2016-06-15 11 views
0

Immer, wenn ich den folgenden Code zu starten:Wie kommt mein channel.basicConsume nicht für Nachrichten nicht warten

ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost"); 
    Connection connection = factory.newConnection(); 
    Channel channel = connection.createChannel(); 
    String exchangeName = "direct_logs"; 
    channel.exchangeDeclare(exchangeName, "direct"); 
    String queueName = channel.queueDeclare().getQueue(); 
    channel.queueBind(queueName, exchangeName, "red"); 
    channel.basicQos(1); 

    final Consumer consumer = new DefaultConsumer(channel){ 
     @Override 
     public void handleDelivery(String consumerTag, 
            Envelope envelope, 
            AMQP.BasicProperties properties, 
            byte[] body) throws IOException{ 
      String message = new String(body, "UTF-8"); 
      System.out.println(message); 
      System.out.println("message received"); 
     } 
    }; 

    channel.basicConsume(queueName, true, consumer); 

Es ist nicht eine Endlosschleife beginnen, wie in der Dokumentation impliziert. Stattdessen stoppt es sofort. Der einzige Weg, ich kann es für einige Zeit in Anspruch nehmen muß, ist channel.basicConsume mit einer Schleife zu ersetzen, wie folgt:

DateTime startedAt = new DateTime(); 
    DateTime stopAt = startedAt.plusSeconds(60); 
    long i=0; 
    try { 
     while (stopAt.compareTo(new DateTime()) > 0) { 
      channel.basicConsume(queueName, true, consumer); 
      i++; 
     } 
    }finally { 
     System.out.println(new DateTime()); 
     System.out.println(startedAt); 
     System.out.println(stopAt); 
     System.out.println(i); 
    } 

Es muss ein besserer Weg, um Nachrichten für eine Weile zu hören, nicht wahr? Was vermisse ich? Es hört sofort auf zu hören.

+0

Haben Sie keine Ausnahme wie ConnectException: Verbindung verweigert? –

Antwort

3

Sind Sie sicher, dass es zu stoppen? Was basicConsume tut, ist ein Benutzer registrieren, um eine bestimmte Warteschlange zu hören, so dass es nicht notwendig ist, es in einer Schleife auszuführen. Sie führen es nur einmal aus und die handleDelivery Methode der Instanz Consumer, die Sie übergeben, wird aufgerufen, sobald eine Nachricht eintrifft.

Die Themen, die die rabbitmq Bibliothek erstellt von dem Verlassen des JVM halten sollte. Um das Programm zu beenden, sollten Sie tatsächlich connection.close()

Hier nennen, ist ein vollständiger Empfänger Beispiel von rabbitmq: https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Recv.java

Es ist eigentlich so ziemlich das gleiche wie Sie.

0

Ich hatte das gleiche Problem. Der Grund war, dass ich am Ende connection.close angerufen habe. Die basicConsume() -Methode blockiert jedoch nicht den aktuellen Thread, sondern andere Threads, sodass der nachfolgende Code, d. h. die connection.close(), sofort aufgerufen wird.