1

Wir führen ein paar dataproc Aufträge mit dataproc image 1.0 und spark-redshift.DataProc Avro Version verursacht Fehler auf Image v1.0.0

Wir zwei Cluster haben, sind hier einige Details:

  • Ein Cluster -> Läuft PySpark Streaming Job, erstellt letzte 2016. Jul 15. 11:27:12 AEST
  • Cluster B -> Läuft PySpark Batchjobs wird der Cluster jedes Mal die erstellt Job wird ausgeführt und danach abgebaut.
  • A & B läuft die gleiche Codebasis verwenden die gleiche Init-Skript, gleiche Knotentypen usw.

Seit irgendwann am vergangenen Freitag (2016-08-05 AEST), unseren Code mit dem folgenden Arbeiten an Cluster B gestoppt Fehler, während Cluster A ohne Probleme ausgeführt wird.

Der folgende Code kann das Problem auf Cluster B (oder einen neuen Cluster mit Bild v1.0.0) wiederzugeben, während es feinen auf Cluster A. läuft

Probe PySpark Code:

from pyspark import SparkContext, SQLContext 
sc = SparkContext() 
sql_context = SQLContext(sc) 

rdd = sc.parallelize([{'user_id': 'test'}]) 
df = rdd.toDF() 

sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "FOO") 
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "BAR") 

df\ 
    .write\ 
    .format("com.databricks.spark.redshift") \ 
    .option("url", "jdbc:redshift://foo.ap-southeast-2.redshift.amazonaws.com/bar") \ 
    .option("dbtable", 'foo') \ 
    .option("tempdir", "s3n://bar") \ 
    .option("extracopyoptions", "TRUNCATECOLUMNS") \ 
    .mode("append") \ 
    .save() 

Der obige Code schlägt in beiden folgenden Situationen auf Cluster B fehl, während auf 0 F ausgeführt wird. Beachten Sie, dass das RedshiftJDBC41-1.1.10.1010.jar über Cluster-Init-Skript erstellt wird.

  • Laufen im interaktiven Modus auf Master-Knoten:

    PYSPARK_DRIVER_PYTHON=ipython pyspark \ 
        --verbose \ 
        --master "local[*]"\ 
        --jars /usr/lib/hadoop/lib/RedshiftJDBC41-1.1.10.1010.jar \ 
        --packages com.databricks:spark-redshift_2.10:1.0.0 
    
  • Senden Sie den Auftrag über gcloud dataproc

    gcloud --project foo \ 
        dataproc jobs submit pyspark \ 
        --cluster bar \ 
        --properties ^#^spark.jars.packages=com.databricks:spark-redshift_2.10:1.0.0#spark.jars=/usr/lib/hadoop/lib/RedshiftJDBC41-1.1.10.1010.jar \ 
        foo.bar.py 
    

Der Fehler es produziert (Trace):

2016-08-08 06:12:23 WARN TaskSetManager:70 - Lost task 6.0 in stage 45.0 (TID 121275, foo.bar.internal): 
    java.lang.NoSuchMethodError: org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter; 
    at org.apache.avro.mapreduce.AvroKeyRecordWriter.<init>(AvroKeyRecordWriter.java:55) 
    at org.apache.avro.mapreduce.AvroKeyOutputFormat$RecordWriterFactory.create(AvroKeyOutputFormat.java:79) 
    at org.apache.avro.mapreduce.AvroKeyOutputFormat.getRecordWriter(AvroKeyOutputFormat.java:105) 
    at com.databricks.spark.avro.AvroOutputWriter.<init>(AvroOutputWriter.scala:82) 
    at com.databricks.spark.avro.AvroOutputWriterFactory.newInstance(AvroOutputWriterFactory.scala:31) 
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:129) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:255) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

2016-08-08 06:12:24 ERROR YarnScheduler:74 - Lost executor 63 on kinesis-ma-sw-o7he.c.bupa-ma.internal: Container marked as failed: container_1470632577663_0003_01_000065 on host: kinesis-ma-sw-o7he.c.bupa-ma.internal. Exit status: 50. Diagnostics: Exception from container-launch. 
Container id: container_1470632577663_0003_01_000065 
Exit code: 50 
Stack trace: ExitCodeException exitCode=50: 
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:545) 
    at org.apache.hadoop.util.Shell.run(Shell.java:456) 
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:722) 
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212) 
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) 
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

SparkRedshift: 1.0.0 com.databricks.spark-avro:2.0.1 erfordert, die org.apache.avro:1.7.6 erfordert.

Nach der Version von org.apache.avro.generic.GenericData auf Cluster A Überprüfung:

[email protected]:/home/foo# spark-shell \ 
>  --verbose \ 
>  --master "local[*]" \ 
>  --deploy-mode client \ 
>  --packages com.databricks:spark-redshift_2.10:1.0.0 \ 
>  --jars "/usr/lib/hadoop/lib/RedshiftJDBC41-1.1.10.1010.jar" 

Es produziert (Trace):

scala> import org.apache.avro.generic._ 
import org.apache.avro.generic._ 

scala> val c = GenericData.get() 
c: org.apache.avro.generic.GenericData = [email protected] 

scala> c.getClass.getProtectionDomain().getCodeSource() 
res0: java.security.CodeSource = (file:/usr/lib/hadoop/lib/bigquery-connector-0.7.5-hadoop2.jar <no signer certificates>) 

Während den gleichen Befehl auf Cluster B ausgeführt wird:

scala> import org.apache.avro.generic._ 
import org.apache.avro.generic._ 

scala> val c = GenericData.get() 
c: org.apache.avro.generic.GenericData = [email protected] 

scala> c.getClass.getProtectionDomain().getCodeSource() 
res0: java.security.CodeSource = (file:/usr/lib/hadoop/lib/bigquery-connector-0.7.7-hadoop2.jar <no signer certificates>) 

Screenshot of Env auf Cluster B. (Entschuldigung für alle Redaktionen). Wir haben die auf here und here beschriebene Methode ohne Erfolg versucht.

Das ist wirklich frustrierend, da das DataProc den Bildinhalt aktualisiert ohne die Freigabeversion als das komplette Gegenteil der unveränderlichen Freigaben stoßend. Jetzt ist unser Code pleite und wir können auf keinen Fall die vorherige Version wiederherstellen.

Antwort

1

Entschuldigung für das Problem! Es ist sicherlich nicht dafür vorgesehen, Änderungen innerhalb einer Image-Version aufzuheben. Beachten Sie, dass Subminor-Versionen "unter die Haube" gerollt werden, um Bug Fixes und Dataproc-spezifische Patches zu beheben.

können Sie zurückkommen, die 1,0 bis mit * Version von vor dem letzten Woche durch einfache --image-version 1.0.8 angeben, wenn Cluster aus der Befehlszeile bereitstellen.

gcloud dataproc clusters create --image-version 1.0.8 

Edit: Für zusätzliche Erläuterungen haben wir die Avro suchen Versionen in Frage und verifiziert, dass Avro-Versionsnummern tatsächlich nicht Änderung in jedem kürzlichen Subminor Dataproc-Release. Das Kernproblem ist, dass Hadoop selbst einen latenten Fehler hatte, bei dem Hadoop selbst avro-1.7.4 unter /usr/lib/hadoop/lib/ bringt und Spark avro-1.7.7 verwendet. Zufälligerweise verwendet Googles bigquery connectory auch avro-1.7.7, aber dies stellt sich als orthogonal zu dem bekannten Spark/Hadoop problem with 1.7.4 vs 1.7.7 heraus. Das letzte Bildupdate wurde als nichtbrechend angesehen, da Versionen sich tatsächlich nicht änderten, aber das Sortieren von Ordnern auf nichtdeterministische Weise geändert wurde, wo Hadoops schlechte Avro-Version aus reinem Glück vor dem Spark-Job verborgen war und nicht mehr versehentlich im letzten Bild versteckt ist .

Dataprocs preview Image enthält derzeit eine Fehlerbehebung für die Avro-Version in der Hadoop-Ebene, die es in eine zukünftige Dataproc 1.1-Version aufnehmen sollte, wenn es herauskommt; Vielleicht möchten Sie die preview-Version ausprobieren, um zu sehen, ob Spark 2.0 ein nahtloser Übergang ist.

+0

Danke für die Detailerklärung, es ist jetzt viel klarer. Beide Version 1.0.8 und Vorschau hat getestet, um gut zu funktionieren. Diese Frage kann etwas außerhalb des Kontextes liegen, was das Ladeproblem der Klassenpfad betrifft, wir haben es mit 'spark.driver.userClassPathFirst' und' spark.executor.userClassPathFirst' versucht und sie haben nicht funktioniert. Frage mich nur, ob du etwas davon hast? Vielen Dank. –

+0

Leider hatte ich nicht viel Gelegenheit, mit 'spark.driver.userClassPathFirst' herumzuspielen; wenn ich raten müsste, würde ich vermuten, dass es hauptsächlich für das Übergeben von Uber-Jars funktioniert, die die genauen Abhängigkeiten direkt bündeln, und ich wäre nicht überrascht, wenn Abhängigkeiten, auf die in "-Packages" verwiesen wird, nicht von den speziellen Classloader-Einstellungen profitieren. –

+0

Danke. es ist jetzt viel klarer. –