0

Ich habe Kafka-Client eingerichtet, die Nachrichten produzieren und konsumieren, es funktioniert wie erwartet, wenn wir Nutzdaten vom Produzenten zum Thema senden, also habe ich ein Problem mit dem Produzenten jetzt erste Nachricht, die ich zum Thema senden konnte und ich auch konnte konsumiere es von kafka topic, jetzt bin ich versucht, zweite nachrichten zu senden, aber verbraucher liest nicht die zweite nachricht von kafka thema, Irgendeine idee, was hier passiert?Wie sende ich mit kafka producer Nachrichten an dasselbe Thema?

producer.js

var config = require('./config.js'); 
var zk = require('node-zookeeper-client'); 
var kafkaConn = "130.8"; 
var kafka = require('kafka-node'), 
    HighLevelProducer = kafka.HighLevelProducer, 
    client = new kafka.Client(kafkaConn), 
    producer = new HighLevelProducer(client), 
    payloads = [ 
     { topic: 'test', messages: 'second message' } 
    ]; 
producer.on('ready', function() { 
    producer.send(payloads, function (err, data) { 
     console.log(data); 
    }); 
}); 

consumer.js

function start() { 
    topics = [{topic: 'test'}]; 
    var groupId = 'ulogGroup'; 
    var clientId = "consumer-" + Math.floor(Math.random() * 10000); 
    var options = {autoCommit: true, fetchMaxWaitMs: 100, fetchMaxBytes: 10 * 1024 * 1024, groupId: groupId}; 
    console.log("Started consumer: ", clientId); 
    var consumer_client = new kafka.Client(kafkaConn,clientId); 
    var client = new Client(consumer_client.connectionString,clientId); 
    var consumer = new HighLevelConsumer(client, topics, options); 
    console.log("Consumer topics:", getConsumerTopics(consumer).toString()); 
    // startConsumer(consumer); 
    consumer.on('message', function (message) { 
     //var topic = message.data; 
     console.log('Message',message); 
    }); 
}; 
start(); 

Antwort

1

Könnte die banalsten Vorschlag sein, aber sind Sie sicher, dass Ihre zweite Nachricht an kafka Warteschlange gesendet wird? Neben Ihrem Consumer möchten Sie möglicherweise den integrierten Befehlszeilen-Consumer verwenden, während Ihr Produzent Nachrichten veröffentlicht, um sicherzustellen, dass alle Ihre Nachrichten veröffentlicht werden.

bin/kafka-console-consumer.sh --zookeeper host:2181 --topic test --from-beginning 
+0

Danke für die Antwort! – hussain