2015-09-08 5 views
13

Ich habe Haupt die Funken Kontext schafft:Spark-SQL-Datenrahmen - Import sqlContext.implicits._

val sc = new SparkContext(sparkConf) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

schafft Dann Datenrahmen und tut Filter und Validierungen auf dem Datenrahmen.

val convertToHourly = udf((time: String) => time.substring(0, time.indexOf(':')) + ":00:00") 

    val df = sqlContext.read.schema(struct).format("com.databricks.spark.csv").load(args(0)) 
    // record length cannot be < 2 
    .na.drop(3) 
    // round to hours 
    .withColumn("time",convertToHourly($"time")) 

Das funktioniert großartig.

aber wenn ich auf eine andere Datei meine Validierungen versuchen zu bewegen, indem Sie den Datenrahmen zu

Senden
function ValidateAndTransform(df: DataFrame) : DataFrame = {...} 

, dass der Datenrahmen wird & tut die Validierungen und Transformationen: Es scheint, wie ich das brauchen

import sqlContext.implicits._ 

To avoid the error: “value $ is not a member of StringContext” that happens on line: .withColumn("time",convertToHourly($"time"))

Aber um die zu verwenden, brauche ich auch die sqlContext entweder in der neuen definiert Datei wie folgt:

val sc = new SparkContext(sparkConf) 
val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

oder senden Sie es an die

function ValidateAndTransform(df: DataFrame) : DataFrame = {...} 
function 

Ich mag die Trennung fühle ich versuche zu 2-Dateien (Haupt & Validierung) zu tun, ist nicht richtig gemacht ...

Irgendeine Idee, wie man das gestaltet? Oder senden Sie einfach den sqlContext an die Funktion?

Danke!

+0

spark repository bei diesem Beispiel einen Blick Wenn ich Dinge wie trennen wollen, dass ich ich importieren sqlContext.implicits._ nur SqlContext im Konstruktor der neuen Klasse übergeben und dann einmal pro Klasse. Ich konnte mir nichts besseres einfallen lassen, also stimme ich diese Frage ab und warte auf bessere Vorschläge. – Niemand

Antwort

11

Sie können mit einer Singleton-Instanz von SQLContext arbeiten. Sie können in der

/** Lazily instantiated singleton instance of SQLContext */ 
object SQLContextSingleton { 

    @transient private var instance: SQLContext = _ 

    def getInstance(sparkContext: SparkContext): SQLContext = { 
    if (instance == null) { 
     instance = new SQLContext(sparkContext) 
    } 
    instance 
    } 
} 
... 
//And wherever you want you can do 
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) 
import sqlContext.implicits._ 
+1

Danke! Ich habe das Singleton-Objekt verwendet, aber in meinem Fall möchte ich, dass es nur einmal erstellt wurde: Objekt SQLContextSingleton { @Transient var instance: SQLContext = _ } initialisierte es dann von main und verwendete es für Validierungen. Danke für die Hilfe! –