Sie sollten sqlContext.cacheTable("table_name")
verwenden, um es zwischenzuspeichern, oder alternativ CACHE TABLE table_name
SQL-Abfrage verwenden.
Hier ist ein Beispiel. Ich habe diese Datei auf HDFS bekam:
1|Alex|[email protected]
2|Paul|[email protected]
3|John|[email protected]
Dann wird der Code in PySpark:
people = sc.textFile('hdfs://sparkdemo:8020/people.txt')
people_t = people.map(lambda x: x.split('|')).map(lambda x: Row(id=x[0], name=x[1], email=x[2]))
tbl = sqlContext.inferSchema(people_t)
tbl.registerTempTable('people')
Jetzt haben wir einen Tisch und kann es abfragen:
sqlContext.sql('select * from people').collect()
es zu beharren, Wir haben 3 Optionen:
# 1st - using SQL
sqlContext.sql('CACHE TABLE people').collect()
# 2nd - using SQLContext
sqlContext.cacheTable('people')
sqlContext.sql('select count(*) from people').collect()
# 3rd - using Spark cache underlying RDD
tbl.cache()
sqlContext.sql('select count(*) from people').collect()
1. und 2. Optionen a sind bevorzugt, da sie die Daten in einer optimierten In-Memory-Spaltenformat cachen würde, während 3. es wie jede andere RDD in Reihe orientiert cachen würde
Also zurück zu Ihrer Frage geht, ist hier eine mögliche Lösung:
output = sqlContext.sql("SELECT * From people")
output.registerTempTable('people2')
sqlContext.cacheTable('people2')
sqlContext.sql("SELECT count(*) From people2").collect()
"Spark SQL kann Tabellen mit einem speicherinternen Spaltenformat zwischenspeichern, indem es spark.catalog.cacheTable (" tableName ") oder dataFrame.cache() aufruft." (https://spark.apache.org/docs/latest/sql-programming-guide.html#caching-data-in-memory). Die Dokumentation scheint zu implizieren, dass die erste, zweite und dritte Option gleichwertig sind. – asmaier