2016-01-25 26 views
24

Was ist der richtige Weg, um ein Bulk insertOrUpdate in Slick 3.0 zu tun?Slick 3.0 bulk insert oder update (upsert)

Ich verwende MySQL, wo die entsprechende Abfrage

INSERT INTO table (a,b,c) VALUES (1,2,3),(4,5,6) 
ON DUPLICATE KEY UPDATE c=VALUES(a)+VALUES(b); 

MySQL bulk INSERT or UPDATE

hier sein würde, ist meine aktuellen Code, der sehr langsam :-(ist

// FIXME -- this is slow but will stop repeats, an insertOrUpdate 
// functions for a list would be much better 
val rowsInserted = rows.map { 
    row => await(run(TableQuery[FooTable].insertOrUpdate(row))) 
}.sum 

Was ich suche entspricht

def insertOrUpdate(values: Iterable[U]): DriverAction[MultiInsertResult, NoStream, Effect.Write] 

Antwort

28

Es gibt mehrere Möglichkeiten, wie Sie diesen Code schneller machen können (jeweils sollte schneller sein als die vorhergehenden, aber es wird immer weniger idiomatische-Slick):

  • Führen sie Ihre DBIO Ereignisse auf einmal, anstatt für jeden warten begehen, bevor Sie den nächsten Lauf:

    val toBeInserted = rows.map { row => TableQuery[FooTable].insertOrUpdate(row) } 
    val inOneGo = DBIO.sequence(toBeInserted) 
    val dbioFuture = run(inOneGo) 
    // Optionally, you can add a `.transactionally` 
    // and/or `.withPinnedSession` here to pin all of these upserts 
    // to the same transaction/connection 
    // which *may* get you a little more speed: 
    // val dbioFuture = run(inOneGo.transactionally) 
    val rowsInserted = await(dbioFuture).sum 
    
  • Tropfen bis auf den JDBC-Ebene und führen Sie Ihre Upsert alle in einem Rutsch (idea via this answer):

    val SQL = """INSERT INTO table (a,b,c) VALUES (?, ?, ?) 
    ON DUPLICATE KEY UPDATE c=VALUES(a)+VALUES(b);""" 
    
    SimpleDBIO[List[Int]] { session => 
        val statement = session.connection.prepareStatement(SQL) 
        rows.map { row => 
        statement.setInt(1, row.a) 
        statement.setInt(2, row.b) 
        statement.setInt(3, row.c) 
        statement.addBatch() 
        } 
        statement.executeBatch() 
    } 
    
+0

Cool ist. Danke besonders für die zweite Technik. Ich wusste nicht, – user1902291

+0

Nur zu doublecheck: die erste Lösung ist nicht im Batch einfügen, ist es? Es sieht so aus, als würde es alle Einsätze parallel machen und nicht gestapelt werden, oder? – ignasi35

+0

Korrekt @ Ignasi35 –

0

Wie Sie unter Slick examples sehen können, können Sie die Funktion ++= verwenden, um die Funktion JDBC Batch Insert einzufügen. Per Beispiel:

val foos = TableQuery[FooTable] 
val rows: Seq[Foo] = ... 
foos ++= rows // here slick will use batch insert 

Sie können auch „Größe“ Sie Charge von „Gruppierung“ der Reihen-Sequenz:

val batchSize = 1000 
rows.grouped(batchSize).foreach { group => foos ++= group } 
+9

Dank SQLU nicht, aber ich glaube nicht ++ = tut insertOrUpdate. Ich glaube, es ist nur einfügen, und in meinem Fall wird eine Integritätsausnahme zu werfen, wenn es eine doppelte Zeile – user1902291

0

Verwendung SQLU

dieses Demoarbeit

case ("insertOnDuplicateKey",answers:List[Answer])=>{ 
    def buildInsert(r: Answer): DBIO[Int] = 
    sqlu"insert into answer (aid,bid,sbid,qid,ups,author,uid,nick,pub_time,content,good,hot,id,reply,pic,spider_time) values (${r.aid},${r.bid},${r.sbid},${r.qid},${r.ups},${r.author},${r.uid},${r.nick},${r.pub_time},${r.content},${r.good},${r.hot},${r.id},${r.reply},${r.pic},${r.spider_time}) ON DUPLICATE KEY UPDATE `aid`=values(aid),`bid`=values(bid),`sbid`=values(sbid),`qid`=values(qid),`ups`=values(ups),`author`=values(author),`uid`=values(uid),`nick`=values(nick),`pub_time`=values(pub_time),`content`=values(content),`good`=values(good),`hot`=values(hot),`id`=values(id),`reply`=values(reply),`pic`=values(pic),`spider_time`=values(spider_time)" 
    val inserts: Seq[DBIO[Int]] = answers.map(buildInsert) 
    val combined: DBIO[Seq[Int]] = DBIO.sequence(inserts) 
    DEST_DB.run(combined).onComplete(data=>{ 
    println("insertOnDuplicateKey data result",data.get.mkString) 
    if (data.isSuccess){ 
     println(data.get) 
     val lastid=answers.last.id 
     Sync.lastActor !("upsert",tablename,lastid) 
    }else{ 
     //retry 
     self !("insertOnDuplicateKey",answers) 
    } 
    }) 
} 

und ich versuche SQLU in einer einzigen SQL zu verwenden, aber Fehler vielleicht Versorgung String Interpolation

diese Demo nicht arbeiten

case ("insertOnDuplicateKeyError",answers:List[Answer])=>{ 
    def buildSql(execpre:String,values: String,execafter:String): DBIO[Int] = sqlu"$execpre $values $execafter" 
    val execpre="insert into answer (aid,bid,sbid,qid,ups,author,uid,nick,pub_time,content,good,hot,id,reply,pic,spider_time) values " 
    val execafter=" ON DUPLICATE KEY UPDATE `aid`=values(aid),`bid`=values(bid),`sbid`=values(sbid),`qid`=values(qid),`ups`=values(ups),`author`=values(author),`uid`=values(uid),`nick`=values(nick),`pub_time`=values(pub_time),`content`=values(content),`good`=values(good),`hot`=values(hot),`id`=values(id),`reply`=values(reply),`pic`=values(pic),`spider_time`=values(spider_time)" 
    val valuesstr=answers.map(row=>("("+List(row.aid,row.bid,row.sbid,row.qid,row.ups,"'"+row.author+"'","'"+row.uid+"'","'"+row.nick+"'","'"+row.pub_time+"'","'"+row.content+"'",row.good,row.hot,row.id,row.reply,row.pic,"'"+row.spider_time+"'").mkString(",")+")")).mkString(",\n") 
    val insertOrUpdateAction=DBIO.seq(
    buildSql(execpre,valuesstr,execafter) 
) 
    DEST_DB.run(insertOrUpdateAction).onComplete(data=>{ 
    if (data.isSuccess){ 
     println("insertOnDuplicateKey data result",data) 
     //retry 
     val lastid=answers.last.id 
     Sync.lastActor !("upsert",tablename,lastid) 
    }else{ 
     self !("insertOnDuplicateKey2",answers) 
    } 
    }) 
} 

ein mysql-Sync-Tool mit scala Slick https://github.com/cclient/ScalaMysqlSync