Ich versuche, Lese-Seite in meiner ES-CQRS-Architektur zu implementieren. Lassen Sie sich sagen, dass ich einen persistenten Schauspieler wie diese:Akka Persistence Query Ereignisstrom und CQRS
object UserWrite {
sealed trait UserEvent
sealed trait State
case object Uninitialized extends State
case class User(username: String, password: String) extends State
case class AddUser(user: User)
case class UserAdded(user: User) extends UserEvent
case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed])
case class UsersStream(fromSeqNo: Long)
case object GetCurrentUser
def props = Props(new UserWrite)
}
class UserWrite extends PersistentActor {
import UserWrite._
private var currentUser: State = Uninitialized
override def persistenceId: String = "user-write"
override def receiveRecover: Receive = {
case UserAdded(user) => currentUser = user
}
override def receiveCommand: Receive = {
case AddUser(user: User) => persist(UserAdded(user)) {
case UserAdded(`user`) => currentUser = user
}
case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo)
case GetCurrentUser => sender() ! currentUser
}
def publishUserEvents(fromSeqNo: Long) = {
val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val userEvents = readJournal
.eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue)
.map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event }
sender() ! UserEvents(userEvents)
}
}
Soweit ich verstehe, jedes Mal, wenn ein Ereignis beibehalten wird, können wir es über Akka Persistence Query
veröffentlichen. Nun, ich bin mir nicht sicher, was wäre eine richtige Möglichkeit, um diese Ereignisse zu abonnieren, damit ich es in meiner Datenbank lesen kann. Eine der Ideen ist, zuerst eine Nachricht von meinem gelesenen Schauspieler an UsersStream
Nachricht an UserWrite
Schauspieler und "Senke" Ereignisse in diesem Schauspieler zu senden.
EDIT
Nach Anregung von @cmbaxter, Implementiert I Seite auf diese Weise lesen:
object UserRead {
case object GetUsers
case class GetUserByUsername(username: String)
case class LastProcessedEventOffset(seqNo: Long)
case object StreamCompleted
def props = Props(new UserRead)
}
class UserRead extends PersistentActor {
import UserRead._
var inMemoryUsers = Set.empty[User]
var offset = 0L
override val persistenceId: String = "user-read"
override def receiveRecover: Receive = {
// Recovery from snapshot will always give us last sequence number
case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo
case RecoveryCompleted => recoveryCompleted()
}
// After recovery is being completed, events will be projected to UserRead actor
def recoveryCompleted(): Unit = {
implicit val materializer = ActorMaterializer()
PersistenceQuery(context.system)
.readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
.eventsByPersistenceId("user-write", offset + 1, Long.MaxValue)
.map {
case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event
}
.runWith(Sink.actorRef(self, StreamCompleted))
}
override def receiveCommand: Receive = {
case GetUsers => sender() ! inMemoryUsers
case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username)
// Match projected event and update offset
case (seqNo: Long, UserAdded(user)) =>
saveSnapshot(LastProcessedEventOffset(seqNo))
inMemoryUsers += user
}
}
Es gibt einige Probleme wie: Ereignisstrom langsam zu sein scheint. I.e. UserRead
Darsteller kann mit einer Gruppe von Benutzern antworten, bevor der neu hinzugefügte Benutzer gespeichert wird.
EDIT 2
I erhöhte Aktualisierungsintervall von cassandra Abfrage Zeitschrift, das weniger mehr Problem mit langsamem Ereignisstrom gelöst. Es scheint, dass Cassandra-Ereignisjournal standardmäßig alle 3 Sekunden aufgerufen wird. In meinem application.conf
ich hinzugefügt:
cassandra-query-journal {
refresh-interval = 20ms
}
EDIT 3
Eigentlich nicht abnehmen Aktualisierungsintervall. Das wird die Speichernutzung erhöhen, aber das ist nicht gefährlich, auch kein Punkt. Im allgemeinen Konzept von CQRS ist, dass die Schreib- und Leseseite asynchron sind. Daher sind Daten nach dem Schreiben niemals sofort zum Lesen verfügbar. Umgang mit UI? Ich öffne einfach den Stream und schiebe Daten über Server gesendet Ereignisse, nachdem die Leseseite sie bestätigt.
Ich würde nur die Lese Journal basierten Code in Ihre Leseseite Projektion Schauspieler bewegen, anstatt sie eine Nachricht mit einem 'Source' auf sie zu senden. Dann streamen Sie diesen Stream in dem projizierten Aktor auf der Leseseite und projizieren diese Information in Elasticsearch. – cmbaxter
@cmbaxter Ich habe das getan. Es scheint eine sehr gute Idee zu sein. Ich habe meine Frage aktualisiert und akzeptiere immer noch Vorschläge, da ich immer noch Zweifel habe. –