Zwei Fragen, die Antwort auf die allgemeine wird mich darauf hinweisen, wie minimal ich eine MVCE machen kann.Kryo Serializer verursacht Ausnahme auf zugrunde liegenden Scala-Klasse WrappedArray
1) Wie kann ich wissen, WrappedArray registrieren vorne, (und jede andere Klasse in Scala ich könnte)? Ist es normal, Klassen aus Bibliotheken mit Kryo registrieren zu müssen?
und die spezifische:
2) Wie kann ich dieses Problem beheben? (Willing zugeben, ich habe vielleicht etwas anderes verrückt los, dass, wenn ein falschen Fehler hier reflektiert, also nicht selbst töten versuchen, dies zu reproduzieren)
DETAILS
Testing aus einem Spark-Programm in Java unsere Kunden mit bezogenen Klassen Genetik und Statistik, auf Spark-1.4.1, Scala 2.11.5 mit den folgenden Einstellungen auf SparkConf:
// for kyro serializer it wants to register all classes that need to be serialized
Class[] kryoClassArray = new Class[]{DropResult.class, DropEvaluation.class, PrintHetSharing.class};
SparkConf sparkConf = new SparkConf().setAppName("PipeLinkageData")
<SNIP other settings to declare master>
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//require registration of all classes with Kryo
.set("spark.kryo.registrationRequired", "true")
.registerKryoClasses(kryoClassArray);
diese Fehler (am Ende der langen Fehlerliste wiederholt):
Caused by: java.lang.IllegalArgumentException: Class is not
registered: scala.collection.mutable.WrappedArray$ofRef Note: To
register this class use:
kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);
Aber ich nenne diese Klasse niemals aus meinem Code. Ich kann scala.collection.mutable.WrappedArray
dem kryoClassArray hinzufügen, aber es behebt das Problem nicht. Wenn ich scala.collection.mutable.WrappedArray $ ofRef.class (wie im Fehler vorgeschlagen) hinzufügen, was ein Syntaxfehler ist, sehe ich, dass ich hier keine anonyme Funktion deklarieren kann?
MVCE: Ich habe ein MVCE aber das Problem ist, zu tun, ein mit unseren Klassen erfordert externe Bibliotheken und Text/Daten-Dateien gestartet. Sobald ich unsere Kurse ausgezogen habe, habe ich das Problem nicht. Wenn jemand die allgemeine Frage beantworten könnte, könnte es helfen, mir zu zeigen, wie viel MVCE ich mir vorstellen kann.
Während ich diese Frage schreibe, habe ich grünes Licht für die Aktualisierung auf 1.5.2 bekommen, werde sehen, ob es dort Änderungen gibt und die Frage aktualisieren, wenn es so ist.
Kurz ein MVCE hier sind meine Klassendeklarationen:
public class MVCEPipeLinkageInterface extends LinkageInterface implements Serializable {
class PrintHetSharing implements VoidFunction<DropResult> {
class SparkDoDrop implements Function<Integer, Integer> {
Voll Fehler:
16/01/08 10:54:54 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
16/01/08 10:54:55 INFO SparkDeploySchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://[email protected]:55646/user/Executor#214759698]) with ID 0
16/01/08 10:54:55 ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it.
java.io.IOException: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef
Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)
at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168)
at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:226)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:295)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:293)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:293)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.makeOffers(CoarseGrainedSchedulerBackend.scala:167)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receiveAndReply$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:143)
at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:178)
at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:127)
at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:198)
at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:126)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:93)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef
Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);
1. Zu wissen, welche Klassen erfordern Serialisierung erfordert Sie überprüfen Ihren Code und verstehen, was er tut (Sie haben nur das conf-Beispiel eingefügt, nicht die Verwendung). 2. Wie 1, ohne Code-Beispiel nicht zu beantworten. –
Sicher, @DanielL. Ich werde in etwas Code bearbeiten. Aber es klingt, als ob du sagst, dass ich jede zugrunde liegende Klasse kennen muss? Als allgemeiner Grundsatz?Ich schreibe Java, also habe ich nicht erwartet, dass ich die zugrunde liegenden Scala-Klassen kennen muss, um Kryo zum Laufen zu bringen. Danke – JimLohse
@DanielL. Ich schätze die MVCE-Anfrage, das Problem, auf das ich gestoßen bin, ist, dass man mit unseren Klassen externe Bibliotheken und Text-/Datendateien benötigt. Sobald ich unsere Klassen und die Notwendigkeit für unsere Dateien ausstreichen, habe ich das Problem nicht. Wenn jemand die allgemeine Frage beantworten könnte, könnte es mir helfen, zu zeigen, wie viel von einem MVCE ich mir vorstellen kann. Ich implementiere Serializable in allen Klassen, entweder explizit oder durch die Implementierung von Functions from Spark wie das Importieren von org.apache.spark.api.java.function.Function und org.apache.spark.api.java.function.VoidFunction – JimLohse