5

Ich versuche, ein kleines Beispiel von Apache flink in clojure laufen, aber gerade jetzt bin ich stecken, wegen der Art in clojure und eine seltsame Eigenart in flink andeuten.Stuck mit Typ Hinweise in clojure für generische Klasse

Hier ist mein Code:

(ns pipeline.core 
(:import 
(org.apache.flink.api.java ExecutionEnvironment) 
(org.apache.flink.api.common.functions FlatMapFunction) 
(org.apache.flink.api.java.tuple Tuple2) 
(org.apache.flink.util Collector) 
(java.lang String))) 

(def flink-env (ExecutionEnvironment/createLocalEnvironment)) 

(def dataset (.fromElements flink-env (to-array ["please test me"]))) 

(defn tokenizer [] (reify FlatMapFunction 
       (flatMap [this value collector] 
        (println value)))) 

(.flatMap dataset (tokenizer)) 

Wenn ich Hinweise nicht-Typ liefern, ich einen Fehler aus dem flink api erhalten:

Caused by: java.lang.IllegalArgumentException: The types of the interface org.apache.flink.api.common.functions.FlatMapFunction could not be inferred. Support for synthetic interfaces, lambdas, and generic types is limited at this point. 
at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:662) 

Wenn ich Typ Hinweise geben:

(defn tokenizer [] (reify FlatMapFunction 
       (^void flatMap [this ^String value ^Collector collector] 
        (println value)))) 

Ich bekomme einen Fehler von der Clojure-Compiler:

Caused by: java.lang.IllegalArgumentException: Can't find matching method: flatMap, leave off hints for auto match. 
at clojure.lang.Compiler$NewInstanceMethod.parse(Compiler.java:8065) 

Gibt es eine Möglichkeit, Typhinweise in clojure mit generischen Klassen hinzuzufügen? Es sollte wie folgt sein:

(defn tokenizer [] (reify FlatMapFunction 
       (^void flatMap [this ^String value ^Collector<Tuple2<String, Integer>> collector] 
        (println value)))) 

Aber das funktioniert nicht. Irgendwelche Ideen?

Die lein Config sieht wie folgt aus:

(defproject pipeline "0.1.0-SNAPSHOT" 
:description "FIXME: write description" 
:url "http://example.com/FIXME" 
:license {:name "Eclipse Public License" 
     :url "http://www.eclipse.org/legal/epl-v10.html"} 
:dependencies [[org.clojure/clojure "1.7.0"]    
      [org.apache.flink/flink-java "0.9.0"]    
      ] 
    :aot :all) 

Antwort

3

Clojure können Reflexionen nicht umgehen, so müssen Sie den Rückgabetyp manuell über Flink Methode returns angeben.

(.returns (.flatMap dataset (tokenizer)) String) 

Darüber hinaus müssen Sie deftype verwenden tokenizer zu definieren und ein neues Objekt instanziiert, wenn Sie es denn Flink keine anonymen Klassen umgehen kann:

(deftype tokenizer [] FlatMapFunction 
         (flatMap [this value collector] 
         (println value))) 

(.flatMap dataset (tokenizer.)) 

Hier ist ein voll „Wort-Count-Beispiel " das kann in ein Glas gepackt und ausgeführt werden.

Achten Sie auf die Typhinweise und -formen. Für tokenizer ist der Ausgang (int 1) erforderlich, sonst wäre Long der zweite Typ von Tuple2. Außerdem verwenden wir einen String, um den Ausgabetyp für tokenizer zu deklarieren (ein Klassentyp ist nicht ausreichend, da die Reflektionsarten ebenfalls angegeben werden müssen). Schließlich müssen wir den Tipp (int-array [0]) eingeben, um die Überladung von groupBy aufzulösen (ohne sie ist die Methode für den Clojure-Compiler mehrdeutig).

(ns org.apache.flink.flink-clojure.WordCount 
(:import 
(org.apache.flink.api.common.functions FlatMapFunction) 
(org.apache.flink.api.java DataSet) 
(org.apache.flink.api.java ExecutionEnvironment) 
(org.apache.flink.api.java.tuple Tuple2) 
(org.apache.flink.util Collector) 
(java.lang String)) 
(:require [clojure.string :as str]) 
(:gen-class)) 

(def flink-env (ExecutionEnvironment/createLocalEnvironment)) 

(def text (.fromElements flink-env (to-array ["please test me and me too"]))) 

(deftype tokenizer [] FlatMapFunction 
         (flatMap [this value collector] 
         (doseq [v (str/split value #"\s")] 
          (.collect collector (Tuple2. v (int 1)))))) 

(def tokens (.returns (.flatMap text (tokenizer.)) "Tuple2<String,Integer>")) 

(def counts (.sum (.groupBy tokens (int-array [0])) 1)) 

(defn -main [] 
    (.print counts) 
) 
+0

Hallo, leider, die nicht geholfen haben, bekomme ich immer noch die Ausnahme für 'Die Typen der Schnittstelle org.apache.flink.api.common.functions.FlatMapFunction nicht inferred.' werden könnte Aber dank Für Ihre Eingabe werde ich noch einmal auf SingleInputUdfOperator schauen. Übrigens sollte es sein '(. Returns (.flatMap Dataset (Tokenizer)) String)' – knuth

+0

Also habe ich das gleiche in Java direkt versucht. Ich habe das Wordcount-Beispiel geändert, so dass der Tokenizer nur "FlatMapFunction" (ohne Generika) implementiert. Sogar mit ". Returns (" Tuple2 ") bekomme ich die selbe Ausnahme. Also muss ich vielleicht einen zusätzlichen Java Wrapper für meinen Zweck schreiben, der die generischen Typen behandelt. – knuth

+0

Hi, ich habe mir das genauer angesehen und Was ich vorgeschlagen habe, sollte funktionieren. Es ist ein Fehler im System. Ich habe gerade eine JIRA dafür geöffnet: https: //issues.apache.org/jira/browse/FLINK-2557 Ich denke, im Moment müssen Sie den Weg mit dem zusätzlichen Java-Wrapper gehen. –