2016-08-08 33 views
1

Ich versuche zu verstehen, wie beobachtbar funktioniert. Hier ist mein Code.Wie heißt OnNext in Scala rx

def make: Stream[Int] = { 
    Stream.cons(scala.util.Random.nextInt(10), { 
     println("Making ..") 
     Thread.sleep(1000) 
     make 
    }) 
    } 

    val y = Observable.from(make) 

    y.foreach(a => println(a)) 

emit wird alle 1 Sekunde neue Werte erzeugen. Ich mache eine Observable daraus. für jede Schleife wird für immer Drucken neu produzierter Werte fortfahren.

Wie ich verstehe, ist a => println (a) ein Rückrufwert, der onNext (t) in rx beobachtbar genannt wird.

Was ich herausfinden möchte, ist, wie das auf den Produzenten geklebt ist, also wenn neuer Wert produziert wird, wo der onNext genannt wird. Ich habe eine Weile nach dem RX-Code geschaut und kann immer noch nicht herausfinden.

Danke.

Antwort

0

Es scheint auf den Stream abonnieren ruft rx.Observable der

Subscription subscribe(Subscriber<? super T> subscriber) 

Dies wird IterableProducer.request Methode aufrufen, die dieses Stück Code haben.

if (n == Long.MAX_VALUE && REQUESTED_UPDATER.compareAndSet(this, 0, Long.MAX_VALUE)) { 
     // fast-path without backpressure 
     while (it.hasNext()) { 
      if (o.isUnsubscribed()) { 
       return; 
      } 
      o.onNext(**it.next()**); 
     } 
     if (!o.isUnsubscribed()) { 
      o.onCompleted(); 
     } 
    } 

onNext ist der Beobachtercode und Eingabe zu es wäre Produzenten it.next. So ist es geklebt.