Um Ihre Frage zu beantworten, wird der Master nicht alle enthaltenen Daten lesen, sondern den Status aller Eingabedateien abrufen, bevor er mit der Arbeit beginnt. Dataproc setzt die Eigenschaft "mapreduce.input.fileinputformat.list-status.num-threads" standardmäßig auf 20, um den Zeitpunkt dieser Suche zu verbessern. In GCS wird jedoch immer noch eine RPC-Datei ausgeführt.
Es scheint, dass Sie einen Fall gefunden haben, in dem das Hinzufügen von Threads nicht sehr hilfreich ist und den Treiber schneller zu OOM führt.
Erweiterung auf wie das Lesen zu parallelisieren, habe ich zwei Ideen.
Aber zuerst, ein wenig Warnung: keine dieser Lösungen, wie sie sind, sind sehr robust gegenüber den Verzeichnissen in den Glob enthalten sind. Sie sollten sich wahrscheinlich vor Verzeichnissen schützen, die in der Liste der zu lesenden Dateien angezeigt werden.
Die erste ist mit Python und den hadoop-Befehlszeilenwerkzeugen gemacht (dies könnte auch mit gsutil gemacht werden). Die unten ist ein Beispiel dafür, wie es aussehen könnte, und führt eine Datei auf Arbeiter Auflistung liest Dateiinhalt in Paaren und berechnet schließlich Paare (Dateiname, Dateilänge):
from __future__ import print_function
from pyspark.rdd import RDD
from pyspark import SparkContext
import sys
import subprocess
def hadoop_ls(file_glob):
lines = subprocess.check_output(["/usr/bin/hadoop", "fs", "-ls", file_glob]).split("\n")
files = [line.split()[7] for line in lines if len(line) > 0]
return files
def hadoop_cat(file):
return subprocess.check_output(["/usr/bin/hadoop", "fs", "-cat", file]).decode("utf-8")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Provide a list of path globs to read.")
exit(-1)
sc = SparkContext()
# This is just for testing. You'll want to generate a list
# of prefix globs instead of having a list passed in from the
# command line.
globs = sys.argv[1:]
# Desired listing partition count
lpc = 100
# Desired 'cat' partition count, should be less than total number of files
cpc = 1000
files = sc.parallelize(globs).repartition(lpc).flatMap(hadoop_ls)
files_and_content = files.repartition(cpc).map(lambda f: [f, hadoop_cat(f)])
files_and_char_count = files_and_content.map(lambda p: [p[0], len(p[1])])
local = files_and_char_count.collect()
for pair in local:
print("File {} had {} chars".format(pair[0], pair[1]))
ich zum ersten Mal mit diesem Teilprozess beginnen würde Lösung und spielen mit der Partitionierung von hadoop_ls und hadoop_cat Anrufe und sehen, ob Sie etwas bekommen können, das akzeptabel ist.
Die zweite Lösung ist komplizierter, wird aber wahrscheinlich eine Pipeline ergeben, die leistungsfähiger ist, indem viele, viele Exec-Aufrufe vermieden werden.
In dieser zweiten Lösung kompilieren wir einen speziellen Helfer-Krug, verwenden eine Initialisierungsaktion, um diesen Krug an alle Arbeiter zu kopieren und schließlich den Helfer aus unserem Treiber zu verwenden.
Die letzte Verzeichnisstruktur unseres scala jar Projekt wird wie folgt aussehen:
helper/src/main/scala/com/google/cloud/dataproc/support/PysparkHelper.scala
helper/build.sbt
In unserer PysparkHelper.scala Datei werden wir eine kleine scala Klasse, die Lösung über viel wie unsere reine Python-Funktionen tut . Zuerst erstellen wir eine RDD von Datei-Globs, dann eine RDD von Dateinamen und schließlich eine RDD von Dateinamen und Datei-Content-Paaren.
package com.google.cloud.dataproc.support
import collection.JavaConversions._
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext}
import java.util.ArrayList
import java.nio.charset.StandardCharsets
class PysparkHelper extends Serializable {
def wholeTextFiles(
context: JavaSparkContext,
paths: ArrayList[String],
partitions: Int): JavaPairRDD[String, String] = {
val globRDD = context.sc.parallelize(paths).repartition(partitions)
// map globs to file names:
val filenameRDD = globRDD.flatMap(glob => {
val path = new Path(glob)
val fs: FileSystem = path.getFileSystem(new Configuration)
val statuses = fs.globStatus(path)
statuses.map(s => s.getPath.toString)
})
// Map file name to (name, content) pairs:
// TODO: Consider adding a second parititon count parameter to repartition before
// the below map.
val fileNameContentRDD = filenameRDD.map(f => {
Pair(f, readPath(f, new Configuration))
})
new JavaPairRDD(fileNameContentRDD)
}
def readPath(file: String, conf: Configuration) = {
val path = new Path(file)
val fs: FileSystem = path.getFileSystem(conf)
val stream = fs.open(path)
try {
IOUtils.toString(stream, StandardCharsets.UTF_8)
} finally {
stream.close()
}
}
}
Der Helfer/Build.sbt Datei würde wie folgt aussehen:
organization := "com.google.cloud.dataproc.support"
name := "pyspark_support"
version := "0.1"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided"
libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.7.1" % "provided"
exportJars := true
Wir können dann die Helfer mit sbt bauen:
$ cd helper && sbt package
Der Ausgang Helfer jar sollte Ziel/scala-2.10/pyspark_support_2.10-0.1.jar sein
Jetzt müssen wir dieses jar auf unseren Cluster bringen, und dazu müssen wir zwei Dinge tun: 1) das jar in GCS hochladen und 2) eine Initialisierungsaktion in GCS erstellen, um das jar auf Clusterknoten zu kopieren.
Nehmen wir mal an, Ihr Bucket heißt MY_BUCKET (fügen Sie hier ein Walross-bezogenes Mem ein).
$ gsutil cp target/scala-2.10/pyspark_support_2.10-0.1.jar gs://MY_BUCKET/pyspark_support.jar
Eine Initialisierungsaktion (nennen wir es pyspark_init_action.sh, MY_BUCKET ersetzt je nach Bedarf):
#!/bin/bash
gsutil cp gs://MY_BUCKET/pyspark_support.jar /usr/lib/hadoop/lib/
und schließlich die Initialisierungsaktion zu GCS hochladen:
$ gsutil cp pyspark_init_action.sh gs://MY_BUCKET/pyspark_init_action.sh
Ein Cluster kann jetzt gestartet werden, indem die folgenden Flags an gcloud übergeben werden:
--initialization-actions gs://MY_BUCKET/pyspark_init_action.sh
Nach dem Bau, das Hochladen und die Installation unserer neuen Bibliothek können wir schließlich nutzen es aus pyspark:
from __future__ import print_function
from pyspark.rdd import RDD
from pyspark import SparkContext
from pyspark.serializers import PairDeserializer, UTF8Deserializer
import sys
class DataprocUtils(object):
@staticmethod
def wholeTextFiles(sc, glob_list, partitions):
"""
Read whole text file content from GCS.
:param sc: Spark context
:param glob_list: List of globs, each glob should be a prefix for part of the dataset.
:param partitions: number of partitions to use when creating the RDD
:return: RDD of filename, filecontent pairs.
"""
helper = sc._jvm.com.google.cloud.dataproc.support.PysparkHelper()
return RDD(helper.wholeTextFiles(sc._jsc, glob_list, partitions), sc,
PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Provide a list of path globs to read.")
exit(-1)
sc = SparkContext()
globs = sys.argv[1:]
partitions = 10
files_and_content = DataprocUtils.wholeTextFiles(sc, globs, partitions)
files_and_char_count = files_and_content.map(lambda p: (p[0], len(p[1])))
local = files_and_char_count.collect()
for pair in local:
print("File {} had {} chars".format(pair[0], pair[1]))
Alle Fehlermeldungen, Stack-Traces usw. wären hilfreich. Der Master liest nicht alle enthaltenen Daten, aber er holt den Status für alle Eingabedateien, bevor er mit der Arbeit beginnt. Dataproc setzt die Eigenschaft "mapreduce.input.fileinputformat.list-status.num-threads" standardmäßig auf 20, um den Zeitpunkt dieser Suche zu verbessern. In GCS wird jedoch immer noch eine RPC-Datei ausgeführt. Eine Methode zur weiteren Verbesserung der Suche besteht darin, einen Teil dieser Nachschlagelogik über Spark auszuführen, indem RDD-Dateipräfixe erstellt werden, FlatMap verwendet wird, um diese Präfixe in Dateinamen umzuwandeln, und Dateinamen dann Dateiinhalten zugeordnet werden. –
Ok. Angenommen, ich erstelle eine RDD von Dateinamen, wie Sie es vorschlagen. Wie sollte ich diesen Dateinamen dem Dateiinhalt zuordnen? Ich kann sc.wholeTextFile nicht innerhalb eines Executors aufrufen. Ich könnte die Boto-API innerhalb des Executor verwenden, um die Datei herunterzuladen. Ich habe es versucht, aber es ist noch langsamer. Mein Verdacht ist, dass die Boto-API bei jeder Anfrage einen hohen Authentifizierungsaufwand hat. –