Nun, technisch gesehen, wenn es kein Grund zur Serialisierung (es gibt keine Shuffle, Caching mit Serialisierung oder ähnlichem Verfahren Sie RDD mit Daten haben können, die nicht serialisierbar sind zum Beispiel, wenn Sie Daten wie diese:.
class Foo(x: Int)
val rdd = sc.parallelize(1 to 4, 4).map(i => (i, new Foo(i)))
wo Foo
nicht serialisierbar ist, können Sie zählen:
rdd.count
// 4
aber Sie können nicht distinct.count
:
rdd.distinct.count
// java.io.NotSerializableException: $line30.$read$$iwC$$iwC$Foo
// Serialization stack:
// - object not serializable (class: $line30.$read$$iwC$$iwC$Foo, value: ...
// - field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
// - object (class scala.Tuple2, (1,[email protected]))
// at ...
Nicht serialisierbare Objekte sind also nur für temporären Speicher geeignet, der auf eine einzelne Task beschränkt ist.
Aber das ist nicht der Fall mit Mahout Vector. Mahout
Spark-Bindungen bieten tatsächlich Kryo registration tools und Vector
is actually registered there:
kryo.addDefaultSerializer(classOf[Vector], new VectorKryoSerializer())
und bietet specialized serializer
Bitte beachten Sie auch, dass Kryo deutlich mehr so nachsichtig sein kann, wenn man spark.serializer
-org.apache.spark.serializer.KryoSerializer
distinct.count
oben gegebenen Beispiel gesetzt funktioniert sogar ganz gut obwohl Foo
ist nicht serialisierbar mit Java-Serialisierung.
Ahhh Kryo Registrierung für die Serialisierung! Der Grund, warum ich besorgt war, war, dass Spark 1.6 eine explizite Überprüfung der Serialisierbarkeit von Tupelfeldern in Shuffle-Ops über org.apache.spark.util.ClosureCleaner $ .ensureSerializable hinzufügte, für die alle Vectors fehlschlagen, zB: java.io.NotSerializableException: org .apache.mahout.math.RandomAccessSparseVector. Aber wenn sie tatsächlich serialisierbar sind, da Kryo immer über Spark config vorhanden ist, kann ich Serializable einfach mixen und die Ausnahme entfernen. – Simplefish
Ich sollte genauer sein - die hier verwendete Operation ist aggregateByKey, die einen Nullwert des Werttyps für die Aggregation nimmt. Es ist Teil der Schließung der Aggregation Funktionen, weshalb Spark wirft, obwohl es kryo-serialisierbar ist, da alle Closure-Elemente Java serialisierbar sein müssen. Die Lösung ist etwas komplizierter als nur Serializable zu mischen. – Simplefish
Wenn dies ein Problem ist, gibt es verschiedene Problemumgehungen. Sie könnten zum Beispiel 'combineByKey' verwenden. Auch Brise-Vektoren sind serialisierbar und benötigen keine externen Deps. – zero323