2016-06-06 11 views
1

Ich möchte versuchen, laden Sie eine CSV-Daten in Python und streamen jede Zeile Funken über SPark Streaming.Python senden CSV-Daten zum Funken Streaming

Ich bin ziemlich neu zu Netzwerk Zeug. Ich bin nicht genau, wenn ich ein Server-Python-Skript erstellen sollte, das, sobald es eine Verbindung herstellt (mit Spark-Streaming), es beginnt, jede Zeile zu senden. In der Spark-Streaming-Dokumentation machen sie einen nc -l 9999, der ein Netcat-Server ist, der Port 9999 überwacht, wenn er korrekt ist. Also habe ich versucht, ähnlich ein Python-Skript erstellen, die eine csv analysiert und sendet auf Port 60000

import socket     # Import socket module 
import csv 

port = 60000     # Reserve a port for your service. 
s = socket.socket()    # Create a socket object 
host = socket.gethostname()  # Get local machine name 
s.bind((host, port))   # Bind to the port 
s.listen(5)      # Now wait for client connection. 

print('Server listening....') 

while True: 
    conn, addr = s.accept()  # Establish connection with client. 
    print('Got connection from', addr) 



    csvfile = open('Titantic.csv', 'rb') 

    reader = csv.reader(csvfile, delimiter = ',') 
    for row in reader: 
     line = ','.join(row) 

     conn.send(line) 
     print(line) 

    csvfile.close() 

    print('Done sending') 
    conn.send('Thank you for connecting') 
    conn.close() 

SPARK Streaming-Skript -

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
ssc = StreamingContext(sc, 1) 

# Create a DStream that will connect to hostname:port, like localhost:9999 
linesRDD = ssc.socketTextStream("localhost", 60000) 

# Split each line into words 
dataRDD = lines.flatMap(lambda line: line.split(",")) 

dataRDD.pprint() 

ssc.start()    # Start the computation 
ssc.awaitTermination() # Wait for the computation to terminate 

Wenn der Funke Skript ausführen (Dies ist in Jupyter Notebooks btw) bekomme ich Dieser Fehler - IllegalArgumentException: 'Anforderung fehlgeschlagen: Keine Ausgabevorgänge registriert, also nichts auszuführen'

Ich denke nicht, dass ich mein Socket-Skript richtig mache, aber ich bin nicht wirklich sicher, was zu tun Im im Grunde versuche, w zu replizieren Hat nc -lk 9999 kann ich Textdaten über den Port senden und dann Funken Streaming ist es zu hören und empfängt die Daten und verarbeitet sie.

Jede Hilfe wäre sehr dankbar

Antwort

0

Ich versuche, etwas Ähnliches zu tun, aber ich möchte alle 10 Sekunden eine Zeile streamen. Ich löste mit diesem Skript:

import socket 
from time import sleep 

host = 'localhost' 
port = 12345 

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
s.bind((host, port)) 
s.listen(1) 
while True: 
    print('\nListening for a client at',host , port) 
    conn, addr = s.accept() 
    print('\nConnected by', addr) 
    try: 
     print('\nReading file...\n') 
     with open('iris_test.csv') as f: 
      for line in f: 
       out = line.encode('utf-8') 
       print('Sending line',line) 
       conn.send(out) 
       sleep(10) 
      print('End Of Stream.') 
    except socket.error: 
     print ('Error Occured.\n\nClient disconnected.\n') 
conn.close() 

Hoffe, dass dies hilft.