0

Ich installierte Cassandra und Spark mit SparkSQL auf meinem Computer. Spark-SQL-Unterstützung Schlüsselwort JOINVerwenden von SparkSQL, um zwei Tabellen auf Cassandra zu verbinden - FEHLER: fehlende EOF

https://docs.datastax.com/en/datastax_enterprise/4.6/datastax_enterprise/spark/sparkSqlSupportedSyntax.html

Supported syntax of Spark SQL The following syntax defines a SELECT query.

SELECT [DISTINCT] [column names]|[wildcard] FROM [kesypace name.]table name [JOIN clause table name ON join condition] [WHERE condition] [GROUP BY column name] [HAVING conditions] [ORDER BY column names [ASC | DSC]]

Ich habe den folgenden Code

SparkConf conf = new SparkConf().setAppName("My application").setMaster("local"); 
conf.set("spark.cassandra.connection.host", "localhost"); 
JavaSparkContext sc = new JavaSparkContext(conf); 
CassandraConnector connector = CassandraConnector.apply(sc.getConf()); 
Session session = connector.openSession(); 

ResultSet results; 
String sql =""; 


BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); 
sql = "SELECT * from siem.report JOIN siem.netstat on siem.report.REPORTUUID = siem.netstat.NETSTATREPORTUUID ALLOW FILTERING;"; 
results = session.execute(sql); 

ich folgende Fehlermeldung

Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:25 missing EOF at ',' (SELECT * from siem.report[,] siem...) 11:14 AM com.datastax.driver.core.exceptions.SyntaxError: line 1:25 missing EOF at ',' (SELECT * from siem.report[,] siem...) at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:58) at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:24) at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37) at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:63) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33) at com.sun.proxy.$Proxy59.execute(Unknown Source) at com.ge.predix.rmd.siem.boot.PersistenceTest.test_QuerySparkOnReport_GIACOMO_LogDao(PersistenceTest.java:178) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:73) at org.springframework.test.context.junit4.statements

auch versucht, mit

bekommen
SELECT * from siem.report JOIN siem.netstat on report.REPORTUUID = netstat.NETSTATREPORTUUID ALLOW FILTERING 

auch versucht, mit

SELECT * from siem.report R JOIN siem.netstat N on R.REPORTUUID = N.NETSTATREPORTUUID ALLOW FILTERING 

kann mir jemand helfen? Ich benutze wirklich SparkSQL oder CQL?

UPDATE

Ich versuchte

public void test_JOIN_on_Cassandra() { 

     SparkConf conf = new SparkConf().setAppName("My application").setMaster("local"); 
     conf.set("spark.cassandra.connection.host", "localhost"); 
     JavaSparkContext sc = new JavaSparkContext(conf); 


     SQLContext sqlContext = new SQLContext(sc); 
     try { 
      //QueryExecution test1 = sqlContext.executeSql("SELECT * from siem.report"); 
      //QueryExecution test2 = sqlContext.executeSql("SELECT * from siem.report JOIN siem.netstat on report.REPORTUUID = netstat.NETSTATREPORTUUID"); 
      QueryExecution test3 = sqlContext.executeSql("SELECT * from siem.report JOIN siem.netstat on siem.report.REPORTUUID = siem.netstat.NETSTATREPORTUUID"); 

     } catch (Exception e) { 
      e.printStackTrace(); 
     } 

     // SchemaRDD results = sc.sql("SELECT * from siem.report JOIN siem.netstat on siem.report.REPORTUUID = siem.netstat.NETSTATREPORTUUID"); 

} 

und ich

== Parsed Logical Plan == 'Project [unresolvedalias()] +- 'Join Inner, Some(('siem.report.REPORTUUID = 'siem.netstat.NETSTATREPORTUUID)) :- 'UnresolvedRelation siem . report , None +- 'UnresolvedRelation siem . netstat , None == Analyzed Logical Plan == org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to toAttribute on unresolved object, tree: unresolvedalias() 'Project [unresolvedalias(*)] +- 'Join Inner, Some(('siem.report.REPORTUUID = 'siem.netstat.NETSTATREPORTUUID)) :- 'UnresolvedRelation siem . report , None +- 'UnresolvedRelation siem . netstat , None == Optimized Logical Plan == org.apache.spark.sql.AnalysisException: Table not found: siem . report ; == Physical Plan == org.apache.spark.sql.AnalysisException: Table not found: siem . report ;

Antwort

2

Es sieht aus wie Sie hier ein paar Konzepte Mischen, die einen Fehler erstellen. Die Sitzung, die Sie erstellen, öffnet eine direkte Verbindung zu Cassandra, was bedeutet, dass sie CQL und nicht SQL akzeptiert. Wenn Sie SQL ausführen möchten, können Sie eine kleine Änderung machen

SparkConf conf = new SparkConf().setAppName("My application").setMaster("local"); 
conf.set("spark.cassandra.connection.host", "localhost"); 
JavaSparkContext sc = new JavaSparkContext(conf); 

SchemaRDD results = sparkContext.sql("SELECT * from siem.report JOIN siem.netstat on siem.report.REPORTUUID = siem.netstat.NETSTATREPORTUUID"); 

Sie SparkSQL vom Spark-Context rufen, anstatt direkt zu Cassandra verbinden. Mehr hier: http://docs.datastax.com/en/latest-dse/datastax_enterprise/spark/sparkSqlJava.html