9

Ich habe eine große Datenmenge in einer BigQuery-Tabelle gespeichert und möchte es in eine pypark RDD für ETL-Datenverarbeitung laden.BigQuery Connector für pyspark über Hadoop Input Format Beispiel

I erkennen, dass die BigQuery Hadoop Input/Output-Format

https://cloud.google.com/hadoop/writing-with-bigquery-connector

und pyspark sollte diese Schnittstelle verwenden, um in der Lage unterstützen eine RDD unter Verwendung des Verfahrens "newAPIHadoopRDD" zu erstellen.

http://spark.apache.org/docs/latest/api/python/pyspark.html

Leider ist die Dokumentation an beiden Enden scheint knapp und geht über mein Wissen über Hadoop/Zünd-/BigQuery. Gibt es jemanden, der herausgefunden hat, wie man das macht?

Antwort

3

Google hat jetzt eine example zur Verwendung des BigQuery-Connectors mit Spark.

Es scheint ein Problem mit der GsonBigQueryInputFormat zu sein, aber ich habe eine einfache Shakespeare-Wortzählung Beispiel Arbeits

import json 
import pyspark 
sc = pyspark.SparkContext() 

hadoopConf=sc._jsc.hadoopConfiguration() 
hadoopConf.get("fs.gs.system.bucket") 

conf = {"mapred.bq.project.id": "<project_id>", "mapred.bq.gcs.bucket": "<bucket>", "mapred.bq.input.project.id": "publicdata", "mapred.bq.input.dataset.id":"samples", "mapred.bq.input.table.id": "shakespeare" } 

tableData = sc.newAPIHadoopRDD("com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat", "org.apache.hadoop.io.LongWritable", "com.google.gson.JsonObject", conf=conf).map(lambda k: json.loads(k[1])).map(lambda x: (x["word"], int(x["word_count"]))).reduceByKey(lambda x,y: x+y) 
print tableData.take(10) 
+0

Hallo, ja. Ich hatte es eigentlich schon selbst herausgefunden. Sie haben jedoch nur den Connector für scala gepostet. Ich werde die Lösung mit pyspark einsetzen, wie ich Zeit habe. –