2016-05-29 11 views
2

Ich habe two case class geschrieben, die Base abstract class erweitert. Ich habe zwei Listen jeder Klasse (listA und listB). Wenn ich diese beiden Listen zusammenführen möchte, kann ich die endgültige Liste nicht in Apache Spark 1.6.1 Dataset konvertieren.Keine Java-Klasse entsprechend Produkt mit Serializable mit Base gefunden

abstract class Base 

case class A(name: String) extends Base 
case class B(age: Int) extends Base 

val listA: List[A] = A("foo")::A("bar")::Nil 
val listB: List[B] = B(10)::B(20)::Nil 
val list: List[Base with Product with Serializable] = listA ++ listB 

val result: RDD[Base with Product with Serializable] = sc.parallelize(list).toDS() 

Apache Spark diese Ausnahme steigen:

A needed class was not found. This could be due to an error in your runpath. Missing class: no Java class corresponding to Base with Product with Serializable found 
java.lang.NoClassDefFoundError: no Java class corresponding to Base with Product with Serializable found 
    at scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1299) 
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:192) 
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54) 
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:50) 
    at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41) 

Wenn ich will von list Spark-RDD erstellen keine Ausnahme werfen, aber wenn ich RDD konvertieren mit toDS() Verfahren dieser Stand der Ausnahme dataSet wird werfen.

+2

Ich bin nicht sicher Mixins sind noch mit DataSet-Encoder unterstützt. –

Antwort

2

Zuerst können Sie einen fehlerfreien Typ für list erhalten, indem Sie explizit List[Base] oder Base extends Product with Serializable hinzufügen, wenn die Absicht darin besteht, nur um Fallklassen/Objekte erweitert werden. Aber das ist nicht genug, denn

Spark 1.6 comes with support for automatically generating encoders for a wide variety of types, including primitive types (e.g. String, Integer, Long), Scala case classes, and Java Beans.

Beachten Sie, dass abstrakte Klassen wie Base werden nicht unterstützt. Benutzerdefinierte Encoder werden ebenfalls nicht unterstützt. Sie könnten auch versuchen, den kryo (oder javaSerialization, als letzten Ausweg) Encoder zu verwenden, siehe How to store custom objects in Dataset?. Hier

abgeschlossen Arbeitsbeispiel:

abstract class Base extends Serializable with Product 

case class A(name: String) extends Base 

case class B(age: Int) extends Base 

object BaseEncoder { 
    implicit def baseEncoder: org.apache.spark.Encoder[Base] = org.apache.spark.Encoders.kryo[Base] 
} 


val listA: Seq[A] = Seq(A("a"), A("b")) 
val listB: Seq[B] = Seq(B(1), B(2)) 
val list: Seq[Base] = listA ++ listB 

val ds = sc.parallelize(list).toDS 
+0

Tait Lösung hat nicht funktioniert, aber Schreiben Encoder funktioniert sehr gut. –

+0

Der zweite Absatz erklärt eigentlich, warum das erste nicht genug ist. Ich habe den Text korrigiert, um es klarer zu machen. –

+0

Warum Produkt erweitern? Ohne Produkt funktioniert es. –