2016-01-22 5 views
7

Ich bin eine benutzerdefinierte Dataflow unbeschränkte Datenquelle zu schreiben, die 0,8 von Kafka liest. Ich möchte es lokal mit dem DirectPipelineRunner ausführen. Aber ich bin immer die folgende stackstrace:Verwenden von benutzerdefinierten Dataflow unbeschränkte Quelle auf DirectPipelineRunner

Exception in thread "main" java.lang.IllegalStateException: no evaluator registered for Read(KafkaDataflowSource) 
     at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:700) 
     at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) 
     at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
     at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) 
     at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:252) 
     at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:662) 
     at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:374) 
     at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:87) 
     at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:174) 

die einigen Sinn macht, da ich nicht einen Auswerter für meine benutzerdefinierte Quelle jederzeit registriert.

Lesen https://github.com/GoogleCloudPlatform/DataflowJavaSDK, es scheint nur Evaluatoren für begrenzt Quellen registriert sind. Was ist die empfohlene Methode zum Definieren und Registrieren eines Evaluators für eine benutzerdefinierte unbegrenzte Quelle?

Antwort

3

DirectPipelineRunner läuft derzeit nur über begrenzten Eingang. Wir arbeiten aktiv daran, diese Einschränkung aufzuheben und erwarten, sie in Kürze veröffentlichen zu können.

In der Zwischenzeit können Sie trivialer jede UnboundedSource in eine BoundedSource drehen, zu Testzwecken von withMaxNumRecords verwenden, wie im folgenden Beispiel:

UnboundedSource<String> unboundedSource = ...; // make a Kafka source 
PCollection<String> boundedKafkaCollection = 
    p.apply(Read.from(unboundedSource).withMaxNumRecords(10)); 

Weitere Details finden this issue on GitHub.


Separat gibt es mehrere Bemühungen, den Kafka-Stecker beizutragen. Vielleicht möchten Sie sich mit uns und anderen Mitwirkenden über our GitHub repository in Verbindung setzen.

+0

Im Grunde sind wir auf einer alten Version von kafka mit der älteren Verbraucher API, so dass unsere Bemühungen auf dem Stecker sind zur Zeit ein bisschen nutzlos Sie :-) Wir hoffen, würden wir unsere Version von kafka erhalten aktualisiert und am Ende mit ein Standardstecker bald. – bfabry