2016-07-22 12 views
0

ich einen Fehler bin schlagen, wenn für RDDs in Spark-Jobs java Protokollpuffer Klassen als Objektmodell verwendet,Fehler mit Java-Protokoll-Puffer mit Kryo Serializer Spark, die Arrays von Strings haben

für meine Anwendung, meine, proto-Datei hat Eigenschaften, die String wiederholt werden. Zum Beispiel

message OntologyHumanName 
{ 
repeated string family = 1; 
} 

Daraus die 2.5.0 Protoc Compiler generiert Java-Code wie

private com.google.protobuf.LazyStringList family_ = com.google.protobuf.LazyStringArrayList.EMPTY; 

Wenn ich einen Scala Spark-Job ausführen, der die Kryo Serializer ich die folgende Fehlermeldung erhalten verwendet

Caused by: java.lang.NullPointerException 
at com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:61) 
at java.util.AbstractList.add(AbstractList.java:108) 
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134) 
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40) 
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:708) 
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) 
... 40 more 

Derselbe Code funktioniert gut mit spark.serializer = org.apache.spark.serializer.JavaSerializer.

Meine Umgebung ist CDH Quickstart 5.5 mit JDK 1.8.0_60

Antwort

0

Ich denke, Typ Ihre RDD Klasse OntologyHumanName enthält. wie: RDD [(String, OntologyHumanName)], und dieser Typ RDD in Zufallsphase durch Zufall. View this: https://github.com/EsotericSoftware/kryo#kryoserializable Kryo kann Serialisierung auf abstrakte Klasse nicht tun.

  1. den Funken doc lesen: http://spark.apache.org/docs/latest/tuning.html#data-serialization

    val conf = new SparkConf().setMaster(...).setAppName(...) 
    conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) 
    val sc = new SparkContext(conf) 
    
  2. auf Kryo doc:

    public class SomeClass implements KryoSerializable { 
        // ... 
    
        public void write (Kryo kryo, Output output) { 
         // ... 
        } 
    
        public void read (Kryo kryo, Input input) { 
         // ... 
        } 
    } 
    

aber die Klasse: OntologyHumanName automatisch von protobuf erzeugt wird. Ich denke also nicht, dass dies ein guter Weg ist.

  1. Versuchen Sie, die Fallklasse zu verwenden, um OntologyHumanName zu ersetzen, um eine direkte Serialisierung der Klasse OntologyHumanName zu vermeiden. So habe ich es nicht versucht, es funktioniert nicht möglich.

    case class OntologyHumanNameScalaCaseClass(val humanNames: OntologyHumanName) 
    
  2. Ein hässlicher Weg. Ich habe gerade die Klasse von Protobuf in Scala umgewandelt. Dieser Weg kann nicht fehlgeschlagen sein. wie:

    import scala.collection.JavaConverters._ 
    
    val humanNameObj: OntologyHumanName = ... 
    val families: List[String] = humamNameObj.getFamilyList.asScala //use this to replace the humanNameObj. 
    

Hoffnung Ihr Problem oben lösen.