2016-07-29 19 views
0

Mahout des DrmRdd Typ ist definiert alsMüssen Spark-RDD-Tupelfelder serialisierbar sein? Mahout Drm scheint, nein zu sagen

type DrmRdd[K] = RDD[DrmTuple[K]] 

die zu

übersetzt
RDD[(K,Vector)] 

jedoch Mahout Dokumentation wird deutlich, dass die Vector-Klasse nicht serialisierbar ist.

Das führt mich zu etwas Kopfschütteln, wie man eine RDD [(K, Vector)] erzeugt, um in einen Mahout Drm zu wickeln, ohne über die Tatsache zu stolpern, dass Vector nicht serialisierbar ist.

Meine Frage ist, wann muss ein Funke RDD-Tupel serialisierbar sein? Oder müssen sie nur für bestimmte Funktionen serialisierbar sein, die ein Shuffle durchlaufen müssen?

Antwort

2

Nun, technisch gesehen, wenn es kein Grund zur Serialisierung (es gibt keine Shuffle, Caching mit Serialisierung oder ähnlichem Verfahren Sie RDD mit Daten haben können, die nicht serialisierbar sind zum Beispiel, wenn Sie Daten wie diese:.

class Foo(x: Int) 

val rdd = sc.parallelize(1 to 4, 4).map(i => (i, new Foo(i))) 

wo Foo nicht serialisierbar ist, können Sie zählen:

rdd.count 
// 4 

aber Sie können nicht distinct.count:

rdd.distinct.count 
// java.io.NotSerializableException: $line30.$read$$iwC$$iwC$Foo 
// Serialization stack: 
// - object not serializable (class: $line30.$read$$iwC$$iwC$Foo, value: ... 
// - field (class: scala.Tuple2, name: _2, type: class java.lang.Object) 
// - object (class scala.Tuple2, (1,[email protected])) 
// at ... 

Nicht serialisierbare Objekte sind also nur für temporären Speicher geeignet, der auf eine einzelne Task beschränkt ist.

Aber das ist nicht der Fall mit Mahout Vector. Mahout Spark-Bindungen bieten tatsächlich Kryo registration tools und Vectoris actually registered there:

kryo.addDefaultSerializer(classOf[Vector], new VectorKryoSerializer()) 

und bietet specialized serializer

Bitte beachten Sie auch, dass Kryo deutlich mehr so ​​nachsichtig sein kann, wenn man spark.serializer-org.apache.spark.serializer.KryoSerializerdistinct.count oben gegebenen Beispiel gesetzt funktioniert sogar ganz gut obwohl Foo ist nicht serialisierbar mit Java-Serialisierung.

+0

Ahhh Kryo Registrierung für die Serialisierung! Der Grund, warum ich besorgt war, war, dass Spark 1.6 eine explizite Überprüfung der Serialisierbarkeit von Tupelfeldern in Shuffle-Ops über org.apache.spark.util.ClosureCleaner $ .ensureSerializable hinzufügte, für die alle Vectors fehlschlagen, zB: java.io.NotSerializableException: org .apache.mahout.math.RandomAccessSparseVector. Aber wenn sie tatsächlich serialisierbar sind, da Kryo immer über Spark config vorhanden ist, kann ich Serializable einfach mixen und die Ausnahme entfernen. – Simplefish

+0

Ich sollte genauer sein - die hier verwendete Operation ist aggregateByKey, die einen Nullwert des Werttyps für die Aggregation nimmt. Es ist Teil der Schließung der Aggregation Funktionen, weshalb Spark wirft, obwohl es kryo-serialisierbar ist, da alle Closure-Elemente Java serialisierbar sein müssen. Die Lösung ist etwas komplizierter als nur Serializable zu mischen. – Simplefish

+1

Wenn dies ein Problem ist, gibt es verschiedene Problemumgehungen. Sie könnten zum Beispiel 'combineByKey' verwenden. Auch Brise-Vektoren sind serialisierbar und benötigen keine externen Deps. – zero323