2014-11-19 1 views
5

Ich bin der Auswertung von Spark-Cassandra-Connector und ich habe Schwierigkeiten zu versuchen, eine Bereichsabfrage auf Partition Schlüssel zu arbeiten.Spark Cassandra Connector - Bereich Abfrage auf Partition Schlüssel

Gemäß der Dokumentation des Connectors scheint es möglich zu sein, die serverseitige Filterung des Partitionsschlüssels mit Gleichheit oder IN-Operator durchzuführen, aber leider ist mein Partitionsschlüssel ein Zeitstempel, daher kann ich ihn nicht verwenden. So

Ich habe versucht, Spark-SQL mit der folgenden Abfrage (‚Zeitstempel‘ ist die Partition Schlüssel):

select * from datastore.data where timestamp >= '2013-01-01T00:00:00.000Z' and timestamp < '2013-12-31T00:00:00.000Z' 

Obwohl der Job 200 Aufgaben laicht, wird die Abfrage keine Daten zurückgegeben.

Ich kann auch versichern, dass Daten zurückgegeben werden, da die Abfrage auf Cqlsh ausgeführt wird (die entsprechende Konvertierung mit der Funktion "Token"). Gibt Daten zurück.

Ich benutze funken 1.1.0 mit Standalone-Modus. Cassandra ist 2.1.2 und Connector-Version ist 'b1.1' Zweig. Cassandra-Treiber ist DataStax 'Master' Zweig. Cassandra-Cluster mit drei Servern mit Replikationsfaktor von 1 auf Funken Cluster überlagert

Here is the job's full log

Jede jemand Ahnung?

Update: Beim Versuch, serverseitige Filterung basierend auf der Partition Schlüssel (mit CassandraRDD.where Methode) bekomme ich die folgende Ausnahme zu tun:

Exception in thread "main" java.lang.UnsupportedOperationException: Range predicates on partition key columns (here: timestamp) are not supported in where. Use filter instead. 

Aber leider weiß ich nicht, was " filter "ist ...

Antwort

8

Sie haben mehrere Optionen, um die gewünschte Lösung zu erhalten.

Die leistungsstärkste wäre die Verwendung von Lucene-Indizes, die in Cassandra von Stratio integriert sind. Damit können Sie nach jedem indizierten Feld auf der Serverseite suchen. Ihre Schreibzeit wird erhöht, andererseits können Sie jeden Zeitbereich abfragen. Sie können weitere Informationen über Lucene-Indizes in Cassandra here finden. Diese erweiterte Version von Cassandra ist vollständig in die deep-spark project integriert, so dass Sie alle Vorteile der Lucene-Indizes in Cassandra damit nutzen können. Ich würde Ihnen empfehlen, Lucene-Indizes zu verwenden, wenn Sie eine eingeschränkte Abfrage ausführen, die eine Ergebnismenge von kleinen bis mittleren Werten abruft. Wenn Sie ein großes Stück Ihres Datensatzes abrufen möchten, sollten Sie die dritte Option darunter verwenden.

Ein anderer Ansatz, abhängig von der Funktionsweise Ihrer Anwendung, besteht darin, das Zeitstempelfeld abzukürzen, damit Sie es mit einem IN-Operator suchen können.Das Problem ist, soweit ich weiß, dass Sie den Spark-Cassandra-Connector dafür nicht verwenden können, sollten Sie den direkten Cassandra-Treiber verwenden, der nicht in Spark integriert ist, oder Sie können sich das Deep-Spark-Projekt ansehen wo ein neues Feature, das dies ermöglicht, bald veröffentlicht wird. Ihre Abfrage würde wie folgt aussehen:

select * from datastore.data where timestamp IN ('2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04', ... , '2013-12-31') 

, aber, wie ich schon sagte, ich weiß nicht, ob es Ihren Bedürfnissen passt, da Sie nicht auf Ihre Daten und die Gruppe, es könnte der Lage sein, nach Datum/trunkieren Zeit.

Die letzte Option, die Sie haben, aber die weniger effiziente, besteht darin, den vollständigen Datensatz in Ihren Funke-Cluster zu bringen und einen Filter auf die RDD anzuwenden.

Haftungsausschluss: Ich arbeite für Stratio :-) Zögern Sie nicht uns zu kontaktieren, wenn Sie Hilfe benötigen.

Ich hoffe, es hilft!

+0

Hallo, ich experimentiere gerade mit IN-Operator (es wird unterstützt), aber ich bin ein bisschen besorgt, dass ich höchstens 1 Jahr Daten abrufen muss, was zu einem IN-Ausdruck mit 365 Werten führt. Wie auch immer, ich werde Stratio überprüfen. Vielen Dank! – tsouza

8

ich denke, der CassandraRDD-Fehler sagt, dass die Abfrage, die Sie versuchen, in Cassandra nicht erlaubt ist, und Sie müssen die gesamte Tabelle in eine CassandraRDD laden und dann einen Funkenfilter durchführen diese KassandraRDD.

So Ihr Code (in scala) sollte wie folgt:

val cassRDD= sc.cassandraTable("keyspace name", "table name").filter(row=> row.getDate("timestamp")>=DateFormat('2013-01-01T00:00:00.000Z')&&row.getDate("timestamp") < DateFormat('2013-12-31T00:00:00.000Z')) 

Wenn Sie bei der Herstellung dieser Art von Anfragen interessiert sind, könnten Sie einen Blick auf andere Cassandra Anschlüsse nehmen, wie die, entwickelt von Stratio

+0

Hallo, das Laden aller Daten ist etwas, was ich nicht tun möchte. Ich überprüfe diesen Anschluss, danke für den Tipp! – tsouza