2016-06-20 8 views
0

Erste Schritte beim Schreiben von Hadoop MR-Jobs. Hoffentlich werden wir bald zu Spark wechseln, aber wir stecken jetzt fest.Hadoop Map Reduce - Wie trennt man die Gruppierung von der Sortierung?

Ich möchte Datensätze durch ein Hash ihres Wertes gruppieren. Aber ich würde sie gerne nach etwas völlig anderem ordnen - ein Zeitstempel in ihrem Wert. Ich bin verwirrt darüber, wie ich das am besten mache. Ich sehe zwei Optionen:

1) Haben Sie einen ersten MR-Job, der den Hash für jeden Wert in seinem Mapper berechnet und dann alle Datensätze dieses Hash auf den gleichen Wert reduziert, aber es will (ich habe tatsächlich so viel Arbeit genau wie wir jetzt brauchen). Dann verketten Sie einen zweiten MR-Auftrag, der die Ausgabe des Reduzierers oben um den Zeitstempel im Wert sortiert. Ineffizient?

2) Ich habe einige Blogs/Posts über die Verwendung von zusammengesetzten Schlüsseln gelesen, also könnte ich vielleicht alles in einem Schritt erreichen? Ich würde eine Art zusammengesetzten Schlüssel erstellen, der sowohl den Hash für die Gruppierung als auch den Zeitstempel für die Sortierung im Mapper enthielt. Aber ich bin mir nicht sicher, ob das möglich ist. Kann es immer noch korrekt gruppieren, wenn die Sortierung völlig unabhängig von der Gruppierung ist? Ich bin mir auch nicht sicher, welche Schnittstellen ich implementieren müsste und welche Klassen ich erstellen oder konfigurieren müsste.

Ich spreche nicht von einer sekundären Art. Die Reihenfolge der Objekte im Iterator für jeden Reduce-Aufruf ist mir egal. Ich bin besorgt über die Reihenfolge, in der die Dinge vom Reducer emittiert werden, muss eine globale Sortierung nach Timestamp sein.

Was ist der empfohlene Weg, um so etwas zu tun?

+0

Muss Ihr Reducer etwas anderes tun, als mit Hash zu aggregieren, sortieren und anzeigen? Außerdem interessiert Sie nicht die Reihenfolge der Hashes, aber alle Datensätze für den gleichen Hash müssen sortiert werden, richtig? – Jedi

+0

My reduce() muss nur mit Hash aggregieren. Aber ich möchte, dass die Ausgabe aller reduce-Aufrufe nach Zeitmarken sortiert wird. Ich brauche nicht alle Datensätze des gleichen Hash sortiert. Ich brauche die kumulative Ausgabe des Reduzierers sortiert nach Zeitstempel. Dieser Zeitstempel könnte aus einem der Werte im Iterator von reduce() stammen. – medloh

Antwort

1

Absolut möglich, wenn Sie einen zusammengesetzten Schlüssel haben können, der die Gruppierungs- und Sortiereigenschaften vor einkapselt, um zu reduzieren.

Angenommen, Sie benötigen einen Schlüssel, der int-Hash-Code und langen Zeitstempel enthält. Dann müssen Sie ein beschreibbares Tupel (zB IntLongPair) implementieren, in dem Sie alle Arten von Komparatoren und Partitionern definieren können, die für Ihren Anwendungsfall benötigt werden.

So setzen Sie Ihre Arbeit als so etwas wie dieses (ich später zurückkommen, um mögliche IntLongPair Implementierung):

job.setPartitionerClass(IntLongPair.IntOnlyPartitioner.class); //partition by your hash code stored in the int part of the part 
job.setGroupingComparatorClass(IntLongPair.IntAscComparator.class); //your hash code grouping - perhaps does not matter ascending or descending 
job.setSortComparatorClass(IntLongPair.IntDescLongAscComparator.class); //assuming you need newest items first 

L

Und hier ist die IntLongPair Sie verwenden können:

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.RawComparator; 
import org.apache.hadoop.io.Writable; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.io.WritableComparator; 
import org.apache.hadoop.mapreduce.Partitioner; 

public class IntLongPair implements WritableComparable<IntLongPair> { 

    private IntWritable intVal = new IntWritable(); 
    private LongWritable longVal = new LongWritable(); 

    public void write(DataOutput d) throws IOException { 
     intVal.write(d); 
     longVal.write(d); 
    } 

    public void readFields(DataInput di) throws IOException { 
     intVal.readFields(di); 
     longVal.readFields(di); 
    } 

    /** 
    * Natural order is int first, long next 
    * @param o 
    * @return 
    */ 
    public int compareTo(IntLongPair o) { 
     int diff = intVal.compareTo(o.intVal); 
     if (diff != 0) { 
      return diff; 
     } 
     return longVal.compareTo(o.longVal); 
    } 

    public IntWritable getInt() { 
     return intVal; 
    } 

    public void setInt(IntWritable intVal) { 
     this.intVal = intVal; 
    } 

    public void setInt(int intVal) { 
     this.intVal.set(intVal); 
    } 

    public LongWritable getLong() { 
     return longVal; 
    } 

    public void setLong(LongWritable longVal) { 
     this.longVal = longVal; 
    } 

    public void setLong(long longVal) { 
     this.longVal.set(longVal); 
    } 

    @Override 
    public boolean equals(Object obj) { 
     if (obj == null) { 
      return false; 
     } 
     if (getClass() != obj.getClass()) { 
      return false; 
     } 
     final IntLongPair other = (IntLongPair) obj; 
     if (this.intVal != other.intVal && (this.intVal == null || !this.intVal.equals(other.intVal))) { 
      return false; 
     } 
     if (this.longVal != other.longVal && (this.longVal == null || !this.longVal.equals(other.longVal))) { 
      return false; 
     } 
     return true; 
    } 

    @Override 
    public int hashCode() { 
     int hash = 3; 
     hash = 47 * hash + (this.intVal != null ? this.intVal.hashCode() : 0); 
     hash = 47 * hash + (this.longVal != null ? this.longVal.hashCode() : 0); 
     return hash; 
    } 

    @Override 
    public String toString() { 
     return "IntLongPair{" + intVal + ',' + longVal + '}'; 
    } 

    public IntWritable getFirst() { 
     return intVal; 
    } 

    public LongWritable getSecond() { 
     return longVal; 
    } 

    public void setFirst(IntWritable value) { 
     intVal.set(value.get()); 
    } 

    public void setSecond(LongWritable value) { 
     longVal.set(value.get()); 
    } 


    public static class Comparator extends WritableComparator { 

     public Comparator() { 
      super(IntLongPair.class); 
     } 

     @Override 
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      return compareBytes(b1, s1, l1, b2, s2, l2); 
     } 
    } 

    static {          // register this comparator 
     WritableComparator.define(IntLongPair.class, new Comparator()); 
    } 

    public static class IntDescLongAscComparator implements RawComparator<IntLongPair> { 

     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      int comp = IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4); 
      if (comp != 0) { 
       return -comp; 
      } 
      return LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8); 
     } 

     public int compare(IntLongPair o1, IntLongPair o2) { 
      int comp = o1.getInt().compareTo(o2.getInt()); 
      if (comp != 0) { 
       return -comp; 
      } 
      return o1.getLong().compareTo(o2.getLong()); 
     } 
    } 

    public static class LongAscIntAscComparator implements RawComparator<IntLongPair> { 

     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      int comp = LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8); 
      if (comp != 0) { 
       return comp; 
      } 
      return IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4); 
     } 

     public int compare(IntLongPair o1, IntLongPair o2) { 
      int comp = o1.getLong().compareTo(o2.getLong()); 
      if (comp != 0) { 
       return comp; 
      } 
      return o1.getInt().compareTo(o2.getInt()); 
     } 
    } 

    public static class LongAscIntDescComparator implements RawComparator<IntLongPair> { 

     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      int comp = LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8); 
      if (comp != 0) { 
       return comp; 
      } 
      return -IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4); 
     } 

     public int compare(IntLongPair o1, IntLongPair o2) { 
      int comp = o1.getLong().compareTo(o2.getLong()); 
      if (comp != 0) { 
       return comp; 
      } 
      return -o1.getInt().compareTo(o2.getInt()); 
     } 
    } 

    public static class LongDescIntAscComparator implements RawComparator<IntLongPair> { 

     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      int comp = LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8); 
      if (comp != 0) { 
       return -comp; 
      } 
      return IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4); 
     } 

     public int compare(IntLongPair o1, IntLongPair o2) { 
      int comp = o1.getLong().compareTo(o2.getLong()); 
      if (comp != 0) { 
       return -comp; 
      } 
      return o1.getInt().compareTo(o2.getInt()); 
     } 
    } 

    public static class LongDescIntDescComparator implements RawComparator<IntLongPair> { 

     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      int comp = LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8); 
      if (comp != 0) { 
       return -comp; 
      } 
      return -IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4); 
     } 

     public int compare(IntLongPair o1, IntLongPair o2) { 
      int comp = o1.getLong().compareTo(o2.getLong()); 
      if (comp != 0) { 
       return -comp; 
      } 
      return -o1.getInt().compareTo(o2.getInt()); 
     } 
    } 

    public static class IntAscComparator implements RawComparator<IntLongPair> { 

     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      return IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4); 
     } 

     public int compare(IntLongPair o1, IntLongPair o2) { 
      return o1.getInt().compareTo(o2.getInt()); 
     } 
    } 

    public static class IntDescComparator implements RawComparator<IntLongPair> { 

     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      return -IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4); 
     } 

     public int compare(IntLongPair o1, IntLongPair o2) { 
      return -o1.getInt().compareTo(o2.getInt()); 
     } 
    } 

    public static class LongAscComparator implements RawComparator<IntLongPair> { 

     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      return LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8); 
     } 

     public int compare(IntLongPair o1, IntLongPair o2) { 
      return o1.getLong().compareTo(o2.getLong()); 
     } 
    } 

    public static class LongDescComparator implements RawComparator<IntLongPair> { 

     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      return -LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8); 
     } 

     public int compare(IntLongPair o1, IntLongPair o2) { 
      return -o1.getLong().compareTo(o2.getLong()); 
     } 
    } 

    /** 
    * Partition based on the long part of the pair. 
    */ 
    public static class LongOnlyPartitioner extends Partitioner<IntLongPair, Writable> { 

     @Override 
     public int getPartition(IntLongPair key, Writable value, 
       int numPartitions) { 
      return Math.abs(key.getLong().hashCode() & Integer.MAX_VALUE) % numPartitions; 
     } 
    } 

    /** 
    * Partition based on the int part of the pair. 
    */ 
    public static class IntOnlyPartitioner extends Partitioner<IntLongPair, Writable> { 

     @Override 
     public int getPartition(IntLongPair key, Writable value, 
       int numPartitions) { 
      return Math.abs(key.getInt().hashCode() & Integer.MAX_VALUE) % numPartitions; 
     } 
    } 
} 
+0

Ich brauchte nur die Bestätigung, dass es möglich ist, bevor ich mit Composite Keys aus dem tiefen Ende gehe. Sobald ich es funktioniere, komme ich zurück und markiere es als beantwortet. – medloh

+0

Kein Problem - ich hatte gerade diese Klasse für eines meiner früheren Projekte handlich geschrieben, so dachte, wäre es wert zu teilen, da es einige ziemlich langweilig Code für rohe Binärvergleich benötigt enthält. – yurgis

+0

Nachdem ich die Blogs gelesen hatte, war ich besorgt, dass wenn ich die Sortierung nach Timestamp änderte, es nicht mehr in der Lage wäre, richtig durch Hash zu gruppieren, aber das ist nicht der Fall, oder? Übrigens, vielen Dank für den Code. – medloh