2016-03-21 9 views
0

Ich versuche verwenden multi Observer abonnieren Sie eine Observable welche onNext in einer Schleife aufgetreten ist.Es scheint nicht für jeden Beobachter funktionieren.RxScala Subscribe mit multi Observer nur Ereignis an erster Stelle senden

import rx.lang.scala.Observable 

object SubscribeMultiEvent extends App{ 
    val obv = Observable.apply[String]{ s => 
    def printForever: Unit = { 
     s.onNext("hi~") 
     Thread.sleep(1000) 
     printForever 
    } 
    printForever 
    } 

    obv.subscribe(s => println(s"first observer - $s")) 
    obv.subscribe(s => println(s"second observer - $s")) 

    Thread.currentThread().join() 
} 

Antwort nur für den ersten Beobachter

first observer - hi~ 
first observer - hi~ 
... 

Warum zweiten der Zeichnung nicht empfangen kann? Danke

Antwort

1

Das Problem in Ihren Codes ist Ihre Observable ist synchron. Es bedeutet, dass die zweite subscribe nicht läuft, bis die erste subscribe beendet ist. Und da Ihre Observable nie abgeschlossen ist, kann die zweite subscribe nicht ausgeführt werden.

Um dieses Problem zu beheben, müssen Sie Ihre Observable asynchron machen. Sie können subscribeOn verwenden, um in einem anderen Thread zu laufen. Zum Beispiel

import rx.lang.scala.Observable 
import rx.lang.scala.schedulers.NewThreadScheduler 

object SubscribeMultiEvent extends App{ 
    val obv = Observable.apply[String]{ s => 
    def printForever: Unit = { 
     s.onNext("hi~") 
     Thread.sleep(1000) 
     printForever 
    } 
    printForever 
    }.subscribeOn(NewThreadScheduler()) 

    obv.subscribe(s => println(s"first observer - $s")) 
    obv.subscribe(s => println(s"second observer - $s")) 

    Thread.sleep(60000) 
} 

Thread.sleep(60000) am Ende ist wichtig. Die Threads von RxJava sind standardmäßig Daemon, und wenn der Hauptthread beendet ist, weil es keine Nicht-Daemon-Threads mehr gibt, wird JVM beendet. Um zu verhindern, dass der Haupt-Thread anhält, müssen Sie etwas wie Thread.sleep(60000) hinzufügen.

+0

Danke, das ist großartig.Weiterhin kann ich ein Loop-Emit-Ereignis auf diese Weise konstruieren, aber es iteriere Body-Subscribe auf alle Beobachter jedes Mal? Ich möchte noch keinen neuen Thread für jedes Abo verwenden. – LoranceChen

+0

Sieht aus, als müssten Sie 'publish'. Siehe dieses Beispiel: https://github.com/ReactiveX/RxScala/blob/a385e1a474a05af5173d3a6c5f380b0f87b50dff/examples/src/test/scala/examples/RxScalaDemo.scala#L438 – zsxwing

+0

'publish' ist erstaunlich, ich muss Rx tiefer lernen. – LoranceChen