2016-04-15 2 views
3

Ich fragte mich, ob es möglich war, verschiedene Zustände in einer Anwendung radikal zu erhalten? Zum Beispiel, haben die update function des ersten Zustandes den einen aus dem zweiten Zustand?Handle verschiedene Zustände

Ich erinnere mich nicht durch ein solches Beispiel gehen, noch fand ich eine Gegenanzeige ... Basierend auf dem Beispiel von https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html, weiß ich keinen Grund, warum ich nicht anders trackStateFunc s mit anderen haben könnte State s, und aktualisieren Sie immer noch diese dank ihrer Key, wie unten dargestellt:

def firstTrackStateFunc(batchTime: Time, 
         key: String, 
         value: Option[Int], 
         state: State[Long]): Option[(String, Long)] = { 
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L) 
    val output = (key, sum) 
    state.update(sum) 
    Some(output) 
} 

und

def secondTrackStateFunc(batchTime: Time, 
         key: String, 
         value: Option[Int], 
         state: State[Int]): Option[(String, Long)] = { 
    // disregard problems this example would cause 
    val dif = value.getOrElse(0) - state.getOption.getOrElse(0L) 
    val output = (key, dif) 
    state.update(dif) 
    Some(output) 
} 

ich denke, dass dies möglich ist, aber immer noch unsicher. Ich möchte jemanden bestätigen oder diese Annahme ungültig ...

+0

Sie wollen die Eingabe identisch sein, aber die Fähigkeit haben, einen Zustand in einem anderen zu aktualisieren? Oder möchten Sie den Status von Grund auf basierend auf dem Schlüssel abrufen und diesen verwenden, um den Status in zwei zu aktualisieren? –

+0

Ich möchte nicht unbedingt, dass die Eingabe identisch ist, aber die Zustände sind völlig verschieden (wie ein Feld und ein anderes Feld). In der Lage zu sein, die zweite innerhalb des ersten Zustands zu aktualisieren, wäre großartig, aber nicht der Hauptzweck. In der Tat würde ich gerne wissen, ob ich verschiedene Zustände in der gleichen Anwendung (durch verschiedene Update-Funktionen) radikal aktualisieren kann? Ist es für dich klarer? – wipman

+0

Ich denke schon. Zustände sind isoliert, Sie können nicht zwischen ihnen innerhalb verschiedener 'mapWithState'-Funktionen interagieren. Was Sie * tun * können, ist, diese Zustände zusammen zu ketten und sie als Wert an den nächsten 'mapWithState' zu ​​übergeben, aber ich denke nicht, dass Sie das tun wollen. –

Antwort

2

ich mich gefragt, ob es möglich war, über eine Anwendung radikal andere Staaten aufrecht zu erhalten?

Jeder Aufruf zu mapWithState auf einem DStream[(Key, Value)] kann man State[T] Objekt halten. Diese T muss für jeden Aufruf von mapWithState gleich sein. Um verschiedene Zustände zu verwenden, können Sie entweder mapWithState Anrufe ketten, wo man Option[U] anothers Eingang ist, oder Sie können die DStream teilen und einen unterschiedlichen mapWithState Anruf auf jeden anwenden. Sie können jedoch kein anderes State[T] Objekt in einem anderen Objekt aufrufen, da diese voneinander isoliert sind und der Zustand des anderen nicht mutiert werden kann.

1

@Yuval gab eine gute Antwort auf Chain MapWithState Funktionen. Ich habe jedoch einen anderen Ansatz. Anstelle von zwei mapWithState-Aufrufen können Sie sowohl die Summe als auch das Diff in denselben Status [(Int, Int)] setzen.

In diesem Fall würden Sie nur eine mapWithState Funktionen benötigen, wo Sie beide Dinge aktualisieren können. Etwas wie dieses:

def trackStateFunc(batchTime: Time, 
        key: String, 
        value: Option[Int], 
        state: State[(Long, Int)]): Option[(String, (Long, Int))] = 
{ 
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L) 
    val dif = value.getOrElse(0) - state.getOption.getOrElse(0L) 
    val output = (key, (sum, diff)) 
    state.update((sum, diff)) 
    Some(output) 
}