【问题标题】:Apache Spark can't calculate average and write result it in Apache KafkaApache Spark 无法计算平均值并将结果写入 Apache Kafka
【发布时间】:2021-08-03 01:58:06
【问题描述】:

我有一个模拟温度传感器,可以将数据发送到 Apache Kafka 主题(我使用的是 Confluent 平台 6.1.0),我需要使用 Apache Spark 读取这个主题(我使用的是 3.1.1 版)和我需要在 5 分钟的窗口内计算平均温度(模拟传感器无延迟地发送数据..但我想考虑 60 分钟的延迟,因此我使用 withWatermark)。最后我想把这个平均值写在另一个 Kafka 主题上。 在我的代码中,它似乎不是根据温度数据计算的平均值,因为我有这个结果:


Batch: 0
-------------------------------------------
+--------------------+--------------+-----------+------------------+
|              window|avg(partition)|avg(offset)|avg(timestampType)|
+--------------------+--------------+-----------+------------------+
|{2021-05-12 10:29...|           0.0|      134.5|               0.0|
|{2021-05-12 12:45...|           0.0|      175.5|               0.0|
|{2021-05-12 11:55...|           0.0|      167.0|               0.0|
|{2021-05-12 12:46...|           0.0|      178.5|               0.0|
|{2021-05-12 10:33...|           0.0|      156.5|               0.0|
|{2021-05-12 11:40...|           0.0|      160.0|               0.0|
|{2021-05-12 10:27...|           0.0|      122.5|               0.0|
|{2021-05-12 10:31...|           0.0|      146.5|               0.0|
|{2021-05-12 13:01...|           0.0|      181.5|               0.0|
|{2021-05-12 10:18...|           0.0|       80.0|               0.0|
|{2021-05-12 10:32...|           0.0|      152.5|               0.0|
|{2021-05-12 10:17...|           0.0|       37.5|               0.0|
|{2021-05-12 10:22...|           0.0|       94.0|               0.0|
|{2021-05-12 11:41...|           0.0|      164.0|               0.0|
|{2021-05-12 10:23...|           0.0|       98.5|               0.0|
|{2021-05-12 10:16...|           0.0|        3.5|               0.0|
|{2021-05-12 10:25...|           0.0|      110.5|               0.0|
|{2021-05-12 10:30...|           0.0|      140.5|               0.0|
|{2021-05-12 10:28...|           0.0|      128.5|               0.0|
|{2021-05-12 11:56...|           0.0|      171.5|               0.0|
+--------------------+--------------+-----------+------------------+
only showing top 20 rows

它不会在 Kafka 上写入数据,因为我收到错误消息:

pyspark.sql.utils.AnalysisException: cannot resolve '`partition`' given input columns: [avg(offset), avg(partition), avg(timestampType), window]; line 1 pos 4;
'Project [unresolvedalias('avg('partition), Some(org.apache.spark.sql.Column$$Lambda$1698/0x0000000840d45840@5117787c)), unresolvedalias(cast('key as string), None), unresolvedalias(cast('value as string), None)]
+- Aggregate [window#35], [window#35 AS window#21, avg(cast(partition#10 as bigint)) AS avg(partition)#32, avg(offset#11L) AS avg(offset)#33, avg(cast(timestampType#13 as bigint)) AS avg(timestampType)#34]
+- Filter isnotnull(timestamp#12)
+- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 60000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 60000000) + 0) + 60000000), LongType, TimestampType)) AS window#35, key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12-T60000ms, timestampType#13]
+- EventTimeWatermark timestamp#12: timestamp, 1 minutes
+- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@72bee61d, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@390238cf, [startingOffsets=earliest, kafka.bootstrap.servers=localhost:9092, subscribe=temperature], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5fbd93c5,kafka,List(),None,List(),None,Map(kafka.bootstrap.servers -> localhost:9092, subscribe -> temperature, startingOffsets -> earliest),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

这是我的代码(Kafka 生产者和 Spark 阅读器):

#Kafka producer
from confluent_kafka import Producer, KafkaError
import json
import ccloud_lib

import random
from random import randrange
import datetime
import time


if __name__ == '__main__':

    # Read arguments and configurations and initialize
    args = ccloud_lib.parse_args()
    config_file = args.config_file
    topic = args.topic
    conf = ccloud_lib.read_ccloud_config(config_file)

    # Create Producer instance
    producer_conf = ccloud_lib.pop_schema_registry_params_from_config(conf)
    producer = Producer(producer_conf)

    # Create topic if needed
    ccloud_lib.create_topic(conf, topic)

    delivered_records = 0

    # Optional per-message on_delivery handler (triggered by poll() or flush())
    # when a message has been successfully delivered or
    # permanently failed delivery (after retries).
    def acked(err, msg):
        global delivered_records
        """Delivery report handler called on
        successful or failed delivery of message
        """
        if err is not None:
            print("Failed to deliver message: {}".format(err))
        else:
            delivered_records += 1
            print("Produced record to topic {} partition [{}] @ offset {}"
                  .format(msg.topic(), msg.partition(), msg.offset()))

   
    guid_base = "0-ZZZ12345678-"
    formato = "urn:example:sensor:temp"
    temp_data = {}

    while 1:
        rand_num = str(round(random.randrange(0, 9), 2)) + str(round(random.randrange(0, 9), 2))
        temp_init_weight = round(random.uniform(-5, 5), 2)
        temp_delta = round(random.uniform(10, 20), 2)

        guid = guid_base + rand_num

        temperature = temp_delta
        today = datetime.datetime.today()
        datestr = today.isoformat()

        data = {'eventTime': datestr, 'temperatura': temperature} # ok
        time.sleep(10)

        record_key = "temperatura"
        ##record_value = json.dumps(temp_data)
        record_value = json.dumps(data)
        print("\n")

        producer.produce(topic, key=record_key, value=record_value, on_delivery=acked)
        producer.poll(0)
        record_value = ''

    producer.flush()

    print("{} messages were produced to topic {}!".format(delivered_records, topic))

火花阅读器

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
 
# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('TemperatureStreamApp')
         # Add kafka package
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")
         .getOrCreate())
 
sc = spark.sparkContext
 
# Create stream dataframe setting kafka server, topic and offset option
df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092") # kafka server
  .option("subscribe", "temperature") # topic
  .option("startingOffsets", "earliest") # start from beginning
  .load())
 
windowedAvg = df\
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(
    window(df.timestamp, "5 minutes", "60 minutes")).avg()  
 
query = windowedAvg\
        .writeStream\
        .outputMode('complete')\
        .format('console')\
        .option('truncate', 'true')\
        .start()
 
# write on kafka topic avgtemperature
qk = (windowedAvg \
        .selectExpr("avg(partition)", "CAST(key AS STRING)", "CAST(value AS STRING)") \
        .writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("checkpointLocation", "/home/kafka/Documenti/confluent/examples-6.1.0-post/clients/cloud/python/kafkaStream") \
        .option("topic", "avgtemperature")  \
        .outputMode("complete") \
        .start())
        
query.awaitTermination()

如何计算温度列的平均值并将结果保存在 Kafa 主题 avgtemperature 中? 谢谢。

【问题讨论】:

  • 建议的答案对您有用吗?

标签: python apache-spark pyspark apache-kafka


【解决方案1】:

根据您分享的尝试,有几点需要注意。在尝试处理来自 kafka 流的数据之前,您应该提取值并转换为您选择的数据类型。由于您提交的是 json 值,因此我使用架构 temperature_schema 来提取 eventTimetemperature 并将它们转换为流数据帧中的列。

您添加了一个水印很好,但是timestamp 列在您的流数据框中不存在,因此我继续使用您的eventTime,它被转换为timestamp

我还对您的 kafka 接收器的输出进行了修改。您收到错误是因为 kafka 接收器需要一组特定的列名:

Column Type
key (optional) string or binary
value (required) string or binary
headers (optional) array
topic (*optional) string
partition (optional) int

工作和运行 kafka 作业时的其他文档可用here。请务必根据您的 spark 版本参考文档。

更新了 Kafka 流作业


from pyspark.sql import functions as F
from pyspark.sql import types as T

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092") # kafka server
  .option("subscribe", "temperature") # topic
  .option("startingOffsets", "earliest") # start from beginning
  .load())

# create schema for temperature
temperature_schema = T.StructType([
    T.StructField("eventTime",T.StringType(),True),
    T.StructField("temperatura",T.FloatType(),True),
])

# extract temperature data and ensure `eventTime` is timestamp
df = (
    df.selectExpr("CAST(value as string)")
      .select(F.from_json(F.col("value"),temperature_schema).alias("json_value"))
      .selectExpr("json_value.*") # gives us a dataframe with columns (eventTime,temperatura)
      .select(
          F.expr("CAST(eventTime as timestamp)").alias("eventTime"),
          F.col("temperatura")
      )
      
)

# when using window you will get a range or value resembling [start,end]. 
# I have chosen the `start` for this example

windowedAvg = ( 
    df.withWatermark("eventTime", "5 minutes") 
      .groupBy(window(F.col("eventTime"), "5 minutes", "60 minutes").alias('eventTimeWindow'))
      .agg(F.avg("temperatura").alias("avgtemperature")) 
      .select(
          F.col("eventTimeWindow.start").alias("eventTime"),
          F.col("avgtemperature")
      )
)

# continue with your code to write to your various streams
query = windowedAvg\
        .writeStream\
        .outputMode('complete')\
        .format('console')\
        .option('truncate', 'true')\
        .start()


# write on kafka topic avgtemperature
# here i've chosen as an example to use the eventTime as the key and the value to be the avgtemperature
qk = (windowedAvg 
        .select(
            F.expr("CAST(eventTime AS STRING)").alias("key"),
            F.expr("CAST(avgtemperature AS STRING)").alias("value")
        )
        .writeStream 
        .format("kafka") 
        .option("kafka.bootstrap.servers", "localhost:9092") 
        .option("checkpointLocation", "/home/kafka/Documenti/confluent/examples-6.1.0-post/clients/cloud/python/kafkaStream") 
        .option("topic", "avgtemperature")  
        .outputMode("complete") 
        .start())
        
query.awaitTermination()

【讨论】:

    猜你喜欢
    • 2014-09-01
    • 1970-01-01
    • 2020-09-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-06-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多