Dies ist meine Verbraucher-Code überprüfen:Wie Verbraucherdaten mit Kafka Java api
public class KafkaConsumer {
private ConsumerConnector consumerConnector = null;
private final String topic = "JsonTopic";
public void initialize() {
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "testgroup");
props.put("kafka.topic", "JsonTopic");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "300");
props.put("auto.commit.interval.ms", "1000");
ConsumerConfig conConfig = new ConsumerConfig(props);
consumerConnector = Consumer.createJavaConsumerConnector(conConfig);
}
public void consume() throws IOException {
Map<String, Integer> topicCount = new HashMap<String, Integer>();
topicCount.put(topic, new Integer(1));
//ConsumerConnector creates the message stream for each topic
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =consumerConnector.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> kStreamList = consumerStreams.get(topic);
for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
ConsumerIterator<byte[], byte[]> consumerItem = kStreams.iterator();
while (consumerItem.hasNext())
{
System.out.println("Message consumed from topic[" + topic + "] : " + new String(consumerItem.next().message()));
// writeToFile(new String(consumerItem.next().message()),"/root/abc.txt");
}
}
//Shutdown the consumer connector
if (consumerConnector != null)
consumerConnector.shutdown();
}
Server starten:
./kafka-server-start.sh /usr/hdp/2.3.0.0-2557/etc/kafka/conf.default/server.properties
starten prodcuer:
[[email protected] bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic JsonTopic
HI
Hello
ich nicht bekommen jede Ausgabe in der Konsole.
[[email protected] ~]# java -jar kafkaconsumer.jar
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
Dies ist nicht die Lösung – Aman