Ich denke, was Sie brauchen, ist eine Priorität Postfach, wo die neueren Nachrichten höhere Priorität haben. Sehen Sie sich die Standardimplementierung PriorityMailbox an.
Es könnte wie folgt aussehen (basierend auf dem Beispiel aus der Dokumentation):
import akka.dispatch.PriorityGenerator
import akka.dispatch.UnboundedStablePriorityMailbox
import com.typesafe.config.Config
type Version = // Long, Date, Timestamp, smth else - must be ordered
case class MyMessage(key: String, value: String, version: Version)
class MyPrioMailbox(settings: ActorSystem.Settings, config: Config)
extends UnboundedStablePriorityMailbox(
// Create a new PriorityGenerator, lower prio means more important
PriorityGenerator {
case MyMessage(_, _, version) => versionToInt(version)
// PoisonPill when no other left
case PoisonPill => 1 // or Int.MaxValue - 2
// We default to 1, which is in between high and low
case otherwise => 0 // or Int.MaxValue - 1
})
Hier versionToInt
sollten einige Int
Wert zurück, der niedriger ist nach neueren Versionen von Nachrichten (höhere Priorität), sagen wir in 2 - Inf
Angebot.
Anschließend können Sie die höchste Version verfolgen, die in Ihrem Akteur verarbeitet wurde, und alle anderen Nachrichten löschen, die älter als diese Version sind. Verwenden Sie entweder become
oder nur einen var
in Ihrem Schauspieler dafür.
Es ist erwähnenswert, dass die Reihenfolge der gesendeten Nachrichten von Akka garantiert wird, aber letztendlich hängt es vom Sender ab, in welcher Reihenfolge sie gesendet werden und von den Verzögerungen dazwischen, was sich darauf auswirken würde, welche Nachrichten verarbeitet werden.
Warum werden Sie nicht vorfiltern und nur die neuesten Werte übergeben? Vielleicht würde ein Beispiel helfen, Ihren Anwendungsfall zu verstehen. Ich sehe den Grund für die Unterthemen nicht? – sascha10000
Die Anwendung ist in HFT-Domäne und die Eingabe ist ein Strom von Markt-Ticker, Die Idee ist, dass, nachdem Sie den frischesten Ticker verarbeitet haben, hat es keinen Sinn, veraltete Marktdaten zu verarbeiten. Die "Themen" oder "Schlüssel" wären Tickersymbole und die Werte wären ihre Preise. – sdfdfjndfsd
Okay, ich würde den Stream vielleicht über akka-Streams vorverarbeiten und dann nur die "tatsächlichen" Tickinfos an einen BalancingPool senden, wenn die Handhabung für die Schlüssel: Wert-Paare gleich ist. Andernfalls, wenn Sie mehr als 1 Actor implementiert haben, können Sie zuerst den Stream filtern und dann entscheiden, welchen Actor er behandeln soll. Grundsätzlich würde ich sogar ein Actor-System aufbauen, das RootActor/s die Filterung übernimmt und es dann an das entsprechende Kind für weitere Operationen sendet. Das sollte "eins" und das sehr gut machen. – sascha10000