2016-01-12 4 views
5

Zwei Fragen, die Antwort auf die allgemeine wird mich darauf hinweisen, wie minimal ich eine MVCE machen kann.Kryo Serializer verursacht Ausnahme auf zugrunde liegenden Scala-Klasse WrappedArray

1) Wie kann ich wissen, WrappedArray registrieren vorne, (und jede andere Klasse in Scala ich könnte)? Ist es normal, Klassen aus Bibliotheken mit Kryo registrieren zu müssen?

und die spezifische:

2) Wie kann ich dieses Problem beheben? (Willing zugeben, ich habe vielleicht etwas anderes verrückt los, dass, wenn ein falschen Fehler hier reflektiert, also nicht selbst töten versuchen, dies zu reproduzieren)

DETAILS

Testing aus einem Spark-Programm in Java unsere Kunden mit bezogenen Klassen Genetik und Statistik, auf Spark-1.4.1, Scala 2.11.5 mit den folgenden Einstellungen auf SparkConf:

// for kyro serializer it wants to register all classes that need to be serialized 
Class[] kryoClassArray = new Class[]{DropResult.class, DropEvaluation.class, PrintHetSharing.class}; 

SparkConf sparkConf = new SparkConf().setAppName("PipeLinkageData") 
       <SNIP other settings to declare master> 
       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
       //require registration of all classes with Kryo 
       .set("spark.kryo.registrationRequired", "true") 
       .registerKryoClasses(kryoClassArray); 

diese Fehler (am Ende der langen Fehlerliste wiederholt):

Caused by: java.lang.IllegalArgumentException: Class is not 
registered: scala.collection.mutable.WrappedArray$ofRef Note: To 
register this class use: 
kryo.register(scala.collection.mutable.WrappedArray$ofRef.class); 

Aber ich nenne diese Klasse niemals aus meinem Code. Ich kann scala.collection.mutable.WrappedArray dem kryoClassArray hinzufügen, aber es behebt das Problem nicht. Wenn ich scala.collection.mutable.WrappedArray $ ofRef.class (wie im Fehler vorgeschlagen) hinzufügen, was ein Syntaxfehler ist, sehe ich, dass ich hier keine anonyme Funktion deklarieren kann?

MVCE: Ich habe ein MVCE aber das Problem ist, zu tun, ein mit unseren Klassen erfordert externe Bibliotheken und Text/Daten-Dateien gestartet. Sobald ich unsere Kurse ausgezogen habe, habe ich das Problem nicht. Wenn jemand die allgemeine Frage beantworten könnte, könnte es helfen, mir zu zeigen, wie viel MVCE ich mir vorstellen kann.

Während ich diese Frage schreibe, habe ich grünes Licht für die Aktualisierung auf 1.5.2 bekommen, werde sehen, ob es dort Änderungen gibt und die Frage aktualisieren, wenn es so ist.

Kurz ein MVCE hier sind meine Klassendeklarationen:

public class MVCEPipeLinkageInterface extends LinkageInterface implements Serializable { 

class PrintHetSharing implements VoidFunction<DropResult> { 

class SparkDoDrop implements Function<Integer, Integer> { 

Voll Fehler:

16/01/08 10:54:54 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 
16/01/08 10:54:55 INFO SparkDeploySchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://[email protected]:55646/user/Executor#214759698]) with ID 0 
16/01/08 10:54:55 ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it. 
java.io.IOException: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef 
Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class); 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242) 
    at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:483) 
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) 
    at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168) 
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231) 
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
    at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:226) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:295) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:293) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293) 
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:293) 
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.makeOffers(CoarseGrainedSchedulerBackend.scala:167) 
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receiveAndReply$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:143) 
    at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:178) 
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:127) 
    at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:198) 
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:126) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) 
    at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) 
    at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) 
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) 
    at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:93) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef 
Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class); 
+0

1. Zu wissen, welche Klassen erfordern Serialisierung erfordert Sie überprüfen Ihren Code und verstehen, was er tut (Sie haben nur das conf-Beispiel eingefügt, nicht die Verwendung). 2. Wie 1, ohne Code-Beispiel nicht zu beantworten. –

+0

Sicher, @DanielL. Ich werde in etwas Code bearbeiten. Aber es klingt, als ob du sagst, dass ich jede zugrunde liegende Klasse kennen muss? Als allgemeiner Grundsatz?Ich schreibe Java, also habe ich nicht erwartet, dass ich die zugrunde liegenden Scala-Klassen kennen muss, um Kryo zum Laufen zu bringen. Danke – JimLohse

+0

@DanielL. Ich schätze die MVCE-Anfrage, das Problem, auf das ich gestoßen bin, ist, dass man mit unseren Klassen externe Bibliotheken und Text-/Datendateien benötigt. Sobald ich unsere Klassen und die Notwendigkeit für unsere Dateien ausstreichen, habe ich das Problem nicht. Wenn jemand die allgemeine Frage beantworten könnte, könnte es mir helfen, zu zeigen, wie viel von einem MVCE ich mir vorstellen kann. Ich implementiere Serializable in allen Klassen, entweder explizit oder durch die Implementierung von Functions from Spark wie das Importieren von org.apache.spark.api.java.function.Function und org.apache.spark.api.java.function.VoidFunction – JimLohse

Antwort

6

In Scala Sie dieses Problem beheben sollte ‚scala.collection.mutable.WrappedArray.ofRef [_]‘ als registrierte Klasse wie im folgenden Ausschnitt Zugabe:

conf.registerKryoClasses(
    Array(
    ... 
    classOf[Person], 
    classOf[Array[Person]], 
    ... 
    classOf[scala.collection.mutable.WrappedArray.ofRef[_]] 
) 
) 
+0

Zuerst sieht deine Antwort wie Scala aus aber ich bin in Java aber ich verstehe den Punkt :) Ich schätze die Antwort, aber die zugrundeliegende Frage ist unbeantwortet, warum sollte ich diese Klasse deklarieren müssen wenn ich nicht benutze es nicht? Ich muss nicht jede Klasse in Spark deklarieren, warum diese? Ich habe nicht versucht, Kryo für ein bisschen zu verwenden, ich sollte es jetzt neu implementieren, dass unsere Lösung viel weiter die Straße hinunter ist, und Spark ein paar Versionen neuer ist. +1, danke! – JimLohse

+0

nicht sicher, warum das Java-Tag war nicht auf der Frage, mein schlechtes, tut mir leid, es war in der Frage, aber nicht die Tags, oops – JimLohse

+2

akzeptierte diese Antwort jetzt, dass ich dieses Thema erneut, es bietet keine vollständige Antwort als Dieser Scala-Code funktioniert nicht in Java. Trotzdem ist es näher an einer bestimmten Antwort. Ich könnte auf eine andere Frage schwören, dass jemand geschrieben hat, wie man diese Scala-Klasse in ein Java-Array einfügt, wobei das Beispiel .ofRef [] oder das Java-esque $ ofRef nicht funktioniert. Für jetzt habe ich die "erforderlichen" Einstellung auf Kryo entspannt. – JimLohse

2

Sie müssen es nicht alles serializable machen, unabhängiger Teil einer Client-Bibliothek ist oder nicht. Aber Sie müssen jedes Lambda, das auf den Executoren serialisierbar wird, machen. Diese laufen nicht auf dem Master-Knoten, so dass es keine Möglichkeit gibt, die Serialisierung zu verhindern (und auch nicht, weil der gesamte Zweck von Spark verteilte Berechnung ist).

Für Beispiele und solche (und wenn Sie das Konzept noch nicht ganz begreifen), überprüfen Sie the official docs about this.

+0

Danke, durch clearing die allgemeine Frage erlaubt es mir zu wissen, wo ich meine Bemühungen konzentrieren kann, sehr nützlich! Ich bin immer noch ein wenig verwirrt, warum die Scala WrappedArray-Klasse als die gemeldet wird, die nicht serialisiert werden kann. Ich werde meinen Code ausziehen und ihn wieder zusammensetzen. Ich verstehe anonyme Funktionen und verwende sie bei der Verwendung von integrierten Klassen - wenn ich unsere Klassen verwende, deklariere ich sie separat. Ich werde immer noch an einem MVCE arbeiten Dank nochmal – JimLohse