2016-04-25 7 views
1

Ich habe einen Spark-Code, wo der Code in Call-Methode Aufruf an die memSQL-Datenbank zum Lesen aus einer Tabelle. Mein Code öffnet jedes Mal ein neues Verbindungsobjekt und schließt es, nachdem die Aufgabe erledigt ist. Dieser Aufruf erfolgt innerhalb der Call-Methode. Dies funktioniert gut, aber die Ausführungszeit für den Spark-Job wird hoch. Was wäre ein besserer Weg, dies zu tun, so dass die Ausführungszeit des Funkencodes reduziert wird.Was ist die richtige Art der Verwendung von memSQL Connection-Objekt in Call-Methode von Apache Spark-Code

Vielen Dank.

Antwort

1

Sie können eine Verbindung pro Partition wie folgt verwenden:

rdd.foreachPartition {records => 
    val connection = DB.createConnection() 
    //you can use your connection instance inside foreach 
    records.foreach { r=> 
    val externalData = connection.read(r.externaId) 
    //do something with your data 
    } 
    DB.save(records) 
    connection.close() 
} 

Wenn Sie Spark-Streaming verwenden:

dstream.foreachRDD { rdd => 
    rdd.foreachPartition { records => 
    val connection = DB.createConnection() 
    //you can use your connection instance inside foreach 
    records.foreach { r=> 
     val externalData = connection.read(r.externaId) 
     //do something with your data 
    } 
    DB.save(records) 
    connection.close() 
    } 
} 

Siehe http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

+0

Dank. Ich arbeite gerade daran, meinen Code zu ändern, um dies zu verwenden. –

+0

Dies dient zum Speichern in einem externen System, während ich es zum Lesen von einem externen System benötige. Die Funkenführung sagt "Diese Funktion sollte die Daten in jeder RDD zu einem externen System, wie das RDD in Dateien speichern oder über das Netzwerk in eine Datenbank schreiben." Ich benötige es zum Lesen von einem externen System. Danke –

+0

Ich habe die Antwort aktualisiert, hoffe, dass dies das ist, was Sie suchen. –