Obwohl Sie eine Sink.foreach
verwenden können, um dies zu erreichen (wie von Ramon erwähnt) ist es sicherer und wahrscheinlich schneller (indem Sie die Einsätze parallel laufen), die mapAsync
Flow
zu verwenden. Das Problem, das Sie mit Sink.foreach
konfrontiert werden, ist, dass es keinen Rückgabewert hat. Einfügen in eine Datenbank über Slicks db.run
Methode gibt eine Future
zurück, die dann aus den Dampf zurückgegeben Future[Done]
zurückkehrt, die abgeschlossen wird, sobald die Sink.foreach
beendet.
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
class Numbers(tag: Tag) extends Table[Int](tag, "NumberTable") {
def value = column[Int]("value")
def * = value
}
val numbers = TableQuery[Numbers]
val db = Database.forConfig("postgres")
Await.result(db.run(numbers.schema.create), Duration.Inf)
val streamFuture: Future[Done] = Source(0 to 100)
.runWith(Sink.foreach[Int] { (i: Int) =>
db.run(numbers += i).foreach(_ => println(s"stream 1 insert $i done"))
})
Await.result(streamFuture, Duration.Inf)
println("stream 1 done")
//// sample 1 output: ////
// stream 1 insert 1 done
// ...
// stream 1 insert 99 done
// stream 1 done <-- stream Future[Done] returned before inserts finished
// stream 1 insert 100 done
Auf der anderen Seite der def mapAsync[T](parallelism: Int)(f: Out ⇒ Future[T])
Flow
können Sie die Einsätze parallel über die Parallelität paramerter laufen und übernimmt eine Funktion aus dem Upstream-out Wert auf eine Zukunft eines bestimmten Typs. Dies entspricht unserer i => db.run(numbers += i)
Funktion. Das Tolle an diesem Flow
ist, dass es dann das Ergebnis dieser Futures
Downstream zuführt.
val streamFuture2: Future[Done] = Source(0 to 100)
.mapAsync(1) { (i: Int) =>
db.run(numbers += i).map { r => println(s"stream 2 insert $i done"); r }
}
.runWith(Sink.ignore)
Await.result(streamFuture2, Duration.Inf)
println("stream 2 done")
//// sample 2 output: ////
// stream 2 insert 1 done
// ...
// stream 2 insert 100 done
// stream 1 done <-- stream Future[Done] returned after inserts finished
Um den Punkt zu beweisen, können Sie sogar ein echtes Ergebnis aus dem Strom zurückkehren eher als ein Future[Done]
(Mit Geschehen darstellt Unit).Dieser Stream fügt außerdem einen höheren Parallelitätswert und eine Stapelverarbeitung für zusätzliche Leistung hinzu. *
val streamFuture3: Future[Int] = Source(0 to 100)
.via(Flow[Int].grouped(10)) // Batch in size 10
.mapAsync(2)((ints: Seq[Int]) => db.run(numbers ++= ints).map(_.getOrElse(0))) // Insert batches in parallel, return insert count
.runWith(Sink.fold(0)(_+_)) // count all inserts and return total
val rowsInserted = Await.result(streamFuture3, Duration.Inf)
println(s"stream 3 done, inserted $rowsInserted rows")
// sample 3 output:
// stream 3 done, inserted 101 rows
- Hinweis: Sie werden wahrscheinlich nicht eine bessere Leistung für eine so kleine Datenmenge sehen, aber wenn ich mit einer 1,7 M zu tun hatte einfügen ich in der Lage war, mit einem die beste Leistung auf meiner Maschine zu bekommen Stapelgröße von 1000 und Parallelitätswert von 8, lokal mit Postgresql. Das war ungefähr doppelt so gut wie nicht parallel laufen. Wie immer im Zusammenhang mit der Leistung können Ihre Ergebnisse variieren und Sie sollten selbst messen.