【问题标题】:how to launch spark-job.py using spark-submit after kafka producer started如何在 kafka 生产者启动后使用 spark-submit 启动 spark-job.py
【发布时间】:2020-11-14 21:00:16
【问题描述】:

我有 3 个文件 - kafka producer.py、consumer.py 和 spark-job.py。我不知道如何启动 spark 文件以使其处理来自 kafka 的生成数据流。

  • 在第一个终端启动 zookeeper 服务器:

    .\bin\windows\zookeeper-start.bat .\config\zookeeper.properties

  • 然后在第二个单独的终端中启动 kafka-server:

    .\bin\windows\kafka-server-start.bat .\config\server.properties

然后在 2 个单独的终端中,我启动了 producer.py 和 consumer.py。

producer kafka 文件只是生成一些数据字典:

{分支、货币、金额}

每 5 秒左右将其生成到 kafka 集群。

from json import dumps
from time import sleep
from numpy.random import choice, randint
from kafka import KafkaProducer

def get_random_value():
    new_dict = {}
    branch_list = ["Almaty", "Astana", "Taraz", "Semei"]
    currency_list = ["KZT", "RUB", "GBP", "USD"]

    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(1, 100)
    new_dict['branch'] = choice(branch_list)
    # print(new_dict)
    return new_dict


if __name__ == "__main__":
    producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'],
                             value_serializer=lambda x: dumps(x).encode('utf-8'),
                             compression_type='gzip')
    topic_name = 'transaction'

    while True:
        for _ in range(100):
            data = get_random_value()
            try:
                message = producer.send(topic=topic_name, value=data)
                record_data = message.get(timeout=10)
                print('data: {}, offset: {}' \
                      .format(data, record_data.offset))
                #print(data)
            except Exception as e:
                print(e)
            finally:
                producer.flush()
        sleep(5)
    producer.close()

consumer 只打印该字典:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer('transaction',bootstrap_servers=['127.0.0.1:9092'])

print("start consuming")
for message in consumer:
    aa = json.loads(message.value.decode())
    print("currency: %s, amount: %d, branch: %s" %(aa['currency'], aa['amount'], aa['branch']))

生产者、消费者工作 - 同时输出到终端。

但是一旦这两个工作正常,我需要开始 spark-job。

Spark-job.py 监听 localhost:9092(kafka 也位于该地址),并将传入数据简单地写入数据库。

import sys
import os
import shutil

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
from pyspark.streaming.kafka import KafkaUtils
import json

outputPath = 'C:/Users/Admin/Downloads/madi_kafka/logs/checkpoints01'

def get_sql_query():
    strSQL = 'select from_unixtime(unix_timestamp()) as curr_time,t.branch as city,t.currency as currency,sum(amount) as amount from exchanges_stream t'
    return strSQL
# -------------------------------------------------
# Lazily instantiated global instance of SparkSession
# -------------------------------------------------
def getSparkSessionInstance(sparkConf):
    if ('sparkSessionSingletonInstance' not in globals()):
        globals()['sparkSessionSingletonInstance'] = SparkSession \
            .builder \
            .config(conf=sparkConf) \
            .getOrCreate()
    return globals()['sparkSessionSingletonInstance']
# -------------------------------------------------
# What I want to do per each RDD...
# -------------------------------------------------
def process(time, rdd):
    print("===========-----> %s <-----===========" % str(time))
    try:
        spark = getSparkSessionInstance(rdd.context.getConf())
        rowRdd = rdd.map(lambda w: Row(city=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))

        testDataFrame = spark.createDataFrame(rowRdd)

        testDataFrame.createOrReplaceTempView("exchanges_stream")

        sql_query = get_sql_query()
        testResultDataFrame = spark.sql(sql_query)
        testResultDataFrame.show(n=5)

        # Insert into DB
        try:
            testResultDataFrame.write \
                .format("jdbc") \
                .mode("append") \
                .option("driver", 'org.postgresql.Driver') \
                .option("url", "jdbc:postgresql://xxx") \
                .option("dbtable", "transaction_flow") \
                .option("user", "habr") \
                .option("password", "habr12345") \
                .save()
            
            print('DB write succesfull !')
        except Exception as e:
            print("-->Error with DB working!", e)

    except Exception as e:
        print("--> Error!", e)


# -------------------------------------------------
# General function
# -------------------------------------------------
def createContext():
    sc = SparkContext(appName="PythonStreamingKafkaTransaction")
    sc.setLogLevel("ERROR")

    ssc = StreamingContext(sc, 10)#  2

    broker_list, topic = sys.argv[1:]

    try:
        directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                                          [topic],
                                                          {"metadata.broker.list": broker_list})
    except:
        raise ConnectionError("Kafka error: Connection refused: \
                            broker_list={} topic={}".format(broker_list, topic))

    parsed_lines = directKafkaStream.map(lambda v: json.loads(v[1]))

    # RDD handling
    parsed_lines.foreachRDD(process)

    return ssc

if __name__ == "__main__":

    if len(sys.argv) != 3:
        print("Usage: spark_job.py <zk> <topic>", file=sys.stderr)
        exit(-1)

print("--> Creating new context")
if os.path.exists(outputPath):
    shutil.rmtree('outputPath')

ssc = StreamingContext.getOrCreate(outputPath, lambda: createContext())
ssc.start()
ssc.awaitTermination()

我不知道如何启动 spark-job.py。

当生产者不断生成消息时,我尝试启动

spark-submit \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,\
org.postgresql:postgresql:9.4.1207 \
spark_job.py localhost:9092 transaction

这给出了:

Exception in thread "main" org.apache.spark.SparkException: Cannot load main class from JAR org.postgresql:postgresql:9.4.1207 with URI org.postgresql. Please specify a class through --class.

如果我尝试启动这个 cmd:

python.exe .\spark_job.py 127.0.0.1:2181 transaction

它确实启动并创建了新的上下文,但仍然找不到一些文件:

--> Creating new context
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/07/25 06:12:46 WARN Checkpoint: Checkpoint directory C:/Users/Admin/Downloads/madi_kafka/logs/checkpoints01 does not exist

________________________________________________________________________________________________

  Spark Streaming's Kafka libraries not found in class path. Try one of the following.

  1. Include the Kafka library and its dependencies with in the
     spark-submit command as

     $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8:2.4.6 ...

  2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
     Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-0-8-assembly, Version = 2.4.6.
     Then, include the jar in the spark-submit command as

     $ bin/spark-submit --jars <spark-streaming-kafka-0-8-assembly.jar> ...

________________________________________________________________________________________________


Traceback (most recent call last):
  File ".\spark_job.py", line 88, in createContext
    {"metadata.broker.list": broker_list})
  File "C:\python37\lib\site-packages\pyspark\streaming\kafka.py", line 138, in createDirectStream
    helper = KafkaUtils._get_helper(ssc._sc)
  File "C:\python37\lib\site-packages\pyspark\streaming\kafka.py", line 217, in _get_helper
    return sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper()
TypeError: 'JavaPackage' object is not callable

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File ".\spark_job.py", line 114, in <module>
    ssc = StreamingContext.getOrCreate(outputPath, lambda: createContext())
  File "C:\python37\lib\site-packages\pyspark\streaming\context.py", line 107, in getOrCreate
    ssc = setupFunc()
  File ".\spark_job.py", line 114, in <lambda>
    ssc = StreamingContext.getOrCreate(outputPath, lambda: createContext())
  File ".\spark_job.py", line 91, in createContext
    broker_list={} topic={}".format(broker_list, topic))
ConnectionError: Kafka error: Connection refused:                             broker_list=127.0.0.1:2181 topic=transaction

【问题讨论】:

    标签: python apache-spark apache-kafka


    【解决方案1】:

    --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 是正确的

    假设

    1. 您正在使用 Spark 2.0.2 并安装了 Scala 2.11(两者都是旧的和过时的),加上 Spark 结构化 Streaming 是新的热点(打包为 spark-sql-kafka) ... 想指出:您的错误表明您有 Spark 2.4.6,其中不推荐使用 Spark Streaming,而 SQL-Kafka 包将为您省去将 RDD 转换为 Dataframes 的麻烦

    第一个错误与缺少 Postgres 类有关。如前所述,我强烈建议在Kafka Connect exists for this very purpose 时不要使用 Spark,但要修复,您需要将 postgres JAR 添加到包列表(或更准确地说,是 Spark 类路径)

    第二个错误是因为您现在通过名为 PYSPARK_SUBMIT_ARGS 的环境变量缺少 --packages 参数

    os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages <kafka>"
    
    sc = get_context()
    

    【讨论】:

    • 谢谢你,是的,我有 spark 2.4.6,Scala 版本 2.11.12,我用这个长命令使它工作:spark-submit --jars spark-streaming-kafka-assembly_2.11 -1.6.3.jar --class spark-streaming-kafka-0-8_2.11-2.4.6.jar ./spark_job.py 127.0.0.1:9092 交易
    • 我安装了那些 jars 文件,但是我有下一个错误,但这是另一个问题。
    • 再一次,那个 1.6.3 是不正确的。请为所有与 Spark 相关的 JAR 使用正确的 Spark 版本
    • 感谢指出 - 如果您在 maven mvnrepository.com/artifact/org.apache.spark/… 上查看此链接,则 1.6.3 是最新可用的
    猜你喜欢
    • 2018-04-19
    • 1970-01-01
    • 1970-01-01
    • 2018-12-18
    • 2018-01-21
    • 1970-01-01
    • 2019-09-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多