2015-02-27 3 views
7

Lasst uns sagen, ich habe Reihen Telefonanruf zeichnet das Format:Emit mehrere Paare in der Karte Operation

[CallingUser, ReceivingUser, Duration] 

Wenn ich will, um die Gesamtmenge von Zeit wissen, dass ein bestimmter Benutzer am Telefon gewesen ist (Summe aus Dauer, in der der Benutzer der CallingUser oder der ReceivingUser war).

Effektiv, für einen bestimmten Datensatz möchte ich 2 Paare (CallingUser, Duration) und (ReceivingUser, Duration) erstellen.

Was ist der effizienteste Weg, dies zu tun? Ich kann RDDs zusammen 2 hinzufügen, aber ich bin nicht klar, ob dies ein guter Ansatz ist:

#Sample Data: 
callData = sc.parallelize([["User1", "User2", 2], ["User1", "User3", 4], ["User2", "User1", 8] ]) 


calls = callData.map(lambda record: (record[0], record[2])) 

#The potentially inefficient map in question: 
calls += callData.map(lambda record: (record[1], record[2])) 


reduce = calls.reduceByKey(lambda a, b: a + b) 

Antwort

10

Sie flache Karte mögen. Wenn Sie eine Funktion schreiben, die die Liste [(record[0], record[2]),(record[1],record[2])] zurückgibt, dann können Sie sie flach abbilden!

+4

Möchten Sie die Codezeile dafür bereitstellen? Vielen Dank. –

6

Verwenden Sie eine flatMap(), die für die Aufnahme einzelner Eingänge und das Generieren von mehreren zugeordneten Ausgängen geeignet ist. Komplett mit Code:

callData = sc.parallelize([["User1", "User2", 2], ["User1", "User3", 4], ["User2", "User1", 8]]) 

calls = callData.flatMap(lambda record: [(record[0], record[2]), (record[1], record[2])]) 
print calls.collect() 
# prints [('User1', 2), ('User2', 2), ('User1', 4), ('User3', 4), ('User2', 8), ('User1', 8)] 

reduce = calls.reduceByKey(lambda a, b: a + b) 
print reduce.collect() 
# prints [('User2', 10), ('User3', 4), ('User1', 14)]