0

Ich habe eine cassandra Tabelle person_master (PersonId: int, customerId: Int, vorName: String, nachName: String, mrids: Set) primaryKey (PersonId und customerID)Update eine Spalte in cassandra Tabelle

Angenommen, ich habe eine Eingabe-RDD der Struktur [personId, customerId, Vorname, Nachname, Nachrichtentyp: Zeichenfolge, Quelle: Zeichenfolge, QuelleTyp: Zeichenfolge]

angenommen Wert der RDD: [1001,119, Keine, Keine, {abc. xyz} und Cassandra Zeile hat Wert [1001,119, Vikash, Singh, {aaa.bbb}]

Ich möchte auf Cachandra-Zeile auf RDD-Wert abrufen und aktualisieren Sie die Spalte Mrids der Tabelle Cassandra und alle anderen Spalte aus Cassandra Zeile verwenden.

z.B. in diesem möchte ich endgültigen RDD-Wert als [1001,119, Vikash, Singh, {aaa.bbb, abc.xyz}], die ich später auf Cassandra aktualisieren werde.

Kann mir jemand die Lösung, dies in Spark mit Cassandra Connector zu tun.

Antwort

0

sc Unter der Annahme sparkContext wie

val sparkConf = new SparkConf().setMaster(SPARK_MASTER) 
          .setAppName(SPARK_SCALA_APP_NAME) 
          .setJars(SPARK_SCALA_JAR) 
sparkConf.set("spark.cassandra.connection.host", value) 
sparkConf.set("spark.cassandra.auth.username", value) 
sparkConf.set("spark.cassandra.auth.password", value) 
val sc = new SparkContext(sparkConf) 

Sie verwenden können, oder ignorieren, wo Klausel (wo verwendet werden kann nur, wenn seine Partitionsschlüssel)

val selectedRow = sc.cassandraTable("keyspace", "tableName") 
     .select("key", "column2", "column3") 
     .where("key IN ?", keys) 
     .as((key: String, column2: String, column3: Integer) 
      =>(key, column2, column3)) 

Filterung und Modifikation Sie auf Ihrem rdd Dann speichern Sie es wie,

selectedRow.saveToCassandra("keyspace", 
          "tableName", 
          SomeColumns("key", "column2", "column3"))