2016-02-28 4 views
7

Nach dem Lesen mehrerer Dokumentation Seiten Apache Flink (official documentation, dataartisans) sowie die Beispiele in der official repository, sehe ich Beispiele, wo sie als Datenquelle für das Streaming einer Datei verwenden bereits heruntergeladen, immer mit dem localhost verbinden.Get JSON Elemente aus einem Web mit Apache Flink

Ich versuche Apache Flink zu verwenden, um JSON-Dateien herunterzuladen, die dynamische Daten enthalten. Meine Absicht ist es zu versuchen, die URL zu etablieren, wo ich auf die JSON-Datei als Eingangsquelle von Apache Flink zugreifen kann, anstatt sie mit einem anderen System herunterzuladen und die heruntergeladene Datei mit Apache Flink zu verarbeiten.

Ist es möglich, diese Netzwerkverbindung mit Apache Flink zu stabilisieren?

Antwort

4

Sie können die URLs, die Sie herunterladen möchten, als Ihre Eingabe DataStream definieren und dann die Dokumente aus einer MapFunction herunterladen. Der folgende Code demonstriert das:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

DataStream<String> inputURLs = env.fromElements("http://www.json.org/index.html"); 

inputURLs.map(new MapFunction<String, String>() { 
    @Override 
    public String map(String s) throws Exception { 
     URL url = new URL(s); 
     InputStream is = url.openStream(); 

     BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is)); 

     StringBuilder builder = new StringBuilder(); 
     String line; 

     try { 
      while ((line = bufferedReader.readLine()) != null) { 
       builder.append(line + "\n"); 
      } 
     } catch (IOException ioe) { 
      ioe.printStackTrace(); 
     } 

     try { 
      bufferedReader.close(); 
     } catch (IOException ioe) { 
      ioe.printStackTrace(); 
     } 

     return builder.toString(); 
    } 
}).print(); 

env.execute("URL download job"); 
+0

Ich führe Beispielcode, aber es wird nur einmal ausgeführt und alle Datei gelesen. Allerdings streamt Iit nicht, ich dachte, dass es weiterlesen wird, wenn es in der JSON-Datei Probleme gibt. – zt1983811

+0

Dafür müssten Sie die 'ContinuousFileMonitoringFunction' verwenden. Streaming per se bedeutet nicht, dass der Job unendlich lange läuft. Dies geschieht nur, wenn Sie eine nicht endliche Quelle haben. In diesem Fall erzeugt die Funktion "env.fromElements" eine endliche Streaming-Quelle. Sobald diese Quelle ihr Ende erreicht hat, wird das Programm beendet. –