3

i Sturm Topologie entwickelt JSONArray Daten von kafka Makler auf Hortonworks zu erhalten,Mein KafkaSpout verbraucht keine Nachrichten von Kafka Brokers in HDP

Ich weiß nicht, warum meine kafkaSpout Nachrichten konsumieren nicht von Kafka Brokers in HDP, aber die Sturm Topologie wurde erfolgreich übermittelt, aber wenn ich die Topologie visualisiere: 0% Daten wurden verbraucht !!

topology visualisation

Das ist mein Schema Klasse:

public class ClientInfosSheme implements Scheme{ 
private static final long serialVersionUID = -2990121166902741545L; 
private static final Logger LOG = Logger.getLogger(ClientInfosSheme.class); 
public String codeBanque; 
public String codeAgence; 
public String codeGuichet; 
public String devise; 
public String numCompte; 
public String codeClient; 
public String codeOperation; 
public String sensOperation; 
public String montantOperation; 
public String dateValeur; 
public String dateComptable; 
public String utilisateur; 

public static final String CODEBANQUE="codeBanque"; 
public static final String CODEAGENCE="codeAgence"; 
public static final String CODEGUICHET="codeGuichet"; 
public static final String DEVISE="devise"; 
public static final String NUMCOMPTE="numCompte"; 
public static final String CODECLIENT="codeClient"; 
public static final String CODEOPERATION="codeOperation"; 
public static final String SENSOPERATION="sensOperation"; 
public static final String MONTANTOPERATION="montantOperation"; 
public static final String DATEVALEUR="dateValeur"; 
public static final String DATECOMPTABLE="dateComptable"; 
public static final String UTILISATEUR="utilisateur"; 

public List<Object> deserialize(byte[] bytes) { 

     try{ 
      String clientInfos = new String(bytes, "UTF-8"); 
       JSONArray JSON = new JSONArray(clientInfos); 
       for(int i=0;i<JSON.length();i++) { 
        JSONObject object_clientInfos=JSON.getJSONObject(i); 
       try{  

        //Récupérations des données 

         this.codeBanque=object_clientInfos.getString("codeBanque"); 
         this.codeAgence=object_clientInfos.getString("codeAgence"); 
         this.codeGuichet=object_clientInfos.getString("codeGuichet"); 
         this.devise=object_clientInfos.getString("devise"); 
         this.numCompte=object_clientInfos.getString("numCompte"); 
         this.codeClient=object_clientInfos.getString("codeClient"); 
         this.codeOperation=object_clientInfos.getString("codeOperation"); 
         this.sensOperation=object_clientInfos.getString("sensOperation"); 
         this.montantOperation=object_clientInfos.getString("montantOperation"); 
         this.dateValeur=object_clientInfos.getString("dateValeur"); 
         this.dateComptable=object_clientInfos.getString("dateComptable"); 
         this.utilisateur=object_clientInfos.getString("utilisateur"); 

        } 
        catch(Exception e) 
           { 
            e.printStackTrace(); 
           } 


    }// End For Loop 



     } catch (JSONException e1) { 
     // TODO Auto-generated catch block 
     e1.printStackTrace(); 
    } catch (UnsupportedEncodingException e1) { 
     // TODO Auto-generated catch block 
     e1.printStackTrace(); 
    } 
     return new Values(codeBanque, codeAgence, codeGuichet, devise, numCompte, codeClient, codeOperation, sensOperation, 
       montantOperation,dateValeur, dateComptable,utilisateur); 

}// End Function deserialize 

public Fields getOutputFields() { 
     return new Fields(CODEBANQUE,CODEAGENCE,CODEGUICHET,DEVISE,NUMCOMPTE, 
       CODECLIENT,CODEOPERATION, SENSOPERATION,MONTANTOPERATION,DATEVALEUR,DATECOMPTABLE,UTILISATEUR); 
    } 


} 

und die Eigenschaftendatei:

#Broker host 
kafka.zookeeper.host.port=sandbox.hortonworks.com 

#Kafka topic to consume. 
kafka.topic=INFOCLIENT 

#Location in ZK for the Kafka spout to store state. 
kafka.zkRoot=/client_infos_sprout 

#Kafka Spout Executors. 
spout.thread.count=1 

Wenn ich einen anderen Verbraucher die Daten verwenden storted in Kafka Brokers mag:

[{"codeBanque":"xx","codeAgence":"yy","codeGuichet":"zz","devise":"tt"..}, 
{"codeBanque":"xx1","codeAgence":"yy1","codeGuichet":"zz1","devise":"tt1"..}, 
{"codeBanque":"xx2","codeAgence":"yy2","codeGuichet":"zz2","devise":"tt2"..}] 

so mein Problem Warum konsumiert es keine Nachrichten von Kafka Brokers?

Bitte Ich brauche Hilfe

+0

Haben Sie den richtigen Themennamen, IP/Hostname usw. überprüft? Haben Sie Storm und Kafka Logs auf Fehlermeldungen überprüft? –

+0

Hallo @ MatthiasJ.Sax ich doppelt überprüft und ich fand, dass, wenn ich '#Broker Host' änderte:' kafka.zookeeper.host.port = 192.168.1.78: 2181' Ich habe dieses Problem: _java.lang.RuntimeException: java .lang.IllegalArgumentException: a || b || c || calculCleRib (a, b, c) existiert nicht bei backtype.storm.utils.DisruptorQueue.consumeBatchToCursor (DisruptorQueue.java:128) _ –

+0

Hallo, wenn ich die STORM UI überprüfe, sah ich, dass die MSSGs ausgesendet und übertragen werden, aber nicht acked! Ich habe diese msg: Die Anzahl der Tupel, die explizit fehlgeschlagen oder impered vor dem Ackern wurde abgeschlossen. ein Wert von 0 wird erwartet, kein Acking erfolgt –

Antwort

1

Wie Sie in den Protokollen festgestellt haben, Ihre Spout Nachrichten nicht „verbrauchen“, weil die Topologie einen Fehler hat und nicht die Tupel nicht ack - daher der Spout wird sie wiederholen . Dies funktioniert wie vorgesehen.

Sobald Ihre Topologie stabil ist, werden Sie beobachten, dass der Offset inkrementiert wird. Bis dahin sendet der Spout Nachrichten in die Topologie, aber Sie können keine Ergebnisse beobachten.

Ohne die calculCleRib-Methode zu sehen und wie sie in Ihre Topologie integriert ist, können wir Ihnen beim Debuggen dieses Aspekts nicht helfen.