Ich versuche, die Funken Shell verwenden, um eine Accumulo Tabellevon Accumulo mit Spark-Shell Lesen
Ich lade Funken und die Bibliotheken ich so müssen verbinden:
$ bin/Funken Schale --jars /data/bigdata/installs/accumulo-1.7.2/lib/accumulo-fate.jar:/data/bigdata/installs/accumulo-1.7.2/lib/accumulo-core.jar:/data/bigdata/installs/accumulo -1.7.2/lib/acumulo-trace.jar: /data/bigdata/installs/accumulo-1.7.2/lib/htrace-core.jar: /data/bigdata/installs/accumulo-1.7.2/lib/libthrift .jar
Um die Schale, Paste ich
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
import org.apache.accumulo.core.client.mapred.{AbstractInputFormat, AccumuloInputFormat}
import org.apache.accumulo.core.client.security.tokens.PasswordToken
import org.apache.hadoop.conf.Configuration
import org.apache.accumulo.core.security.Authorizations
import org.apache.accumulo.core.client.ClientConfiguration
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.accumulo.core.client.mapred.InputFormatBase
val user = "root"
val tableName = "hd_history"
val instanceName = "GISCIENCE"
val zooKeepers = "localhost:2181"
val token = new PasswordToken("***")
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//conf.registerKryoClasses(Array(classOf[org.apache.accumulo.core.data.Key],classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],classOf[org.apache.accumulo.core.data.Value],classOf[org.apache.spark.api.java.JavaSparkContext]))
val sc = new SparkContext(conf)
val jobConf = new JobConf() // Create a job conf
// Configure the job conf with accumulo properties
AbstractInputFormat.setConnectorInfo(jobConf, user, token)
AbstractInputFormat.setScanAuthorizations(jobConf, new Authorizations)
val clientConfig = new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers)
AbstractInputFormat.setZooKeeperInstance(jobConf, clientConfig)
InputFormatBase.setInputTableName(jobConf, tableName)
// Create an RDD using the jobConf
val rdd2 = sc.newAPIHadoopRDD(jobConf,
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value]
)
Als ich rdd2.count versuchen()
ich
16/07/18 18:30:43 INFO spark.SparkContext: Starting job: count at <console>:38
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Got job 1 (count at <console>:38) with 1 output partitions
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (count at <console>:38)
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Missing parents: List()
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (NewHadoopRDD[0] at newAPIHadoopRDD at <console>:35), which has no missing parents
16/07/18 18:30:43 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 1776.0 B, free 148.9 KB)
16/07/18 18:30:43 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1110.0 B, free 150.0 KB)
16/07/18 18:30:43 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:39461 (size: 1110.0 B, free: 487.9 MB)
16/07/18 18:30:43 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (NewHadoopRDD[0] at newAPIHadoopRDD at <console>:35)
16/07/18 18:30:43 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/07/18 18:30:43 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,PROCESS_LOCAL, 2284 bytes)
16/07/18 18:30:43 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 1)
16/07/18 18:30:43 INFO rdd.NewHadoopRDD: Input split: Range: (-inf,+inf) Locations: [localhost] Table: hd_history TableID: 8 InstanceName: GISCIENCE zooKeepers: localhost:2181 principal: root tokenSource: INLINE authenticationToken: [email protected]db28e3 authenticationTokenFile: null Authorizations: offlineScan: false mockInstance: false isolatedScan: false localIterators: false fetchColumns: [] iterators: [] logLevel: INFO
16/07/18 18:30:43 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 1). 2082 bytes result sent to driver
16/07/18 18:30:43 ERROR scheduler.TaskResultGetter: Exception while getting task result
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311)
at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/07/18 18:30:43 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/07/18 18:30:43 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1
16/07/18 18:30:43 INFO scheduler.DAGScheduler: ResultStage 1 (count at <console>:38) failed in 0.029 s
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Job 1 failed: count at <console>:38, took 0.040014 s
16/07/18 18:30:43 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0 on localhost:39461 in memory (size: 1110.0 B, free: 487.9 MB)
16/07/18 18:30:43 INFO spark.ContextCleaner: Cleaned accumulator 2
16/07/18 18:30:43 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on localhost:39461 in memory (size: 1110.0 B, free: 487.9 MB)
16/07/18 18:30:43 INFO spark.ContextCleaner: Cleaned accumulator 1
org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
... 48 elided
Es mir nicht klar ist, welche Klassen ich Kryo müssen registrieren (d Wie finde ich heraus, welche Klasse zu der referenzierten ID 13994 gehört und ob das wirklich das Problem ist?