Ich schreibe eine Funke App zu bewerten, wo ich brauche die Streaming-Daten über die historischen Daten zu bewerten, die nun in einer SQL Server-Datenbank sitztWie Funken DSTREAM Objekte mit einem Funkendatenrahmen
die Idee ist, , spark holt die historischen Daten aus der Datenbank und speichert sie im Speicher und wertet die Streaming-Daten dagegen aus.
Jetzt bekomme ich die Streaming-Daten als
import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext,functions as func,Row
sc = SparkContext("local[2]", "realtimeApp")
ssc = StreamingContext(sc,10)
files = ssc.textFileStream("hdfs://RealTimeInputFolder/")
########Lets get the data from the db which is relavant for streaming ###
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
dataurl = "jdbc:sqlserver://myserver:1433"
db = "mydb"
table = "stream_helper"
credential = "my_credentials"
########basic data for evaluation purpose ########
files_count = files.flatMap(lambda file: file.split())
pattern = '(TranAmount=Decimal.{2})(.[0-9]*.[0-9]*)(\\S+)(TranDescription=u.)([a-zA-z\\s]+)([\\S\\s]+)(dSc=u.)([A-Z]{2}.[0-9]+)'
tranfiles = "wasb://myserver.blob.core.windows.net/RealTimeInputFolder01/"
def getSqlContextInstance(sparkContext):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
return globals()['sqlContextSingletonInstance']
def pre_parse(logline):
"""
to read files as rows of sql in pyspark streaming using the pattern . for use of logging
added 0,1 in case there is any failure in processing by this pattern
"""
match = re.search(pattern,logline)
if match is None:
return(line,0)
else:
return(
Row(
customer_id = match.group(8)
trantype = match.group(5)
amount = float(match.group(2))
),1)
def parse():
"""
actual processing is happening here
"""
parsed_tran = ssc.textFileStream(tranfiles).map(preparse)
success = parsed_tran.filter(lambda s: s[1] == 1).map(lambda x:x[0])
fail = parsed_tran.filter(lambda s:s[1] == 0).map(lambda x:x[0])
if fail.count() > 0:
print "no of non parsed file : %d", % fail.count()
return success,fail
success ,fail = parse()
Jetzt möchte ich es von den Datenrahmen bewerten, die ich von den historischen Daten erhalten
base_data = sqlContext.read.format("jdbc").options(driver=driver,url=dataurl,database=db,user=credential,password=credential,dbtable=table).load()
Nun ist diese seit als zurückgegeben werden Datenrahmen wie verwende ich das für meinen Zweck. Der Streaming-Programmierleitfaden here sagt
"Sie müssen einen SQLContext mit dem SparkContext erstellen, den der StreamingContext verwendet."
Jetzt macht mich das noch verwirrter über die Verwendung des vorhandenen Datenrahmens mit dem Streaming-Objekt. Jede Hilfe wird sehr geschätzt. Diese 2 Kontexte (SqlContext und Streaming) wird im gleichen Job koexistieren, weil sie
sc = SparkContext("local[2]", "realtimeApp")
sqlc = SQLContext(sc)
ssc = StreamingContext(sc, 10)
:
Es gibt mehrere Wege, um darüber zu gehen. Ich würde empfehlen, die foreachRDD-Methode für DStreams zu betrachten, in der Sie Ihren vorhandenen Datenrahmen als Broadcast-Variable verwenden können. Sie können Ihren Datenrahmen auch in eine RDD konvertieren und diese mit jeder DStream RDD mithilfe von Transformationen verbinden. Wenn Sie den sparkContext von sparkStreamingContext in der foreachRDD-Methode benötigen, um den sqlContext zu erstellen, können Sie einfach 'ssc.sparkContext()' aufrufen –