【发布时间】:2020-11-30 22:11:40
【问题描述】:
我编写了一些示例代码连接到 kafka 代理,从主题读取数据并将其接收到 snappydata 表。
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SQLContext, Row, SparkSession
from pyspark.sql.snappy import SnappySession
from pyspark.rdd import RDD
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import col, explode, split
import time
import sys
def main(snappy):
logger = logging.getLogger('py4j')
logger.info("My test info statement")
sns = snappy.newSession()
df = sns \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "10.0.0.4:9092") \
.option("subscribe", "test_import3") \
.option("failOnDataLoss", "false") \
.option("startingOffsets", "latest") \
.load()
bdf = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
streamingQuery = bdf\
.writeStream\
.format("snappysink") \
.queryName("Devices3") \
.trigger(processingTime="30 seconds") \
.option("tablename","devices2") \
.option("checkpointLocation","/tmp") \
.start()
streamingQuery.awaitTermination()
if __name__ == "__main__":
from pyspark.sql.snappy import SnappySession
from pyspark import SparkContext, SparkConf
sc = SparkSession.builder.master("local[*]").appName("test").config("snappydata.connection", "10.0.0.4:1527").getOrCreate()
snc = SnappySession(sc)
main(snc)
我正在使用命令提交它
/opt/snappydata/bin/spark-submit --master spark://10.0.0.4:1527 /path_to/file.py --conf snappydata.connection=10.0.0.4:1527
一切正常,从 Kafka Topic 读取数据并写入 snappydata 表。 我不明白为什么我在 SnappyData 仪表板 UI 中看不到这个流式查询 - 在控制台中提交 pyspark 代码后,我看到新的 Spark Master UI 启动了。
我如何才能从 pySpark 连接到 SnappyData 内部 Spark Master?
【问题讨论】:
标签: apache-spark pyspark snappydata