Antwort: Wie beitreten Apache Flink und Riak CS?
Riak CS hat S3 (Version 2) kompatible Schnittstelle. So ist es möglich, S3-Dateisystemadapter von Hadoop zu verwenden, um mit Riak CS zu arbeiten.
ich nicht bekannt, warum aber Apache Flink hat nur einen Teil der Hadoop-Dateisystem-Adapter innerhalb Fett jar (lib/flink-dist_2.11-1.0.1.jar
) heißt es FTP-Dateisystem (org.apache.hadoop.fs.ftp.FTPFileSystem
) aber S3-Dateisystem nicht hat (das heißt org.apache.hadoop.fs.s3a.S3AFileSystem
). So haben Sie 2 Möglichkeiten, dieses Problem zu lösen:
- Verwenden Sie diese Adapter von Hadoop Installation. Ich habe das nicht versucht, aber es sieht so aus, als sollten Sie nur die Variable HADOOP_CLASSPATH oder HADOOP_HOME evn konfigurieren.
- monky Patch Apache Flink und Download erforderlich JAR-Dateien zu
<flink home>/lib
Verzeichnis
So wählen ich zweiten Weg, weil nicht will, für die Bereitstellung von Hadoop in meiner Umgebung. Sie können JAR-Dateien von Hadoop dist oder Internet kopieren:
curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.jar -o /flink/lib/hadoop-aws-2.7.2.jar
curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o /flink/lib/aws-java-sdk-1.7.4.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.2.5/httpcore-4.2.5.jar -o /flink/lib/httpcore-4.2.5.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar -o /flink/lib/httpclient-4.2.5.jar
Wie man sehen kann ich alte Versionen verwenden, weil ein solche Version in Hadoop mit 2.7.2 und ich verwende Flink kompatibel mit dieser Version von Hadoop.
FYI: Solche Hack kann Probleme verursachen, wenn Sie die neueste Version dieser JARs im eigenen Fluss verwenden. Um zu vermeiden, Probleme im Zusammenhang mit verschiedenen Versionen können Sie Pakete verlagern, wenn Sie Fett Glas bauen mit Strömungs Verwendung so etwas wie (ich benutze Gradle):
// Relocate org.apache.http packages because Apache Flink include old version of this library (we place them for using S3 compatible FS)
shadowJar {
dependencies {
include(dependency('.*:.*:.*'))
}
relocate 'org.apache.http', 'relocated.org.apache.http'
relocate 'org.apache.commons', 'relocated.org.apache.commons'
}
Dann sollten Sie Pfad zu core-site.xml
in flink-conf.yaml
weil Hadoop kompatible Dateisysteme angeben mit dieser Konfiguration zum Laden von Einstellungen:
...
fs.hdfs.hadoopconf: /flink/conf
...
Wie Sie sehen, ich lege es nur zu <fink home>/conf
Verzeichnis. Es hat die folgenden Einstellungen:
...
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints
...
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
...
und schaffen Eimer in Riak CS:
<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> // because S3A better then other: https://wiki.apache.org/hadoop/AmazonS3
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>my-riak-cs.stage.local</value> // this is my Riak CS host
</property>
<property>
<name>fs.s3a.connection.ssl.enabled</name> // my Riak CS in staging doesn't support SSL
<value>false</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value>????</value> // this is my access key for Riak CS
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>????</value> // this is my secret key for Riak CS
</property>
</configuration>
Dann sollten Sie Riak CS Eimer in flink-conf.yaml
als Recommender here konfigurieren.Ich verwende s3cmd
(über brew
in meinem OS X dev env installiert ist):
s3cmd mb s3://example-staging-flink
FYI: Bevor s3cmd
verwenden Sie sollten es s3cmd --configure
verwenden konfigurieren und dann einige Einstellungen in ~/.s3cmd
Datei zu beheben:
signature_v2 = True // because Riak CS using S3 V2 interface
use_https = False // if your don't use SSL
access_key = ???
secret_key = ???
host_base = my-riak-cs.stage.local // your Riak CS host
host_bucket = %(bucket).my-riak-cs.stage.local // format of bucket used by Riak CS
Das ist alles, was Sie für den Speicher-/Wiederherstellungsstatus des Standalone HA Apache Flink-Clusters in Riak CS konfigurieren sollten.