i Sturm Topologie entwickelt JSONArray Daten von kafka Makler auf Hortonworks zu erhalten,Mein KafkaSpout verbraucht keine Nachrichten von Kafka Brokers in HDP
Ich weiß nicht, warum meine kafkaSpout Nachrichten konsumieren nicht von Kafka Brokers in HDP, aber die Sturm Topologie wurde erfolgreich übermittelt, aber wenn ich die Topologie visualisiere: 0% Daten wurden verbraucht !!
Das ist mein Schema Klasse:
public class ClientInfosSheme implements Scheme{
private static final long serialVersionUID = -2990121166902741545L;
private static final Logger LOG = Logger.getLogger(ClientInfosSheme.class);
public String codeBanque;
public String codeAgence;
public String codeGuichet;
public String devise;
public String numCompte;
public String codeClient;
public String codeOperation;
public String sensOperation;
public String montantOperation;
public String dateValeur;
public String dateComptable;
public String utilisateur;
public static final String CODEBANQUE="codeBanque";
public static final String CODEAGENCE="codeAgence";
public static final String CODEGUICHET="codeGuichet";
public static final String DEVISE="devise";
public static final String NUMCOMPTE="numCompte";
public static final String CODECLIENT="codeClient";
public static final String CODEOPERATION="codeOperation";
public static final String SENSOPERATION="sensOperation";
public static final String MONTANTOPERATION="montantOperation";
public static final String DATEVALEUR="dateValeur";
public static final String DATECOMPTABLE="dateComptable";
public static final String UTILISATEUR="utilisateur";
public List<Object> deserialize(byte[] bytes) {
try{
String clientInfos = new String(bytes, "UTF-8");
JSONArray JSON = new JSONArray(clientInfos);
for(int i=0;i<JSON.length();i++) {
JSONObject object_clientInfos=JSON.getJSONObject(i);
try{
//Récupérations des données
this.codeBanque=object_clientInfos.getString("codeBanque");
this.codeAgence=object_clientInfos.getString("codeAgence");
this.codeGuichet=object_clientInfos.getString("codeGuichet");
this.devise=object_clientInfos.getString("devise");
this.numCompte=object_clientInfos.getString("numCompte");
this.codeClient=object_clientInfos.getString("codeClient");
this.codeOperation=object_clientInfos.getString("codeOperation");
this.sensOperation=object_clientInfos.getString("sensOperation");
this.montantOperation=object_clientInfos.getString("montantOperation");
this.dateValeur=object_clientInfos.getString("dateValeur");
this.dateComptable=object_clientInfos.getString("dateComptable");
this.utilisateur=object_clientInfos.getString("utilisateur");
}
catch(Exception e)
{
e.printStackTrace();
}
}// End For Loop
} catch (JSONException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (UnsupportedEncodingException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
return new Values(codeBanque, codeAgence, codeGuichet, devise, numCompte, codeClient, codeOperation, sensOperation,
montantOperation,dateValeur, dateComptable,utilisateur);
}// End Function deserialize
public Fields getOutputFields() {
return new Fields(CODEBANQUE,CODEAGENCE,CODEGUICHET,DEVISE,NUMCOMPTE,
CODECLIENT,CODEOPERATION, SENSOPERATION,MONTANTOPERATION,DATEVALEUR,DATECOMPTABLE,UTILISATEUR);
}
}
und die Eigenschaftendatei:
#Broker host
kafka.zookeeper.host.port=sandbox.hortonworks.com
#Kafka topic to consume.
kafka.topic=INFOCLIENT
#Location in ZK for the Kafka spout to store state.
kafka.zkRoot=/client_infos_sprout
#Kafka Spout Executors.
spout.thread.count=1
Wenn ich einen anderen Verbraucher die Daten verwenden storted in Kafka Brokers mag:
[{"codeBanque":"xx","codeAgence":"yy","codeGuichet":"zz","devise":"tt"..},
{"codeBanque":"xx1","codeAgence":"yy1","codeGuichet":"zz1","devise":"tt1"..},
{"codeBanque":"xx2","codeAgence":"yy2","codeGuichet":"zz2","devise":"tt2"..}]
so mein Problem Warum konsumiert es keine Nachrichten von Kafka Brokers?
Bitte Ich brauche Hilfe
Haben Sie den richtigen Themennamen, IP/Hostname usw. überprüft? Haben Sie Storm und Kafka Logs auf Fehlermeldungen überprüft? –
Hallo @ MatthiasJ.Sax ich doppelt überprüft und ich fand, dass, wenn ich '#Broker Host' änderte:' kafka.zookeeper.host.port = 192.168.1.78: 2181' Ich habe dieses Problem: _java.lang.RuntimeException: java .lang.IllegalArgumentException: a || b || c || calculCleRib (a, b, c) existiert nicht bei backtype.storm.utils.DisruptorQueue.consumeBatchToCursor (DisruptorQueue.java:128) _ –
Hallo, wenn ich die STORM UI überprüfe, sah ich, dass die MSSGs ausgesendet und übertragen werden, aber nicht acked! Ich habe diese msg: Die Anzahl der Tupel, die explizit fehlgeschlagen oder impered vor dem Ackern wurde abgeschlossen. ein Wert von 0 wird erwartet, kein Acking erfolgt –