Ich bin neu bei RxJava und versuche, ein Observable von Nachrichten aus einer RabbitMQ-Warteschlange zu implementieren, die verlustfreien Gegendruck unterstützt. Ich habe es geschafft, ein Observable aus einem Spring AMQP MessageListener zu erstellen. Dies behandelt den Gegendruck in einer synchronen Umgebung (z. B. callstack blocking), aber sobald mehrere Threads eingeführt werden, verlässt der Gegendruck das Fenster - wie zu erwarten ist. Die Klasse ist unter:Wie wird Gegendruck in einem RxJava RabbitMQ Observable implementiert?
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.stereotype.Component;
import rx.Observable;
import rx.subscriptions.Subscriptions;
import javax.inject.Inject;
@Component
public class CommandExchange {
private final MessageConverter messageConverter;
private final ConnectionFactory connectionFactory;
@Inject
public CommandExchange(MessageConverter messageConverter, ConnectionFactory connectionFactory) {
this.messageConverter = messageConverter;
this.connectionFactory = connectionFactory;
}
public <T extends Command> Observable<T> observeQueue(String... queueNames) {
return Observable.create(subscriber -> {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueNames);
container.setMessageListener((MessageListener) message -> {
T command = (T) messageConverter.fromMessage(message);
if (!subscriber.isUnsubscribed()) {
System.out.println("Being asked for a message.");
subscriber.onNext(command);
}
});
container.start();
Subscriptions.create(container::shutdown);
});
}
}
ich nicht meinen Kopf herum bekommen, wie hier zu implementieren lossless backpressue ohne oder Pufferung zu blockieren. Es ist nicht sinnvoll, die Pufferung zu verwenden, da die Rabbit MQ-Warteschlange bereits ein Puffer ist. Daher sollte eine Nachricht nur dann aus der Warteschlange konsumiert werden, wenn ein Abonnent dafür bereit ist. Ist die Lösung eher, ein Pull-basiertes Observable zu verwenden (d. H. Stoppen Sie die Verwendung eines Listeners und stattdessen grab a message when there is demand from the subscriber)? Wenn ja, wie wäre es am besten, den Fall zu bearbeiten, in dem sich gerade keine Nachrichten in der Warteschlange befinden?
Benötigen Sie weitere Hilfe zu diesem Thema? Ich kann mehr Details hinzufügen, wenn Sie es brauchen. –
'public beobachtbare observeQueue (String warteschlange) { Observable.OnSubscribe staatenlos = SyncOnSubscribe.createStateless (s -> { s.onNext ((T) rabbitTemplate.receiveAndConvert (warteschlange, -1)); }); return Observable.create (statuslos) .subscribeOn (Schedulers.newThread()); } ' Das funktioniert gut, obwohl ich nicht sicher bin, ob der blockierende Beobachter, der auf einem neuen Thread läuft, hier die beste Vorgehensweise ist. –