2016-04-23 7 views
0

Ich versuche, Datensätze in eine Datenbank mit DB-Akteur einzufügen. Ich habe Millionen von Aufzeichnungen. Nach dem Betrieb hat die Datenbank jedoch nur zehn Datensätze. Ich weiß, dass die Datenbankverbindung ein Zustand ist, und ich denke, dass es ein Problem mit dieser Bedingung gibt. Hier ist mein Code.Was ist der beste Weg, um Datensätze in Akkas Actor with Slick einzufügen?

class DBActor extends Actor with DBConfig { 
    override def receive: Receive = { 
    case Message(id, title) => 
     db.run(products += Product(id, title)) 
    } 
} 

Die Datenbank ist eine relationale Datenbank, ‚Produkte‘ ist ein TableQuery und DBConfig verfügt über eine Datenbank-Verbindung und Sitzung. Was ist der beste Weg, Datensätze mit diesem Akteur unter Garantie einzufügen?

+0

Denken Sie daran, dass 'db.run' eigentlich eine Zukunft ist, die nützliche Informationen über das Schreiben enthält. Sie sollten das Ergebnis der Zukunft zumindest an diesen Akteur weiterleiten, damit Sie überprüfen können, ob der Schreibvorgang erfolgreich war. –

Antwort

2

Persistent Datensätze mit Batch statt einer nach dem anderen. db.run() Methode sind asynchron, so dass es Zukunft sofort zurückgibt und die Ausführung später auf anderen Thread so Actor ist nichts außer einem Methodenaufruf (db.run). Sie sollten Callback (onFailure) auf Ergebnis der db resistieren. run() -Methode, so dass Sie sehen können, ob ein Fehler auftritt. Siehe Beispiel (es ist kein kompilierter Code):

case object Insert 

case object Flush 

class DBActor extends Actor with DBConfig { 


    implicit val dispatcher = context.dispatcher 

    val bulkLimit:Int = 1000 

    val flushTime:Int = 10 

    var documents = List.empty[Product] 

/*** 
    * Start automatic flushing when actor start 
    */ 
    context.system.scheduler.scheduleOnce(flushTime second, self, Flush) 

def receive:Receive={ 

case document: Document => 
    documents =documents :+ document 
    log.info(s"DBActor received document [total count ${documents.length}]") 
    if (documents.length >= bulkLimit) self ! Insert 

case Insert => 
    if (documents.length > 0) { 
    val batch = documents.take(bulkLimit) 
    db.run(products ++= batch).onFailure { case ex: Exception => log.error("Getting error on persisting data", ex) } 
    documents = documents.drop(bulkLimit) 
    } 

case Flush => 
    if (documents.length > 0) self ! Insert 

    context.system.scheduler.scheduleOnce(flushTime second, self, Flush) 
} 

} 
+0

Nun, "Dokumente" sind jetzt veränderbar. Wird es eine Race Condition geben, wenn ich diese Art von Lösung verwende? Können wir db.run blockieren? –

+0

Nein und niemals. Innerhalb des Schauspielers können Sie veränderbare Variable behalten. Blockiere db.run nicht.Bitte versuche diese Lösung. Wenn irgendein Problem mich wissen lässt. – Sky

+0

Es funktioniert perfekt. –