Immer, wenn ich den folgenden Code zu starten:Wie kommt mein channel.basicConsume nicht für Nachrichten nicht warten
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "direct_logs";
channel.exchangeDeclare(exchangeName, "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, "red");
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException{
String message = new String(body, "UTF-8");
System.out.println(message);
System.out.println("message received");
}
};
channel.basicConsume(queueName, true, consumer);
Es ist nicht eine Endlosschleife beginnen, wie in der Dokumentation impliziert. Stattdessen stoppt es sofort. Der einzige Weg, ich kann es für einige Zeit in Anspruch nehmen muß, ist channel.basicConsume
mit einer Schleife zu ersetzen, wie folgt:
DateTime startedAt = new DateTime();
DateTime stopAt = startedAt.plusSeconds(60);
long i=0;
try {
while (stopAt.compareTo(new DateTime()) > 0) {
channel.basicConsume(queueName, true, consumer);
i++;
}
}finally {
System.out.println(new DateTime());
System.out.println(startedAt);
System.out.println(stopAt);
System.out.println(i);
}
Es muss ein besserer Weg, um Nachrichten für eine Weile zu hören, nicht wahr? Was vermisse ich? Es hört sofort auf zu hören.
Haben Sie keine Ausnahme wie ConnectException: Verbindung verweigert? –