2016-04-06 10 views
0

Ich möchte filesystem Zustand Backend und zookeeper Wiederherstellungsmodus konfigurieren:Ist es möglich, Riak CS mit Apache Flink zu verwenden?

state.backend: filesystem 
state.backend.fs.checkpointdir: ??? 

recovery.mode: zookeeper 
recovery.zookeeper.storageDir: ??? 

Wie du, ich sollte checkpointdir und storageDir Parameter angeben sehen können, aber ich habe keine Dateisysteme von Apache Flink (wie HDFS unterstützt oder Amazon S3). Aber ich habe Riak CS Cluster installiert (scheint wie compatible with S3).

Kann ich Riak CS zusammen mit Apache Flink verwenden? Wenn es möglich ist: Wie konfiguriert man Apache Flink für die Arbeit mit Riak CS?

Antwort

3

Antwort: Wie beitreten Apache Flink und Riak CS?

Riak CS hat S3 (Version 2) kompatible Schnittstelle. So ist es möglich, S3-Dateisystemadapter von Hadoop zu verwenden, um mit Riak CS zu arbeiten.

ich nicht bekannt, warum aber Apache Flink hat nur einen Teil der Hadoop-Dateisystem-Adapter innerhalb Fett jar (lib/flink-dist_2.11-1.0.1.jar) heißt es FTP-Dateisystem (org.apache.hadoop.fs.ftp.FTPFileSystem) aber S3-Dateisystem nicht hat (das heißt org.apache.hadoop.fs.s3a.S3AFileSystem). So haben Sie 2 Möglichkeiten, dieses Problem zu lösen:

  • Verwenden Sie diese Adapter von Hadoop Installation. Ich habe das nicht versucht, aber es sieht so aus, als sollten Sie nur die Variable HADOOP_CLASSPATH oder HADOOP_HOME evn konfigurieren.
  • monky Patch Apache Flink und Download erforderlich JAR-Dateien zu <flink home>/lib Verzeichnis

So wählen ich zweiten Weg, weil nicht will, für die Bereitstellung von Hadoop in meiner Umgebung. Sie können JAR-Dateien von Hadoop dist oder Internet kopieren:

curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.jar -o /flink/lib/hadoop-aws-2.7.2.jar 
curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o /flink/lib/aws-java-sdk-1.7.4.jar 
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.2.5/httpcore-4.2.5.jar -o /flink/lib/httpcore-4.2.5.jar 
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar -o /flink/lib/httpclient-4.2.5.jar 

Wie man sehen kann ich alte Versionen verwenden, weil ein solche Version in Hadoop mit 2.7.2 und ich verwende Flink kompatibel mit dieser Version von Hadoop.

FYI: Solche Hack kann Probleme verursachen, wenn Sie die neueste Version dieser JARs im eigenen Fluss verwenden. Um zu vermeiden, Probleme im Zusammenhang mit verschiedenen Versionen können Sie Pakete verlagern, wenn Sie Fett Glas bauen mit Strömungs Verwendung so etwas wie (ich benutze Gradle):

// Relocate org.apache.http packages because Apache Flink include old version of this library (we place them for using S3 compatible FS) 
shadowJar { 
    dependencies { 
     include(dependency('.*:.*:.*')) 
    } 

    relocate 'org.apache.http', 'relocated.org.apache.http' 
    relocate 'org.apache.commons', 'relocated.org.apache.commons' 
} 

Dann sollten Sie Pfad zu core-site.xml in flink-conf.yaml weil Hadoop kompatible Dateisysteme angeben mit dieser Konfiguration zum Laden von Einstellungen:

... 
fs.hdfs.hadoopconf: /flink/conf 
... 

Wie Sie sehen, ich lege es nur zu <fink home>/conf Verzeichnis. Es hat die folgenden Einstellungen:

... 
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints 
... 
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery 
... 

und schaffen Eimer in Riak CS:

<?xml version="1.0" encoding="UTF-8" ?> 
<configuration> 
    <property> 
     <name>fs.s3a.impl</name> 
     <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> // because S3A better then other: https://wiki.apache.org/hadoop/AmazonS3 
    </property> 
    <property> 
     <name>fs.s3a.endpoint</name> 
     <value>my-riak-cs.stage.local</value> // this is my Riak CS host 
    </property> 
    <property> 
     <name>fs.s3a.connection.ssl.enabled</name> // my Riak CS in staging doesn't support SSL 
     <value>false</value> 
    </property> 
    <property> 
     <name>fs.s3a.access.key</name> 
     <value>????</value> // this is my access key for Riak CS 
    </property> 
    <property> 
     <name>fs.s3a.secret.key</name> 
     <value>????</value> // this is my secret key for Riak CS 
    </property> 
</configuration> 

Dann sollten Sie Riak CS Eimer in flink-conf.yaml als Recommender here konfigurieren.Ich verwende s3cmd (über brew in meinem OS X dev env installiert ist):

s3cmd mb s3://example-staging-flink 

FYI: Bevor s3cmd verwenden Sie sollten es s3cmd --configure verwenden konfigurieren und dann einige Einstellungen in ~/.s3cmd Datei zu beheben:

signature_v2 = True // because Riak CS using S3 V2 interface 
use_https = False // if your don't use SSL 
access_key = ??? 
secret_key = ??? 
host_base = my-riak-cs.stage.local // your Riak CS host 
host_bucket = %(bucket).my-riak-cs.stage.local // format of bucket used by Riak CS 

Das ist alles, was Sie für den Speicher-/Wiederherstellungsstatus des Standalone HA Apache Flink-Clusters in Riak CS konfigurieren sollten.