Dies ist ein Teil meiner Funke App. Der erste Teil ist der Teil, wo ich alle Artikel innerhalb der letzten 1 Stunde bekomme und der zweite Teil des Codes greift alle diese Artikel Kommentare. Der dritte Teil fügt die Kommentare zu den Artikeln hinzu. Das Problem ist, dass der articles.map(lambda x:(x.id,x.id)).join(axes)
Teil zu langsam ist, dauert es etwa 1 Minute. Ich möchte das auf 10 Sekunden oder noch weniger verbessern, weiß aber nicht wie? Danke für deine Antwort.Wie kann ich serverseitige Filterung mit dem Join in Spark Dataframe erreichen api
articles = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="articles", keyspace=source).load() \
.map(lambda x:x).filter(lambda x:x.created_at!=None).filter(lambda x:x.created_at>=datetime.now()-timedelta(hours=1) and x.created_at<=datetime.now()-timedelta(hours=0)).cache()
axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().map(lambda x:(x.article,x))
speed_rdd = articles.map(lambda x:(x.id,x.id)).join(axes)
EDIT
Das ist mein neuer Code, die ich Ihre Vorschläge geändert nach. Es ist jetzt schon 2 mal so schnell wie vorher, also danke dafür;). Nur eine weitere Verbesserung, die ich mit dem letzten Teil meines Codes in den Achsen Teil machen möchten, die immer noch zu langsam und benötigt 38 Sekunden für 30 Millionen Daten:
range_expr = col("created_at").between(
datetime.now()-timedelta(hours=timespan),
datetime.now()-timedelta(hours=time_delta(timespan))
)
article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="article_by_created_at", keyspace=source).load().where(range_expr).select('article','created_at').persist()
axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load()
Ich habe versucht, hier das (die ersetzen soll letzte Achsen Teil meines Codes), und dies ist auch die Lösung, die ich haben möchte, aber es scheint nicht richtig zu funktionieren:
in_expr = col("article").isin(article_ids.collect())
axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().where(in_expr)
ich bekomme immer diese Fehlermeldung:
in_expr = col("article").isin(article_ids.collect())
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: 'Column' object is not callable
Danke für Ihre Hilfe.
a) 'isin' wurde in 1.5 b eingeführt) Ich bin mir ziemlich sicher, was Sie wollen, ist diese' article_ids.collect() 'zuerst zu reduzieren. – zero323
Ich versuchte es zu reduzieren, aber es erhöhte nicht wirklich die Geschwindigkeit. Beim Laden dieses Teils sqlContext.read.Format hat eine Standard-Partition Nummer von 255 und ich möchte es kleiner machen, da dieser Teil von Cassandra nach Partitionen liest ist langsam, aber weiß nicht wie. Irgendwelche Ideen? Vielen Dank – peter