2016-03-24 5 views
0

Ich habe benutzerdefinierte WindowAssigher implementiert:Wie implementiert man Sicherungspunkte für benutzerdefinierte Auslöser mit Status?

public class SessionWindowAssigner extends WindowAssigner<LogItem, SessionWindow> { 
    @Override 
    public Collection<SessionWindow> assignWindows(LogItem element, long timestamp) { 
     return Collections.singletonList(new SessionWindow(element.getSessionUid())); 
    } 

    @Override 
    public Trigger<LogItem, SessionWindow> getDefaultTrigger(StreamExecutionEnvironment env) { 
     return new SessionTrigger(60_000L); 
    } 

    @Override 
    public TypeSerializer<SessionWindow> getWindowSerializer(ExecutionConfig executionConfig) { 
     return new SessionWindow.Serializer(); 
    } 
} 

, Window:

public class SessionWindow extends Window { 
    private final String sessionUid; 

    public SessionWindow(String sessionUid) { 
     this.sessionUid = sessionUid; 
    } 

    public String getSessionUid() { 
     return sessionUid; 
    } 

    @Override 
    public long maxTimestamp() { 
     return Long.MAX_VALUE; 
    } 

    @Override 
    public boolean equals(Object o) { 
     if (this == o) return true; 
     if (o == null || getClass() != o.getClass()) return false; 

     SessionWindow that = (SessionWindow) o; 

     return sessionUid.equals(that.sessionUid); 
    } 

    @Override 
    public int hashCode() { 
     return sessionUid.hashCode(); 
    } 

    public static class Serializer extends TypeSerializer<SessionWindow> { 
     private static final long serialVersionUID = 1L; 

     @Override 
     public boolean isImmutableType() { 
      return true; 
     } 

     @Override 
     public TypeSerializer<SessionWindow> duplicate() { 
      return this; 
     } 

     @Override 
     public SessionWindow createInstance() { 
      return null; 
     } 

     @Override 
     public SessionWindow copy(SessionWindow from) { 
      return from; 
     } 

     @Override 
     public SessionWindow copy(SessionWindow from, SessionWindow reuse) { 
      return from; 
     } 

     @Override 
     public int getLength() { 
      return 0; 
     } 

     @Override 
     public void serialize(SessionWindow record, DataOutputView target) throws IOException { 
      target.writeUTF(record.sessionUid); 
     } 

     @Override 
     public SessionWindow deserialize(DataInputView source) throws IOException { 
      return new SessionWindow(source.readUTF()); 
     } 

     @Override 
     public SessionWindow deserialize(SessionWindow reuse, DataInputView source) throws IOException { 
      return new SessionWindow(source.readUTF()); 
     } 

     @Override 
     public void copy(DataInputView source, DataOutputView target) throws IOException { 
      target.writeUTF(source.readUTF()); 
     } 

     @Override 
     public boolean equals(Object obj) { 
      return obj instanceof Serializer; 
     } 

     @Override 
     public boolean canEqual(Object obj) { 
      return obj instanceof Serializer; 
     } 

     @Override 
     public int hashCode() { 
      return 0; 
     } 
    } 
} 

und Trigger:

public class SessionTrigger extends Trigger<LogItem, SessionWindow> { 
    private final long sessionTimeout; 

    private final ValueStateDescriptor<Long> previousFinishTimestampDesc = new ValueStateDescriptor<>("SessionTrigger.timestamp", LongSerializer.INSTANCE, null); 

    public SessionTrigger(long sessionTimeout) { 
     this.sessionTimeout = sessionTimeout; 
    } 

    @Override 
    public TriggerResult onElement(LogItem element, long timestamp, SessionWindow window, TriggerContext ctx) throws Exception { 
     ValueState<Long> previousFinishTimestampState = ctx.getPartitionedState(previousFinishTimestampDesc); 

     Long previousFinishTimestamp = previousFinishTimestampState.value(); 
     Long newFinisTimestamp = timestamp + sessionTimeout; 

     if (previousFinishTimestamp != null) { 
      ctx.deleteEventTimeTimer(previousFinishTimestamp); 
     } 

     ctx.registerEventTimeTimer(newFinisTimestamp); 

     previousFinishTimestampState.update(newFinisTimestamp); 

     return TriggerResult.CONTINUE; 
    } 

    @Override 
    public TriggerResult onEventTime(long time, SessionWindow window, TriggerContext ctx) throws Exception { 
     return TriggerResult.FIRE_AND_PURGE; 
    } 

    @Override 
    public TriggerResult onProcessingTime(long time, SessionWindow window, TriggerContext ctx) throws Exception { 
     throw new UnsupportedOperationException("This is not processing time trigger"); 
    } 

    @Override 
    public void clear(SessionWindow window, TriggerContext ctx) throws Exception { 
     ValueState<Long> previousFinishTimestampState = ctx.getPartitionedState(previousFinishTimestampDesc); 

     Long previousFinishTimestamp = previousFinishTimestampState.value(); 

     ctx.deleteEventTimeTimer(previousFinishTimestamp); 

     previousFinishTimestampState.clear(); 
    } 
} 

für durch Timeout dh Ende der Sitzung festzustellen, ob letzte Ereignis N Sekunden war vor wertet dann die Fensterfunktion aus. Wie Sie sehen können, speichere ich den letzten Ereigniszeitstempel in ValueState, weil ich ihn nach einem Fehler wiederherstellen möchte.

Scheint, ich sollte Checkpointed Schnittstelle zum Speichern/Wiederherstellen Savepoint (und Checkpoint) Snapshots in diesem Trigger implementieren, weil ich Trigger-Status während der Neu-Bereitstellung meines Flusses nicht verlieren möchte.

Kann mir also jemand erklären, wie man den Zustand SessionTrigger Trigger (und wahrscheinlich verwandte Fenster) während der Bereitstellung korrekt speichert?

Wie ich verstehe, sollte ich einfach Checkpointed Schnittstelle für SessionTrigger implementieren, weil nur es Zustand hat. Recht? Wie wäre es mit SessionWindow -s und SessionWindowAssigner? Werden sie nach der Bereitstellung automatisch wiederhergestellt oder sollte ich es manuell tun?

Antwort

0

Entnommen SessionWindowing

private static class SessionTrigger extends Trigger<Tuple3<String, Long, Integer>, GlobalWindow> { 

    private static final long serialVersionUID = 1L; 

    private final Long sessionTimeout; 

    private final ValueStateDescriptor<Long> stateDesc = 
      new ValueStateDescriptor<>("last-seen", Long.class, -1L); 


    public SessionTrigger(Long sessionTimeout) { 
     this.sessionTimeout = sessionTimeout; 

    } 

    @Override 
    public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception { 

     ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc); 
     Long lastSeen = lastSeenState.value(); 

     Long timeSinceLastEvent = timestamp - lastSeen; 

     ctx.deleteEventTimeTimer(lastSeen + sessionTimeout); 

     // Update the last seen event time 
     lastSeenState.update(timestamp); 

     ctx.registerEventTimeTimer(timestamp + sessionTimeout); 

     if (lastSeen != -1 && timeSinceLastEvent > sessionTimeout) { 
      System.out.println("FIRING ON ELEMENT: " + element + " ts: " + timestamp + " last " + lastSeen); 
      return TriggerResult.FIRE_AND_PURGE; 
     } else { 
      return TriggerResult.CONTINUE; 
     } 
    } 

    @Override 
    public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception { 
     ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc); 
     Long lastSeen = lastSeenState.value(); 

     if (time - lastSeen >= sessionTimeout) { 
      System.out.println("CTX: " + ctx + " Firing Time " + time + " last seen " + lastSeen); 
      return TriggerResult.FIRE_AND_PURGE; 
     } 
     return TriggerResult.CONTINUE; 
    } 

    @Override 
    public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception { 
     return TriggerResult.CONTINUE; 
    } 

    @Override 
    public void clear(GlobalWindow window, TriggerContext ctx) throws Exception { 
     ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc); 
     if (lastSeenState.value() != -1) { 
      ctx.deleteEventTimeTimer(lastSeenState.value() + sessionTimeout); 
     } 
     lastSeenState.clear(); 
    } 
}