Mein Stack ist uwsgi mit gevents. Ich versuche, meine API-Endpunkte mit einem Dekorator zu umhüllen, um alle Anfragedaten (URL, Methode, Körper und Antwort) zu einem kafka Thema zu schieben, aber es funktioniert nicht. Meine Theorie ist, weil ich gevents verwende, und ich versuche, diese im asynchronen Modus auszuführen, der asynchrone Thread, der tatsächlich nach kafka geht, kann nicht mit gevents laufen. Und wenn ich versuche, die Methode zu synchronisieren, dann funktioniert es auch nicht, es stirbt im produce worker, d. H. Nach produce kehrt der Aufruf nie zurück. Obwohl beide Methoden gut auf Python-Shell funktionieren und wenn ich uwsgi auf Threads ausführen.Wie kafka-python oder pykafka als async producer mit uwsgi und gevent arbeiten?
Folgt der Beispielcode: 1. mit kafka-Python (async)
try:
kafka_producer = KafkaProducer(bootstrap_servers=KAFKAHOST.split(','))
except NoBrokersAvailable:
logger.info(u'Kafka Host not available: {}'.format(KAFKAHOST))
kafka_producer = None
def send_message_to_kafka(topic, key, message):
"""
:param topic: topic name
:param key: key to decide partition
:param message: json serializable object to send
:return:
"""
if not kafka_producer:
logger.info(u'Kafka Host not available: {}'.format(KAFKAHOST))
return
data = json.dumps(message)
try:
start = time.time()
kafka_producer.send(topic, key=str(key), value=data)
logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start))
except KafkaTimeoutError as e:
logger.info(u'Message not sent: {}'.format(KAFKAHOST))
logger.info(e)
pass
except Exception as e:
logger.info(u'Message not sent: {}'.format(KAFKAHOST))
logger.exception(e)
pass
mit PY-kafka (sync):
try: client = KafkaClient(hosts=KAFKAHOST) except Exception as e: logger.info(u'Kafka Host Not Found: {}'.format(KAFKAHOST)) client = None def send_message_to_kafka(topic, key, message): """ :param topic: topic name :param key: key to decide partition :param message: json serializable object to send :return: """ if not client: logger.info(u'Kafka Host is None') return data = json.dumps(message) try: start = time.time() topic = client.topics[topic] with topic.get_sync_producer() as producer: producer.produce(data, partition_key='{}'.format(key)) logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start)) except Exception as e: logger.exception(e) pass
irgendwelche neuigkeiten über kafka-python? –