2016-04-19 6 views
0

Ich möchte Komponententests für Spark-Jobs schreiben, die im Spark-Jobserver ausgeführt werden. Dies funktioniert gut, wenn ich nicht auf die Konfiguration z. überprüfen Sie es für bestimmte Eingabewerte wie:Spark Jobserver Unit Testen von Jobs

Try(config.getString("myKey")) 
     .map(x => SparkJobValid) 
     .getOrElse(SparkJobInvalid("No value for myKey config param")) 

Die Config wie folgt erstellt:

import com.typesafe.config.Config 
val myConfig = ConfigFactory.parseString("key=value") 

Dann wird der Auftrag ausgeführt wie:

MyJob.run(sqlCtx, myConfig)) 

Dies ist die Ausnahme:

ERROR Utils: Uncaught exception in thread driver-heartbeater 
java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics 
     at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163) 
     at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219) 
     at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) 
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) 
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
     at org.apache.spark.util.Utils$.deserialize(Utils.scala:91) 
     at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:440) 
     at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:430) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:430) 
     at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:428) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
     at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
     at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:428) 
     at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:472) 
     at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:472) 
     at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:472) 
     at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) 
     at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:472) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) 
     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics 
     at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) 
     at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) 
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006) 
     at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501) 
     at org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220) 
     at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160) 
     ... 32 more 

Antwort

0

Ich kann bestätigen wi Der letzte Funke 1.6.1 dieses Problem ist weg. Die Jobs scheinen jedoch nicht von dieser Ausnahme betroffen zu sein.