1

Ich habe ein Problem mit Cloudera VM und Spark. Zunächst einmal bin ich bei Spark völlig neu, und mein Chef hat mich gebeten, Spark auf Scala in einer virtuellen Maschine zu testen.SparkSQL-Scala mit POM

Ich habe die virtuelle Maschine auf Virtual Box-Umgebung heruntergeladen, also öffne ich Eclipse und ich hatte ein neues Projekt auf Maven. Obliviously, nachdem ich zuvor die Cloudera-Umgebung ausgeführt habe und alle Dienste wie Spark, Yarn, Hive usw. gestartet habe. Alle Dienste funktionieren gut, und alle überprüfen, in Cloudera-Dienste sind grün. Ich habe einen Test mit Impala gemacht und das funktioniert perfekt.

Mit Eclipse und Scala-Maven-Umgebung wurden die Dinge Schlimmste: dass mein sehr einfachen Code in Scala ist:

package org.test.spark 

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.sql.SQLContext 

object TestSelectAlgorithm { 

    def main(args: Array[String]) = { 
    val conf = new SparkConf() 
     .setAppName("TestSelectAlgorithm") 
     .setMaster("local") 
    val sc = new SparkContext(conf) 

    val sqlContext = new SQLContext(sc) 

    val df = sqlContext.sql("SELECT * FROM products").show() 
    } 
} 

Der Test ist sehr einfach, da die Tabelle „Produkte“ vorhanden sein: wenn ich kopieren -and-paste die gleiche Abfrage auf Impala, die Abfrage funktioniert gut!

Auf der Umgebung Eclipse, sonst habe ich ein Problem:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
16/06/30 05:43:17 INFO SparkContext: Running Spark version 1.6.0 
16/06/30 05:43:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
16/06/30 05:43:18 WARN Utils: Your hostname, quickstart.cloudera resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface eth0) 
16/06/30 05:43:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 
16/06/30 05:43:18 INFO SecurityManager: Changing view acls to: cloudera 
16/06/30 05:43:18 INFO SecurityManager: Changing modify acls to: cloudera 
16/06/30 05:43:18 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera) 
16/06/30 05:43:19 INFO Utils: Successfully started service 'sparkDriver' on port 53730. 
16/06/30 05:43:19 INFO Slf4jLogger: Slf4jLogger started 
16/06/30 05:43:19 INFO Remoting: Starting remoting 
16/06/30 05:43:19 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:39288] 
16/06/30 05:43:19 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 39288. 
16/06/30 05:43:19 INFO SparkEnv: Registering MapOutputTracker 
16/06/30 05:43:19 INFO SparkEnv: Registering BlockManagerMaster 
16/06/30 05:43:19 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-7d685fc0-ea88-423a-9335-42ca12db85da 
16/06/30 05:43:19 INFO MemoryStore: MemoryStore started with capacity 1619.3 MB 
16/06/30 05:43:20 INFO SparkEnv: Registering OutputCommitCoordinator 
16/06/30 05:43:20 INFO Utils: Successfully started service 'SparkUI' on port 4040. 
16/06/30 05:43:20 INFO SparkUI: Started SparkUI at http://10.0.2.15:4040 
16/06/30 05:43:20 INFO Executor: Starting executor ID driver on host localhost 
16/06/30 05:43:20 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 57294. 
16/06/30 05:43:20 INFO NettyBlockTransferService: Server created on 57294 
16/06/30 05:43:20 INFO BlockManagerMaster: Trying to register BlockManager 
16/06/30 05:43:20 INFO BlockManagerMasterEndpoint: Registering block manager localhost:57294 with 1619.3 MB RAM, BlockManagerId(driver, localhost, 57294) 
16/06/30 05:43:20 INFO BlockManagerMaster: Registered BlockManager 
Exception in thread "main" org.apache.spark.sql.AnalysisException: Table not found: products; 
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:306) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:315) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:310) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) 
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:265) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:305) 
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:54) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:310) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:300) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80) 
    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) 
    at scala.collection.immutable.List.foldLeft(List.scala:84) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72) 
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:36) 
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:36) 
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34) 
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133) 
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52) 
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817) 
    at org.test.spark.TestSelectAlgorithm$.main(TestSelectAlgorithm.scala:18) 
    at org.test.spark.TestSelectAlgorithm.main(TestSelectAlgorithm.scala) 
16/06/30 05:43:22 INFO SparkContext: Invoking stop() from shutdown hook 
16/06/30 05:43:22 INFO SparkUI: Stopped Spark web UI at http://10.0.2.15:4040 
16/06/30 05:43:22 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
16/06/30 05:43:22 INFO MemoryStore: MemoryStore cleared 
16/06/30 05:43:22 INFO BlockManager: BlockManager stopped 
16/06/30 05:43:22 INFO BlockManagerMaster: BlockManagerMaster stopped 
16/06/30 05:43:22 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 
16/06/30 05:43:22 INFO SparkContext: Successfully stopped SparkContext 
16/06/30 05:43:22 INFO ShutdownHookManager: Shutdown hook called 
16/06/30 05:43:22 INFO ShutdownHookManager: Deleting directory /tmp/spark-29d381e9-b5e7-485c-92f2-55dc57ca7d25 

Der Hauptfehler ist (für mich):

Exception in thread "main" org.apache.spark.sql.AnalysisException: Table not found: products; 

ich auf andere Website und Dokumentation gesucht, und ich gegründet dass das Problem mit der Hive-Tabelle verbunden ist ... aber ich benutze die Hive-Tabelle nicht, ich benutze SparkSql ...

Kann mir bitte jemand helfen? Vielen Dank für eine Antwort.

+0

wo gibt es diese 'products' Tabelle? in relationalen db? oder willst du eine Datei von hdfs lesen? –

+0

von hdfs: vielleicht führe ich die gleiche Abfrage auf http: //quickstart.cloudera: 8888/Impala/Ausführen/Abfrage/8 # Abfrage/Ergebnisse ==> IMPALA, in der virtuellen Maschine - und das funktioniert perfekt. – Alessandro

+0

müssen Sie Dataframe verwenden oder 'erstellen Sie ein Schema> Register temporäre Tabelle> Abfrage ausführen '- dieser Code wird Ihnen einen Hinweis geben - für Textdateiformat: https://gist.github.com/InvisibleTech/c71cb88b2390eb2223a8 für JSonfile-Format : http: //www.tutorialspoint.com/spark_sql/spark_sql_dataframes.htm –

Antwort

1

Können Sie überprüfen /user/cloudera/.sparkStaging/stagingArea Standort vorhanden ist oder es enthält .avro Datei ?? Und ändern Sie bitte "Ihre Ausgabeort" nach Verzeichnis Speicherort.
Bitte überprüfen avro github Seite für weitere Details. https://github.com/databricks/spark-avro

2

Im Funken, Für Impala gibt es keine direkte Unterstützung als Bienenstock hat .So, Sie müssen Datei laden. Wenn es csv Sie funken csv verwenden können,

val df = sqlContext.read 
     .format("com.databricks.spark.csv") 
     .option("header", "true") 
     .option("inferSchema", "true") 
     .load("your .csv file location") 

import sqlContext.implicits._ 
import sqlContext._ 

df.registerTempTable("products") 

sqlContext.sql("select * from products").show() 

pom Abhängigkeit für funken csv

<!-- https://mvnrepository.com/artifact/com.databricks/spark-csv_2.10 --> 
<dependency> 
    <groupId>com.databricks</groupId> 
    <artifactId>spark-csv_2.10</artifactId> 
    <version>1.4.0</version> 
</dependency> 

für Avro dort funken avro

val sqlContext = new SQLContext(sc) 

val df = sqlContext.read.avro("your .avro file location") 

import sqlContext.implicits._ 
import sqlContext._ 

df.registerTempTable("products") 


val result= sqlContext.sql("select * from products") 
val result.show() 

result.write 
    .format("com.databricks.spark.avro") 
    .save("Your ouput location") 

pom Abhängigkeit für Avro ist

<!-- http://mvnrepository.com/artifact/com.databricks/spark-avro_2.10 --> 
      <dependency> 
       <groupId>com.databricks</groupId> 
       <artifactId>spark-avro_2.10</artifactId> 
       <version>2.0.1</version> 
      </dependency> 

und Parkett Funken hat in-Buil d Unterstützung

val sqlContext = new SQLContext(sc) 
    val parquetFile = sqlContext.read.parquet("your parquet file location") 

    parquetFile.registerTempTable("products") 

    sqlContext.sql("select * from products").show() 
+0

ok, vor allem: vielen Dank! Jetzt muss ich verstehen, welche Art von Pfad ich in "Ihren Speicherort" einfügen kann! Jetzt wird die Tabelle "Produkte" angelegt, dafür wurde das Problem gelöst !!! ;-) Aber wenn du mir ein anderes Mal hilfst, kannst du mir sagen, wo ich den Pfad für die Ausgabe im AVRO-Format gefunden habe? – Alessandro

+0

In diesem Code haben wir die Datei nicht an einem Ort gespeichert, für den Sie zusätzliche Arbeit leisten müssen. Bitte überprüfen Sie den aktualisierten Code. –