Absolut möglich, wenn Sie einen zusammengesetzten Schlüssel haben können, der die Gruppierungs- und Sortiereigenschaften vor einkapselt, um zu reduzieren.
Angenommen, Sie benötigen einen Schlüssel, der int-Hash-Code und langen Zeitstempel enthält. Dann müssen Sie ein beschreibbares Tupel (zB IntLongPair) implementieren, in dem Sie alle Arten von Komparatoren und Partitionern definieren können, die für Ihren Anwendungsfall benötigt werden.
So setzen Sie Ihre Arbeit als so etwas wie dieses (ich später zurückkommen, um mögliche IntLongPair Implementierung):
job.setPartitionerClass(IntLongPair.IntOnlyPartitioner.class); //partition by your hash code stored in the int part of the part
job.setGroupingComparatorClass(IntLongPair.IntAscComparator.class); //your hash code grouping - perhaps does not matter ascending or descending
job.setSortComparatorClass(IntLongPair.IntDescLongAscComparator.class); //assuming you need newest items first
L
Und hier ist die IntLongPair Sie verwenden können:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Partitioner;
public class IntLongPair implements WritableComparable<IntLongPair> {
private IntWritable intVal = new IntWritable();
private LongWritable longVal = new LongWritable();
public void write(DataOutput d) throws IOException {
intVal.write(d);
longVal.write(d);
}
public void readFields(DataInput di) throws IOException {
intVal.readFields(di);
longVal.readFields(di);
}
/**
* Natural order is int first, long next
* @param o
* @return
*/
public int compareTo(IntLongPair o) {
int diff = intVal.compareTo(o.intVal);
if (diff != 0) {
return diff;
}
return longVal.compareTo(o.longVal);
}
public IntWritable getInt() {
return intVal;
}
public void setInt(IntWritable intVal) {
this.intVal = intVal;
}
public void setInt(int intVal) {
this.intVal.set(intVal);
}
public LongWritable getLong() {
return longVal;
}
public void setLong(LongWritable longVal) {
this.longVal = longVal;
}
public void setLong(long longVal) {
this.longVal.set(longVal);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final IntLongPair other = (IntLongPair) obj;
if (this.intVal != other.intVal && (this.intVal == null || !this.intVal.equals(other.intVal))) {
return false;
}
if (this.longVal != other.longVal && (this.longVal == null || !this.longVal.equals(other.longVal))) {
return false;
}
return true;
}
@Override
public int hashCode() {
int hash = 3;
hash = 47 * hash + (this.intVal != null ? this.intVal.hashCode() : 0);
hash = 47 * hash + (this.longVal != null ? this.longVal.hashCode() : 0);
return hash;
}
@Override
public String toString() {
return "IntLongPair{" + intVal + ',' + longVal + '}';
}
public IntWritable getFirst() {
return intVal;
}
public LongWritable getSecond() {
return longVal;
}
public void setFirst(IntWritable value) {
intVal.set(value.get());
}
public void setSecond(LongWritable value) {
longVal.set(value.get());
}
public static class Comparator extends WritableComparator {
public Comparator() {
super(IntLongPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return compareBytes(b1, s1, l1, b2, s2, l2);
}
}
static { // register this comparator
WritableComparator.define(IntLongPair.class, new Comparator());
}
public static class IntDescLongAscComparator implements RawComparator<IntLongPair> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int comp = IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
if (comp != 0) {
return -comp;
}
return LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
}
public int compare(IntLongPair o1, IntLongPair o2) {
int comp = o1.getInt().compareTo(o2.getInt());
if (comp != 0) {
return -comp;
}
return o1.getLong().compareTo(o2.getLong());
}
}
public static class LongAscIntAscComparator implements RawComparator<IntLongPair> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int comp = LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
if (comp != 0) {
return comp;
}
return IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
}
public int compare(IntLongPair o1, IntLongPair o2) {
int comp = o1.getLong().compareTo(o2.getLong());
if (comp != 0) {
return comp;
}
return o1.getInt().compareTo(o2.getInt());
}
}
public static class LongAscIntDescComparator implements RawComparator<IntLongPair> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int comp = LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
if (comp != 0) {
return comp;
}
return -IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
}
public int compare(IntLongPair o1, IntLongPair o2) {
int comp = o1.getLong().compareTo(o2.getLong());
if (comp != 0) {
return comp;
}
return -o1.getInt().compareTo(o2.getInt());
}
}
public static class LongDescIntAscComparator implements RawComparator<IntLongPair> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int comp = LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
if (comp != 0) {
return -comp;
}
return IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
}
public int compare(IntLongPair o1, IntLongPair o2) {
int comp = o1.getLong().compareTo(o2.getLong());
if (comp != 0) {
return -comp;
}
return o1.getInt().compareTo(o2.getInt());
}
}
public static class LongDescIntDescComparator implements RawComparator<IntLongPair> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int comp = LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
if (comp != 0) {
return -comp;
}
return -IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
}
public int compare(IntLongPair o1, IntLongPair o2) {
int comp = o1.getLong().compareTo(o2.getLong());
if (comp != 0) {
return -comp;
}
return -o1.getInt().compareTo(o2.getInt());
}
}
public static class IntAscComparator implements RawComparator<IntLongPair> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
}
public int compare(IntLongPair o1, IntLongPair o2) {
return o1.getInt().compareTo(o2.getInt());
}
}
public static class IntDescComparator implements RawComparator<IntLongPair> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
}
public int compare(IntLongPair o1, IntLongPair o2) {
return -o1.getInt().compareTo(o2.getInt());
}
}
public static class LongAscComparator implements RawComparator<IntLongPair> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
}
public int compare(IntLongPair o1, IntLongPair o2) {
return o1.getLong().compareTo(o2.getLong());
}
}
public static class LongDescComparator implements RawComparator<IntLongPair> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
}
public int compare(IntLongPair o1, IntLongPair o2) {
return -o1.getLong().compareTo(o2.getLong());
}
}
/**
* Partition based on the long part of the pair.
*/
public static class LongOnlyPartitioner extends Partitioner<IntLongPair, Writable> {
@Override
public int getPartition(IntLongPair key, Writable value,
int numPartitions) {
return Math.abs(key.getLong().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
/**
* Partition based on the int part of the pair.
*/
public static class IntOnlyPartitioner extends Partitioner<IntLongPair, Writable> {
@Override
public int getPartition(IntLongPair key, Writable value,
int numPartitions) {
return Math.abs(key.getInt().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
}
Muss Ihr Reducer etwas anderes tun, als mit Hash zu aggregieren, sortieren und anzeigen? Außerdem interessiert Sie nicht die Reihenfolge der Hashes, aber alle Datensätze für den gleichen Hash müssen sortiert werden, richtig? – Jedi
My reduce() muss nur mit Hash aggregieren. Aber ich möchte, dass die Ausgabe aller reduce-Aufrufe nach Zeitmarken sortiert wird. Ich brauche nicht alle Datensätze des gleichen Hash sortiert. Ich brauche die kumulative Ausgabe des Reduzierers sortiert nach Zeitstempel. Dieser Zeitstempel könnte aus einem der Werte im Iterator von reduce() stammen. – medloh