2015-01-22 3 views
9

streamen Ich versuche, Code unter Verwendung von Eclipse (mit Maven Conf) mit 2 Arbeiter ausführen und jeder haben 2 Kern oder auch mit Spark-Submit versucht.funken ssc.textFileStream ist keine Dateien aus dem Verzeichnis

public class StreamingWorkCount implements Serializable { 

    public static void main(String[] args) { 
     Logger.getLogger("org.apache.spark").setLevel(Level.WARN); 
     JavaStreamingContext jssc = new JavaStreamingContext(
       "spark://192.168.1.19:7077", "JavaWordCount", 
       new Duration(1000)); 
     JavaDStream<String> trainingData = jssc.textFileStream(
       "/home/bdi-user/kaushal-drive/spark/data/training").cache(); 
     trainingData.foreach(new Function<JavaRDD<String>, Void>() { 

      public Void call(JavaRDD<String> rdd) throws Exception { 
       List<String> output = rdd.collect(); 
       System.out.println("Sentences Collected from files " + output); 
       return null; 
      } 
     }); 

     trainingData.print(); 
     jssc.start(); 
     jssc.awaitTermination(); 
    } 
} 

und melden Sie sich von diesem Code

15/01/22 21:57:13 INFO FileInputDStream: New files at time 1421944033000 ms: 

15/01/22 21:57:13 INFO JobScheduler: Added jobs for time 1421944033000 ms 
15/01/22 21:57:13 INFO JobScheduler: Starting job streaming job 1421944033000 ms.0 from job set of time 1421944033000 ms 
15/01/22 21:57:13 INFO SparkContext: Starting job: foreach at StreamingKMean.java:33 
15/01/22 21:57:13 INFO DAGScheduler: Job 3 finished: foreach at StreamingKMean.java:33, took 0.000094 s 
Sentences Collected from files [] 
------------------------------------------- 
15/01/22 21:57:13 INFO JobScheduler: Finished job streaming job 1421944033000 ms.0 from job set of time 1421944033000 ms 
Time: 1421944033000 ms 
-------------------------------------------15/01/22 21:57:13 INFO JobScheduler: Starting job streaming job 1421944033000 ms.1 from job set of time 1421944033000 ms 


15/01/22 21:57:13 INFO JobScheduler: Finished job streaming job 1421944033000 ms.1 from job set of time 1421944033000 ms 
15/01/22 21:57:13 INFO JobScheduler: Total delay: 0.028 s for time 1421944033000 ms (execution: 0.013 s) 
15/01/22 21:57:13 INFO MappedRDD: Removing RDD 5 from persistence list 
15/01/22 21:57:13 INFO BlockManager: Removing RDD 5 
15/01/22 21:57:13 INFO FileInputDStream: Cleared 0 old files that were older than 1421943973000 ms: 
15/01/22 21:57:13 INFO FileInputDStream: Cleared 0 old files that were older than 1421943973000 ms: 
15/01/22 21:57:13 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer() 

Das Problem ist, dass ich immer Daten bin nicht die Datei bilden, die in dem Verzeichnis befindet. Bitte hilf mir.

+0

Mit genau demselben Problem auf Windows-Maschine konfrontiert.Bitte schlagen Sie –

+0

Ich denke, das funktioniert nur in HDFS und nicht auf lokalen Dateisystem –

Antwort

8

Versuchen Sie es mit einem anderen Verzeichnis und kopieren Sie dann diese Dateien in dieses Verzeichnis, während der Job ausgeführt wird.

+0

ja, ich habe es auch mit einem anderen Dir versucht. Ich habe nicht verstanden, was ist das Problem und wie zu debuggen, auch was nicht im Protokoll angezeigt wird. – Kaushal

+1

Aber war das Verzeichnis leer, als Sie den Job starteten? – pzecevic

+0

Eigentlich sind einige Dateien bereits da und ich kopiere auch einige Dateien, wenn ich meine Arbeit starte. – Kaushal

1

Ich denke, Sie müssen das Schema, d. H. file:// oder hdfs:// vor Ihrem Weg hinzufügen.


rückgängig machen bearbeiten zu meinem Kommentar, weil: Es ist in der Tat file:// und hdfs://, die „vor“ hinzugefügt werden muss der Weg, so dass der gesamte Pfad wird file:///tmp/file.txt oder hdfs:///user/data. Wenn in der Konfiguration kein NameNode festgelegt ist, muss dieser hdfs://host:port/user/data sein.

+1

mit HDFS, es funktioniert, aber wenn ich lokale Dateisystem mit 'Datei: ///' (Funke unterstützt nicht Datei: //) Präfix, es funktioniert nicht. – Kaushal

+1

Dies liegt möglicherweise daran, dass Sie einen Cluster verwenden und der angegebene Pfad für alle Spark-Executoren zugänglich sein muss, d. H. Es ist nicht genug, wenn der Spark-Treiber darauf zugreifen kann. – tgpfeiffer

3

hatte das gleiche Problem. Hier ist mein Code:

Linien = jssc.textFileStream ("file: /// Users/Projekte/Funken/test/data ');

die TextFileSTream ist sehr empfindlich; was ich am Ende tun war:

1. Run Spark program 
2. touch datafile 
3. mv datafile datafile2 
4. mv datafile2 /Users/projects/spark/test/data 

und das tat es

+0

Ja, es hat gut funktioniert! – lihongxu

0

JavaDoc schlägt nur dann funktionieren, neue Datei-Streams. s.

Ref: https://spark.apache.org/docs/1.0.1/api/java/org/apache/spark/streaming/api/java/JavaStreamingContext.html#textFileStream(java.lang.String)

einen Eingabestream erstellen, die ein Hadoop-kompatibles Dateisystem für neue Dateien überwacht und liest sie als Textdateien (Schlüssel als LongWritable, Wert als Text und Eingabeformat als TextInputFormat verwenden). Dateien müssen in das überwachte Verzeichnis geschrieben werden, indem sie von einem anderen Speicherort innerhalb desselben Dateisystems "verschoben" werden. Dateinamen beginnend mit. werden ignoriert.

0

textFileStream kann nur einen Ordner überwachen, wenn die Dateien in dem Ordner werden hinzugefügt oder aktualisiert. Wenn Sie nur Dateien lesen möchten, können Sie stattdessen SparkContext.textFile verwenden.

0

Sie müssen zählen, dass Spark Streaming nur die neuen Dateien im Verzeichnis liest, keine der aktualisierten (sobald sie im Verzeichnis sind) und sie alle müssen das gleiche Format haben.

Source