2016-06-26 8 views
2

Ich bin ein einfacher Wortzählimpuls flink Job zu schreiben, aber ich halte diesen Fehler:konnte nicht impliziten Wert für Beweise Parameter finden

could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String] 
[error] .flatMap{_.toLowerCase.split("\\W+") filter {_.nonEmpty}} 

ich das Netz gesucht, aber keine nachvollziehbare Antwort bekommen konnte.

Hier ist mein Code:

object Job { 
    def main(args: Array[String]) { 
    // set up the execution environment 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val dataStream = env.readTextFile("file:///home/plivo/code/flink/scala/flinkstream/test/") 

    val count = dataStream 
       .flatMap{_.toLowerCase.split("\\W+") filter {_.nonEmpty}} 
       .map{ (_,1) } 
       .groupBy(0) 
       .sum(1) 


    dataStream.print() 
    env.execute("Flink Scala API Skeleton") 
    } 
} 
+0

Versuchen Sie, die Antwort auf diese Frage, es könnte auch Ihnen helfen: http://stackoverflow.com/questions/29540121/flink-scala-api-not-enough-arguments – richj

+0

ich alle notwendigen Bibliotheken importiert haben einschließlich flink.api.scala._ und flink.streaming.api.scala._ – sidd607

+0

Das Problem ist, dass es in fink (Version 1.0.3.) keine groupBy (...) Methode für einen DataStream [(String, Int)] gibt). Es gibt eine keyBy (Int) -Methode, die einen KeyedStream [(String, Int), Tuple] erzeugt. – richj

Antwort

0

das Hinzufügen von: implicit val typeInfo = TypeInformation.of(classOf[(String)]) als die erste Zeile in def main(args: Array[String]) {...} es für mich fixiert.

object Job { 
    def main(args: Array[String]) { 
    implicit val typeInfo = TypeInformation.of(classOf[(String)]) //Add this here 
    // set up the execution environment 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val dataStream = env.readTextFile("file:///home/plivo/code/flink/scala/flinkstream/test/") 

    val count = dataStream 
       .flatMap{_.toLowerCase.split("\\W+") filter {_.nonEmpty}} 
       .map{ (_,1) } 
       .groupBy(0) 
       .sum(1) 


    dataStream.print() 
    env.execute("Flink Scala API Skeleton") 
    } 
}