Meine Anwendung wurde mit MongoDB als Plattform erstellt. Eine Sammlung in DB hat ein riesiges Datenvolumen und hat sich für apache spark entschieden, um analytische Daten durch Berechnung zu erhalten und zu generieren. Ich habe Spark Connector for MongoDB konfiguriert, um mit MongoDB zu kommunizieren. Ich muss MongoDB-Sammlung unter Verwendung pyspark abfragen und einen Datenrahmen aufbauen, der resultset der mongodb Frage besteht. Bitte schlagen Sie mir eine geeignete Lösung vor.Wie erstellt Spark Datenrahmen mit gefilterten Datensätzen aus MongoDB?
Antwort
Sie können wie so die Daten direkt in einen Datenrahmen laden:
# Create the dataframe
df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://127.0.0.1/mydb.mycoll").load()
# Filter the data via the api
df.filter(people.age > 30)
# Filter via sql
df.registerTempTable("people")
over_thirty = sqlContext.sql("SELECT name, age FROM people WHERE age > 30")
Für weitere Informationen, um den Mongo Spark-Anschluss Python API Abschnitt oder die introduction.py sehen. Die SQL-Abfragen werden übersetzt und an den Connector zurückgegeben, sodass die Daten in MongoDB abgefragt werden können, bevor sie an den Funke-Cluster gesendet werden.
Sie können auch Ihre eigenen bieten aggregation pipeline auf die Sammlung anzuwenden, bevor die Ergebnisse in Spark-Rückkehr:
dfr = sqlContext.read.option("pipeline", "[{ $match: { name: { $exists: true } } }]")
df = dfr.option("uri", ...).format("com.mongodb.spark.sql.DefaultSource").load()
Danke @Ross .Aber ich direkt Filter in Datenbankabfrage anwenden müssen stattdessen Filter über Datenrahmen der Anwendung –
das wird es in einer Abfrage auf der Sammlung übersetzen, der Stecker wird dann Rückkehr die gefilterten Ergebnisse – Ross
Erarbeiten Sie es bitte durch ein Code-Snippet –
Warum versuchen Sie nicht die [Stratio Stecker] (https://github.com/Stratio/ Spark-MongoDB)? Dieser Anschluss gibt Ihnen ein Datenframe direkt zurück –
@JohnZeng Das Folgende ist Code-Snippet, das ich mit Stratio-Connector implementiert habe. df = sqlContext.read.format ('com.stratio.datasource.mongodb') .optionen (host = 'localhost: 27017', datenbank = 'mydb', collection = 'mycoll'). load() –
Könnten Sie bitte bearbeiten Ihre Frage und fügen Sie das Snippet ein? Ich denke, Sie haben bereits einen Datenrahmen erhalten, nachdem Sie dies angerufen haben. Ich bin verwirrt, was Sie jetzt wollen, weil Ihre Frage mit dem MongoDB-Connector verknüpft ist. –