2016-04-04 14 views
7

In Slick's documentation Beispiele für die Verwendung von Reactive Streams werden nur zum Lesen von Daten als Mittel eines DatabasePublisher vorgestellt. Aber was passiert, wenn Sie Ihre Datenbank basierend auf Ihrer Einfügerate als Sink- und Rückprojektion verwenden möchten?Wie werden reaktive Datenströme in Slick zum Einfügen von Daten verwendet?

Ich habe nach entsprechenden DatabaseSubscriber gesucht, aber es existiert nicht. Die Frage ist also, ob ich eine Quelle haben, sagen:

val source = Source(0 to 100)

wie kann ich Kreta ein Waschbecken mit Slick, die diese Werte in eine Tabelle mit Schema schreibt:

create table NumberTable (value INT)

Antwort

7

Serieneinlagen

Der einfachste Weg wäre innerhalb einer Sink.foreach zu tun.

Angenommen Sie haben die schema code generation benutzt haben und Ihre Tabelle weiter Annahme, daß dem Namen „NumberTable“

//Tables file was auto-generated by the schema code generation 
import Tables.{Numbertable, NumbertableRow} 

val numberTableDB = Database forConfig "NumberTableConfig" 

Wir können eine Funktion schreiben, die das Einfügen tut

def insertIntoDb(num : Int) = 
    numberTableDB run (Numbertable += NumbertableRow(num)) 

Und diese Funktion platziert werden können im Waschbecken

val insertSink = Sink[Int] foreach insertIntoDb 

Source(0 to 100) runWith insertSink 

Ba tched Einsätze

Sie könnten weiter die Sink verlängern Methodik durch batching N Einsätze zu einem Zeitpunkt:

def batchInsertIntoDb(nums : Seq[Int]) = 
    numberTableDB run (Numbertable ++= nums.map(NumbertableRow.apply)) 

val batchInsertSink = Sink[Seq[Int]] foreach batchInsertIntoDb 

Diese batched kann Sink von einem Flow gespeist werden, der die Batch-Gruppierung hat:

val batchSize = 10 

Source(0 to 100).via(Flow[Int].grouped(batchSize)) 
       .runWith(batchInsertSink) 
2

Obwohl Sie eine Sink.foreach verwenden können, um dies zu erreichen (wie von Ramon erwähnt) ist es sicherer und wahrscheinlich schneller (indem Sie die Einsätze parallel laufen), die mapAsyncFlow zu verwenden. Das Problem, das Sie mit Sink.foreach konfrontiert werden, ist, dass es keinen Rückgabewert hat. Einfügen in eine Datenbank über Slicks db.run Methode gibt eine Future zurück, die dann aus den Dampf zurückgegeben Future[Done] zurückkehrt, die abgeschlossen wird, sobald die Sink.foreach beendet.

implicit val system = ActorSystem("system") 
implicit val materializer = ActorMaterializer() 

class Numbers(tag: Tag) extends Table[Int](tag, "NumberTable") { 
    def value = column[Int]("value") 
    def * = value 
} 

val numbers = TableQuery[Numbers] 

val db = Database.forConfig("postgres") 
Await.result(db.run(numbers.schema.create), Duration.Inf) 

val streamFuture: Future[Done] = Source(0 to 100) 
    .runWith(Sink.foreach[Int] { (i: Int) => 
    db.run(numbers += i).foreach(_ => println(s"stream 1 insert $i done")) 
    }) 
Await.result(streamFuture, Duration.Inf) 
println("stream 1 done") 

//// sample 1 output: //// 
// stream 1 insert 1 done 
// ... 
// stream 1 insert 99 done 
// stream 1 done <-- stream Future[Done] returned before inserts finished 
// stream 1 insert 100 done 

Auf der anderen Seite der def mapAsync[T](parallelism: Int)(f: Out ⇒ Future[T])Flow können Sie die Einsätze parallel über die Parallelität paramerter laufen und übernimmt eine Funktion aus dem Upstream-out Wert auf eine Zukunft eines bestimmten Typs. Dies entspricht unserer i => db.run(numbers += i) Funktion. Das Tolle an diesem Flow ist, dass es dann das Ergebnis dieser Futures Downstream zuführt.

val streamFuture2: Future[Done] = Source(0 to 100) 
    .mapAsync(1) { (i: Int) => 
    db.run(numbers += i).map { r => println(s"stream 2 insert $i done"); r } 
    } 
    .runWith(Sink.ignore) 
Await.result(streamFuture2, Duration.Inf) 
println("stream 2 done") 

//// sample 2 output: //// 
// stream 2 insert 1 done 
// ... 
// stream 2 insert 100 done 
// stream 1 done <-- stream Future[Done] returned after inserts finished 

Um den Punkt zu beweisen, können Sie sogar ein echtes Ergebnis aus dem Strom zurückkehren eher als ein Future[Done] (Mit Geschehen darstellt Unit).Dieser Stream fügt außerdem einen höheren Parallelitätswert und eine Stapelverarbeitung für zusätzliche Leistung hinzu. *

val streamFuture3: Future[Int] = Source(0 to 100) 
    .via(Flow[Int].grouped(10)) // Batch in size 10 
    .mapAsync(2)((ints: Seq[Int]) => db.run(numbers ++= ints).map(_.getOrElse(0))) // Insert batches in parallel, return insert count 
    .runWith(Sink.fold(0)(_+_)) // count all inserts and return total 
val rowsInserted = Await.result(streamFuture3, Duration.Inf) 
println(s"stream 3 done, inserted $rowsInserted rows") 

// sample 3 output: 
// stream 3 done, inserted 101 rows 
  • Hinweis: Sie werden wahrscheinlich nicht eine bessere Leistung für eine so kleine Datenmenge sehen, aber wenn ich mit einer 1,7 M zu tun hatte einfügen ich in der Lage war, mit einem die beste Leistung auf meiner Maschine zu bekommen Stapelgröße von 1000 und Parallelitätswert von 8, lokal mit Postgresql. Das war ungefähr doppelt so gut wie nicht parallel laufen. Wie immer im Zusammenhang mit der Leistung können Ihre Ergebnisse variieren und Sie sollten selbst messen.