2016-08-03 46 views
0

Ich habe den folgenden Anwendungsfall:Machen Akka Nachrichten einzigartige/von einigem Attribute versioniert

Ein Schauspieler Daten aus einer externen Quelle verbraucht, die aus key:value Paaren bestehen. Dann werden sie an einen Schauspieler übergeben, der einige Aktionen an key und die letzte value durchführen muss, wobei die älteren übersprungen werden. Grundsätzlich versuche ich die Mailbox in "Unterthemen" mit einer Kapazität von 1 zu teilen. Ist so etwas überhaupt mit Akka möglich? Wenn nicht, welche Alternativen sollte ich berücksichtigen?

+0

Warum werden Sie nicht vorfiltern und nur die neuesten Werte übergeben? Vielleicht würde ein Beispiel helfen, Ihren Anwendungsfall zu verstehen. Ich sehe den Grund für die Unterthemen nicht? – sascha10000

+0

Die Anwendung ist in HFT-Domäne und die Eingabe ist ein Strom von Markt-Ticker, Die Idee ist, dass, nachdem Sie den frischesten Ticker verarbeitet haben, hat es keinen Sinn, veraltete Marktdaten zu verarbeiten. Die "Themen" oder "Schlüssel" wären Tickersymbole und die Werte wären ihre Preise. – sdfdfjndfsd

+0

Okay, ich würde den Stream vielleicht über akka-Streams vorverarbeiten und dann nur die "tatsächlichen" Tickinfos an einen BalancingPool senden, wenn die Handhabung für die Schlüssel: Wert-Paare gleich ist. Andernfalls, wenn Sie mehr als 1 Actor implementiert haben, können Sie zuerst den Stream filtern und dann entscheiden, welchen Actor er behandeln soll. Grundsätzlich würde ich sogar ein Actor-System aufbauen, das RootActor/s die Filterung übernimmt und es dann an das entsprechende Kind für weitere Operationen sendet. Das sollte "eins" und das sehr gut machen. – sascha10000

Antwort

0

Ich denke, was Sie brauchen, ist eine Priorität Postfach, wo die neueren Nachrichten höhere Priorität haben. Sehen Sie sich die Standardimplementierung PriorityMailbox an.

Es könnte wie folgt aussehen (basierend auf dem Beispiel aus der Dokumentation):

import akka.dispatch.PriorityGenerator 
import akka.dispatch.UnboundedStablePriorityMailbox 
import com.typesafe.config.Config 

type Version = // Long, Date, Timestamp, smth else - must be ordered 
case class MyMessage(key: String, value: String, version: Version) 

class MyPrioMailbox(settings: ActorSystem.Settings, config: Config) 
    extends UnboundedStablePriorityMailbox(
    // Create a new PriorityGenerator, lower prio means more important 
    PriorityGenerator { 
     case MyMessage(_, _, version) => versionToInt(version) 

     // PoisonPill when no other left 
     case PoisonPill => 1 // or Int.MaxValue - 2 

     // We default to 1, which is in between high and low 
     case otherwise  => 0 // or Int.MaxValue - 1 
    }) 

Hier versionToInt sollten einige Int Wert zurück, der niedriger ist nach neueren Versionen von Nachrichten (höhere Priorität), sagen wir in 2 - Inf Angebot.

Anschließend können Sie die höchste Version verfolgen, die in Ihrem Akteur verarbeitet wurde, und alle anderen Nachrichten löschen, die älter als diese Version sind. Verwenden Sie entweder become oder nur einen var in Ihrem Schauspieler dafür.

Es ist erwähnenswert, dass die Reihenfolge der gesendeten Nachrichten von Akka garantiert wird, aber letztendlich hängt es vom Sender ab, in welcher Reihenfolge sie gesendet werden und von den Verzögerungen dazwischen, was sich darauf auswirken würde, welche Nachrichten verarbeitet werden.

+0

Ich dachte an diesen Ansatz, aber wäre es nicht langsam? Ich würde eine gemeinsame Variable für alle Actor-Instanzen benötigen, die mich zwingen würde, Synchronisations-/Atomwerte zu verwenden. Ich bin Akka ziemlich neu und verstehe nicht alles perfekt. Außerdem brauche ich die Nachrichten nur zwischen einigen "eindeutigen" (mehr Erklärungen in einem Kommentar zu der Frage), nicht über den gesamten Stream. Ich könnte versuchen, diesen Ansatz zu ändern, indem ich eine gemeinsame Karte von Versionen habe, aber es würde a) alles verlangsamen b) einen Mechanismus erfordern, um alte Schlüssel zu entfernen. – sdfdfjndfsd

+0

Es sollte keine gemeinsam genutzten Variablen geben. Sie müssen einen Akteur pro Schlüssel erstellen. Ignorieren von Nachrichten wird billig sein. Der änderbare Status, der die zuletzt gesehene Version einer Nachricht darstellt, ist in einem Akteur sicher eingeschlossen. Sollte für Millionen von Akteuren pro JVM sehr gut funktionieren –

+0

Aber wie stelle ich sicher, dass zukünftige Nachrichten an denselben Zielakteur weitergeleitet werden? Indem Sie einen benutzerdefinierten Router schreiben? – sdfdfjndfsd