【问题标题】:Unhandled error in Deferred? Using UNIX socket with TwistedDeferred 中未处理的错误?使用带有 Twisted 的 UNIX 套接字
【发布时间】:2017-10-31 16:05:44
【问题描述】:

我对网络编程和事件驱动事件完全陌生。但是,我能够使用我的机器(服务器)和用于测试的客户端机器(命令行)之间的 TCP 连接成功实现发布订阅方案。但是,我需要实际使用带有 Twisted 的 UNIX 套接字。

运行代码时出现以下错误:

Unhandled error in Deferred

这是我的pub_sub.py 代码:

"""
a networking implementation of PubSub using Twisted.
=============
PubSub Server
=============
A PubSub server listens for subscription requests and publish commands, and, when
published to, sends data to subscribers. All incoming and outgoing requests are
encoded in JSON.
A Subscribe request looks like this:
    {
        "command": "subscribe",
        "topic": "hello"
    }
A Publish request looks like this:
    {
        "command": "publish",
        "topic": "hello",
        "data": {
            "world": "WORLD"
        }
    }
When the server receives a Publish request, it will send the 'data' object to all
subscribers of 'topic'.
"""

import argparse
import json
import logging

from collections import defaultdict

from twisted.internet import reactor
from twisted.python import log
from twisted.python.filepath import FilePath
from twisted.internet.endpoints import UNIXClientEndpoint, UNIXServerEndpoint, \
                                       connectProtocol
from twisted.internet.protocol import Protocol, Factory


class PubSubProtocol(Protocol):

    def __init__(self, topics):
        self.topics = topics
        self.subscribed_topic = None

    def connectionLost(self, reason):
        print("Connection lost: {}".format(reason))
        if self.subscribed_topic:
            self.topics[self.subscribed_topic].remove(self)

    def dataReceived(self, data):
        print("Data received: {}".format(data))
        try:
            request = json.loads(data)
        except ValueError:
            logging.debug("ValueError on deconding incoming data. "
                          "Data: {}".format(data), exc_info=True)
            self.transport.loseConnection()
            return

        if request['command'] == 'subscribe':
            self.handle_subscribe(request['topic'])
        elif request['command'] == 'publish':
            self.handle_publish(request['topic'], request['data'])

    def handle_subscribe(self, topic):
        print("Subscribed to topic: {}".format(topic))
        self.topics[topic].add(self)
        self.subscribed_topic = topic

    def handle_publish(self, topic, data):
        request = json.dumps(data)

        for protocol in self.topics[topic]:
            protocol.transport.write(request)
        print("Publish sent for topic: {}".format(topic))


class PubSubFactory(Factory):

    def __init__(self):
        self.topics = defaultdict(set)

    def buildProtocol(self, addr):
        return PubSubProtocol(self.topics)


class PublisherProtocol(Protocol):
    """
    Publish protocol for sending data to client, i.e. front-end web GUI.
    """
    def __init__(self, topic, **kwargs):
        self.topic = topic
        self.kwargs = kwargs

    def connectionMade(self):
        request = json.dumps({
            'command': 'publish',
            'topic': self.topic,
            'data': self.kwargs,
        })

        self.transport.write(request)
        self.transport.loseConnection()


class SubscriberProtocol(Protocol):
    """
    Subscriber protocol for client sending a request to subscribe to a specific
    topic.
    """
    def __init__(self, topic, callback):
        self.topic = topic
        self.callback = callback

    def connectionMade(self):
        request = json.dumps({
            'command': 'subscribe',
            'topic': self.topic,
        })

        self.transport.write(request)

    def dataReceived(self, data):
        kwargs = json.loads(data)

        self.callback(**kwargs)


class PubSub(object):

    def __init__(self, path='./.sock'):
        self.path = FilePath(path)
        self.reactor = reactor

    def _make_connection(self, protocol):
        endpoint = UNIXClientEndpoint(reactor, self.path)
        connection = connectProtocol(endpoint, protocol)

    def subscribe(self, topic, callback):
        """
        Subscribe 'callback' callable to 'topic'.
        """
        sub = SubscriberProtocol(topic, callback)
        self._make_connection(sub)

    def publish(self, topic, **kwargs):
        """
        Publish 'kwargs' to 'topic', calling all callables subscribed to 'topic'
        with the arguments specified in '**kwargs'.
        """
        pub = PublisherProtocol(topic, **kwargs)
        self._make_connection(pub)

    def run(self):
        """
        Convenience method to start the Twisted event loop.
        """
        self.reactor.run()


def main():
    path = FilePath("./.sock")
    endpoint = UNIXServerEndpoint(reactor, path)
    endpoint.listen(PubSubFactory())

    reactor.run()


if __name__ == '__main__':
    main()

如果我做错了什么,任何帮助将不胜感激。

谢谢,

布赖恩

【问题讨论】:

    标签: python twisted unix-socket twisted.internet


    【解决方案1】:

    您似乎在 Windows 上运行您的软件。唉,UNIX 套接字在 Windows 上不可用。如果你想使用 UNIX 套接字,你需要使用更 POSIX-ish 的环境——Linux、*BSD、OS X 等。

    【讨论】:

    • 谢谢!我昨晚意识到了这一点,并在我的 Linux 机器上运行了相同的代码,一切正常。感谢您的帮助:)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-12-17
    • 1970-01-01
    • 1970-01-01
    • 2015-02-06
    • 2012-05-15
    相关资源
    最近更新 更多