2016-06-21 9 views
0

Ich benutze Databricks Community Edition mit Spark 2.0 Vorschau. habe ich versucht, den folgenden (einfachen) Code:SparkSQL Aggregator: Typ Mismatch Fehler

import org.apache.spark.sql.expressions.Aggregator 
import org.apache.spark.sql.Encoder 
import java.util.Calendar 
import spark.implicits._ 

case class C1(f1: String, f2: String, f3: String, f4:java.sql.Date, f5: Double) 
val teams = sc.parallelize(Seq(C1("hash1", "NLC", "Cubs", Java.sql.Date.valueOf("2016-01-23"), 3253.21), C1("hash1", "NLC", "Cubs", java.sql.Date.valueOf("2014-01-23"), 353.88), C1("hash3", "NLW", "Dodgers", java.sql.Date.valueOf("2013-08-15"), 4322.12),C1("hash4", "NLE", "Red Sox", java.sql.Date.valueOf("2010-03-14"), 10283.72))).toDS 

object C1Agg extends Aggregator[C1, Seq[C1], Seq[C1]] { 
    def zero: Seq[C1] = Seq.empty[C1] //Nil 
    def reduce(b: Seq[C1], a: C1): Seq[C1] = b :+ a 
    def merge(b1: Seq[C1], b2: Seq[C1]): Seq[C1] = b1 ++ b2 
    def finish(r: Seq[C1]): Seq[C1] = r 

    override def bufferEncoder: Encoder[Seq[C1]] = newProductSeqEncoder[C1] 
    override def outputEncoder: Encoder[Seq[C1]] = newProductSeqEncoder[C1] 
} 
val g_c1 = teams.groupByKey(_.f1).agg[Seq[C1]](C1Agg.toColumn).collect 

ich folgende Fehlermeldung:

error: type mismatch;
found: org.apache.spark.sql.TypedColumn[C1,Seq[C1]]
required: org.apache.spark.sql.TypedColumn[C1,Seq[C1]]
val g_c1 = teams.groupByKey(_.f1).aggSeq[C1]

Als ich

val g_c1 = teams.groupByKey(_.f1).agg(C1Agg.toColumn).collect 

verwende ich bekommen:

error: type mismatch;
found: org.apache.spark.sql.TypedColumn[C1,Seq[C1]]
required: org.apache.spark.sql.TypedColumn[C1,?]
val g_c1 = teams.groupByKey(_.f1).aggSeq[C1]

Irgendwelche Hinweise?

Antwort

0

Ich fand den Grund: Das passiert, weil ich die Fallklasse auf einer Zelle (des Notebooks) deklariere, und dann in verschiedenen nachfolgenden Zellen verwende.

Durch das Einfügen des gesamten Codes in dieselbe Zelle wird dieses Problem gelöst. (Leider, aber jetzt habe ich ein anderes Problem konfrontiert MissingRequirementError)