【问题标题】:pyspark.sql.utils.AnalysisException: 'Event time must be defined on a window or a timestamp, but timestamp is of type stringpyspark.sql.utils.AnalysisException: '必须在窗口或时间戳上定义事件时间,但时间戳是字符串类型
【发布时间】:2020-04-09 12:46:59
【问题描述】:

我使用 pyspark 编写了一个结构化的流媒体演示,但它出错了。我使用kafka作为流数据源,代码如下:

def produce():
    p = KafkaProducer(bootstrap_servers="localhost:9092", value_serializer=lambda x: json.dumps(x, ensure_ascii=False).encode("utf-8"))

    for i in range(100):
        p.send(topic="test", value={"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "word": "spark" + str(random.randint(1, 3))})
        time.sleep(0.5)
    p.flush()

火花流代码:

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test") \
    .option("deserializer", lambda x: json.loads(x.decode("utf-8"))) \
    .load()
df = df.select(F.get_json_object(df.value.cast("string"), "$.timestamp").alias("timestamp"),
               F.get_json_object(df.value.cast("string"), "$.word").alias("word"))

df = df \
    .withWatermark("timestamp", "5 seconds") \
    .groupBy(
    F.window("timestamp", "10 seconds")
).count()
df = df.select(df.window.start.cast("string").alias("start"), df.window.end.cast("string").alias("end"),
          "count")
q = df.writeStream \
    .outputMode("update") \
    .format("console") \
    .trigger(processingTime='20 seconds') \
    .option("checkpointLocation", "hdfs://127.0.0.1:9000/offsets_d") \
    .start()
q.awaitTermination()

它引发了pyspark.sql.utils.AnalysisException: 'Event time must be defined on a window or a timestamp, but timestamp is of type string;;\nEventTimeWatermark timestamp#21: string, interval 5 seconds\n+- Project [get_json_object(cast(value#8 as string), $.timestamp) AS timestamp#21, get_json_object(cast(value#8 as string), $.word) AS word#22]\n +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@435b0386, kafka, Map(deserializer -> <function <lambda> at 0x7fca0be971e0>, subscribe -> test, kafka.bootstrap.servers -> localhost:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3546fa34,kafka,List(),None,List(),None,Map(deserializer -> <function <lambda> at 0x7fca0be971e0>, subscribe -> test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]\n

按照官方文档,不知道哪里错了。

【问题讨论】:

    标签: pyspark apache-kafka spark-structured-streaming


    【解决方案1】:

    这部分看起来会导致异常:

    df = df \
        .withWatermark("timestamp", "5 seconds") \
        .groupBy(
        F.window("timestamp", "10 seconds")
    ).count()
    

    尝试按照official documentation中给出的示例进行操作

    windowedCounts = words \
        .withWatermark("timestamp", "10 minutes") \
        .groupBy(
            window(words.timestamp, "10 minutes", "5 minutes"),
            words.word) \
        .count()
    

    并确保您的timestamp 列是时间戳类型而不是字符串

    【讨论】:

      猜你喜欢
      • 2019-07-07
      • 2019-05-22
      • 1970-01-01
      • 1970-01-01
      • 2020-11-22
      • 1970-01-01
      • 2020-09-18
      • 2016-07-10
      相关资源
      最近更新 更多