3

Ich versuche etwa 1 Million HTML-Dateien mit PySpark (Google Dataproc) zu analysieren und die relevanten Felder in eine komprimierte Datei zu schreiben. Jede HTML-Datei ist etwa 200 KB. Daher sind alle Daten etwa 200 GB.PySpark + Google Cloud Speicher (wholeTextFiles)

Der untenstehende Code funktioniert gut, wenn ich eine Teilmenge der Daten verwende, aber stundenlang läuft und dann abstürzt, wenn das ganze Dataset ausgeführt wird. Außerdem werden die Arbeiterknoten nicht verwendet (< 5% CPU), also weiß ich, dass es ein Problem gibt.

Ich glaube, das System verschluckt sich bei der Aufnahme der Daten von GCS. Gibt es einen besseren Weg, dies zu tun? Wenn ich auf diese Weise ganzeTextFiles verwende, versucht der Master dann, alle Dateien herunterzuladen und sie dann an die Executoren zu senden, oder lässt er sie von den Executoren herunterladen?

+0

Alle Fehlermeldungen, Stack-Traces usw. wären hilfreich. Der Master liest nicht alle enthaltenen Daten, aber er holt den Status für alle Eingabedateien, bevor er mit der Arbeit beginnt. Dataproc setzt die Eigenschaft "mapreduce.input.fileinputformat.list-status.num-threads" standardmäßig auf 20, um den Zeitpunkt dieser Suche zu verbessern. In GCS wird jedoch immer noch eine RPC-Datei ausgeführt. Eine Methode zur weiteren Verbesserung der Suche besteht darin, einen Teil dieser Nachschlagelogik über Spark auszuführen, indem RDD-Dateipräfixe erstellt werden, FlatMap verwendet wird, um diese Präfixe in Dateinamen umzuwandeln, und Dateinamen dann Dateiinhalten zugeordnet werden. –

+0

Ok. Angenommen, ich erstelle eine RDD von Dateinamen, wie Sie es vorschlagen. Wie sollte ich diesen Dateinamen dem Dateiinhalt zuordnen? Ich kann sc.wholeTextFile nicht innerhalb eines Executors aufrufen. Ich könnte die Boto-API innerhalb des Executor verwenden, um die Datei herunterzuladen. Ich habe es versucht, aber es ist noch langsamer. Mein Verdacht ist, dass die Boto-API bei jeder Anfrage einen hohen Authentifizierungsaufwand hat. –

Antwort

3

Um Ihre Frage zu beantworten, wird der Master nicht alle enthaltenen Daten lesen, sondern den Status aller Eingabedateien abrufen, bevor er mit der Arbeit beginnt. Dataproc setzt die Eigenschaft "mapreduce.input.fileinputformat.list-status.num-threads" standardmäßig auf 20, um den Zeitpunkt dieser Suche zu verbessern. In GCS wird jedoch immer noch eine RPC-Datei ausgeführt.

Es scheint, dass Sie einen Fall gefunden haben, in dem das Hinzufügen von Threads nicht sehr hilfreich ist und den Treiber schneller zu OOM führt.

Erweiterung auf wie das Lesen zu parallelisieren, habe ich zwei Ideen.

Aber zuerst, ein wenig Warnung: keine dieser Lösungen, wie sie sind, sind sehr robust gegenüber den Verzeichnissen in den Glob enthalten sind. Sie sollten sich wahrscheinlich vor Verzeichnissen schützen, die in der Liste der zu lesenden Dateien angezeigt werden.

Die erste ist mit Python und den hadoop-Befehlszeilenwerkzeugen gemacht (dies könnte auch mit gsutil gemacht werden). Die unten ist ein Beispiel dafür, wie es aussehen könnte, und führt eine Datei auf Arbeiter Auflistung liest Dateiinhalt in Paaren und berechnet schließlich Paare (Dateiname, Dateilänge):

from __future__ import print_function 

from pyspark.rdd import RDD 
from pyspark import SparkContext 

import sys 
import subprocess 


def hadoop_ls(file_glob): 
    lines = subprocess.check_output(["/usr/bin/hadoop", "fs", "-ls", file_glob]).split("\n") 
    files = [line.split()[7] for line in lines if len(line) > 0] 
    return files 

def hadoop_cat(file): 
    return subprocess.check_output(["/usr/bin/hadoop", "fs", "-cat", file]).decode("utf-8") 

if __name__ == "__main__": 
    if len(sys.argv) < 2: 
    print("Provide a list of path globs to read.") 
    exit(-1) 

    sc = SparkContext() 
    # This is just for testing. You'll want to generate a list 
    # of prefix globs instead of having a list passed in from the 
    # command line. 
    globs = sys.argv[1:] 
    # Desired listing partition count 
    lpc = 100 
    # Desired 'cat' partition count, should be less than total number of files 
    cpc = 1000 
    files = sc.parallelize(globs).repartition(lpc).flatMap(hadoop_ls) 
    files_and_content = files.repartition(cpc).map(lambda f: [f, hadoop_cat(f)]) 
    files_and_char_count = files_and_content.map(lambda p: [p[0], len(p[1])]) 
    local = files_and_char_count.collect() 
    for pair in local: 
    print("File {} had {} chars".format(pair[0], pair[1])) 

ich zum ersten Mal mit diesem Teilprozess beginnen würde Lösung und spielen mit der Partitionierung von hadoop_ls und hadoop_cat Anrufe und sehen, ob Sie etwas bekommen können, das akzeptabel ist.

Die zweite Lösung ist komplizierter, wird aber wahrscheinlich eine Pipeline ergeben, die leistungsfähiger ist, indem viele, viele Exec-Aufrufe vermieden werden.

In dieser zweiten Lösung kompilieren wir einen speziellen Helfer-Krug, verwenden eine Initialisierungsaktion, um diesen Krug an alle Arbeiter zu kopieren und schließlich den Helfer aus unserem Treiber zu verwenden.

Die letzte Verzeichnisstruktur unseres scala jar Projekt wird wie folgt aussehen:

helper/src/main/scala/com/google/cloud/dataproc/support/PysparkHelper.scala 
helper/build.sbt 

In unserer PysparkHelper.scala Datei werden wir eine kleine scala Klasse, die Lösung über viel wie unsere reine Python-Funktionen tut . Zuerst erstellen wir eine RDD von Datei-Globs, dann eine RDD von Dateinamen und schließlich eine RDD von Dateinamen und Datei-Content-Paaren.

package com.google.cloud.dataproc.support 

import collection.JavaConversions._ 

import org.apache.commons.io.IOUtils 
import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.fs.{FileSystem, Path} 
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext} 

import java.util.ArrayList 
import java.nio.charset.StandardCharsets 

class PysparkHelper extends Serializable { 
    def wholeTextFiles(
    context: JavaSparkContext, 
    paths: ArrayList[String], 
    partitions: Int): JavaPairRDD[String, String] = { 

    val globRDD = context.sc.parallelize(paths).repartition(partitions) 
    // map globs to file names: 
    val filenameRDD = globRDD.flatMap(glob => { 
     val path = new Path(glob) 
     val fs: FileSystem = path.getFileSystem(new Configuration) 
     val statuses = fs.globStatus(path) 
     statuses.map(s => s.getPath.toString) 
    }) 
    // Map file name to (name, content) pairs: 
    // TODO: Consider adding a second parititon count parameter to repartition before 
    // the below map. 
    val fileNameContentRDD = filenameRDD.map(f => { 
     Pair(f, readPath(f, new Configuration)) 
    }) 

    new JavaPairRDD(fileNameContentRDD) 
    } 

    def readPath(file: String, conf: Configuration) = { 
    val path = new Path(file) 
    val fs: FileSystem = path.getFileSystem(conf) 
    val stream = fs.open(path) 
    try { 
     IOUtils.toString(stream, StandardCharsets.UTF_8) 
    } finally { 
     stream.close() 
    } 
    } 
} 

Der Helfer/Build.sbt Datei würde wie folgt aussehen:

organization := "com.google.cloud.dataproc.support" 
name := "pyspark_support" 
version := "0.1" 
scalaVersion := "2.10.5" 
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided" 
libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.7.1" % "provided" 
exportJars := true 

Wir können dann die Helfer mit sbt bauen:

$ cd helper && sbt package 

Der Ausgang Helfer jar sollte Ziel/scala-2.10/pyspark_support_2.10-0.1.jar sein

Jetzt müssen wir dieses jar auf unseren Cluster bringen, und dazu müssen wir zwei Dinge tun: 1) das jar in GCS hochladen und 2) eine Initialisierungsaktion in GCS erstellen, um das jar auf Clusterknoten zu kopieren.

Nehmen wir mal an, Ihr Bucket heißt MY_BUCKET (fügen Sie hier ein Walross-bezogenes Mem ein).

$ gsutil cp target/scala-2.10/pyspark_support_2.10-0.1.jar gs://MY_BUCKET/pyspark_support.jar 

Eine Initialisierungsaktion (nennen wir es pyspark_init_action.sh, MY_BUCKET ersetzt je nach Bedarf):

#!/bin/bash 

gsutil cp gs://MY_BUCKET/pyspark_support.jar /usr/lib/hadoop/lib/ 

und schließlich die Initialisierungsaktion zu GCS hochladen:

$ gsutil cp pyspark_init_action.sh gs://MY_BUCKET/pyspark_init_action.sh 

Ein Cluster kann jetzt gestartet werden, indem die folgenden Flags an gcloud übergeben werden:

--initialization-actions gs://MY_BUCKET/pyspark_init_action.sh 

Nach dem Bau, das Hochladen und die Installation unserer neuen Bibliothek können wir schließlich nutzen es aus pyspark:

from __future__ import print_function 

from pyspark.rdd import RDD 
from pyspark import SparkContext 
from pyspark.serializers import PairDeserializer, UTF8Deserializer 

import sys 

class DataprocUtils(object): 

    @staticmethod 
    def wholeTextFiles(sc, glob_list, partitions): 
    """ 
    Read whole text file content from GCS. 
    :param sc: Spark context 
    :param glob_list: List of globs, each glob should be a prefix for part of the dataset. 
    :param partitions: number of partitions to use when creating the RDD 
    :return: RDD of filename, filecontent pairs. 
    """ 
    helper = sc._jvm.com.google.cloud.dataproc.support.PysparkHelper() 
    return RDD(helper.wholeTextFiles(sc._jsc, glob_list, partitions), sc, 
       PairDeserializer(UTF8Deserializer(), UTF8Deserializer())) 

if __name__ == "__main__": 
    if len(sys.argv) < 2: 
    print("Provide a list of path globs to read.") 
    exit(-1) 

    sc = SparkContext() 
    globs = sys.argv[1:] 
    partitions = 10 
    files_and_content = DataprocUtils.wholeTextFiles(sc, globs, partitions) 
    files_and_char_count = files_and_content.map(lambda p: (p[0], len(p[1]))) 
    local = files_and_char_count.collect() 
    for pair in local: 
    print("File {} had {} chars".format(pair[0], pair[1])) 
0

Dank! Ich habe die erste Methode ausprobiert. Es funktioniert, ist aber aufgrund der exec-Aufrufe und RPC/auth-Overhead nicht sehr performant. Es dauert ungefähr 10 Stunden, um auf einem Cluster mit 32 Knoten zu laufen. Ich konnte es in 30 Minuten auf einem 4-Knoten-Cluster mit Databricks auf aws mit dem Amazon S3-Connector ausführen. Es scheint, dass dort viel weniger Aufwand herrscht. Ich wünschte, Google würde eine bessere Möglichkeit bieten, Daten von GCS zu Spark zu importieren.

+0

Ich möchte mir das ein bisschen genauer ansehen; 10 Stunden scheinen zu hoch - können Sie das Layout Ihrer Dateien/Verzeichnisse teilen (z. B. 1 Million Objekte in einem einzigen Verzeichnis oder 1.000 Verzeichnisse mit jeweils 1.000 Objekten usw.)? Auch - können Sie die allgemeine Form von Sie glob teilen - (z. B./bucket/*/dir/* oder etwas anderes)? –