5

Ich habe einen Spark-Job, der von Cassandra liest, die Daten verarbeitet/transformiert/filtert und die Ergebnisse in Elasticsearch schreibt. Ich benutze Docker für meine Integrationstests, und ich habe Probleme, von Spark zu Elasticsearch zu schreiben.Elasticsearch-Hadoop-Bibliothek kann keine Verbindung zum Andock-Container herstellen

Abhängigkeiten:

"joda-time"    % "joda-time"   % "2.9.4", 
"javax.servlet"   % "javax.servlet-api" % "3.1.0", 
"org.elasticsearch"  % "elasticsearch"  % "2.3.2", 
"org.scalatest"   %% "scalatest"   % "2.2.1", 
"com.github.nscala-time" %% "nscala-time"  % "2.10.0", 
"cascading"    % "cascading-hadoop" % "2.6.3", 
"cascading"    % "cascading-local" % "2.6.3", 
"com.datastax.spark"  %% "spark-cassandra-connector" % "1.4.2", 
"com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5", 
"org.elasticsearch"  % "elasticsearch-hadoop"  % "2.3.2" excludeAll(ExclusionRule("org.apache.storm")), 
"org.apache.spark"  %% "spark-catalyst"   % "1.4.0" % "provided" 

In meinen Unit-Tests ich anschließen kann

aka mit einem TransportClient Setup meine Vorlage und Index Elasticsearch. Dies funktioniert

val conf = new SparkConf().setAppName("test_reindex").setMaster("local") 
    .set("spark.cassandra.input.split.size_in_mb", "67108864") 
    .set("spark.cassandra.connection.host", cassandraHostString) 
    .set("es.nodes", elasticsearchHostString) 
    .set("es.port", "9200") 
    .set("http.publish_host", "") 
sc = new SparkContext(conf) 
esClient = TransportClient.builder().build() 
esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(elasticsearchHostString), 9300)) 
esClient.admin().indices().preparePutTemplate(testTemplate).setSource(Source.fromInputStream(getClass.getResourceAsStream("/mytemplate.json")).mkString).execute().actionGet() 
esClient.admin().indices().prepareCreate(esTestIndex).execute().actionGet() 
esClient.admin().indices().prepareAliases().addAlias(esTestIndex, "hot").execute().actionGet() 

aber wenn ich versuche,

EsSpark.saveToEs(
    myRDD, 
    "hot/mytype", 
    Map("es.mapping.id" -> "id", "es.mapping.parent" -> "parent_id") 
) 

Ich erhalte diese Stack-Trace

org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[172.17.0.2:9200]] 
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142) 
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434) 
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:442) 
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:518) 
at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:524) 
at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:491) 
at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:412) 
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:400) 
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40) 
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) 
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:89) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
16/08/08 12:30:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[172.17.0.2:9200]] 
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142) 
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434) 
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:442) 
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:518) 
at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:524) 
at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:491) 
at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:412) 
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:400) 
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40) 
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) 
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:89) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

I-Netz ‚Docker verwendet, kann überprüfen, laufen inspizieren Brücke, die es versucht, eine Verbindung herstellen die richtige IP-Adresse.

docker network inspect bridge 
[ 
{ 
    "Name": "bridge", 
    "Id": "ef184e3be3637be28f854c3278f1c8647be822a9413120a8957de6d2d5355de1", 
    "Scope": "local", 
    "Driver": "bridge", 
    "EnableIPv6": false, 
    "IPAM": { 
     "Driver": "default", 
     "Options": null, 
     "Config": [ 
      { 
       "Subnet": "172.17.0.0/16", 
       "Gateway": "172.17.0.1" 
      } 
     ] 
    }, 
    "Internal": false, 
    "Containers": { 
     "0c79680de8ef815bbe4bdd297a6f845cce97ef18bb2f2c12da7fe364906c3676": { 
      "Name": "analytics_rabbitmq_1", 
      "EndpointID": "3f03fdabd015fa1e2af802558aa59523f4a3c8c72f1231d07c47a6c8e60ae0d4", 
      "MacAddress": "02:42:ac:11:00:04", 
      "IPv4Address": "172.17.0.4/16", 
      "IPv6Address": "" 
     }, 
     "9b1f37c8df344c50e042c4b3c75fcb2774888f93fd7a77719fb286bb13f76f38": { 
      "Name": "analytics_elasticsearch_1", 
      "EndpointID": "fb083d27aaf8c0db1aac90c2a1ea2f752c46d8ac045e365f4b9b7d1651038a56", 
      "MacAddress": "02:42:ac:11:00:02", 
      "IPv4Address": "172.17.0.2/16", 
      "IPv6Address": "" 
     }, 
     "ed0cfad868dbac29bda66de6bee93e7c8caf04d623d9442737a00de0d43c372a": { 
      "Name": "analytics_cassandra_1", 
      "EndpointID": "2efa95980d681b3627a7c5e952e2f01980cf5ffd0fe4ba6185b2cab735784df6", 
      "MacAddress": "02:42:ac:11:00:03", 
      "IPv4Address": "172.17.0.3/16", 
      "IPv6Address": "" 
     } 
    }, 
    "Options": { 
     "com.docker.network.bridge.default_bridge": "true", 
     "com.docker.network.bridge.enable_icc": "true", 
     "com.docker.network.bridge.enable_ip_masquerade": "true", 
     "com.docker.network.bridge.host_binding_ipv4": "0.0.0.0", 
     "com.docker.network.bridge.name": "docker0", 
     "com.docker.network.driver.mtu": "1500" 
    }, 
    "Labels": {} 
} 
] 

Ich bin alles lokal auf einem Macbook/OSX. Ich weiß nicht, warum ich mit dem TransportClient und über meinen Browser eine Verbindung zum Docker-Container herstellen kann, aber die Funktion EsSpark.saveToES (...) schlägt immer fehl.

+0

Hat Ihre App den gleichen Namen? – alpert

+1

Können Sie versuchen, den Parameter 'es.nodes.wan.only' auf" true "zu setzen? –

Antwort

0

von

.config("es.nodes.wan.only", "true") 

Einstellung kann dieses Problem lösen

es.nodes.ingest.only

(default false) Ob nur Elasticsearch Ingest Knoten zu verwenden. Wenn diese Option aktiviert ist, leitet elasticsearch-hadoop alle Anforderungen (nach der Knotenerkennung, falls aktiviert) über die Knoten im Cluster weiter. Der Zweck dieser Konfigurationseinstellung ist , um die Kosten für die Weiterleitung von Daten zu vermeiden, die für eine Pipeline von Nicht-Ingest-Knoten bestimmt sind; Wirklich nur nützlich beim Schreiben von Daten in eine Ingest Pipeline (siehe oben es.ingest.pipeline).