2016-05-30 15 views
1

Dies ist ein Teil meiner Funke App. Der erste Teil ist der Teil, wo ich alle Artikel innerhalb der letzten 1 Stunde bekomme und der zweite Teil des Codes greift alle diese Artikel Kommentare. Der dritte Teil fügt die Kommentare zu den Artikeln hinzu. Das Problem ist, dass der articles.map(lambda x:(x.id,x.id)).join(axes) Teil zu langsam ist, dauert es etwa 1 Minute. Ich möchte das auf 10 Sekunden oder noch weniger verbessern, weiß aber nicht wie? Danke für deine Antwort.Wie kann ich serverseitige Filterung mit dem Join in Spark Dataframe erreichen api

articles = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="articles", keyspace=source).load() \ 
         .map(lambda x:x).filter(lambda x:x.created_at!=None).filter(lambda x:x.created_at>=datetime.now()-timedelta(hours=1) and x.created_at<=datetime.now()-timedelta(hours=0)).cache() 

axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().map(lambda x:(x.article,x)) 

speed_rdd = articles.map(lambda x:(x.id,x.id)).join(axes) 

EDIT

Das ist mein neuer Code, die ich Ihre Vorschläge geändert nach. Es ist jetzt schon 2 mal so schnell wie vorher, also danke dafür;). Nur eine weitere Verbesserung, die ich mit dem letzten Teil meines Codes in den Achsen Teil machen möchten, die immer noch zu langsam und benötigt 38 Sekunden für 30 Millionen Daten:

range_expr = col("created_at").between(
          datetime.now()-timedelta(hours=timespan), 
          datetime.now()-timedelta(hours=time_delta(timespan)) 
         ) 
     article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="article_by_created_at", keyspace=source).load().where(range_expr).select('article','created_at').persist() 


     axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load() 

Ich habe versucht, hier das (die ersetzen soll letzte Achsen Teil meines Codes), und dies ist auch die Lösung, die ich haben möchte, aber es scheint nicht richtig zu funktionieren:

in_expr = col("article").isin(article_ids.collect()) 
     axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().where(in_expr) 

ich bekomme immer diese Fehlermeldung:

in_expr = col("article").isin(article_ids.collect()) 
Traceback (most recent call last):            
    File "<stdin>", line 1, in <module> 
TypeError: 'Column' object is not callable 

Danke für Ihre Hilfe.

Antwort

2

Wie mentioned before Wenn Sie eine angemessene Leistung erzielen möchten, konvertieren Sie Ihre Daten nicht in RDD. Es macht nicht nur Optimierungen wie Prädikat-Pushdown unmöglich, sondern führt auch einen enormen Aufwand beim Verschieben von Daten aus JVM nach Python ein.

Stattdessen sollten Sie die Verwendung von SQL-Ausdrücken/DataFrame API in ähnlicher Weise wie diese verwenden:

from pyspark.sql.functions import col, expr, current_timestamp 

range_expr = col("created_at").between(
    current_timestamp() - expr("INTERVAL 1 HOUR"), 
    current_timestamp()) 

articles = (sqlContext.read.format("org.apache.spark.sql.cassandra") 
    .options(...).load() 
    .where(col("created_at").isNotNull()) # This is not really required 
    .where(range_expr)) 

Es soll auch möglich sein Prädikat Ausdruck zu formulieren, unter Verwendung von Standard-Python-Dienstprogramme, wie Sie vorher getan haben:

import datetime 

range_expr = col("created_at").between(
    datetime.datetime.now() - datetime.timedelta(hours=1), 
    datetime.datetime.now() 
) 

Nachfolgende join sollten Daten ohne sich zu bewegen als auch aus Datenrahmen durchgeführt werden:

axes = (sqlContext.read.format("org.apache.spark.sql.cassandra") 
    .options(...) 
    .load()) 

articles.join(axes, ["id"]) 
+1

a) 'isin' wurde in 1.5 b eingeführt) Ich bin mir ziemlich sicher, was Sie wollen, ist diese' article_ids.collect() 'zuerst zu reduzieren. – zero323

+0

Ich versuchte es zu reduzieren, aber es erhöhte nicht wirklich die Geschwindigkeit. Beim Laden dieses Teils sqlContext.read.Format hat eine Standard-Partition Nummer von 255 und ich möchte es kleiner machen, da dieser Teil von Cassandra nach Partitionen liest ist langsam, aber weiß nicht wie. Irgendwelche Ideen? Vielen Dank – peter

2

1) Predicate Push-Down automatisch durch den Funken Cassandra Verbinder, solange die Filterung ist möglich in Cassandra (unter Verwendung von Primärschlüsseln zum Filtern oder sekundären Index) festgestellt wird: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md#pushing-down-clauses-to-cassandra

2) Für eine effizientere beitritt, Sie können die Methode repartitionByCassandraReplica aufrufen. Leider ist diese Methode möglicherweise nicht für PySpark, nur für Scala/Java API verfügbar. Lesen Sie das Dokument hier: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#performing-efficient-joins-with-cassandra-tables-since-12

3) Ein weiterer Tipp ist zu versuchen, zu debuggen und zu verstehen, wie der Connector Spark Partitionen erstellt. Es gibt einige Beispiele und Vorbehalte, die in der Dokumentation erwähnt werden: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.md