2016-05-26 15 views
1

Ich weiß, dass wir (wie cbind in R) zwei RDDs wie unten in pyspark kombinieren:Wie zwei Dstreams kombinieren Pyspark (ähnlich .zip auf normalen RDD) mit

rdd3 = rdd1.zip(rdd2) 

Ich möchte führen die Gleiches für zwei Dstreams in Pyspark. Ist es möglich oder Alternativen?

In der Tat verwende ich ein MLlib-Randomforest-Modell, um mithilfe von Spark-Streaming zu prognostizieren. Am Ende möchte ich das Merkmal Dstream & Vorhersage Dstream für weitere Downstream-Verarbeitung kombinieren.

Vielen Dank im Voraus.

-Obaid

Antwort

2

Am Ende verwende ich unten.

Der Trick ist die Verwendung von "native Python Map" zusammen mit "Spark spreaming transform". Darf kein elegent Weg, aber es funktioniert :).

def predictScore(texts, modelRF): 
    predictions = texts.map(lambda txt : (txt , getFeatures(txt))).\ 
    map(lambda (txt, features) : (txt ,(features.split(',')))).\ 
    map(lambda (txt, features) : (txt, ([float(i) for i in features]))).\ 
    transform(lambda rdd: sc.parallelize(\ 
     map(lambda x,y:(x,y), modelRF.predict(rdd.map(lambda (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect())\ 
     )\ 
    ) 
    # in the transform operation: x=text and y=features 
    # Return will be tuple of (score,'original text') 
    return predictions 

Hoffe, es wird jemandem helfen, der mit demselben Problem konfrontiert ist. Wenn jemand eine bessere Idee hat, bitte posten Sie es hier.

-Obaid

Anmerkung: Ich legte auch das Problem auf Funken-Benutzerliste und poste meine Antwort auch dort.