【问题标题】:Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;\nJoin Inner当流式 DataFrames/DataSets 上存在流式聚合时,不支持附加输出模式而没有水印;;\nJoin Inner
【发布时间】:2019-12-07 14:11:18
【问题描述】:

我想加入 2 个流,但收到下一个错误,但我不知道如何解决:

存在流式聚合时不支持附加输出模式 在没有水印的流式 DataFrames/DataSets 上;\nJoin Inner

df_stream = spark.readStream.schema(schema_clicks).option("ignoreChanges", True).option("header", True).format("csv").load("s3://mybucket/*.csv")
display(df_stream.select("SendID", "EventType", "EventDate"))

我想加入 df1 和 df2:

df1 = df_stream \
              .withColumn('timestamp', unix_timestamp(col('EventDate'), "MM/dd/yyyy hh:mm:ss aa").cast(TimestampType())) \
              .select(col("SendID"), col("timestamp"), col("EventType")) \
              .withColumnRenamed("SendID", "SendID_update") \
              .withColumnRenamed("timestamp", "timestamp_update") \
              .withWatermark("timestamp_update", "1 minutes")

df2 = df_stream \
              .withColumn('timestamp', unix_timestamp(col('EventDate'), "MM/dd/yyyy hh:mm:ss aa").cast(TimestampType())) \
              .withWatermark("timestamp", "1 minutes") \
              .groupBy(col("SendID")) \
              .agg(max(col('timestamp')).alias("timestamp")) \
              .orderBy('timestamp', ascending=False)

join = df2.alias("A").join(df1.alias("B"),  expr(
      "A.SendID = B.SendID_update" +
        " AND " +
        "B.timestamp_update >= A.timestamp " +
        " AND " +
        "B.timestamp_update <= A.timestamp + interval 1 hour"))

最后当我以追加模式写结果时:

join \
.writeStream \
.outputMode("Append") \
.option("checkpointLocation", "s3://checkpointjoin_delta")  \
.format("delta")  \
.table("test_join")

我收到了上一个错误。

AnalysisException Traceback(最近调用 最后)在() ----> 1 join.writeStream.outputMode("Append").option("checkpointLocation", "s3://checkpointjoin_delta").format("delta").table("test_join")

/databricks/spark/python/pyspark/sql/streaming.py in table(self, 表名)1137“”“1138如果 isinstance(tableName, basestring): -> 1139 return self._sq(self._jwrite.table(tableName)) 1140 else: 1141 raise TypeError("tableName can 只能是一个字符串")

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py 在 call(self, *args) 1255 回答 = self.gateway_client.send_command(command) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_args in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 67 e.java_exception.getStackTrace()))

【问题讨论】:

    标签: python apache-spark spark-structured-streaming


    【解决方案1】:

    问题是.groupBy,需要加上时间戳。例如:

    df2 = df_stream \
                  .withColumn('timestamp', unix_timestamp(col('EventDate'), "MM/dd/yyyy hh:mm:ss aa").cast(TimestampType())) \
                  .withWatermark("timestamp", "1 minutes") \
                  .groupBy(col("SendID"), "timestamp") \
                  .agg(max(col('timestamp')).alias("timestamp")) \
                  .orderBy('timestamp', ascending=False)
    

    【讨论】:

    • 正是我想要的。我错过了向.groupBy 添加时间戳。
    猜你喜欢
    • 2018-07-22
    • 2019-06-04
    • 2017-11-08
    • 1970-01-01
    • 2018-04-25
    • 2018-01-16
    • 2010-11-30
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多