würde ich einen benutzerdefinierten Tupel/Werttyp wie folgt implementieren: Statt Elementvariablen für die Verwendung der Daten zu speichern, die jeweils Das Attribut wird einem festen Index in die Objektliste der vererbten Values
Typen zugeordnet. Dieser Ansatz vermeidet das "Feldgruppierungsproblem" eines normalen Bean.
- es in nicht zusätzliche Attribute für Felder hinzuzufügen erforderlich Gruppierung (was ziemlich unnatürlich)
- Datenduplizierung vermieden wird (die Anzahl der versendeten Bytes reduziert)
- es bewahrt den Vorteil der Bohnen Muster
Ein Beispiel für Wort Beispiel zählen so etwas wie dieses wäre:
public class WordCountTuple extends Values {
private final static long serialVersionUID = -4386109322233754497L;
// attribute indexes
/** The index of the word attribute. */
public final static int WRD_IDX = 0;
/** The index of the count attribute. */
public final static int CNT_IDX = 1;
// attribute names
/** The name of the word attribute. */
public final static String WRD_ATT = "word";
/** The name of the count attribute. */
public final static String CNT_ATT = "count";
// required for serialization
public WordCountTuple() {}
public WordCountTuple(String word, int count) {
super.add(WRD_IDX, word);
super.add(CNT_IDX, count);
}
public String getWord() {
return (String)super.get(WRD_IDX);
}
public void setWort(String word) {
super.set(WRD_IDX, word);
}
public int getCount() {
return (Integer)super.get(CNT_IDX);
}
public void setCount(int count) {
super.set(CNT_IDX, count);
}
public static Fields getSchema() {
return new Fields(WRD_ATT, CNT_ATT);
}
}
Um Inkonsistenzen zu vermeiden, werden final static
Variablen für das Attribut "word" und "count" verwendet.Ferner wird ein Verfahren getSchema()
das implementierte Schema kehrt zu verwendet werden, um Ausgangsströme in Spout/Bolt Methode zu deklarieren .declareOutputFields(...)
Für die Ausgabe Tupeln kann diese Art gerade nach vorne verwendet werden:
public MyOutBolt implements IRichBolt {
@Override
public void execute(Tuple tuple) {
// some more processing
String word = ...
int cnt = ...
collector.emit(new WordCountTuple(word, cnt));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(WordCountTuple.getSchema());
}
// other methods omitted
}
Für Eingabetupel, I würde das folgende Muster vorschlagen:
public MyInBolt implements IRichBolt {
// use a single instance for avoid GC trashing
private final WordCountTuple input = new WordCountTuple();
@Override
public void execute(Tuple tuple) {
this.input.clear();
this.input.addAll(tuple.getValues());
String word = input.getWord();
int count = input.getCount();
// do further processing
}
// other methods omitted
}
MyOutBolt
und MyInBolt
können wie folgt angeschlossen werden:
TopologyBuilder b = ...
b.setBolt("out", new MyOutBolt());
b.setBolt("in", new MyInBolt()).fieldsGrouping("out", WordCountTuple.WRD_ATT);
Die Verwendung der Feldgruppierung ist unkompliziert, da WordCountTuple
den Zugriff auf jedes Attribut einzeln ermöglicht.
das scheint wirklich eine gute Alternative, aber ich frage mich, wie oder wann Tupel zu verwenden (es ist eines der wichtigsten Konzepte des Sturms) ?! – dermoritz
Sie verwenden immer noch ein Tupel .. nur ein Objekt anstelle eines Primitivs übergeben. Wenn Sie jedes Mal die gleichen 100 Felder durchlaufen müssen, würde ich eine Bean wie oben gezeigt verwenden. Wenn die Felder, die Sie jedes Mal senden, sehr unterschiedlich sind, dann wäre es vielleicht nicht so nützlich. –
es ist vollkommen klar für mich, aber ich war neugierig, wie man "echte" Tupel (etwas mit mehreren Feldern) verwendet. aber wie es scheint, niemand benutzt es ... – dermoritz