2016-08-01 11 views
0

Ich fange mit Apache Funke an. Ich habe eine Anforderung, ein JSON-Protokoll in eine abgeflachte Metriken zu konvertieren, kann auch als eine einfache CSV betrachtet werden.Erstellen einer aggregierten Metriken aus JSON-Logs in Apache Funke

Für z.

"orderId":1, 
    "orderData": { 
    "customerId": 123, 
    "orders": [ 
    { 
     "itemCount": 2, 
     "items": [ 
     { 
      "quantity": 1, 
      "price": 315 
     }, 
     { 
      "quantity": 2, 
      "price": 300 
     }, 

     ] 
    } 
    ] 
} 

Dies kann als ein einzelnes json log in Betracht gezogen werden, möchte ich dies in,

orderId,customerId,totalValue,units 
    1 , 123 , 915 , 3 

Ich wollte durch sparkSQL Dokumentation

konvertieren und es halten, einzelne Werte wie „wählen bekommen können orderId, orderData.customerId vom Auftrag "aber ich bin nicht sicher, wie man die Summe aller Preise und Maßeinheiten erhält.

Was sollte die beste Vorgehensweise sein, um dies mit apache spark zu erreichen?

+0

kippen wir wie Dataframe df = sqlContext.read tun() json. ("/ Pfad/zu/Datei"). toDF(); df.registerTempTable ("df"); df.printSchema(); und danach aggregieren durch sql? –

+0

Durch SQL kann ich einzelne Elemente erhalten, aber nicht sicher über orders.items, wie kann ich Aggregate auf diesem ausführen? Ich denke, dass es nur als JSON-Wert kommen wird, korrigiere mich bitte, wenn mir etwas fehlt. – fireants

+0

können Sie [this] (http://xinhstechblog.blogspot.in/2015/06/reading-json-data-in-spark-dataframes.html) und [nested json] (http: // xinhstechblog .blogspot.in/2016/05/lesen-json-verschachtelte-array-in-spark.html) –

Antwort

1

Versuche:

>>> from pyspark.sql.functions import * 
>>> doc = {"orderData": {"orders": [{"items": [{"quantity": 1, "price": 315}, {"quantity": 2, "price": 300}], "itemCount": 2}], "customerId": 123}, "orderId": 1} 
>>> df = sqlContext.read.json(sc.parallelize([doc])) 
>>> df.select("orderId", "orderData.customerId", explode("orderData.orders").alias("order")) \ 
... .withColumn("item", explode("order.items")) \ 
... .groupBy("orderId", "customerId") \ 
... .agg(sum("item.quantity"), sum(col("item.quantity") * col("item.price"))) 
+0

Vielen Dank für die Arbeitslogik, ich werde versuchen, es in Java und posten es hier für andere. – fireants

0

Für die Menschen, die für eine Java-Lösung der oben suchen, benutzen Sie bitte wie folgt vor:

SparkSession spark = SparkSession 
      .builder() 
      .config(conf) 
      .getOrCreate(); 

    SQLContext sqlContext = new SQLContext(spark); 

    Dataset<Row> orders = sqlContext.read().json("order.json"); 
    Dataset<Row> newOrders = orders.select(
      col("orderId"), 
      col("orderData.customerId"), 
      explode(col("orderData.orders")).alias("order")) 
      .withColumn("item",explode(col("order.items"))) 
      .groupBy(col("orderId"),col("customerId")) 
      .agg(sum(col("item.quantity")),sum(col("item.price"))); 
    newOrders.show();