2016-07-18 13 views
2

Ich versuche, die Funken Shell verwenden, um eine Accumulo Tabellevon Accumulo mit Spark-Shell Lesen

Ich lade Funken und die Bibliotheken ich so müssen verbinden:

$ bin/Funken Schale --jars /data/bigdata/installs/accumulo-1.7.2/lib/accumulo-fate.jar:/data/bigdata/installs/accumulo-1.7.2/lib/accumulo-core.jar:/data/bigdata/installs/accumulo -1.7.2/lib/acumulo-trace.jar: /data/bigdata/installs/accumulo-1.7.2/lib/htrace-core.jar: /data/bigdata/installs/accumulo-1.7.2/lib/libthrift .jar

Um die Schale, Paste ich

import org.apache.hadoop.mapred.JobConf 
import org.apache.hadoop.conf.Configuration 
import org.apache.accumulo.core.client.mapred.{AbstractInputFormat, AccumuloInputFormat} 
import org.apache.accumulo.core.client.security.tokens.PasswordToken 

import org.apache.hadoop.conf.Configuration 
import org.apache.accumulo.core.security.Authorizations 
import org.apache.accumulo.core.client.ClientConfiguration 

import org.apache.spark.{SparkConf, SparkContext} 

import org.apache.accumulo.core.client.mapred.InputFormatBase 


val user  = "root" 
val tableName = "hd_history" 
val instanceName = "GISCIENCE" 
val zooKeepers = "localhost:2181" 
val token = new PasswordToken("***") 

val conf = new SparkConf() 
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
//conf.registerKryoClasses(Array(classOf[org.apache.accumulo.core.data.Key],classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],classOf[org.apache.accumulo.core.data.Value],classOf[org.apache.spark.api.java.JavaSparkContext])) 
val sc = new SparkContext(conf) 

val jobConf = new JobConf() // Create a job conf 

// Configure the job conf with accumulo properties 
AbstractInputFormat.setConnectorInfo(jobConf, user, token) 
AbstractInputFormat.setScanAuthorizations(jobConf, new Authorizations) 
val clientConfig = new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers) 
AbstractInputFormat.setZooKeeperInstance(jobConf, clientConfig) 
InputFormatBase.setInputTableName(jobConf, tableName) 
// Create an RDD using the jobConf 
val rdd2 = sc.newAPIHadoopRDD(jobConf, 
    classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat], 
    classOf[org.apache.accumulo.core.data.Key], 
    classOf[org.apache.accumulo.core.data.Value] 
) 

Als ich rdd2.count versuchen()

ich

16/07/18 18:30:43 INFO spark.SparkContext: Starting job: count at <console>:38 
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Got job 1 (count at <console>:38) with 1 output partitions 
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (count at <console>:38) 
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Parents of final stage: List() 
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Missing parents: List() 
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (NewHadoopRDD[0] at newAPIHadoopRDD at <console>:35), which has no missing parents 
16/07/18 18:30:43 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 1776.0 B, free 148.9 KB) 
16/07/18 18:30:43 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1110.0 B, free 150.0 KB) 
16/07/18 18:30:43 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:39461 (size: 1110.0 B, free: 487.9 MB) 
16/07/18 18:30:43 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006 
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (NewHadoopRDD[0] at newAPIHadoopRDD at <console>:35) 
16/07/18 18:30:43 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 
16/07/18 18:30:43 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,PROCESS_LOCAL, 2284 bytes) 
16/07/18 18:30:43 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 1) 
16/07/18 18:30:43 INFO rdd.NewHadoopRDD: Input split: Range: (-inf,+inf) Locations: [localhost] Table: hd_history TableID: 8 InstanceName: GISCIENCE zooKeepers: localhost:2181 principal: root tokenSource: INLINE authenticationToken: [email protected]db28e3 authenticationTokenFile: null Authorizations: offlineScan: false mockInstance: false isolatedScan: false localIterators: false fetchColumns: [] iterators: [] logLevel: INFO 
16/07/18 18:30:43 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 1). 2082 bytes result sent to driver 
16/07/18 18:30:43 ERROR scheduler.TaskResultGetter: Exception while getting task result 
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994 
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) 
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) 
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) 
    at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311) 
    at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97) 
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60) 
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) 
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) 
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765) 
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
16/07/18 18:30:43 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/07/18 18:30:43 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1 
16/07/18 18:30:43 INFO scheduler.DAGScheduler: ResultStage 1 (count at <console>:38) failed in 0.029 s 
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Job 1 failed: count at <console>:38, took 0.040014 s 
16/07/18 18:30:43 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0 on localhost:39461 in memory (size: 1110.0 B, free: 487.9 MB) 
16/07/18 18:30:43 INFO spark.ContextCleaner: Cleaned accumulator 2 
16/07/18 18:30:43 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on localhost:39461 in memory (size: 1110.0 B, free: 487.9 MB) 
16/07/18 18:30:43 INFO spark.ContextCleaner: Cleaned accumulator 1 
org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) 
    at org.apache.spark.rdd.RDD.count(RDD.scala:1157) 
    ... 48 elided 

Es mir nicht klar ist, welche Klassen ich Kryo müssen registrieren (d Wie finde ich heraus, welche Klasse zu der referenzierten ID 13994 gehört und ob das wirklich das Problem ist?

Antwort

0

Das Problem war, dass ich einen zusätzlichen Spark Context zu dem, der bereits nach dem Start der Spark-Shell gegeben ist, erstellt habe. Aufruf

sc.stop() 

auf dem Spark-Kontext sc mein Problem gelöst.