【问题标题】:Why can not run multiple SparkContexts at once为什么不能一次运行多个 SparkContexts
【发布时间】:2018-12-08 19:05:47
【问题描述】:

我使用 python 2 和 spark。我按照这个链接https://github.com/Ruthvicp/CS5590_BigDataProgramming/wiki/Lab-Assignment-4----Spark-MLlib-classification-algorithms,-word-count-on-twitter-streaming上的说明如何计算推特上的字数 我有2个文件 TSWordCount

import findspark
findspark.init()
import pyspark
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import desc
from collections import namedtuple
import os
os.environ["SPARK_HOME"] = "C:\\spark-2.3.1-bin-hadoop2.7\\spark-2.3.1-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "C:\\winutils\\"

def main():
    sc =SparkContext(appName="Countwords1234")
    wordcount = {}
    ssc = StreamingContext(sc, 5)
    lines = ssc.socketTextStream("localhost", 5678)
    fields = ("word", "count")
    Tweet = namedtuple('Text', fields)
    # lines = socket_stream.window(20)
    counts = lines.flatMap(lambda text: text.split(" "))\
        .map(lambda x: (x, 1))\
        .reduceByKey(lambda a, b: a + b).map(lambda rec: Tweet(rec[0], rec[1]))
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":
    main()

当我运行这个文件时,它是成功的,输出是“Listening to port 5678”,我的第二个文件是 TwitterListener

import findspark
findspark.init()
import pyspark
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener
import socket
import json
import time

consumer_key = '30f****'
consumer_secret = 'smu7B******
access_token = '153*******'
access_secret = 'QIizsB***'


auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)

class TweetsListener(StreamListener):

    def __init__(self, csocket):
        self.client_socket = csocket

    def on_data(self, data):
        try:
            msg = json.loads(data)
            print(msg['text'].encode('utf-8'))
            self.client_socket.send(msg['text'].encode('utf-8'))
            return True
        except BaseException as e:
            print("Error on_data: %s" % str(e))
        return True

    def on_error(self, status):
        print(status)
        return True


def sendData(c_socket):
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_secret)

    twitter_stream = Stream(auth, TweetsListener(c_socket))
    twitter_stream.filter(track=['fifa'])


if __name__ == "__main__":
    s = socket.socket()  # Create a socket object
    host = "localhost"  # Get local machine name
    port = 5678 # Reserve a port for your service.
    s.bind((host, port))  # Bind to the port
    print("Listening on port: %s" % str(port))
    s.listen(5)  # Now wait for client connection.
    c, addr = s.accept()  # Establish connection with client.
    print("Received request from: " + str(addr))
    time.sleep(5)
    sendData(c)

就像你看到文件 twitter 监听器监听端口 localhost:5678 一样。然后在文件 TSWordCount 中,我使用 SparkContext(appname="") ,我想我应该把我的应用程序的名称放在 twitter 上,所以我把 Countwors124 放在那里。然后我通过 ssc.socketTextStream("localhost",5678) 调用端口。但是当我运行它时,我在 TSWordCount 出现错误,出现错误说 不能一次运行多个 SparkContexts;现有 SparkContext(app=PySparkShell, master=local[*]) 创建者 我搜索错误并找到了一个解决方案,例如使用 sc.stop(),因此我将其放在 ssc.awaitTermination() 之后。但它没有用。我现在该怎么办 ?

【问题讨论】:

    标签: python apache-spark pyspark jupyter-notebook


    【解决方案1】:

    我找到了答案。我用sc = SparkContext.getOrCreate() 替换了sc =SparkContext(appName="Countwords1234"),一切正常。虽然我还是不明白,但最终结果很重要,哈哈

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-03-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-06-05
      • 2016-08-23
      • 1970-01-01
      相关资源
      最近更新 更多