Ich habe benutzerdefinierte WindowAssigher
implementiert:Wie implementiert man Sicherungspunkte für benutzerdefinierte Auslöser mit Status?
public class SessionWindowAssigner extends WindowAssigner<LogItem, SessionWindow> {
@Override
public Collection<SessionWindow> assignWindows(LogItem element, long timestamp) {
return Collections.singletonList(new SessionWindow(element.getSessionUid()));
}
@Override
public Trigger<LogItem, SessionWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return new SessionTrigger(60_000L);
}
@Override
public TypeSerializer<SessionWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new SessionWindow.Serializer();
}
}
, Window
:
public class SessionWindow extends Window {
private final String sessionUid;
public SessionWindow(String sessionUid) {
this.sessionUid = sessionUid;
}
public String getSessionUid() {
return sessionUid;
}
@Override
public long maxTimestamp() {
return Long.MAX_VALUE;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SessionWindow that = (SessionWindow) o;
return sessionUid.equals(that.sessionUid);
}
@Override
public int hashCode() {
return sessionUid.hashCode();
}
public static class Serializer extends TypeSerializer<SessionWindow> {
private static final long serialVersionUID = 1L;
@Override
public boolean isImmutableType() {
return true;
}
@Override
public TypeSerializer<SessionWindow> duplicate() {
return this;
}
@Override
public SessionWindow createInstance() {
return null;
}
@Override
public SessionWindow copy(SessionWindow from) {
return from;
}
@Override
public SessionWindow copy(SessionWindow from, SessionWindow reuse) {
return from;
}
@Override
public int getLength() {
return 0;
}
@Override
public void serialize(SessionWindow record, DataOutputView target) throws IOException {
target.writeUTF(record.sessionUid);
}
@Override
public SessionWindow deserialize(DataInputView source) throws IOException {
return new SessionWindow(source.readUTF());
}
@Override
public SessionWindow deserialize(SessionWindow reuse, DataInputView source) throws IOException {
return new SessionWindow(source.readUTF());
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.writeUTF(source.readUTF());
}
@Override
public boolean equals(Object obj) {
return obj instanceof Serializer;
}
@Override
public boolean canEqual(Object obj) {
return obj instanceof Serializer;
}
@Override
public int hashCode() {
return 0;
}
}
}
und Trigger
:
public class SessionTrigger extends Trigger<LogItem, SessionWindow> {
private final long sessionTimeout;
private final ValueStateDescriptor<Long> previousFinishTimestampDesc = new ValueStateDescriptor<>("SessionTrigger.timestamp", LongSerializer.INSTANCE, null);
public SessionTrigger(long sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
@Override
public TriggerResult onElement(LogItem element, long timestamp, SessionWindow window, TriggerContext ctx) throws Exception {
ValueState<Long> previousFinishTimestampState = ctx.getPartitionedState(previousFinishTimestampDesc);
Long previousFinishTimestamp = previousFinishTimestampState.value();
Long newFinisTimestamp = timestamp + sessionTimeout;
if (previousFinishTimestamp != null) {
ctx.deleteEventTimeTimer(previousFinishTimestamp);
}
ctx.registerEventTimeTimer(newFinisTimestamp);
previousFinishTimestampState.update(newFinisTimestamp);
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, SessionWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long time, SessionWindow window, TriggerContext ctx) throws Exception {
throw new UnsupportedOperationException("This is not processing time trigger");
}
@Override
public void clear(SessionWindow window, TriggerContext ctx) throws Exception {
ValueState<Long> previousFinishTimestampState = ctx.getPartitionedState(previousFinishTimestampDesc);
Long previousFinishTimestamp = previousFinishTimestampState.value();
ctx.deleteEventTimeTimer(previousFinishTimestamp);
previousFinishTimestampState.clear();
}
}
für durch Timeout dh Ende der Sitzung festzustellen, ob letzte Ereignis N Sekunden war vor wertet dann die Fensterfunktion aus. Wie Sie sehen können, speichere ich den letzten Ereigniszeitstempel in ValueState, weil ich ihn nach einem Fehler wiederherstellen möchte.
Scheint, ich sollte Checkpointed
Schnittstelle zum Speichern/Wiederherstellen Savepoint (und Checkpoint) Snapshots in diesem Trigger implementieren, weil ich Trigger-Status während der Neu-Bereitstellung meines Flusses nicht verlieren möchte.
Kann mir also jemand erklären, wie man den Zustand SessionTrigger
Trigger (und wahrscheinlich verwandte Fenster) während der Bereitstellung korrekt speichert?
Wie ich verstehe, sollte ich einfach Checkpointed
Schnittstelle für SessionTrigger
implementieren, weil nur es Zustand hat. Recht? Wie wäre es mit SessionWindow
-s und SessionWindowAssigner
? Werden sie nach der Bereitstellung automatisch wiederhergestellt oder sollte ich es manuell tun?