【发布时间】:2016-04-06 08:24:35
【问题描述】:
在包含多个 .py 文件的 PySpark 项目中,有一个名为 settings.py 的文件用于声明所有全局变量。
# settings.py
def prepareMyList():
return ['35','19','10','25']
def setGlobal():
global ageList
ageList = prepareMyList()
现在,另一个文件utils.py 包含过滤方法。
# utils.py
import settings
def returnIfTrue(row):
if row[1] in settings.ageList:
return row
Filtering.py利用utils.py文件中的方法对RDD进行过滤。
# filtering.py
import utils
def doFiltering(fileRDD):
filteredRDD = fileRDD.filter(utils.returnIfTrue)
return filteredRDD
main.py 如下。
# main.py
from pyspark import SparkContext
import settings
import filtering
sc = SparkContext()
settings.setGlobal()
rawRDD = sc.textFile("/path/to/Data/")
splittedRDD = rawRDD.map(lambda l:l.split(","))
filteredRDD = filtering.doFiltering(splittedRDD)
for row in filteredRDD.collect():
print row
项目运行时抛出错误AttributeError: 'module' object has no attribute 'ageList'。
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/src/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/local/src/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/src/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "utils.py", line 6, in returnIfTrue
if row[1] in settings.ageList:
AttributeError: 'module' object has no attribute 'ageList'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more
【问题讨论】:
-
但是你声明ageList了吗? ` ageList = None def setGlobal(): global ageList = prepareMyList() `
标签: python-2.7 apache-spark pyspark rdd