2013-03-20 13 views
6

Ich verstehe, dass diese Frage dupliziert Frage an using rabbitmq to send a message not string but structsendet ein Objekt mit RabbitMQ

, wenn dies zu tun, um die erste Art und Weise unter Verwendung von

first way

Ich habe folgende Spur:

java.io.EOFException 
at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2304) 
at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2773) 
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:798) 
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:298) 
at com.mdnaRabbit.worker.data.Data.fromBytes(Data.java:78) 
at com.mdnaRabbit.worker.App.main(App.java:41) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:601) 
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) 

Ich habe überprüft und sicher, dass die Nachricht in Bytes absolut in Absenderklasse umgewandelt wird, aber der Verbraucher kann nicht r Bekommen Sie es.

hier ist mein Produzent Klasse:

package com.mdnaRabbit.newt; 

import java.io.IOException; 
import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.MessageProperties; 
import org.apache.commons.lang.SerializationUtils; 
import com.mdnaRabbit.worker.data.Data; 

public class App { 

    private static final String TASK_QUEUE_NAME = "task_queue"; 

    public static void main(String[] argv) throws IOException{ 

     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 

     channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); 

     int i = 0; 

     do { 
      Data message = getMessage(); 
      byte [] byteMessage = message.getBytes(); 
      //System.out.println(byteMessage); 
      channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, byteMessage); 
      System.out.println(" [" + (i+1) + "] message Sent" + Data.fromBytes(byteMessage).getBody()); 
      i++; 
     } while (i<15); 

     channel.close(); 
     connection.close(); 
    } 

    private static Data getMessage(){ 
     Data data = new Data(); 
     data.setHeader("header"); 
     data.setDomainId("abc.com"); 
     data.setReceiver("me"); 
     data.setSender("he"); 
     data.setBody("body"); 
     return data; 
    } 

    private static String joinStrings(String[] strings, String delimiter){ 
     int length = strings.length; 
     if (length == 0) return ""; 
     StringBuilder words = new StringBuilder(strings[0]); 
     for (int i = 1; i < length; i++){ 
      words.append(delimiter).append(strings[i]); 
     } 
     return words.toString(); 
    } 
} 

hier meine Consumer-Klasse ist:

package com.mdnaRabbit.worker; 

import java.io.IOException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 

import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.QueueingConsumer; 
import com.mdnaRabbit.worker.data.Data; 
import org.apache.commons.lang.SerializationUtils; 

public class App { 

    private static final String TASK_QUEUE_NAME = "task_queue"; 
    private static int i = 0; 
    public static void main(String[] argv) 
      throws IOException, 
      InterruptedException{ 

     ExecutorService threader = Executors.newFixedThreadPool(20); 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(threader); 
     final Channel channel = connection.createChannel(); 

     channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); 
     System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 

     channel.basicQos(20); 

     final QueueingConsumer consumer = new QueueingConsumer(channel); 
     channel.basicConsume(TASK_QUEUE_NAME, false, consumer); 

     try { 

      while (true) { 

         try {QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
          Data message = Data.fromBytes(delivery.getBody()); 
          //Data message = (Data) SerializationUtils.deserialize(delivery.getBody()); 

          System.out.println(" [" + (i++) +"] Received" + message.getBody()); 

          channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
         }catch (Exception e){ 
         } 
        } 
     } catch (Exception e){ 
      e.printStackTrace(); 
     } 
     channel.close(); 
     connection.close(); 
    } 

} 

hier ist meine Datenklasse:

package com.mdnaRabbit.worker.data; 

import java.io.*; 
import java.util.logging.Level; 
import java.util.logging.Logger; 

public class Data implements Serializable{ 

    public String header; 
    public String body; 
    public String domainId; 
    public String sender; 
    public String receiver; 

    public void setHeader(String head){ 
     this.header = head; 
    } 

    public String getHeader(){ 
     return header; 
    } 

    public void setBody(String body){ 
     this.body = body; 
    } 

    public String getBody(){ 
     return body; 
    } 

    public void setDomainId(String domainId){ 
     this.domainId = domainId; 
    } 

    public String getDomainId(){ 
     return domainId; 
    } 

    public void setSender(String sender){ 
     this.sender = sender; 
    } 

    public String getSender(){ 
     return sender; 
    } 

    public String getReceiver(){ 
     return receiver; 
    } 

    public void setReceiver(String receiver){ 
     this.receiver = receiver; 
    } 


    public byte[] getBytes() { 
     byte[]bytes; 
     ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
     try{ 
      ObjectOutputStream oos = new ObjectOutputStream(baos); 
      oos.writeObject(this); 
      oos.flush(); 
      oos.reset(); 
      bytes = baos.toByteArray(); 
      oos.close(); 
      baos.close(); 
     } catch(IOException e){ 
      bytes = new byte[] {}; 
      Logger.getLogger("bsdlog").log(Level.ALL, "unable to write to output stream" + e); 
     } 
     return bytes; 
    } 

    public static Data fromBytes(byte[] body) { 
     Data obj = null; 
     try { 
      ByteArrayInputStream bis = new ByteArrayInputStream(body); 
      ObjectInputStream ois = new ObjectInputStream(bis); 
      obj = (Data) ois.readObject(); 
      ois.close(); 
      bis.close(); 
     } 
     catch (IOException e) { 
      e.printStackTrace(); 
     } 
     catch (ClassNotFoundException ex) { 
      ex.printStackTrace(); 
     } 
     return obj; 
    } 
} 

ich immer scheint, dass Verbraucher-Nachrichten empfängt , denn wenn ich nicht versuche, es in das Objekt zu transformieren, schreibe einfach System.out.println(delivery.getBody) Es zeigt Bytes

+0

ich den Fehler behoben haben in meiner anderen Antwort, die mit Message und Datamessage zur Verwirrung führen – robthewolf

Antwort

4

Es sieht so aus, als ob das Byte-Array, das Sie erhalten, leer ist. Dies geschieht, weil dies:

} catch(IOException e){ 
     bytes = new byte[] {}; 
    } 

Wenn eine Ausnahme erzeugt wird, wird der Code nicht Sie warnen, dass etwas gebrochen ist und sendet nur ein leeres Array statt. Sie sollten den Fehler zumindest protokollieren.

Die Ausnahme erzeugt wahrscheinlich wird, weil Sie eine Klasse serialisiert werden versuchen, die nicht serialisierbar ist. Um eine Klasse zu machen serializable Sie hinzufügen müssen „implementiert Serializable“ auf seine Erklärung:

public class Data implements Serializable { 
+0

making Klasse serializable hat keine Ursache Verbesserung –

+0

Ich habe einen Fehler in meinem Code gefunden: Ich habe keine Werte in der Datenklasse festgelegt. Aber auch danach habe ich das gleiche Problem –

+0

ich die Lösung hier gefunden habe: [link] (http://stackoverflow.com/a/13174951/2082631), aber es diesn't arbeiten sowieso –