2016-04-03 16 views
0

Im Versuch, Daten von einem Thema zu verbrauchen durch die Offset-Einstellung aber Assertionsfehler erhalten -AssertionError: Nicht zugeordnete Partition

from kafka import KafkaConsumer 

consumer = KafkaConsumer('foobar1', 
         bootstrap_servers=['localhost:9092']) 
print 'process started' 
print consumer.partitions_for_topic('foobar1') 
print 'done' 
consumer.seek(0,10) 

for message in consumer: 
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, 
              message.offset, message.key, 
              message.value)) 
print 'process ended' 

Fehler: -

Traceback (most recent call last): 
    File "/Users/pn/Documents/jobs/ccdn/kafka_consumer_1.py", line 21, in <module> 
    consumer.seek(0,10) 
    File "/Users/pn/.virtualenvs/vpsq/lib/python2.7/site-packages/kafka/consumer/group.py", line 549, in seek 
    assert partition in self._subscription.assigned_partitions(), 'Unassigned partition' 
AssertionError: Unassigned partition 

Antwort

1

Sie müssen rufen consumer.assign() mit einer Liste von TopicPartitions vor dem Aufruf von seek. Beachten Sie auch, dass das erste Argument für die Suche auch eine TopicPartition ist. Siehe KafkaConsumer API

0

In meinem Fall mit Kafka 0.9 und kafka-python wird Partitionszuordnung während for message in consumer passiert ist. Also sollte die Suchoperation nach der Iteration erfolgen. Ich setze den Offset meiner Gruppe durch den folgenden Code zurück:

import kafka 

ps = [] 
for i in xrange(topic_partition_number): 
    ps.append(kafka.TopicPartition(topic, i)) 

consumer = kafka.KafkaConsumer(topic, bootstrap_servers=address, group_id=group) 
for msg in consumer: 
    print msg 
    consumer.seek_to_beginning(*ps) 
    consumer.commit() 
    break