2016-03-28 4 views
1

Ich implementiere das Beispiel auf Lucene Plugin für Cassandra Seite (https://github.com/Stratio/cassandra-lucene-index) und wenn ich versuche, die Daten mit saveToCassandra zu speichern, bekomme ich die Ausnahme NoSuchElementException. Wenn ich CassandraConnector.withSessionDo verwende, kann ich Elemente in Cassandra hinzufügen und es wird keine Ausnahme ausgelöst.saveToCassandra arbeitet mit Cassandra Lucene Plugin?

Die Tabellen:

CREATE KEYSPACE demo 
WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor': 1}; 
USE demo; 
CREATE TABLE tweets (
    id INT PRIMARY KEY, 
    user TEXT, 
    body TEXT, 
    time TIMESTAMP, 
    latitude FLOAT, 
    longitude FLOAT 
); 

CREATE CUSTOM INDEX tweets_index ON tweets() 
USING 'com.stratio.cassandra.lucene.Index' 
WITH OPTIONS = { 
    'refresh_seconds' : '1', 
    'schema' : '{ 
     fields : { 
      id : {type : "integer"}, 
      user : {type : "string"}, 
      body : {type : "text", analyzer : "english"}, 
      time : {type : "date", pattern : "yyyy/MM/dd", sorted : true}, 
      place : {type : "geo_point", latitude:"latitude", longitude:"longitude"} 
     } 
    }' 
}; 

Der Code:

import org.apache.spark.{SparkConf, SparkContext, Logging} 
import com.datastax.spark.connector.cql.CassandraConnector 
import com.datastax.spark.connector._ 

object App extends Logging{ 
    def main(args: Array[String]) { 

     // Get the cassandra IP and create the spark context 
     val cassandraIP = System.getenv("CASSANDRA_IP"); 
     val sparkConf = new SparkConf(true) 
         .set("spark.cassandra.connection.host", cassandraIP) 
         .set("spark.cleaner.ttl", "3600") 
         .setAppName("Simple Spark Cassandra Example") 


     val sc = new SparkContext(sparkConf) 

     // Works 
     CassandraConnector(sparkConf).withSessionDo { session => 
      session.execute("INSERT INTO demo.tweets(id, user, body, time, latitude, longitude) VALUES (19, 'Name', 'Body', '2016-03-19 09:00:00-0300', 39, 39)") 
     } 

     // Does not work 
     val demo = sc.parallelize(Seq((9, "Name", "Body", "2016-03-29 19:00:00-0300", 29, 29))) 
     // Raises the exception 
     demo.saveToCassandra("demo", "tweets", SomeColumns("id", "user", "body", "time", "latitude", "longitude")) 

    } 
} 

Die Ausnahme:

16/03/28 14:15:41 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster 
Exception in thread "main" java.util.NoSuchElementException: Column not found in demo.tweets 
    at com.datastax.spark.connector.cql.StructDef$$anonfun$columnByName$2.apply(Schema.scala:60) 
    at com.datastax.spark.connector.cql.StructDef$$anonfun$columnByName$2.apply(Schema.scala:60) 
    at scala.collection.Map$WithDefault.default(Map.scala:52) 
    at scala.collection.MapLike$class.apply(MapLike.scala:141) 
    at scala.collection.AbstractMap.apply(Map.scala:58) 
    at com.datastax.spark.connector.cql.TableDef$$anonfun$9.apply(Schema.scala:153) 
    at com.datastax.spark.connector.cql.TableDef$$anonfun$9.apply(Schema.scala:152) 
    at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722) 
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) 
    at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721) 
    at com.datastax.spark.connector.cql.TableDef.<init>(Schema.scala:152) 
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:283) 
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:271) 
    at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722) 
    at scala.collection.immutable.Set$Set4.foreach(Set.scala:137) 
    at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721) 
    at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchTables$1(Schema.scala:271) 
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:295) 
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:294) 
    at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722) 
    at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153) 
    at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306) 
    at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721) 
    at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1(Schema.scala:294) 
    at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:307) 
    at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:304) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:121) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:120) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109) 
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139) 
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109) 
    at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120) 
    at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:304) 
    at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:275) 
    at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36) 
    at com.webradar.spci.spark.cassandra.App$.main(App.scala:27) 
    at com.webradar.spci.spark.cassandra.App.main(App.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 16/03/28 14:15:41 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster 
Exception in thread "main" java.util.NoSuchElementException: Column not found in demo.tweets 
    at com.datastax.spark.connector.cql.StructDef$$anonfun$columnByName$2.apply(Schema.scala:60) 
    at com.datastax.spark.connector.cql.StructDef$$anonfun$columnByName$2.apply(Schema.scala:60) 
    at scala.collection.Map$WithDefault.default(Map.scala:52) 
    at scala.collection.MapLike$class.apply(MapLike.scala:141) 
    at scala.collection.AbstractMap.apply(Map.scala:58) 
    at com.datastax.spark.connector.cql.TableDef$$anonfun$9.apply(Schema.scala:153) 
    at com.datastax.spark.connector.cql.TableDef$$anonfun$9.apply(Schema.scala:152) 
    at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722) 
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) 
    at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721) 
    at com.datastax.spark.connector.cql.TableDef.<init>(Schema.scala:152) 
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:283) 
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:271) 
    at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722) 
    at scala.collection.immutable.Set$Set4.foreach(Set.scala:137) 
    at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721) 
    at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchTables$1(Schema.scala:271) 
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:295) 
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:294) 
    at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722) 
    at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153) 
    at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306) 
    at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721) 
    at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1(Schema.scala:294) 
    at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:307) 
    at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:304) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:121) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:120) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109) 
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139) 
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109) 
    at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120) 
    at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:304) 
    at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:275) 
    at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36) 
    at com.webradar.spci.spark.cassandra.App$.main(App.scala:27) 
    at com.webradar.spci.spark.cassandra.App.main(App.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

EDITED: Versionen

  • Funken 1.6.0
  • Cassandra 3.0.3
  • Lucene Plugin 3.0.3.1
  • Für Jar Erstellung verwendete ich Maven-Montage-Plugin eine Fett JAR zu bekommen.

Wenn ich entfernen Sie den benutzerdefinierten Index ich in der Lage bin zu verwenden saveToCassandra

+1

paar Fragen ... Welche Versionen von Funken, cassandra Anschluss verwenden Sie? und wie gibst du dein Glas ab? ist es ein Glas oder ein Fettglas? – user1314742

+0

Ich habe die Versionen – Cleosson

Antwort

4

Es scheint, dass das Problem durch ein Problem in der Cassandra Spark-Fahrer verursacht wird, und nicht in der Plug-In.

Seit CASSANDRA-10217 Cassandra 3.x pro Zeile Indizes müssen nicht mehr auf einer gefälschten Spalte erstellt werden. Daher wird die spaltenbasierte "CREATE CUSTOM INDEX% s ON% s (% s)" -Syntax von Cassandra 3.x durch die neue Zeile "CREATE CUSTOM INDEX% s ON% s()" zeilenbasiert ersetzt Syntax. Der DataStax Spark-Treiber scheint diese neue Funktion jedoch noch nicht zu unterstützen.

Wenn "com.datastax.spark.connector.RDDFunctions.saveToCassandra" aufgerufen wird, versucht es, das Tabellenschema und das Indexschema zu einer Tabellenspalte zu laden. Da diese neue Indexsyntax die Fake-Spalte nicht mehr enthält, führt dies aufgrund eines leeren Spaltennamens zu einer NoSuchElementException.

Allerdings funktioniert saveToCassandra gut, wenn Sie das gleiche Beispiel mit vor gefälschter Spalte Syntax ausführen:

CREATE KEYSPACE demo 
WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor': 1}; 
USE demo; 
CREATE TABLE tweets (
    id INT PRIMARY KEY, 
    user TEXT, 
    body TEXT, 
    time TIMESTAMP, 
    latitude FLOAT, 
    longitude FLOAT, 
    lucene TEXT 
); 



CREATE CUSTOM INDEX tweets_index ON tweets (lucene) 
USING 'com.stratio.cassandra.lucene.Index' 
WITH OPTIONS = { 
    'refresh_seconds' : '1', 
    'schema' : '{ 
     fields : { 
      id : {type : "integer"}, 
      user : {type : "string"}, 
      body : {type : "text", analyzer : "english"}, 
      time : {type : "date", pattern : "yyyy/MM/dd", sorted : true}, 
      place : {type : "geo_point", latitude:"latitude", longitude:"longitude"} 
     } 
    }' 
}; 
+1

hinzugefügt Danke, es funktioniert. – Cleosson