2015-06-09 8 views
5

Ich benutze Kafka 0.8.2.1 SimpleConsumer. Könnte jemand die Bedeutung einiger Config-Parameter für SimpleConsumer und FetchRequestBuilder klären? Kurz vor dem Lesen von Kafkas Quellcode konnte ich keine Dokumente finden. (Ich habe versucht, diese Frage zu der kafka Benutzergruppe veröffentlichen - aber kein Glück):kafka: was bedeuten 'soTimeout', 'pufferSize' und 'minBytes' für SimpleConsumer?

- Q1: in der Unterzeichnung der SimpleConsumer Konstruktor ich den ‚soTimeout‘ Parameter Int sehen - , was die Bedeutung dieser ist Auszeit? Ist das ein Timeout um sich mit dem Kafka Broker zu verbinden? Zeitüberschreitung beim Erhalten einer Antwort von irgendeiner [oder spezifischen ??] Anfrage an Kafka (wie FetchRequest)? Etwas anderes?

kafka.javaapi.consumer.SimpleConsumer 
    (val host: String, 
    val port: Int, 
    val soTimeout: Int, 
    val bufferSize: Int, 
    val clientId: String) 

- Q2: SimpleConsumer-Konstruktor übernimmt auch Int 'pufferSize' Parameter. Was bedeutet das? Wie viele Bytes liest SimpleConsumer, wenn ein fetchRequest ausgegeben wird? Oder ist es die maximale Anzahl von Bytes, die pro Abruf von Kafka gelesen werden - und mehrere Abrufe passieren, wenn mehr Daten verfügbar sind?

- Wenn FetchRequest über FetchRequestBuilder Gebäude (siehe unten), muss ich auch 'fetchSize' angeben:

FetchRequest req= newFetchRequestBuilder() 
    .clientId(kafkaGroupId) 
    .addFetch(topic, partition, offset, fetchSizeInBytes) 
    .build(); 

auf den Quellcode des FetchRequestBuilder Sehen, denke ich (ich bin nicht ein Scala pro) diese Aufrufe übersetzen in die unten genannten Methodenaufrufe - und dort heißt der letzte Parameter in die FetchRequest übergeben wird "minBytes", hinweisend, dass dies nicht die genaue Abrufgröße ist, möglicherweise? . Bedeutet es, dass es nicht einmal etwas abholen wird, wenn nicht mindestens 'minBytes' an Daten verfügbar sind?

class FetchRequestBuilder(): 
    def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) 

    def build() = { 
     val fetchRequest= FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap) 

FetchRequest(versionId: Short = FetchRequest.CurrentVersion, 
    correlationId: Int = FetchRequest.DefaultCorrelationId, 
    clientId: String = ConsumerConfig.DefaultClientId, 
    replicaId: Int = Request.OrdinaryConsumerId, 
    maxWait: Int = FetchRequest.DefaultMaxWait, 
    **minBytes: Int = FetchRequest.DefaultMinBytes**, 
...) 

Also, meine letzte Frage ist:

- Q3: Wie 'buffer' und 'fetchSize/minBytes' beziehen? Was genau definieren sie? Muss ich sicher machen, ist einer kleiner oder reifer als der andere?

Danke,

Marina

Antwort

2

soTimeout ist die Zeit in Millisekunden für eine Verbindung zu dem gewünschten Broker zu warten. Ich weiß nicht, dass mit der Verbindung etwas Besonderes passiert, außer dass Sie eine Bestätigung erhalten, dass dort ein Broker ist, der bereit ist, einige weitere Aktionen durchzuführen.

Ich glaube, dass die im Konstruktor verwendete Puffergröße die Größe des Puffers ist, der vom clientseitigen Socket zum Empfangen von Daten verwendet wird, die vom Broker gesendet werden.

Für Ihre letzte Frage, wenn die Gesamtanzahl der Bytes, die aus irgendeinem Grund durch eine Abrufanforderung zurückgegeben wird, größer als die angeforderte Socketpuffergröße ist, müssen mehr als eine niedrigere Ebene alle Daten abzurufen abrufen obwohl es einen einzigen Abruf auf einer höheren Ebene gibt.

+0

danke, Chris. Es macht Sinn, dass es mehr als einen Low-Level-Abruf gibt, wenn mehr Daten als die Puffergröße des Sockets vorhanden sind. Mir ist noch nicht klar, was die 'fetchSize/minBytes' angibt. Wenn, sagen wir, die minBytes = 200, aber es sind 100 Bytes von Nachrichten für den Verbrauch verfügbar - wird der Verbraucher warten, bis mindestens 200 Bytes verfügbar sind?oder ist es eine vorgeschlagene "Chunk" -Größe des Abrufens von Daten von Kafka in einem Abruf auf niedriger Ebene? Vielen Dank!! – Marina

+0

Wenn keine Nachrichten verfügbar sind, wird beim Abruf eine Ausnahme ausgelöst (es gibt einen bestimmten Zeitüberschreitungswert für diese Wartezeit). Andernfalls ruft der Abruf eine Anzahl von Nachrichten ab, deren Anzahl durch die fetchSize angezeigt wird. Im Idealfall haben die zurückgegebenen Nachrichten eine Gesamtgröße, die größer als die Abrufgröße ist. Wenn jedoch nicht genügend Nachrichten vorhanden sind, wird nur diese Zahl nach dem angegebenen Zeitraum zurückgegeben. –

+0

toll, danke !! – Marina