2015-08-17 4 views
7

Ich habe gerade mit Apache Storm begonnen. Ich lese das Tutorial und habe einen Blick in examples Mein Problem ist, dass alle Beispiele mit sehr einfachen Tupeln arbeiten (oft mit einem String). Die Tupel werden inline erstellt (mit neuen Werten (...)). In meinem Fall habe ich Tupel mit vielen Feldern (5..100). Meine Frage ist also, wie man ein solches Tupel mit Namen und Typ (alles Primitive) für jedes Feld implementiert?Wie Apache Sturm Tupel zu verwenden

Gibt es Beispiele? (Ich glaube, die Umsetzung direkt „Tuple“ ist keine gute Idee)

dank

Antwort

8

Eine Alternative das Tupel mit allen Feldern zu erstellen, wie ein Wert ist nur eine Bohne erstellen und übergeben, dass im Inneren des Tupels .

die folgende Klasse Gegeben:

public class DataBean implements Serializable { 
    private static final long serialVersionUID = 1L; 

    // add more properties as necessary 
    int id; 
    String word; 

    public DataBean(int id, String word) { 
     setId(id); 
     setWord(word); 
    } 
    public int getId() { 
     return id; 
    } 
    public void setId(int id) { 
     this.id = id; 
    } 
    public String getWord() { 
     return word; 
    } 
    public void setWord(String word) { 
     this.word = word; 
    } 
} 

erstellen und die Databeans in einer Schraube emittieren:

collector.emit(new Values(bean)); 

die Databeans im Zielbolzen Get:

@Override 
public void execute(Tuple tuple, BasicOutputCollector collector) { 
    try { 
     DataBean bean = (DataBean)tuple.getValue(0); 
     // do your bolt processing with the bean 
    } catch (Exception e) { 
     LOG.error("WordCountBolt error", e); 
     collector.reportError(e); 
    }  
} 

nicht tun vergessen Sie nicht, Ihre Bean serialisierbar zu machen und registrieren Sie sich beim Einrichten Ihrer Topologie:

Config stormConfig = new Config(); 
stormConfig.registerSerialization(DataBean.class); 
// more stuff 
StormSubmitter.submitTopology("MyTopologyName", stormConfig, builder.createTopology()); 

Haftungsausschluss: Bohnen werden gut für Shuffle-Gruppierung funktionieren. Wenn Sie eine fieldsGrouping machen müssen, sollten Sie immer noch ein Primitiv verwenden. Zum Beispiel im Word-Count-Szenario, müssen Sie Gruppe von Wort gehen, so dass Sie emittieren könnten:

collector.emit(new Values(word, bean)); 
+0

das scheint wirklich eine gute Alternative, aber ich frage mich, wie oder wann Tupel zu verwenden (es ist eines der wichtigsten Konzepte des Sturms) ?! – dermoritz

+0

Sie verwenden immer noch ein Tupel .. nur ein Objekt anstelle eines Primitivs übergeben. Wenn Sie jedes Mal die gleichen 100 Felder durchlaufen müssen, würde ich eine Bean wie oben gezeigt verwenden. Wenn die Felder, die Sie jedes Mal senden, sehr unterschiedlich sind, dann wäre es vielleicht nicht so nützlich. –

+0

es ist vollkommen klar für mich, aber ich war neugierig, wie man "echte" Tupel (etwas mit mehreren Feldern) verwendet. aber wie es scheint, niemand benutzt es ... – dermoritz

4

würde ich einen benutzerdefinierten Tupel/Werttyp wie folgt implementieren: Statt Elementvariablen für die Verwendung der Daten zu speichern, die jeweils Das Attribut wird einem festen Index in die Objektliste der vererbten Values Typen zugeordnet. Dieser Ansatz vermeidet das "Feldgruppierungsproblem" eines normalen Bean.

  1. es in nicht zusätzliche Attribute für Felder hinzuzufügen erforderlich Gruppierung (was ziemlich unnatürlich)
  2. Datenduplizierung vermieden wird (die Anzahl der versendeten Bytes reduziert)
  3. es bewahrt den Vorteil der Bohnen Muster

Ein Beispiel für Wort Beispiel zählen so etwas wie dieses wäre:

public class WordCountTuple extends Values { 
    private final static long serialVersionUID = -4386109322233754497L; 

    // attribute indexes 
    /** The index of the word attribute. */ 
    public final static int WRD_IDX = 0; 
    /** The index of the count attribute. */ 
    public final static int CNT_IDX = 1; 

    // attribute names 
    /** The name of the word attribute. */ 
    public final static String WRD_ATT = "word"; 
    /** The name of the count attribute. */ 
    public final static String CNT_ATT = "count"; 

    // required for serialization 
    public WordCountTuple() {} 

    public WordCountTuple(String word, int count) { 
     super.add(WRD_IDX, word); 
     super.add(CNT_IDX, count); 
    } 

    public String getWord() { 
     return (String)super.get(WRD_IDX); 
    } 

    public void setWort(String word) { 
     super.set(WRD_IDX, word); 
    } 

    public int getCount() { 
     return (Integer)super.get(CNT_IDX); 
    } 

    public void setCount(int count) { 
     super.set(CNT_IDX, count); 
    } 

    public static Fields getSchema() { 
     return new Fields(WRD_ATT, CNT_ATT); 
    } 
} 

Um Inkonsistenzen zu vermeiden, werden final static Variablen für das Attribut "word" und "count" verwendet.Ferner wird ein Verfahren getSchema() das implementierte Schema kehrt zu verwendet werden, um Ausgangsströme in Spout/Bolt Methode zu deklarieren .declareOutputFields(...)

Für die Ausgabe Tupeln kann diese Art gerade nach vorne verwendet werden:

public MyOutBolt implements IRichBolt { 

    @Override 
    public void execute(Tuple tuple) { 
     // some more processing 
     String word = ... 
     int cnt = ... 
     collector.emit(new WordCountTuple(word, cnt)); 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(WordCountTuple.getSchema()); 
    } 

    // other methods omitted 
} 

Für Eingabetupel, I würde das folgende Muster vorschlagen:

public MyInBolt implements IRichBolt { 
    // use a single instance for avoid GC trashing 
    private final WordCountTuple input = new WordCountTuple(); 

    @Override 
    public void execute(Tuple tuple) { 
     this.input.clear(); 
     this.input.addAll(tuple.getValues()); 

     String word = input.getWord(); 
     int count = input.getCount(); 

     // do further processing 
    } 

    // other methods omitted 
} 

MyOutBolt und MyInBolt können wie folgt angeschlossen werden:

TopologyBuilder b = ... 
b.setBolt("out", new MyOutBolt()); 
b.setBolt("in", new MyInBolt()).fieldsGrouping("out", WordCountTuple.WRD_ATT); 

Die Verwendung der Feldgruppierung ist unkompliziert, da WordCountTuple den Zugriff auf jedes Attribut einzeln ermöglicht.

+0

Danke dafür, im Moment Ich benutze den "akzeptierten" Ansatz, aber anstelle von Feldern verwende ich eine EnumMap. Die Enum gibt die Felder und den Typ der Felder an. Im Moment kämpfe ich um ein abstraktes Tupel, aber Enum kann nicht abstrakt sein, also muss ich entweder eine normale Map benutzen oder ich muss mit einiger Redundanz leben - wenn ich eine Lösung habe, werde ich sie hier posten. – dermoritz

+0

+1 Ich mag das sehr. Aber auf die Frage ... Da es bei der OP-Frage darum ging, viele Felder zu verwalten, wäre die Übergabe aller Felder über den Konstruktor nicht wirklich machbar. Außerdem spielt die Reihenfolge, in der Sie dem Array "Werte" Objekte hinzufügen, eine wichtige Rolle. Vielleicht sollte Ihr Konstruktor, der keine Argumente enthält, alles mit Nullen initialisieren? –

+0

Alle Felder über Konstruktor hinzugefügt ist eine Frage des Geschmacks, denke ich. Eine 'Null'-Initialisierung wäre eine gültige Verbesserung, denke ich. Für Setter ist es jedoch irrelevant, da sie den Indexzugriff verwenden. Und normale 'ArrayList'-Methoden (' Values' erbt davon) müssen natürlich mit Vorsicht verwendet werden. Sonst könntest du alles durcheinander bringen! –