2015-02-17 5 views
6

Ich möchte ein Objekt vom Treiberknoten an andere Knoten übergeben, auf denen sich eine RDD befindet, so dass jede Partition der RDD auf dieses Objekt zugreifen kann, wie im folgenden Ausschnitt gezeigt.Wie lässt Spark ein Objekt mit Kryo serialisieren?

object HelloSpark { 
    def main(args: Array[String]): Unit = { 
     val conf = new SparkConf() 
       .setAppName("Testing HelloSpark") 
       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
       .set("spark.kryo.registrator", "xt.HelloKryoRegistrator") 

     val sc = new SparkContext(conf) 
     val rdd = sc.parallelize(1 to 20, 4) 
     val bytes = new ImmutableBytesWritable(Bytes.toBytes("This is a test")) 

     rdd.map(x => x.toString + "-" + Bytes.toString(bytes.get) + " !") 
      .collect() 
      .foreach(println) 

     sc.stop 
    } 
} 

// My registrator 
class HelloKryoRegistrator extends KryoRegistrator { 
    override def registerClasses(kryo: Kryo) = { 
     kryo.register(classOf[ImmutableBytesWritable], new HelloSerializer()) 
    } 
} 

//My serializer 
class HelloSerializer extends Serializer[ImmutableBytesWritable] { 
    override def write(kryo: Kryo, output: Output, obj: ImmutableBytesWritable): Unit = { 
     output.writeInt(obj.getLength) 
     output.writeInt(obj.getOffset) 
     output.writeBytes(obj.get(), obj.getOffset, obj.getLength) 
    } 

    override def read(kryo: Kryo, input: Input, t: Class[ImmutableBytesWritable]): ImmutableBytesWritable = { 
     val length = input.readInt() 
     val offset = input.readInt() 
     val bytes = new Array[Byte](length) 
     input.read(bytes, offset, length) 

     new ImmutableBytesWritable(bytes) 
    } 
} 

Im Snippet oben, habe ich versucht, ImmutableBytesWritable von Kryo in Funken zu serialisiert, also tat ich das follwing:

  1. configure die SparkConf Instanz übergeben Kontext entfachen, das heißt gesetzt "spark.serializer" auf "org.apache.spark.serializer.KryoSerializer" und setzen Sie "spark.kryo.registrator" auf "xt.HelloKryoRegistrator ";
  2. Schreiben Sie eine benutzerdefinierte Kryo-Registrator-Klasse, in der ich die Klasse ImmutableBytesWritable registrieren;
  3. einen Serializer für ImmutableBytesWritable schreiben

Allerdings, wenn ich meine Spark-Anwendung in Garn-Client-Modus einreichen, wurde die folgende Ausnahme ausgelöst:

Exception in thread "main" org. apache.spark.SparkException: Task nicht serialisierbar bei org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 166) bei org.apache.spark.util.ClosureCleaner $ .clean (ClosureCleaner.scala: 158) bei org.apache.sp ark.SparkContext.clean (SparkContext.scala: 1242) bei org.apache.spark.rdd.RDD.map (RDD.scala: 270) bei xt.HelloSpark $ .main (HelloSpark.scala: 23) bei xt .HelloSpark.main (HelloSpark.scala) bei sun.reflect.NativeMethodAccessorImpl.invoke0 (native Methode) bei sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:57) bei sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl. java: 43) bei java.lang.reflect.Method.invoke (Methode.java:606) bei org.apache.spark.deploy.Sparkmit $ $ .launch (SparkSubmit.scala: 325) bei org.apache.spark .deploy.SparkSubmit $ .main (SparkSubmit.scala: 75) bei org.apache.spark.deploy.SparkSubmit.main (SparkSubmit.scala) Verursacht durch: java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable bei java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1183) bei java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java : 1547) bei java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1508) bei java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1431) bei java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1177) bei java.io.ObjectOutputStream.writeObject (ObjectOutputStream.java:347) bei org.apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala: 42) bei org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSer ializer.scala: 73) bei org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 164) ...12 weitere

Es scheint, dass ImmutableBytesWritable nicht durch Kryo serialisiert werden. Also, wie kann Spark ein Objekt mit Kryo serialisieren? Kann Kryo jeden Typ serialisieren?

+0

Das gleiche geschieht mit mir, auch mit einer viel einfacheren Konfiguration (nur die Serializer Config-Einstellung und Registrierung Klassen). Notieren Sie diese Zeile Ihres Stacks: 'org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSerializer.scala: 73)', aus irgendeinem Grund versucht er, die Java-Serialisierung zu verwenden, auch wenn Sie ihm gesagt haben, dass er es nicht tun soll. –

+0

Haben Sie es geschafft, dies zu beheben? Ich habe das gleiche Problem. – Nilesh

Antwort

0

Dies passiert, weil Sie ImmutableBytesWritable in Ihrer Schließung verwenden. Spark unterstützt die Serienserialisierung mit Kryo noch nicht (nur Objekte in RDDs). Sie können die Hilfe dieser nehmen Ihr Problem zu lösen:

Spark - Task not serializable: How to work with complex map closures that call outside classes/objects?

Sie müssen einfach die Objekte serialisiert werden, bevor sie durch den Verschluss vorbei, und Deserialisieren danach. Dieser Ansatz funktioniert auch dann, wenn Ihre Klassen nicht serialisierbar sind, da sie Kryo hinter den Kulissen verwenden. Alles, was Sie brauchen, ist etwas Curry. ;)

Hier ist ein Beispiel Skizze:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)]) 
       (foo: Foo) : Bar = { 
    kryoWrapper.value.apply(foo) 
} 
val mapper = genMapper(KryoSerializationWrapper(new ImmutableBytesWritable(Bytes.toBytes("This is a test")))) _ 
rdd.flatMap(mapper).collectAsMap() 

object ImmutableBytesWritable(bytes: Bytes) extends (Foo => Bar) { 
    def apply(foo: Foo) : Bar = { //This is the real function } 
}