【问题标题】:'utf8' codec can't decode byte 0xff in position 0: invalid start byte pySpark Kafka'utf8' 编解码器无法解码位置 0 中的字节 0xff:无效的起始字节 pySpark Kafka
【发布时间】:2018-06-27 23:48:12
【问题描述】:
kafkaStream = KafkaUtils.createStream(
    ssc,
    'zookeeperserver1.sys.net:2181,zookeeperserver2.sys.net:2181,zookeeperserver3.sys.net:2181,zookeeperserver4.sys.net:2181,zookeeperserver5.sys.net:2181,zookeeperserver6.sys.net:2181',
    'spark-streaming23',
    {'topic-name':3})

lines = kafkaStream.map(lambda x: x[1])
lines.pprint()

ssc.start()
ssc.awaitTermination()

文件“/usr/lib64/python2.7/encodings/utf_8.py”,第 16 行,在 decode return codecs.utf_8_decode(input, errors, True) UnicodeDecodeError: 'utf8' codec can't decode byte 0xff in位置 0:无效的起始字节

【问题讨论】:

  • 文件“/usr/lib64/python2.7/encodings/utf_8.py”,第 16 行,在 decode 返回 codecs.utf_8_decode(input, errors, True) UnicodeDecodeError: 'utf8' codec can' t 解码位置 0 中的字节 0xff:无效的起始字节
  • 以后请format您的帖子并使用edit链接而不是cmets来添加更多内容。

标签: python-2.7 apache-spark pyspark apache-kafka spark-streaming


【解决方案1】:

看起来键或值不是 UTF-8 编码的。 createStreamcreateDirectStream 都带有两个额外的参数:

  • keyDecoder – 用于解码密钥的函数(默认为 utf8_decoder)
  • valueDecoder – 用于解码值的函数(默认为 utf8_decoder)

如您所见,两者都默认为utf8_decoder。如果

  • 您知道其中一个或两个不是有效的 UTF-8 字符串,您可以提供自己的解码器或仅使用标识函数来获取原始输入:

    KafkaUtils.createStream(
        ssc, ..., keyDecoder=lambda x: x, valueDecoder=lambda x: x
    )
    
  • 如果您怀疑问题出在某些格式错误的条目中,您可以将现有的 decoder 替换为处理解码异常的 decoder。这些线周围的东西应该可以解决问题:

    from pyspark.streaming.kafka import utf8_decoder
    
    def safe_utf8_decode(s):
        try:
            return utf8_decoder(s)
        except UnicodeDecodeError:
            pass 
    

话虽如此,除非您正在寻找更高级的应用程序,而DStreams 仍然无法替代,我宁愿推荐Structured Streaming

【讨论】:

  • 感谢您的快速回答,但如何用其他解码选项替换 UTF 8
猜你喜欢
  • 2017-05-19
  • 2021-12-01
  • 2016-05-13
  • 2014-07-09
  • 2014-04-08
  • 2023-04-02
  • 2020-09-20
  • 2015-02-23
  • 2017-07-09
相关资源
最近更新 更多