6

Wie sammle ich diese Metriken auf einer Konsole (Spark Shell oder Spark-Übergabeprojekt) direkt nach der Aufgabe oder dem Job.Wie können Metriken wie Ausgabegröße und Datensätze, die von Spark UI geschrieben wurden, abgerufen werden?

Wir verwenden Spark, um Daten von Mysql zu Cassandra zu laden, und es ist ziemlich groß (zB: ~ 200 GB und 600M Zeilen). Wenn die Aufgabe erledigt ist, wollen wir überprüfen, wie viele Zeilen genau verarbeitet wurden? Wir können die Nummer von der Spark UI abrufen, aber wie können wir diese Nummer ("Geschriebene Ausgabe-Datensätze") von der Spark-Shell oder im Spark-Submit-Job abrufen.

Beispiel Befehl zum Laden von Mysql zu Cassandra.

Ich möchte alle Spark UI Metriken auf der oben genannten Aufgabe hauptsächlich Output-Größe und Datensätze geschrieben.

Bitte helfen.

Danke für Ihre Zeit!

Antwort

3

Die Antwort gefunden. Sie können die Statistiken mithilfe von SparkListener abrufen.

Wenn Ihr Job keine Eingabe- oder Ausgabe-Metriken enthält, erhalten Sie möglicherweise None.get-Exceptions, die Sie ignorieren können, indem Sie stmt angeben.

sc.addSparkListener(new SparkListener() { 
    override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { 
    val metrics = taskEnd.taskMetrics 
    if(metrics.inputMetrics != None){ 
     inputRecords += metrics.inputMetrics.get.recordsRead} 
    if(metrics.outputMetrics != None){ 
     outputWritten += metrics.outputMetrics.get.recordsWritten } 
    } 
}) 

Das folgende Beispiel finden Sie.

import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 
import com.datastax.spark.connector._ 
import org.apache.spark.sql._ 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} 

val conf = new SparkConf() 
.set("spark.cassandra.connection.host", "...") 
.set("spark.driver.allowMultipleContexts","true") 
.set("spark.master","spark://....:7077") 
.set("spark.driver.memory","1g") 
.set("spark.executor.memory","10g") 
.set("spark.shuffle.spill","true") 
.set("spark.shuffle.memoryFraction","0.2") 
.setAppName("CassandraTest") 
sc.stop 
val sc = new SparkContext(conf) 
val sqlcontext = new org.apache.spark.sql.SQLContext(sc) 

var outputWritten = 0L 

sc.addSparkListener(new SparkListener() { 
    override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { 
    val metrics = taskEnd.taskMetrics 
    if(metrics.inputMetrics != None){ 
     inputRecords += metrics.inputMetrics.get.recordsRead} 
    if(metrics.outputMetrics != None){ 
     outputWritten += metrics.outputMetrics.get.recordsWritten } 
    } 
}) 

val bp = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "bucks_payments").option("partitionColumn","id").option("lowerBound","1").option("upperBound","14596").option("numPartitions","10").option("fetchSize","100000").option("user", "hadoop").option("password", "...").load() 
bp.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map("table" -> "bucks_payments", "keyspace" -> "test")) 

println("outputWritten",outputWritten) 

Ergebnis:

scala> println("outputWritten",outputWritten) 
(outputWritten,16383)