2014-03-25 6 views
10

Gruppierung Ich versuche, eine einfache Transformation von gemeinsamen Crawl-Daten unter Verwendung von Spark-Host auf einem EC2 mit this guide, mein Code sieht wie folgt aus auszuführen:Funken läuft Speicher aus, wenn sie durch Schlüssel

package ccminer 

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 

object ccminer { 
    val english = "english|en|eng" 
    val spanish = "es|esp|spa|spanish|espanol" 
    val turkish = "turkish|tr|tur|turc" 
    val greek = "greek|el|ell" 
    val italian = "italian|it|ita|italien" 
    val all = (english :: spanish :: turkish :: greek :: italian :: Nil).mkString("|") 

    def langIndep(s: String) = s.toLowerCase().replaceAll(all, "*") 

    def main(args: Array[String]): Unit = { 
    if (args.length != 3) { 
     System.err.println("Bad command line") 
     System.exit(-1) 
    } 

    val cluster = "spark://???" 
    val sc = new SparkContext(cluster, "Common Crawl Miner", 
     System.getenv("SPARK_HOME"), Seq("/root/spark/ccminer/target/scala-2.10/cc-miner_2.10-1.0.jar")) 

    sc.sequenceFile[String, String](args(0)).map { 
     case (k, v) => (langIndep(k), v) 
    } 
    .groupByKey(args(2).toInt) 
    .filter { 
     case (_, vs) => vs.size > 1 
    } 
    .saveAsTextFile(args(1)) 
    } 
} 

Und ich bin mit es mit dem Befehl wie folgt:

sbt/sbt "run-main ccminer.ccminer s3n://aws-publicdatasets/common-crawl/parse-output/segment/1341690165636/textData-* s3n://parallelcorpus/out/ 2000" 

Aber sehr schnell scheitert es Fehler mit als

java.lang.OutOfMemoryError: Java heap space 
at com.ning.compress.BufferRecycler.allocEncodingBuffer(BufferRecycler.java:59) 
at com.ning.compress.lzf.ChunkEncoder.<init>(ChunkEncoder.java:93) 
at com.ning.compress.lzf.impl.UnsafeChunkEncoder.<init>(UnsafeChunkEncoder.java:40) 
at com.ning.compress.lzf.impl.UnsafeChunkEncoderLE.<init>(UnsafeChunkEncoderLE.java:13) 
at com.ning.compress.lzf.impl.UnsafeChunkEncoders.createEncoder(UnsafeChunkEncoders.java:31) 
at com.ning.compress.lzf.util.ChunkEncoderFactory.optimalInstance(ChunkEncoderFactory.java:44) 
at com.ning.compress.lzf.LZFOutputStream.<init>(LZFOutputStream.java:61) 
at org.apache.spark.io.LZFCompressionCodec.compressedOutputStream(CompressionCodec.scala:60) 
at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:803) 
at org.apache.spark.storage.BlockManager$$anonfun$5.apply(BlockManager.scala:471) 
at org.apache.spark.storage.BlockManager$$anonfun$5.apply(BlockManager.scala:471) 
at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:117) 
at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174) 
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164) 
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) 
at org.apache.spark.scheduler.Task.run(Task.scala:53) 
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) 
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:744) 
folgt

Also ist meine grundlegende Frage, was ist notwendig, um eine Spark-Aufgabe zu schreiben, die nach Schlüssel mit einer fast unbegrenzten Menge an Eingabe gruppieren kann, ohne den Speicher zu verlieren?

+0

In welcher Größe laufen Sie? – datasage

+0

http://stackoverflow.com/questions/21138751/spark-java-lang-outofmemoryerror-java-heap-space/22742982#22742982 – samthebest

+0

Warum haben Sie keine Antwort angenommen? :/Oder nicht einmal kommentiert. – gsamaras

Antwort

14

Die häufigste Ursache für java.lang.OutOfMemoryError-Ausnahmen in Shuffle-Aufgaben (wie groupByKey, reduceByKey usw.) ist die niedrige Stufe parallelism.

Sie können den Standardwert erhöhen, indem Sie spark.default.parallelism Eigenschaft in configuration setzen.

+2

Auch zu zu wenig Partitionen zu koaleszieren kann dies verursachen. – jbrown

+0

@jbrown [hat einen Punkt] (http://stackoverflow.com/questions/38961251/java-lang-outofmemoryerror-unable-to-acquire-100-bytes-of-memory-got-0)! – gsamaras

+0

Beachten Sie, dass [reduceByKey() tatsächlich scheint, unter den gleichen Fehlern zu leiden] (http://stackoverflow.com/questions/29156275/spark-scalability-what-am-i-doing-wrong). – gsamaras

4

So besagt, dass Sie Heap-Speicherplatz von JVM nicht mehr zugewiesen haben. Sie können die Größe des Heapspeichers erhöhen, dies wird jedoch durch die Systemfunktionen begrenzt (die Menge an physischem RAM kann nicht überschritten werden).

Auf der anderen Seite, wie von homutov erklärt, geschieht dies in großen Sammelvorgänge. Zum Beispiel groupByKey, reduceByKey, cartisien + mapToPair. Diese Operationen sammeln die RDD-Daten an einem Ort, so dass JVM keinen Heap-Speicherplatz mehr hat.

Was können Sie tun?

Mit meiner Erfahrung, wenn ein Cluster/System begrenzte Ressourcen haben, können Sie Spark tuning guide verwenden. spark.defualt.parallelism kann erhöht werden, bis Sie Aufgabe in Ihr Cluster/System begleiten können [Ich einmal eine KNN-Implementierung für 14000 Instanz ausgeführt, 1024 Feature-Dataset auf meinem Laptop virtuelle Maschine durch Optimieren der Parallelität].

Command line flag : --conf spark.default.parallelism=4 ; 4 is the parallelism value 

Denken Sie daran, Sie müssen TUNE diese Funktionen am effektivsten und scheitern Vermeidung (aus Heap ausgeführt wird) Einstellung die besten Ergebnisse aus Funken zu erhalten.

Zusätzlich

Denken Sie daran, Verwendung primitive Datentypen statt Wrapper zu verwenden. Und verwenden Sie Arrays anstelle von Sammlungen.

In Spark-Arrays können viele wertvollen Speicherplatz sparen und die Leistung verbessern.

Verwenden Sie auch BroadCast Variablen anstelle von kartesischen Produkt oder einer großen Kombinationsaufgabe.

+1

gute Einführung über, wann, wie und warum zu senden kann https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-broadcast.html gefunden werden – Boern