2013-06-18 10 views
5

Wir haben ein Schwein verbinden zwischen einer kleinen (16M Zeilen) eindeutige Tabelle und eine große (6B Zeilen) verdrehte Tabelle. Ein regulärer Join endet in 2 Stunden (nach einigen Feinabstimmungen). Wir haben versucht using skewed und in der Lage, die Leistung auf 20 Minuten zu verbessern.Schwein schief Join mit einer großen Tabelle verursacht "Split Metadatengröße überschritten 10000000"

Wenn wir jedoch eine größere schiefe Tabelle (19B Zeilen) versuchen, bekommen wir diese Nachricht aus dem Job SAMPLER:

Split metadata size exceeded 10000000. Aborting job job_201305151351_21573 [ScriptRunner] 
at org.apache.hadoop.mapreduce.split.SplitMetaInfoReader.readSplitMetaInfo(SplitMetaInfoReader.java:48) 
at org.apache.hadoop.mapred.JobInProgress.createSplits(JobInProgress.java:817) [ScriptRunner] 

Dies ist reproduzierbar jedes Mal, wenn wir using skewed versuchen, und nicht geschieht, wenn wir verwenden der regelmäßige beitreten.

wir versuchten Einstellung mapreduce.jobtracker.split.metainfo.maxsize=-1 und wir können sehen, es ist da in der Datei job.xml, aber es ändert nichts!

Was passiert hier? Ist das ein Fehler mit dem von using skewed erstellten Verteilungsbeispiel? Warum hilft es nicht, den Parameter auf -1 zu ändern?

+0

entschieden, einen Jira-Bug zu speichern: https://issues.apache.org/jira/browse/PIG-3411, wird aktualisiert – ihadanny

+0

wir haben festgestellt, dass maptruce.jobtracker.split.mtainfo ändert. maxsize ist bekannt, nicht in der Job-Ebene zu arbeiten, nur in der JobTracker-Ebene, siehe hier: https://groups.google.com/a/cloudera.org/forum/#!topic/cdh-user/UWBMKplvGkg – ihadanny

+0

hast du jemals eine Lösung für dieses Problem finden? Wir stehen vor einem ähnlichen Problem. – KennethJ

Antwort

1

Kleine Tabelle von 1 MB ist klein genug, um in den Speicher zu passen, versuchen Sie replizierte Join. Der replizierte Join ist nur Map, verursacht keine Reduce-Phase als andere Join-Typen und ist somit immun gegen die Verzerrung in den Join-Schlüsseln. Es sollte schnell gehen.

big = LOAD 'big_data' AS (b1,b2,b3); 
tiny = LOAD 'tiny_data' AS (t1,t2,t3); 
mini = LOAD 'mini_data' AS (m1,m2,m3); 
C = JOIN big BY b1, tiny BY t1, mini BY m1 USING 'replicated'; 

Große Tabelle ist immer die erste in der Aussage.

AKTUALISIEREN 1: Wenn Tischchen in seiner ursprünglichen Form passt nicht in den Speicher, als als eine Arbeit um Sie müssten Ihre kleinen Tisch in Partitionen aufzuteilen, die in den Speicher klein genug sind, um fit und als gelten die gleiche Aufteilung in die große Tabelle, hoffentlich könnten Sie dem System den gleichen Partitionierungsalgorithmus hinzufügen, der große Tabelle erstellt, so dass Sie keine Zeit damit verschwenden, sie neu zu partitionieren. Nach der Partitionierung können Sie den replizierten Join verwenden, es wird jedoch ein separates Skript für jede Partition benötigt.

+0

nette Idee, aber die kleine Tabelle ist nicht 1MB (bearbeitete Frage) und wird nicht in den Hadoop-Cache passen (versuchte es) – ihadanny

+0

Aktualisiert die Antwort. Siehe Update 1. – alexeipab

+0

Nochmals vielen Dank, aber ich suche nach einer Erklärung für das ursprüngliche Problem. Dies ist ein cooler Workaround, aber ich werde es nicht tun, bis ich verstehe, was mit dem herkömmlichen Join falsch ist. – ihadanny

0

In neueren Versionen von Hadoop (> = 2.4.0, aber vielleicht sogar noch früher) sollten Sie mithilfe der folgenden Konfigurationseigenschaften die maximale Spaltgröße auf Jobebene festlegen, um:

mapreduce.job.split .metainfo.maxsize = -1