【发布时间】:2021-08-03 01:58:06
【问题描述】:
我有一个模拟温度传感器,可以将数据发送到 Apache Kafka 主题(我使用的是 Confluent 平台 6.1.0),我需要使用 Apache Spark 读取这个主题(我使用的是 3.1.1 版)和我需要在 5 分钟的窗口内计算平均温度(模拟传感器无延迟地发送数据..但我想考虑 60 分钟的延迟,因此我使用 withWatermark)。最后我想把这个平均值写在另一个 Kafka 主题上。 在我的代码中,它似乎不是根据温度数据计算的平均值,因为我有这个结果:
Batch: 0
-------------------------------------------
+--------------------+--------------+-----------+------------------+
| window|avg(partition)|avg(offset)|avg(timestampType)|
+--------------------+--------------+-----------+------------------+
|{2021-05-12 10:29...| 0.0| 134.5| 0.0|
|{2021-05-12 12:45...| 0.0| 175.5| 0.0|
|{2021-05-12 11:55...| 0.0| 167.0| 0.0|
|{2021-05-12 12:46...| 0.0| 178.5| 0.0|
|{2021-05-12 10:33...| 0.0| 156.5| 0.0|
|{2021-05-12 11:40...| 0.0| 160.0| 0.0|
|{2021-05-12 10:27...| 0.0| 122.5| 0.0|
|{2021-05-12 10:31...| 0.0| 146.5| 0.0|
|{2021-05-12 13:01...| 0.0| 181.5| 0.0|
|{2021-05-12 10:18...| 0.0| 80.0| 0.0|
|{2021-05-12 10:32...| 0.0| 152.5| 0.0|
|{2021-05-12 10:17...| 0.0| 37.5| 0.0|
|{2021-05-12 10:22...| 0.0| 94.0| 0.0|
|{2021-05-12 11:41...| 0.0| 164.0| 0.0|
|{2021-05-12 10:23...| 0.0| 98.5| 0.0|
|{2021-05-12 10:16...| 0.0| 3.5| 0.0|
|{2021-05-12 10:25...| 0.0| 110.5| 0.0|
|{2021-05-12 10:30...| 0.0| 140.5| 0.0|
|{2021-05-12 10:28...| 0.0| 128.5| 0.0|
|{2021-05-12 11:56...| 0.0| 171.5| 0.0|
+--------------------+--------------+-----------+------------------+
only showing top 20 rows
它不会在 Kafka 上写入数据,因为我收到错误消息:
pyspark.sql.utils.AnalysisException: cannot resolve '`partition`' given input columns: [avg(offset), avg(partition), avg(timestampType), window]; line 1 pos 4;
'Project [unresolvedalias('avg('partition), Some(org.apache.spark.sql.Column$$Lambda$1698/0x0000000840d45840@5117787c)), unresolvedalias(cast('key as string), None), unresolvedalias(cast('value as string), None)]
+- Aggregate [window#35], [window#35 AS window#21, avg(cast(partition#10 as bigint)) AS avg(partition)#32, avg(offset#11L) AS avg(offset)#33, avg(cast(timestampType#13 as bigint)) AS avg(timestampType)#34]
+- Filter isnotnull(timestamp#12)
+- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 60000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#12, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 60000000) + 0) + 60000000), LongType, TimestampType)) AS window#35, key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12-T60000ms, timestampType#13]
+- EventTimeWatermark timestamp#12: timestamp, 1 minutes
+- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@72bee61d, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@390238cf, [startingOffsets=earliest, kafka.bootstrap.servers=localhost:9092, subscribe=temperature], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5fbd93c5,kafka,List(),None,List(),None,Map(kafka.bootstrap.servers -> localhost:9092, subscribe -> temperature, startingOffsets -> earliest),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
这是我的代码(Kafka 生产者和 Spark 阅读器):
#Kafka producer
from confluent_kafka import Producer, KafkaError
import json
import ccloud_lib
import random
from random import randrange
import datetime
import time
if __name__ == '__main__':
# Read arguments and configurations and initialize
args = ccloud_lib.parse_args()
config_file = args.config_file
topic = args.topic
conf = ccloud_lib.read_ccloud_config(config_file)
# Create Producer instance
producer_conf = ccloud_lib.pop_schema_registry_params_from_config(conf)
producer = Producer(producer_conf)
# Create topic if needed
ccloud_lib.create_topic(conf, topic)
delivered_records = 0
# Optional per-message on_delivery handler (triggered by poll() or flush())
# when a message has been successfully delivered or
# permanently failed delivery (after retries).
def acked(err, msg):
global delivered_records
"""Delivery report handler called on
successful or failed delivery of message
"""
if err is not None:
print("Failed to deliver message: {}".format(err))
else:
delivered_records += 1
print("Produced record to topic {} partition [{}] @ offset {}"
.format(msg.topic(), msg.partition(), msg.offset()))
guid_base = "0-ZZZ12345678-"
formato = "urn:example:sensor:temp"
temp_data = {}
while 1:
rand_num = str(round(random.randrange(0, 9), 2)) + str(round(random.randrange(0, 9), 2))
temp_init_weight = round(random.uniform(-5, 5), 2)
temp_delta = round(random.uniform(10, 20), 2)
guid = guid_base + rand_num
temperature = temp_delta
today = datetime.datetime.today()
datestr = today.isoformat()
data = {'eventTime': datestr, 'temperatura': temperature} # ok
time.sleep(10)
record_key = "temperatura"
##record_value = json.dumps(temp_data)
record_value = json.dumps(data)
print("\n")
producer.produce(topic, key=record_key, value=record_value, on_delivery=acked)
producer.poll(0)
record_value = ''
producer.flush()
print("{} messages were produced to topic {}!".format(delivered_records, topic))
火花阅读器
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
# Spark session & context
spark = (SparkSession
.builder
.master('local')
.appName('TemperatureStreamApp')
# Add kafka package
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")
.getOrCreate())
sc = spark.sparkContext
# Create stream dataframe setting kafka server, topic and offset option
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092") # kafka server
.option("subscribe", "temperature") # topic
.option("startingOffsets", "earliest") # start from beginning
.load())
windowedAvg = df\
.withWatermark("timestamp", "5 minutes") \
.groupBy(
window(df.timestamp, "5 minutes", "60 minutes")).avg()
query = windowedAvg\
.writeStream\
.outputMode('complete')\
.format('console')\
.option('truncate', 'true')\
.start()
# write on kafka topic avgtemperature
qk = (windowedAvg \
.selectExpr("avg(partition)", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("checkpointLocation", "/home/kafka/Documenti/confluent/examples-6.1.0-post/clients/cloud/python/kafkaStream") \
.option("topic", "avgtemperature") \
.outputMode("complete") \
.start())
query.awaitTermination()
如何计算温度列的平均值并将结果保存在 Kafa 主题 avgtemperature 中? 谢谢。
【问题讨论】:
-
建议的答案对您有用吗?
标签: python apache-spark pyspark apache-kafka