【发布时间】:2020-10-11 10:33:08
【问题描述】:
我会尽量保持简单。 我定期从 kafka 生产者那里读取一些数据并使用 Spark 结构化流输出以下内容
我有这样的输出数据:
+------------------------------------------+-------------------+--------------+-----------------+
|window |timestamp |Online_Emp |Available_Emp |
+------------------------------------------+-------------------+--------------+-----------------+
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:27|1 |0 |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:41|1 |0 |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:29|1 |0 |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:20|1 |0 |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:23|2 |0 |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:52|1 |0 |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:08|1 |0 |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:12|1 |0 |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:02|1 |1 |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:11|1 |0 |
+------------------------------------------+-------------------+--------------+-----------------+
我希望它像这样输出:
Time Online_Emp Available_Emp
2017-01-01 00:00:00 52 23
2017-01-01 00:01:00 58 19
2017-01-01 00:02:00 65 28
所以基本上它会计算每分钟在线的员工人数(通过他们唯一的司机 ID)并显示有多少可用。
请注意,一个特定的雇员 ID 可能会在一分钟内在
available和on_duty之间变化,我们需要在该分钟结束前得到最终统计
卡夫卡产品
_producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:
json.dumps(x).encode('utf-8'))
# schedule.every(1).minutes.do(_producer.send(topic_name, value=( json.loads(json.dumps(dataDict))) ) )
with open(filepath, 'r', encoding="utf16") as f:
for item in json_lines.reader(f):
dataDict.update({'timeStamp':item['timestamp'],
'emp_id':item['emp_id'],
'on_duty':item['on_duty']})
_producer.send(topic_name, value=( json.loads(json.dumps(dataDict))) )
sleep(1)
Spark 结构化流媒体
schema = StructType([ \
StructField("timeStamp", LongType()), \
StructField("emp_id", LongType()), \
StructField("on_duty", LongType())])
df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe","emp_dstream")\
.option("startingOffsets", "earliest")\
.load()\
.selectExpr("CAST(value AS STRING)")\
.select(F.from_json(F.col("value"), schema).alias("value"))\
.select(F.col("value.*"))\
.withColumn("timestamp",F.from_unixtime(F.col("timestamp") / 1000))\
.groupBy(F.window(F.col("timestamp"), "1 minutes"), F.col("timestamp"))\
.agg(F.count(F.col("timeStamp")).alias("total_employees"),F.collect_list(F.col("on_duty")).alias("on_duty"),F.sum(F.when(F.col("on_duty") == 0, F.lit(1)).otherwise(F.lit(0))).alias("not_on_duty"))\
.writeStream\
.format("console")\
.outputMode("complete")\
.option("truncate", "false")\
.start()\
.awaitTermination()
如何获得所需的输出?
将不胜感激任何提示或帮助!
【问题讨论】:
-
将此 .outputMode("complete") 更改为 .outputMode("update") 并检查您是否获得异常输出??
-
在单独的表格中显示新输出,但仍然很乱。我们的想法是将相同的分钟时间框架组合成一个结果(例如,16:00 - 16:01 只有一个条目)
-
在 10:01 时,Kafka 有 10 条记录可用。spark 将读取这些记录,并将在那一分钟聚合。如果您在 10:10 获得相同的时间戳数据,则此数据将被视为新数据,它将落入另一批...因此在您的最终数据中将有多个记录..检查火花流中的窗口函数的更多信息..
-
谢谢。是的,我明白这一点,但我试图在一排的同一分钟内将所有内容都存储起来。也许如果我将时间戳更改为忽略秒?可以这样吗?
-
检查还有一个窗口函数,它需要 3 个参数。最后一个参数来定义您的数据在当前组或批次中考虑的延迟时间。
标签: python apache-spark pyspark apache-kafka apache-spark-sql