2016-08-04 42 views
0

Ich habe den folgenden Code unten auf der CERN-Website gefunden. FYI: Ich benutze funke 1.3 Der Beispielcode ist fantastisch, wenn Sie das Schema der Datenmenge kennen, die Sie zu elasticsearch indizieren möchten.Scala ElasticSearch Indexierung in Bezug auf dynamisch änderndes Schema

Allerdings könnte jemand mich in die richtige Richtung, so dass ich eine Methode erstellen können wie folgt:

Pass in als Argument die Schemastruktur von einer externen Quelle (col Name/Datentyp) (die harte Bit) zusammen mit dem zu indizierenden Dateinamen (easy bit)? Führen Sie die Schemazuordnungen innerhalb der Funktion dynamisch aus.

Mit einer solchen Methode könnte ich einen gemappten und indizierten Datensatz in ES generieren.

Beispielcode:

//import elasticsearch packages 
    import org.elasticsearch.spark._ 

    //define the schema 
    case class MemT(dt: String (link is external), server: String (link is external), memoryused: Integer (link is external)) 

    //load the csv file into rdd 
    val Memcsv = sc.textFile("/tmp/flume_memusage.csv") 

    //split the fields, trim and map it to the schema 
    val MemTrdd = Memcsv.map(line=>line.split(",")).map(line=>MemT(line(0).trim.toString,line(1).trim.toString,line(2).trim.toInt)) 

    //write the rdd to the elasticsearch 
    MemTrdd.saveToEs("fmem/logs") 

Vielen Dank!

Quelle: in der Lage sein Index direkt in ES von einem Datenrahmen https://db-blog.web.cern.ch/blog/prasanth-kothuri/2016-05-integrating-hadoop-and-elasticsearch-%E2%80%93-part-2-%E2%80%93-writing-and-querying

+0

ein wenig unklar, ES wird standardmäßig generieren Schema, das Sie versucht zu indizieren, um Dokumente der accoring, isn Was willst du? – Mysterion

+0

True, aber Spaltenname nicht im Header verfügbar. Es ist mein Verständnis, dass elasticsearch den Datentyp falsch einstellen kann, indem nur der erste Datensatz betrachtet wird. Bitte zitieren Sie mich, wenn meine Annahmen falsch sind. – Paul

+0

Siehe Abschnitt mit der Überschrift "Wann ein benutzerdefiniertes Mapping anzugeben ist" ... "diese dynamische Mapping-Generation enthält einige Einschränkungen: 1. Erkannte Datentypen sind möglicherweise nicht korrekt ... TimeStamp kann als langer DataType erkannt werden ..." source: , https: //www.elastic.co/blog/found-elasticsearch-mapping-introduction. " – Paul

Antwort

0

Was i erreichen wollte ,. Ich benötigte, dass die Indexzuordnungen von externer Schemaquelle gesteuert werden. Hier ist, wie ich erreicht, dass ....

BTW: Es gibt eine zusätzliche Validierung/Verarbeitung, die ich ausgelassen, aber das Skelett Code sollte gehen diejenigen, die erfordern ähnliche Bedürfnisse bekommen ....

I enthalten sind, die folgende ES Abhängigkeit in meiner build.sbt Datei

"org.elasticsearch" % "elasticsearch-spark_2.10" % "2.3.3" 

Kommentare willkommen ...

//Just showing the ES stuff 
import org.elasticsearch.hadoop 
import org.elasticsearch.spark._ 


//Declare a schema 
val schemaString = "age:int, name:string,location:string" 

//Fill RDD with dummy data 
val rdd = sc.textFile("/path/to/your/file.csv") 

val seperator = "," //This is seperator in csv 

//Convert schema string above into a struct 
val schema = 
    StructType(
    schemaString.split(",").map(fieldName => 
    StructField(fieldName.split(":")(0), 
    getFieldTypeInSchema(fieldName.split(":")(1)), true))) 

    //map each element of RDD row to RDD with elements 
    val rowRDDx =rdd.map(p => { 
    var list: collection.mutable.Seq[Any] = collection.mutable.Seq.empty[Any] 
    var index = 0 
    var tokens = p.split(seperator) 
    tokens.foreach(value => { 
     var valType = schema.fields(index).dataType 
     var returnVal: Any = null 
     valType match { 
     case IntegerType => returnVal = value.toString.toInt 
     case DoubleType => returnVal = value.toString.toDouble 
     case LongType => returnVal = value.toString.toLong 
     case FloatType => returnVal = value.toString.toFloat 
     case ByteType => returnVal = value.toString.toByte 
     case StringType => returnVal = value.toString 
     case TimestampType => returnVal = value.toString 
     } 
     list = list :+ returnVal 
     index += 1 
     }) 
    Row.fromSeq(list) 
    }) 

//Convert the RDD with elements to a DF , also specify the intended schema 
val df = sqlContext.createDataFrame(rowRDDx, schema) 

//index the DF to ES 
EsSparkSQL.saveToEs(df,"test/doc")