【问题标题】:ERROR:SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063错误:SparkContext 只能在驱动程序上使用,不能在它在工作人员上运行的代码中使用。有关详细信息,请参阅 SPARK-5063
【发布时间】:2017-05-31 16:20:34
【问题描述】:

我目前正在使用 ASN 1 解码器。我将从生产者那里获得一个十六进制十进制代码,我将在消费者中收集它。 然后,我将十六进制代码转换为 RDD,然后将十六进制值 RDD 传递给具有同一类 Decode_Module 的另一个函数,并将使用 python asn1 解码器解码十六进制数据并将其返回并打印。 我不明白我的代码有什么问题。我也已经在工作节点中安装了我的 asn1 解析器依赖项。 我在 lambda 表达式或其他东西中调用的方式有任何问题。

我的错误:异常:您似乎正试图从广播变量、操作或转换中引用 SparkContext。 SparkContext 只能在驱动程序上使用,不能在它在工作人员上运行的代码中使用。有关详细信息,请参阅 SPARK-5063

请帮帮我,谢谢

我的代码:

class telco_cn:

 def __init__(self,sc):
    self.sc = sc
    print ('in init function')
    logging.info('eneterd into init function')

 def decode_module(self,msg):
        try:
            logging.info('Entered into generate module')
            ### Providing input for module we need to load
            load_module(config_values['load_module'])
            ### Providing Value for Type of Decoding
            ASN1.ASN1Obj.CODEC = config_values['PER_DECODER']
            ### Providing Input for Align/UnAlign
            PER.VARIANT = config_values['PER_ALIGNED']
            ### Providing Input for pdu load
            pdu = GLOBAL.TYPE[config_values['pdu_load']]
            ### Providing Hex value to buf
            buf = '{}'.format(msg).decode('hex')
            return val
        except Exception as e:
            logging.debug('error in decode_module function %s' %str(e))


 def consumer_input(self,sc,k_topic):
            logging.info('entered into consumer input');print(k_topic)
            consumer = KafkaConsumer(ip and other values given)
            consumer.subscribe(k_topic)
            for msg in consumer:
                print(msg.value);
                a = sc.parallelize([msg.value])
                d = a.map(lambda x: self.decode_module(x)).collect()
                print d

if __name__ == "__main__":
    logging.info('Entered into main')
    conf = SparkConf()
    conf.setAppName('telco_consumer')
    conf.setMaster('yarn-client')
    sc = SparkContext(conf=conf)
    sqlContext = HiveContext(sc)
    cn = telco_cn(sc)
    cn.consumer_input(sc,config_values['kafka_topic'])

【问题讨论】:

    标签: python python-2.7 lambda pyspark pyasn1


    【解决方案1】:

    这是因为self.decode_module 包含 SparkContext 的实例。

    要修复您的代码,您可以使用@staticmethod:

    class telco_cn:
        def __init__(self, sc):
            self.sc = sc
    
        @staticmethod
        def decode_module(msg):
            return msg
    
        def consumer_input(self, sc, k_topic):
            a = sc.parallelize(list('abcd'))
            d = a.map(lambda x: telco_cn.decode_module(x)).collect()
            print d
    
    
    if __name__ == "__main__":
        conf = SparkConf()
        sc = SparkContext(conf=conf)
        cn = telco_cn(sc)
        cn.consumer_input(sc, '')
    

    更多信息:

    http://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark

    【讨论】:

    • 谢谢你的回答。你能解释一下如果我们在那里使用静态方法会发生什么吗?如果我们在那里不使用静态方法。
    • 当我尝试上面的代码时,它返回给我这个错误 TypeError: 'JavaPackage' object is not callable
    【解决方案2】:

    您不能在 lambda 表达式中引用实例方法 (self.decode_module),因为它的实例对象包含 SparkContext 引用。

    发生这种情况是因为 PySpark 内部会尝试 Pickle 发送给其工作人员的所有内容。因此,当您说它应该在节点内执行 self.decode_module() 时,PySpark 会尝试腌制整个 (self) 对象(包含对 spark 上下文的引用)。

    要解决此问题,您只需从 telco_cn 类中删除 SparkContext 引用并使用不同的方法,例如在调用类实例之前使用 SparkContext(如 Zhangs 的回答所建议的那样)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-04-28
      • 1970-01-01
      • 2017-06-30
      • 1970-01-01
      • 2014-02-13
      • 2019-06-05
      相关资源
      最近更新 更多