2016-07-01 8 views
3

Ich bin neu bei RxJava und versuche, ein Observable von Nachrichten aus einer RabbitMQ-Warteschlange zu implementieren, die verlustfreien Gegendruck unterstützt. Ich habe es geschafft, ein Observable aus einem Spring AMQP MessageListener zu erstellen. Dies behandelt den Gegendruck in einer synchronen Umgebung (z. B. callstack blocking), aber sobald mehrere Threads eingeführt werden, verlässt der Gegendruck das Fenster - wie zu erwarten ist. Die Klasse ist unter:Wie wird Gegendruck in einem RxJava RabbitMQ Observable implementiert?

import org.springframework.amqp.core.MessageListener; 
import org.springframework.amqp.rabbit.connection.ConnectionFactory; 
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; 
import org.springframework.amqp.support.converter.MessageConverter; 
import org.springframework.stereotype.Component; 
import rx.Observable; 
import rx.subscriptions.Subscriptions; 

import javax.inject.Inject; 

@Component 
public class CommandExchange { 
    private final MessageConverter messageConverter; 
    private final ConnectionFactory connectionFactory; 

    @Inject 
    public CommandExchange(MessageConverter messageConverter, ConnectionFactory connectionFactory) { 
     this.messageConverter = messageConverter; 
     this.connectionFactory = connectionFactory; 
    } 

    public <T extends Command> Observable<T> observeQueue(String... queueNames) { 
     return Observable.create(subscriber -> { 

      SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
      container.setConnectionFactory(connectionFactory); 
      container.setQueueNames(queueNames); 
      container.setMessageListener((MessageListener) message -> { 
       T command = (T) messageConverter.fromMessage(message); 
       if (!subscriber.isUnsubscribed()) { 
        System.out.println("Being asked for a message."); 
        subscriber.onNext(command); 
       } 
      }); 
      container.start(); 

      Subscriptions.create(container::shutdown); 

     }); 
    } 
} 

ich nicht meinen Kopf herum bekommen, wie hier zu implementieren lossless backpressue ohne oder Pufferung zu blockieren. Es ist nicht sinnvoll, die Pufferung zu verwenden, da die Rabbit MQ-Warteschlange bereits ein Puffer ist. Daher sollte eine Nachricht nur dann aus der Warteschlange konsumiert werden, wenn ein Abonnent dafür bereit ist. Ist die Lösung eher, ein Pull-basiertes Observable zu verwenden (d. H. Stoppen Sie die Verwendung eines Listeners und stattdessen grab a message when there is demand from the subscriber)? Wenn ja, wie wäre es am besten, den Fall zu bearbeiten, in dem sich gerade keine Nachrichten in der Warteschlange befinden?

Antwort

1

Ja, ich würde aufhören mit einem Listener und greifen Nachrichten aus der Warteschlange bei Bedarf. Anfordern von Abrechnungs und Gegendruck wird alles für Sie dann behandelt, wenn Sie

verwenden
Observable.create(new SyncOnSubscribe<T>() {...}); 

In SyncOnSubscribe Sie mehr oder weniger nur die Aktion an, die genommen wird, eine Nachricht zu bekommen (oder keine, wenn es keine Warte ist).

+0

Benötigen Sie weitere Hilfe zu diesem Thema? Ich kann mehr Details hinzufügen, wenn Sie es brauchen. –

+0

'public beobachtbare observeQueue (String warteschlange) { Observable.OnSubscribe staatenlos = SyncOnSubscribe.createStateless (s -> { s.onNext ((T) rabbitTemplate.receiveAndConvert (warteschlange, -1)); }); return Observable.create (statuslos) .subscribeOn (Schedulers.newThread()); } ' Das funktioniert gut, obwohl ich nicht sicher bin, ob der blockierende Beobachter, der auf einem neuen Thread läuft, hier die beste Vorgehensweise ist. –