In einem PySpark-Projekt, das mehrere .py-Dateien enthält, gibt es eine Datei namens , in der alle globalen Variablen deklariert werden.Fehler bei der Verwendung globaler Variablen in Spark-Projekt
# settings.py
def prepareMyList():
return ['35','19','10','25']
def setGlobal():
global ageList
ageList = prepareMyList()
Jetzt enthält eine andere Datei utils.py
die Filtermethode.
# utils.py
import settings
def returnIfTrue(row):
if row[1] in settings.ageList:
return row
Filtering.py
führt die Filterung auf der RDD die Methode in utils.py
Datei verwendet.
# filtering.py
import utils
def doFiltering(fileRDD):
filteredRDD = fileRDD.filter(utils.returnIfTrue)
return filteredRDD
Die main.py
ist die folgende.
# main.py
from pyspark import SparkContext
import settings
import filtering
sc = SparkContext()
settings.setGlobal()
rawRDD = sc.textFile("/path/to/Data/")
splittedRDD = rawRDD.map(lambda l:l.split(","))
filteredRDD = filtering.doFiltering(splittedRDD)
for row in filteredRDD.collect():
print row
Wenn das Projekt ausgeführt wird, wird es einen Fehler AttributeError: 'module' object has no attribute 'ageList'
werfen.
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/src/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/local/src/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/src/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "utils.py", line 6, in returnIfTrue
if row[1] in settings.ageList:
AttributeError: 'module' object has no attribute 'ageList'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more
Aber haben Sie ageList deklariert? ' ageList = Keine def setglobal(): global ageList ageList = prepareMyList() ' –