2016-07-07 34 views
7

Ich versuche, Lese-Seite in meiner ES-CQRS-Architektur zu implementieren. Lassen Sie sich sagen, dass ich einen persistenten Schauspieler wie diese:Akka Persistence Query Ereignisstrom und CQRS

object UserWrite { 

    sealed trait UserEvent 
    sealed trait State 
    case object Uninitialized extends State 
    case class User(username: String, password: String) extends State 
    case class AddUser(user: User) 
    case class UserAdded(user: User) extends UserEvent 
    case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed]) 
    case class UsersStream(fromSeqNo: Long) 
    case object GetCurrentUser 

    def props = Props(new UserWrite) 
} 

class UserWrite extends PersistentActor { 

    import UserWrite._ 

    private var currentUser: State = Uninitialized 

    override def persistenceId: String = "user-write" 

    override def receiveRecover: Receive = { 
    case UserAdded(user) => currentUser = user 
    } 

    override def receiveCommand: Receive = { 
    case AddUser(user: User) => persist(UserAdded(user)) { 
     case UserAdded(`user`) => currentUser = user 
    } 
    case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo) 
    case GetCurrentUser => sender() ! currentUser 
    } 

    def publishUserEvents(fromSeqNo: Long) = { 
    val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) 
    val userEvents = readJournal 
     .eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue) 
     .map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event } 
    sender() ! UserEvents(userEvents) 
    } 
} 

Soweit ich verstehe, jedes Mal, wenn ein Ereignis beibehalten wird, können wir es über Akka Persistence Query veröffentlichen. Nun, ich bin mir nicht sicher, was wäre eine richtige Möglichkeit, um diese Ereignisse zu abonnieren, damit ich es in meiner Datenbank lesen kann. Eine der Ideen ist, zuerst eine Nachricht von meinem gelesenen Schauspieler an UsersStream Nachricht an UserWrite Schauspieler und "Senke" Ereignisse in diesem Schauspieler zu senden.

EDIT

Nach Anregung von @cmbaxter, Implementiert I Seite auf diese Weise lesen:

object UserRead { 

    case object GetUsers 
    case class GetUserByUsername(username: String) 
    case class LastProcessedEventOffset(seqNo: Long) 
    case object StreamCompleted 

    def props = Props(new UserRead) 
} 

class UserRead extends PersistentActor { 
    import UserRead._ 

    var inMemoryUsers = Set.empty[User] 
    var offset  = 0L 

    override val persistenceId: String = "user-read" 

    override def receiveRecover: Receive = { 
    // Recovery from snapshot will always give us last sequence number 
    case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo 
    case RecoveryCompleted         => recoveryCompleted() 
    } 

    // After recovery is being completed, events will be projected to UserRead actor 
    def recoveryCompleted(): Unit = { 
    implicit val materializer = ActorMaterializer() 
    PersistenceQuery(context.system) 
     .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) 
     .eventsByPersistenceId("user-write", offset + 1, Long.MaxValue) 
     .map { 
     case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event 
     } 
     .runWith(Sink.actorRef(self, StreamCompleted)) 
    } 

    override def receiveCommand: Receive = { 
    case GetUsers     => sender() ! inMemoryUsers 
    case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username) 
    // Match projected event and update offset 
    case (seqNo: Long, UserAdded(user)) => 
     saveSnapshot(LastProcessedEventOffset(seqNo)) 
     inMemoryUsers += user 
    } 
} 

Es gibt einige Probleme wie: Ereignisstrom langsam zu sein scheint. I.e. UserRead Darsteller kann mit einer Gruppe von Benutzern antworten, bevor der neu hinzugefügte Benutzer gespeichert wird.

EDIT 2

I erhöhte Aktualisierungsintervall von cassandra Abfrage Zeitschrift, das weniger mehr Problem mit langsamem Ereignisstrom gelöst. Es scheint, dass Cassandra-Ereignisjournal standardmäßig alle 3 Sekunden aufgerufen wird. In meinem application.conf ich hinzugefügt:

cassandra-query-journal { 
    refresh-interval = 20ms 
} 

EDIT 3

Eigentlich nicht abnehmen Aktualisierungsintervall. Das wird die Speichernutzung erhöhen, aber das ist nicht gefährlich, auch kein Punkt. Im allgemeinen Konzept von CQRS ist, dass die Schreib- und Leseseite asynchron sind. Daher sind Daten nach dem Schreiben niemals sofort zum Lesen verfügbar. Umgang mit UI? Ich öffne einfach den Stream und schiebe Daten über Server gesendet Ereignisse, nachdem die Leseseite sie bestätigt.

+2

Ich würde nur die Lese Journal basierten Code in Ihre Leseseite Projektion Schauspieler bewegen, anstatt sie eine Nachricht mit einem 'Source' auf sie zu senden. Dann streamen Sie diesen Stream in dem projizierten Aktor auf der Leseseite und projizieren diese Information in Elasticsearch. – cmbaxter

+0

@cmbaxter Ich habe das getan. Es scheint eine sehr gute Idee zu sein. Ich habe meine Frage aktualisiert und akzeptiere immer noch Vorschläge, da ich immer noch Zweifel habe. –

Antwort

4

Es gibt einige Möglichkeiten, dies zu tun. Zum Beispiel habe ich in meiner App einen Akteur in meiner Abfrageseite, der eine PersistenceQuery hat, die ständig nach Änderungen sucht, aber Sie können auch einen Thread mit der gleichen Abfrage haben. Die Sache ist, um den Strom zu halten öffnen zu können, sobald die beibehaltenen Ereignis lesen, wie es

geschieht
val readJournal = 
PersistenceQuery(system).readJournalFor[CassandraReadJournal](
    CassandraReadJournal.Identifier) 

// issue query to journal 
val source: Source[EventEnvelope, NotUsed] = 
    readJournal.eventsByPersistenceId(s"MyActorId", 0, Long.MaxValue) 

// materialize stream, consuming events 
implicit val mat = ActorMaterializer() 
source.map(_.event).runForeach{ 
    case userEvent: UserEvent => { 
    doSomething(userEvent) 
    } 
} 

Statt dessen können Sie einen Timer haben, die einen PersistenceQuery und speichert neue Ereignisse auslöst, aber ich denke, dass ein Strom offen ist der beste Weg,

2

Obwohl die Lösung mit PersistenceQuery nur genehmigt wurde, enthält es die folgenden Probleme:

  1. es Teil ist, gibt es nur Verfahren ist EventEnvelopes präsentiert zu lesen.
  2. Es kann nicht mit State Snapshots arbeiten und als Ergebnis sollte CQRS Reader Part über alle persistent Ereignisse immer beibehalten werden.

Die erste Lösung ist besser, hat aber die folgenden Probleme:

  1. Es ist zu kompliziert. Es verursacht dem Benutzer unnötigen Umgang mit Sequenznummern.
  2. Der Code behandelt auch Status (Abfrage/Update) mit der Actors-Implementierung gekoppelt.

Es gibt existiert einfacheres:

import akka.NotUsed 
import akka.actor.{Actor, ActorLogging} 
import akka.persistence.query.{EventEnvelope, PersistenceQuery} 
import akka.persistence.query.javadsl.{EventsByPersistenceIdQuery, ReadJournal} 
import akka.persistence._ 
import akka.stream.ActorMaterializer 
import akka.stream.javadsl.Source 

/** 
    * Created by alexv on 4/26/2017. 
    */ 
class CQRSTest { 

    // User Command, will be transformed to User Event 
    sealed trait UserCommand 
    // User Event 
    // let's assume some conversion from Command to event here 
    case class PersistedEvent(command: UserCommand) extends Serializable 
    // User State, for simplicity assumed that all State will be snapshotted 
    sealed trait State extends Serializable{ 
    def clear(): Unit 
    def updateState(event: PersistedEvent): Unit 
    def validateCommand(command:UserCommand): Boolean 
    def applyShapshot(newState: State): Unit 
    def getShapshot() : State 
    } 
    case class SaveSnapshot() 

    /** 
    * Common code for Both reader and writer 
    * @param state - State 
    */ 
    abstract class CQRSCore(state: State) extends PersistentActor with ActorLogging { 
    override def persistenceId: String = "CQRSPersistenceId" 

    override def preStart(): Unit = { 
     // Since the state is external and not depends to Actor's failure or restarts it should be cleared. 
     state.clear() 
    } 

    override def receiveRecover: Receive = { 
     case event : PersistedEvent => state.updateState(event) 
     case SnapshotOffer(_, snapshot: State) => state.applyShapshot(snapshot) 
     case RecoveryCompleted => onRecoveryCompleted(super.lastSequenceNr) 
    } 

    abstract def onRecoveryCompleted(lastSequenceNr:Long) 
    } 

    class CQRSWriter(state: State) extends CQRSCore(state){ 
    override def preStart(): Unit = { 
     super.preStart() 
     log.info("CQRSWriter Started") 
    } 

    override def onRecoveryCompleted(lastSequenceNr: Long): Unit = { 
     log.info("Recovery completed") 
    } 

    override def receiveCommand: Receive = { 
     case command: UserCommand => 
     if(state.validateCommand(command)) { 
      // Persist events and call state.updateState with each persisted event 
      persistAll(List(PersistedEvent(command)))(state.updateState) 
     } 
     else { 
      log.error("Validation Failed for Command: {}", command) 
     } 
     case SaveSnapshot => saveSnapshot(state.getShapshot()) 
     case SaveSnapshotSuccess(metadata) => log.debug("Saved snapshot successfully: {}", metadata) 
     case SaveSnapshotFailure(metadata, reason) => log.error("Failed to Save snapshot: {} . Reason: {}", metadata, reason) 
    } 
    } 

    class CQRSReader(state: State) extends CQRSCore(state){ 
    override def preStart(): Unit = { 
     super.preStart() 
     log.info("CQRSReader Started") 
    } 

    override def onRecoveryCompleted(lastSequenceNr: Long): Unit = { 
     log.info("Recovery completed, Starting QueryStream") 

     // ReadJournal type not specified here, so may be used with Cassandra or In-memory Journal (for Tests) 
     val readJournal = PersistenceQuery(context.system).readJournalFor(
     context.system.settings.config.getString("akka.persistence.query.my-read-journal")) 
     .asInstanceOf[ReadJournal 
     with EventsByPersistenceIdQuery] 
     val source: Source[EventEnvelope, NotUsed] = readJournal.eventsByPersistenceId(
     OrgPersistentActor.orgPersistenceId, lastSequenceNr + 1, Long.MaxValue) 
     source.runForeach({ envelope => state.updateState(envelope.event.asInstanceOf[PersistedEvent]) },ActorMaterializer()) 

    } 

    // Nothing received since it is Reader only 
    override def receiveCommand: Receive = Actor.emptyBehavior 
    } 
} 
+0

Angenommen CQRSRead Part sollte über seinen Zustand direkt abgefragt werden. CQRSReader stellt sicher, dass der Status dem von CQRSWriter ähnlich ist. Ich habe den Concrete-Status hier nicht implementiert, aber es kann alles von der einfachen Hash-Map bis zur In-Memory-Graph-DB sein –