2016-02-04 3 views
5

Wir verwenden derzeit Cassandra (http://cassandra.apache.org/) für Zeitreihendaten. Cassandra ist sehr schnell beim Lesen, aber wir müssen eine Reihe von Berechnungen an unseren Daten durchführen, bevor wir es präsentieren (effektiv imitieren wir die Funktionen SUM und GROUP BY von SQL - Etwas, das Cassandra nicht out of the box unterstützt)Große Web-Datensätze in Python - wie mit sehr großen Arrays umgehen?

Wir sind vertraut mit Python (zu einem gewissen Grad) und beschlossen, einen Skript zu bauen unseren Cassandra-Cluster sowie das Durchführen führen~~POS=HEADCOMP der Mathematik und präsentieren das Ergebnis in einem JSON-Format abfragen:

query = (
    "SELECT query here...") 

startTimeQuery = time.time() 

# Executes cassandra query 
rslt = cassession.execute(query) 

print("--- %s seconds to query ---" % (time.time() - startTimeQuery)) 

tally = {} 

startTimeCalcs = time.time() 
for row in rslt: 
    userid = row.site_user_id 

    revenue = (int(row.revenue) - int(row.reversals_revenue or 0)) 
    accepted = int(row.accepted or 0) 
    reversals_revenue = int(row.reversals_revenue or 0) 
    error = int(row.error or 0) 
    impressions_negative = int(row.impressions_negative or 0) 
    impressions_positive = int(row.impressions_positive or 0) 
    rejected = int(row.rejected or 0) 
    reversals_rejected = int(row.reversals_rejected or 0) 

    if tally.has_key(userid): 
     tally[userid]["revenue"] += revenue 
     tally[userid]["accepted"] += accepted 
     tally[userid]["reversals_revenue"] += reversals_revenue 
     tally[userid]["error"] += error 
     tally[userid]["impressions_negative"] += impressions_negative 
     tally[userid]["impressions_positive"] += impressions_positive 
     tally[userid]["rejected"] += rejected 
     tally[userid]["reversals_rejected"] += reversals_rejected 
    else: 
     tally[userid] = { 
      "accepted": accepted, 
      "error": error, 
      "impressions_negative": impressions_negative, 
      "impressions_positive": impressions_positive, 
      "rejected": rejected, 
      "revenue": revenue, 
      "reversals_rejected": reversals_rejected, 
      "reversals_revenue": reversals_revenue 
     } 


print("--- %s seconds to calculate results ---" % (time.time() - startTimeCalcs)) 

startTimeJson = time.time() 
jsonOutput =json.dumps(tally) 
print("--- %s seconds for json dump ---" % (time.time() - startTimeJson)) 

print("--- %s seconds total ---" % (time.time() - startTimeQuery)) 

print "Array Size: " + str(len(tally)) 

Dies ist die Art von Ausgang wir erhalten:

--- 0.493520975113 seconds to query --- 
--- 23.1472680569 seconds to calculate results --- 
--- 0.546246051788 seconds for json dump --- 
--- 24.1871240139 seconds total --- 
Array Size: 198124 

Wir verbringen viel Zeit mit unseren Berechnungen, wir wissen, dass das Problem nicht so sehr die Summen und Gruppenbys selbst sind: Es ist nur die schiere Größe des Arrays.

Wir haben ein paar gute Dinge über numpy gehört, aber die Art unserer Daten macht die Matrixgröße zu einem unbekannten.

Wir suchen nach Tipps, wie man das angehen kann. Einschließlich eines völlig anderen Programmieransatzes.

+2

Das Goto-Python-Paket für Zeitreihendaten ist 'Pandas', die' numpy' unter der Haube verwenden. Hast du das untersucht? –

+2

Wie groß ist "groß"? –

+0

Deserialisierung? –

Antwort

0

In Cassandra 2.2 und höher können Benutzer Aggregatfunktionen definieren. Sie können damit die Säulenkalkulation auf der Cassandra-Seite durchführen. Bitte beachten Sie DataStax article für Daten über benutzerdefinierte Aggregate

1

Ich habe ein sehr ähnliches Stück Verarbeitung gemacht und ich war auch besorgt über Bearbeitungszeiten. Ich denke, dass Sie nicht etwas Wichtiges berücksichtigen: das Ergebnisobjekt, das Sie von Cassandra erhalten, als Rückgabe der Funktion execute() enthält nicht alle Zeilen, die Sie möchten. Stattdessen enthält es ein paginiertes Ergebnis und erhält Linien, während Sie das Objekt innerhalb der for Liste durchlaufen. Dies beruht auf persönlicher Beobachtung, ich weiß jedoch nicht, dass es mehr technische Details darüber gibt.

Ich schlage vor, Sie Abfrage und Verarbeitung der Ergebnisse isolieren durch eine einfache rslt = list(rslt) direkt nach dem execute Befehl hinzufügen, dass Python durch alle Linien im Ergebnis zu gehen zwingen würde, bevor die Verarbeitung zu tun, auch die cassandra Fahrer zwingt alle zu erhalten die Zeilen, die Sie möchten, bevor Sie zur Verarbeitung gehen.

Ich denke, Sie werden feststellen, dass ein großer Teil der Bearbeitungszeit tatsächlich abgefragt wurde, aber vom Fahrer über ein paginiertes Ergebnis maskiert wurde.