2015-09-03 6 views
5

Diese Nutzung ist ein nicht arbeiten versuchen Flink für die Verwendung fach mit scala anonymer Funktion:Wie flink Falzfunktion in scala

val myFoldFunction = (x: Double, t:(Double,String,String)) => x + t._1 
env.readFileStream(...). 
... 
.groupBy(1) 
.fold(0.0, myFoldFunction : Function2[Double, (Double,String,String), Double]) 

Es kompiliert gut, aber bei der Ausführung, erhalte ich eine „Typ Löschung Ausgabe“ (siehe unten). In Java ist das in Ordnung, aber natürlich ausführlicher. Ich mag die prägnanten und klaren Lambdas. Wie kann ich das in Scala tun?

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: 
Type of TypeVariable 'R' in 'public org.apache.flink.streaming.api.scala.DataStream org.apache.flink.streaming.api.scala.DataStream.fold(java.lang.Object,scala.Function2,org.apache.flink.api.common.typeinfo.TypeInformation,scala.reflect.ClassTag)' could not be determined. 
This is most likely a type erasure problem. 
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). 

Antwort

3

Das Problem, auf das Sie gestoßen sind, ist ein Fehler in Flink [1]. Das Problem stammt von Flink's TypeExtractor und der Art, wie die Scala DataStream API über die Java Implementierung implementiert wird. Die TypeExtractor kann keine TypeInformation für den Scala-Typ generieren und gibt daher eine MissingTypeInformation zurück. Diese fehlenden Typinformationen werden nach dem Erstellen des Operators StreamFold manuell festgelegt. Der Operator StreamFold ist jedoch so implementiert, dass er MissingTypeInformation nicht akzeptiert und folglich fehlschlägt, bevor die richtige Typinformation gesetzt wird.

Ich habe eine Pull-Anforderung [2] geöffnet, um dieses Problem zu beheben. Es sollte innerhalb der nächsten zwei Tage zusammengeführt werden. Wenn Sie dann die neueste 0.10-Snapshot-Version verwenden, sollte Ihr Problem behoben sein.