5

Ich habe Szenario, wo ich Streaming-Daten empfangen wird, die von meinem Spark-Streaming-Programm verarbeitet wird und die Ausgabe für jedes Intervall an meine angehängt wird vorhandene Cassandra-Tabelle.java.lang.UnsupportedOperationException: 'Schreiben in eine nicht leere Cassandra-Tabelle ist nicht erlaubt

Derzeit erzeugt mein Spark-Streaming-Programm einen Datenrahmen, den ich in meiner Cassandra-Tabelle speichern muss. Das Problem, das ich zur Zeit bin vor ist, ich bin nicht in der Lage Daten/Zeilen in meinem vorhandenen cassandra Tabelle anzufügen, wenn ich unter Befehl

dff.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "xxx", "yyy" -> "retail")).save() 

ich in folgenden Link http://rustyrazorblade.com/2015/08/migrating-from-mysql-to-cassandra-using-spark/ gelesen hatte, wo er Modus übergeben = „anhängen“ in Speichermethode, aber seine Wurfsyntaxfehler

auch war ich nt der Lage zu verstehen, wo brauche ich aus dem untenstehenden Link zu beheben https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/rlGGWQF2wnM

brauchen Sie Hilfe, wie man diese issue.I'm schreibe meine Funken zu beheben Streaming-Jobs in Scala

Antwort

8

Ich glaube, Sie es die folgende Art und Weise zu tun haben:

dff.write.format("org.apache.spark.sql.cassandra").mode(SaveMode.Append).options(Map("table" -> "xxx", "yyy" -> "retail")).save() 

Die Art und Weise cassandra Daten Kräfte behandelt man so genannte ‚upserts‘ tun - Sie müssen bedenken, dass ein Einsatz einige der Zeilen überschrieben werden kann wobei der Primärschlüssel des bereits gespeicherten Datensatzes mit dem Primärschlüssel des eingefügten Records übereinstimmt. Cassandra ist eine "Write-Fast" -Datenbank, so dass vor dem Schreiben nicht geprüft wird, ob Daten vorhanden sind.