2016-03-31 19 views
0

Ich versuche Zipkin Dependencies Spark job zu optimieren, um in weniger Stufen zu laufen, indem ich die Anzahl der reduceByKey Schritte minimiere, die es tut. Die Daten werden aus der folgenden Tabelle zu lesen:Ist es möglich, alle Zeilen der Cassandra-Partition in einem Spark-Worker zu lesen?

CREATE TABLE IF NOT EXISTS zipkin.traces (
    trace_id bigint, 
    ts  timestamp, 
    span_name text, 
    span  blob, 
    PRIMARY KEY (trace_id, ts, span_name) 
) 

Es wird eine einzelne Partition trace_id enthält eine komplette Spur, und enthält überall von wenigen bis zu einigen hundert Zeilen. Die gesamte Partition wird jedoch vom Spark-Job in eine sehr einfache RDD[((String, String), Long)] konvertiert, wodurch die Anzahl der Einträge von Milliarden auf einige Hundert reduziert wird.

Leider ist der aktuelle Code wird dabei durch alle Zeilen unabhängig über

Lesen
sc.cassandraTable(keyspace, "traces") 

und mit zwei reduceByKey Schritte mit RDD[((String, String), Long)] zu kommen. Wenn es eine Möglichkeit gäbe, die gesamte Partition auf einmal in einem Spark-Worker-Prozess zu lesen und alles im Speicher zu verarbeiten, würde dies eine enorme Geschwindigkeitsverbesserung bedeuten, die das Speichern/Streamen riesiger Datensätze aus der aktuellen Situation überflüssig macht erste Stufen.

- bearbeiten -

Um zu klären, muss der Job alle Daten aus der Tabelle, Milliarden von Partitionen lesen.

Antwort

1

Der Schlüssel alle Partitionsdaten auf dem gleichen Funken Arbeiter zu halten, ohne einen Shuffle zu tun ist spanByKey

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md#grouping-rows-by-partition-key

CREATE TABLE events (year int, month int, ts timestamp, data varchar, PRIMARY KEY (year,month,ts)); 

sc.cassandraTable("test", "events") 
    .spanBy(row => (row.getInt("year"), row.getInt("month"))) 

sc.cassandraTable("test", "events") 
    .keyBy(row => (row.getInt("year"), row.getInt("month"))) 
    .spanByKey 

zu verwenden Wenn kein Shuffle ist als alle Änderungen erfolgen in zusammenstellen und als Iterator zusammenführen.

Achten Sie auf die Einschränkung zu beachten:

Hinweis: Dies funktioniert nur, um sequentiell Daten bestellt. Da Daten in Cassandra durch die Clustering-Schlüssel geordnet sind, müssen alle realisierbaren Spannen der natürlichen Clusterschlüsselreihenfolge folgen.

+0

Ich war derjenige, der die Kommentare hinterlassen hat :-) –

+0

aber der Spark-Job wird erwartet, * alle * Partitionen zu lesen, so dass dieser Ansatz nicht funktioniert. –

+0

ah, vielleicht willst du einen SpanByKey machen? Grundsätzlich, wenn Sie den Shuffle mit dem ReduceByKey vermeiden, sollten Sie in Ordnung sein. https://github.com/datastax/spark-cassandra-connector/blob/edba853b9630f60de2b3f1b0db2118792a5a5a89/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/PairRDDFunctions.scala – RussS