【问题标题】:Twitter+Apache Kafka+ Spark Structured Streaming doesn't workTwitter+Apache Kafka+ Spark 结构化流不起作用
【发布时间】:2020-10-29 08:11:15
【问题描述】:

我想处理一些来自 github(https://github.com/kaantas/spark-twitter-sentiment-analysis) 的示例代码。我按照以下步骤操作;

  1. 启动zkserver
  2. 启动 kafka 2.5.0 版本(我也在使用 apache spark 3.0.0 和 jdk 8)
  3. 已启动 tweeetlistener.py(推文开始流式传输,我可以看到推文 cmd 窗口)
  4. 我用 Spyder 打开 twitter_topic_avg_sentiment_val.py,它只显示底部文本

注意:我对罐子一无所知,如果我要使用外部罐子,请解释一下如何? 非常感谢...

Traceback (most recent call last):

  File "C:\Users\merha\Desktop\spark-twitter-sentiment-analysis-master\twitter_topic_avg_sentiment_val.py", line 40, in <module>
    query.awaitTermination()

  File "C:\Anaconda3\lib\site-packages\pyspark\sql\streaming.py", line 103, in awaitTermination
    return self._jsq.awaitTermination()

  File "C:\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)

  File "C:\Anaconda3\lib\site-packages\pyspark\sql\utils.py", line 137, in deco
    raise_from(converted)

  File "<string>", line 3, in raise_from

StreamingQueryException: org/apache/spark/kafka010/KafkaConfigUpdater
=== Streaming Query ===
Identifier: [id = f5dd9cb5-fcea-42ec-a20e-93a2ad233e1f, runId = 6cffdd89-3792-4500-a508-e4abc76425fb]
Current Committed Offsets: {}
Current Available Offsets: {}

Current State: INITIALIZING
Thread State: RUNNABLE

------------------>>>---- --------------------

from tweepy import Stream
from tweepy.streaming import StreamListener
import json
import twitter_config
import pykafka
from afinn import Afinn
import sys
from sys import exit

class TweetListener(StreamListener):
    def __init__(self):
        self.client = pykafka.KafkaClient("localhost:9092")
        self.producer = self.client.topics[bytes('twitter3','ascii')].get_producer()

    def on_data(self, data):
        try:
            json_data = json.loads(data)

            send_data = '{}'
            json_send_data = json.loads(send_data)          
            json_send_data['text'] = json_data['text']
            json_send_data['senti_val']=afinn.score(json_data['text'])

            print(json_send_data['text'], " >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ", json_send_data['senti_val'])

            self.producer.produce(bytes(json.dumps(json_send_data),'ascii'))
            return True
        except KeyError:
            return True

    def on_error(self, status):
        print(status)
        return True

consumer_key = "xxxxxxxxxx"
consumer_secret = "xxxxxxxxxxx"
access_token = "xxxxxxxxxxxx"
access_secret = "xxxxxxxxxx"

auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)

# create AFINN object for sentiment analysis
afinn = Afinn()

twitter_stream = Stream(auth, TweetListener())
twitter_stream.filter(languages=['en'], track=["big data"])

---------->>>>>------------- --

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import json
import sys
from pyspark.sql.types import *

def fun(avg_senti_val):
    try:
        if avg_senti_val < 0: return 'NEGATIVE'
        elif avg_senti_val == 0: return 'NEUTRAL'
        else: return 'POSITIVE'
    except TypeError:
        return 'NEUTRAL'

if __name__ == "__main__":

    schema = StructType([                                                                                          
        StructField("text", StringType(), True),
        StructField("senti_val", DoubleType(), True)    
    ])
    
    spark = SparkSession.builder.appName("TwitterSentimentAnalysis")  .getOrCreate()
    
    kafka_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "twitter3").option("startingOffsets", "earliest").load()

    kafka_df_string = kafka_df.selectExpr("CAST(value AS STRING)")

    tweets_table = kafka_df_string.select(from_json(col("value"), schema).alias("data")).select("data.*")
    sum_val_table = tweets_table.select(avg('senti_val').alias('avg_senti_val'))
    
    # udf = USER DEFINED FUNCTION
    udf_avg_to_status = udf(fun, StringType())

    # avarage of senti_val column to status column
    new_df = sum_val_table.withColumn("status", udf_avg_to_status("avg_senti_val"))

    query = kafka_df_string.writeStream.format("console").option("truncate","false").start()
    
    query.awaitTermination()```

【问题讨论】:

    标签: python apache-spark apache-kafka streaming


    【解决方案1】:

    你有没有提交过带有 kafka 包的 spark 作为配置?见第三行。

    spark-submit --master yarn --deploy-mode cluster \
        --py-files "${PY_ZIP}" \
        --packages "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1" \
    

    【讨论】:

      【解决方案2】:

      在我下载并复制此 jar 文件后

      spark-token-provider-kafka-0-10

      到 spark jars 文件夹(或将其添加到 Spark_CLASSPATH),我的问题解决了。

      【讨论】:

      • 我尝试了您的解决方案,但问题仍然存在
      猜你喜欢
      • 2020-01-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-11-28
      • 2019-12-21
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多