Ich möchte ein Objekt vom Treiberknoten an andere Knoten übergeben, auf denen sich eine RDD befindet, so dass jede Partition der RDD auf dieses Objekt zugreifen kann, wie im folgenden Ausschnitt gezeigt.Wie lässt Spark ein Objekt mit Kryo serialisieren?
object HelloSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Testing HelloSpark")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "xt.HelloKryoRegistrator")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(1 to 20, 4)
val bytes = new ImmutableBytesWritable(Bytes.toBytes("This is a test"))
rdd.map(x => x.toString + "-" + Bytes.toString(bytes.get) + " !")
.collect()
.foreach(println)
sc.stop
}
}
// My registrator
class HelloKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) = {
kryo.register(classOf[ImmutableBytesWritable], new HelloSerializer())
}
}
//My serializer
class HelloSerializer extends Serializer[ImmutableBytesWritable] {
override def write(kryo: Kryo, output: Output, obj: ImmutableBytesWritable): Unit = {
output.writeInt(obj.getLength)
output.writeInt(obj.getOffset)
output.writeBytes(obj.get(), obj.getOffset, obj.getLength)
}
override def read(kryo: Kryo, input: Input, t: Class[ImmutableBytesWritable]): ImmutableBytesWritable = {
val length = input.readInt()
val offset = input.readInt()
val bytes = new Array[Byte](length)
input.read(bytes, offset, length)
new ImmutableBytesWritable(bytes)
}
}
Im Snippet oben, habe ich versucht, ImmutableBytesWritable von Kryo in Funken zu serialisiert, also tat ich das follwing:
- configure die SparkConf Instanz übergeben Kontext entfachen, das heißt gesetzt "spark.serializer" auf "org.apache.spark.serializer.KryoSerializer" und setzen Sie "spark.kryo.registrator" auf "xt.HelloKryoRegistrator ";
- Schreiben Sie eine benutzerdefinierte Kryo-Registrator-Klasse, in der ich die Klasse ImmutableBytesWritable registrieren;
- einen Serializer für ImmutableBytesWritable schreiben
Allerdings, wenn ich meine Spark-Anwendung in Garn-Client-Modus einreichen, wurde die folgende Ausnahme ausgelöst:
Exception in thread "main" org. apache.spark.SparkException: Task nicht serialisierbar bei org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 166) bei org.apache.spark.util.ClosureCleaner $ .clean (ClosureCleaner.scala: 158) bei org.apache.sp ark.SparkContext.clean (SparkContext.scala: 1242) bei org.apache.spark.rdd.RDD.map (RDD.scala: 270) bei xt.HelloSpark $ .main (HelloSpark.scala: 23) bei xt .HelloSpark.main (HelloSpark.scala) bei sun.reflect.NativeMethodAccessorImpl.invoke0 (native Methode) bei sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:57) bei sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl. java: 43) bei java.lang.reflect.Method.invoke (Methode.java:606) bei org.apache.spark.deploy.Sparkmit $ $ .launch (SparkSubmit.scala: 325) bei org.apache.spark .deploy.SparkSubmit $ .main (SparkSubmit.scala: 75) bei org.apache.spark.deploy.SparkSubmit.main (SparkSubmit.scala) Verursacht durch: java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable bei java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1183) bei java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java : 1547) bei java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1508) bei java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1431) bei java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1177) bei java.io.ObjectOutputStream.writeObject (ObjectOutputStream.java:347) bei org.apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala: 42) bei org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSer ializer.scala: 73) bei org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 164) ...12 weitere
Es scheint, dass ImmutableBytesWritable nicht durch Kryo serialisiert werden. Also, wie kann Spark ein Objekt mit Kryo serialisieren? Kann Kryo jeden Typ serialisieren?
Das gleiche geschieht mit mir, auch mit einer viel einfacheren Konfiguration (nur die Serializer Config-Einstellung und Registrierung Klassen). Notieren Sie diese Zeile Ihres Stacks: 'org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSerializer.scala: 73)', aus irgendeinem Grund versucht er, die Java-Serialisierung zu verwenden, auch wenn Sie ihm gesagt haben, dass er es nicht tun soll. –
Haben Sie es geschafft, dies zu beheben? Ich habe das gleiche Problem. – Nilesh