【发布时间】:2022-01-23 19:55:57
【问题描述】:
我正在尝试从 Azure 事件中心读取数据并以 Spark 流模式将此数据帧存储到 Mysql 表中。
下面是我的 pyspark 代码
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
from datetime import datetime as dt
from pyspark.sql import DataFrameWriter
try:
session = SparkSession.builder.master("local").appName("dataingestion").config("")
spark = session.getOrCreate()
print("Successfully build spark session : ")
except:
print("Fail to build spark session : ")
raise Exception
startOffset = "-1"
startingEventPosition = {
"offset": startOffset,
"seqNo": -1, # not in use
"enqueuedTime": None, # not in use
"isInclusive": True,
}
endTime = dt.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
endingEventPosition = {
"offset": None, # not in use
"seqNo": -1, # not in use
"enqueuedTime": endTime,
"isInclusive": True
}
ehreadConf = {}
ehreadConf["eventhubs.startingPosition"] = json.dumps(startingEventPosition)
ehreadConf["eventhubs.endingPosition"] = json.dumps(endingEventPosition)
connectionString = "eventhub-connection-string"
ehreadConf['eventhubs.connectionString'] = connectionString
try:
inputStream = spark.readStream.format("eventhubs").options(**ehreadConf).load()
print("Successfully connected the event hub : ")
print("Check streaming is started or not : ", inputStream.isStreaming)
print("Schema of inputStream : ", inputStream.printSchema())
except Exception:
print("Fail to connect with Azure event hub : ")
raise Exception
inputStream = inputStream.withColumn("body", inputStream["body"].cast("string"))
server_name = "jdbc:mysql://localhost:3306"
database_name = "eventhub"
jdbcurl = server_name + "/" + database_name
print('%' * 100)
print(jdbcurl)
table_name = "stream_cdr_data"
username = "user"
password = "data@123"
try:
print("Trying to connect MySql sql : ")
sparkDf.writeStream \
.format("jdbc") \
.outputMode("append") \
.option("url", jdbcurl) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.option("driver", "com.mysql.jdbc.Driver") \
.option("checkpointLocation", "./checkpoint") \
.start().awaitTermination(True)
print("Connection to the MySql is successful : ")
except ValueError as error:
print("Connector write failed", error)
spark.sparkContext.stop()
spark.stop()
但我无法将此 spark 数据帧存储到 mysql 表中。 我收到一个错误,比如数据源 jdbc 不支持火花流。
py4j.protocol.Py4JJavaError:调用 o68.start 时出错。 : java.lang.UnsupportedOperationException: 数据源jdbc不支持流式写入>
【问题讨论】:
标签: python mysql apache-spark pyspark