2016-06-08 24 views
4

Mein Stack ist uwsgi mit gevents. Ich versuche, meine API-Endpunkte mit einem Dekorator zu umhüllen, um alle Anfragedaten (URL, Methode, Körper und Antwort) zu einem kafka Thema zu schieben, aber es funktioniert nicht. Meine Theorie ist, weil ich gevents verwende, und ich versuche, diese im asynchronen Modus auszuführen, der asynchrone Thread, der tatsächlich nach kafka geht, kann nicht mit gevents laufen. Und wenn ich versuche, die Methode zu synchronisieren, dann funktioniert es auch nicht, es stirbt im produce worker, d. H. Nach produce kehrt der Aufruf nie zurück. Obwohl beide Methoden gut auf Python-Shell funktionieren und wenn ich uwsgi auf Threads ausführen.Wie kafka-python oder pykafka als async producer mit uwsgi und gevent arbeiten?

Folgt der Beispielcode: 1. mit kafka-Python (async)

try: 
     kafka_producer = KafkaProducer(bootstrap_servers=KAFKAHOST.split(',')) 
    except NoBrokersAvailable: 
     logger.info(u'Kafka Host not available: {}'.format(KAFKAHOST)) 
     kafka_producer = None 


    def send_message_to_kafka(topic, key, message): 
     """ 
     :param topic: topic name 
     :param key: key to decide partition 
     :param message: json serializable object to send 
     :return: 
     """ 
     if not kafka_producer: 
      logger.info(u'Kafka Host not available: {}'.format(KAFKAHOST)) 
      return 
     data = json.dumps(message) 
     try: 
      start = time.time() 
      kafka_producer.send(topic, key=str(key), value=data) 
      logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start)) 
     except KafkaTimeoutError as e: 
      logger.info(u'Message not sent: {}'.format(KAFKAHOST)) 
      logger.info(e) 
      pass 
     except Exception as e: 
      logger.info(u'Message not sent: {}'.format(KAFKAHOST)) 
      logger.exception(e) 
      pass 
  1. mit PY-kafka (sync):

    try: 
        client = KafkaClient(hosts=KAFKAHOST) 
    except Exception as e: 
        logger.info(u'Kafka Host Not Found: {}'.format(KAFKAHOST)) 
        client = None 
    
    
    def send_message_to_kafka(topic, key, message): 
        """ 
        :param topic: topic name 
        :param key: key to decide partition 
        :param message: json serializable object to send 
        :return: 
        """ 
        if not client: 
         logger.info(u'Kafka Host is None') 
         return 
        data = json.dumps(message) 
        try: 
         start = time.time() 
         topic = client.topics[topic] 
         with topic.get_sync_producer() as producer: 
          producer.produce(data, partition_key='{}'.format(key)) 
         logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start)) 
        except Exception as e: 
         logger.exception(e) 
         pass 
    
+0

irgendwelche neuigkeiten über kafka-python? –

Antwort

4

Ich habe mehr Erfahrung mit pykafka, damit ich diesen Abschnitt beantworten kann. pykafka verwendet einen Pluggable-Thread-Handler und Gevent-Unterstützung ist eingebaut. Sie müssen den KafkaClient mit use_greenlets=True instanziieren. Dokumente here

Andere Gedanken zu Ihrem Ansatz. Das Erstellen eines neuen Themenobjekts und Produzenten für jede Nachricht ist extrem teuer. Es ist besser, einmal zu erstellen und erneut zu verwenden.

# setup once 
client = KafkaClient(hosts=KAFKAHOST, use_greenlets=True) 
topic = client.topics[topic] 
producer = topic.get_sync_producer() 

def send_message_to_kafka(producer, key, message): 
    """ 
    :param producer: pykafka producer 
    :param key: key to decide partition 
    :param message: json serializable object to send 
    :return: 
    """ 

    data = json.dumps(message) 
    try: 
     start = time.time() 
     producer.produce(data, partition_key='{}'.format(key)) 
     logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start)) 
    except Exception as e: 
     logger.exception(e) 
     pass # for at least once delivery you will need to catch network errors and retry. 

Endlich, Kafka bekommt seine ganze Geschwindigkeit von Batching und Komprimierung. Die Verwendung des Sync-Producers verhindert, dass der Client diese Funktionen ausnutzt. Es wird funktionieren, ist aber langsamer und benötigt mehr Speicherplatz. Einige Anwendungen erfordern eine Synchronisierung, aber es könnte sinnvoll sein, Ihre Anwendung in Batchnachrichten zu überdenken, wenn Sie Leistungsengpässe haben.

+0

Danke, du hast mir das Leben gerettet ... drei Tage habe ich mit Kafka-Python gekämpft und mich entschieden, nach Pykafka und Use_greenlets zu ziehen –