【问题标题】:Python Twisted TCP application - How to prevent incoming message loss by blocking processPython Twisted TCP 应用程序 - 如何通过阻塞进程来防止传入消息丢失
【发布时间】:2013-08-10 12:36:47
【问题描述】:

我有 10 条消息/秒(总活动)来自 40 个客户端的 TCP。 我需要接收每条消息并执行 5 秒的过程(查找 Web 服务,执行一些 DB 查询,最后将结果写入 DB)。

如何区分来自缓慢的 5 秒进程的消息?此外,我可能会在已经为该客户端处理消息的同时收到来自客户端的另一条消息。我永远不想丢失消息。

【问题讨论】:

  • 是什么让您认为您正在丢失消息?如果您使用 TCP,则在操作系统级别存在排队。 TCP不是面向消息的协议,它是面向流的。字节只是堆积在缓冲区中,直到您的应用程序读取它们。您的应用程序不会遗漏任何内容。
  • 埃里克你说得很好。我会测试一下。我让服务器休眠并发送 150 条消息。然后当它醒来时,我会检查他们是否到达。回答您的问题:客户抱怨消息“可能”丢失。我需要改变一些事情,这样我们才能知道真正发生了什么。
  • 您是否实现了应用程序的客户端部分?如果是这种情况,检查“丢失”消息应该相对容易。
  • 好问题。我无法控制客户质量。他们被赋予了一个简单的文本协议来遵循。他们自己实施。第二个目标是找出哪些客户端没有按预期工作。 (应用程序所有者将订购重建)。我已经启动了一个小测试服务器,它通过一系列测试运行客户端。我还启动了一个以理想方式工作的虚拟客户端。

标签: python multithreading multiprocessing twisted zeromq


【解决方案1】:

有了 Twisted,答案就是做你想做的事:

from twisted.python.log import err
from twisted.internet.protocol import Protocol

class YourProtocol(Protocol):
    ...
    def messageReceived(self, message):
        d = lookupWebService(message)
        d.addCallback(queryDatabase)
        d.addCallback(saveResults)
        d.addErrback(err, "Servicing %r failed" % (message,))

您可以在twisted.web.client 中找到用于与 Web 服务交互的 API(假设“Web 服务”是您使用 HTTP 客户端与之交谈的东西)。您可以在twisted.enterprise.adbapi 中找到用于与某些 SQL 数据库服务器交互的 API。您可以通过谷歌搜索找到用于与其他类型数据库交互的 API。

【讨论】:

  • 看起来不错。请问d到底是什么? JP 我想开始协议之外的任务。我只希望协议将消息传递给队列。然后我可以从队列中做类似上面的事情。我不想在协议中启动回调的原因是我想从队列的中心点轻松监控所有任务的情况。
  • dDeferred。我不确定你在说什么“队列”。我没有使用队列,因为我不需要。
  • 谢谢你,JP。我正在调查。
【解决方案2】:

使用Divide and Conqueur并行分配任务。

许多说明这种方法的 Python 示例,请在此处阅读:

您还可以使用 ROUTER/DEALER 代理分发任务。到达代理的消息被公平排队并在下游工作人员之间分发,没有回话;这种方法可能更适合您的需求。

【讨论】:

  • 谢谢拉菲安。目前我被困在这里做什么:while True: msg = subscribe.recv()。例如,如果我为我正在做的事情设置了一个路由器/经销商——也许是 OTT——我将如何控制工作人员的数量以匹配传入的工作负载?
猜你喜欢
  • 2023-03-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-12-26
  • 2012-12-30
  • 1970-01-01
  • 2012-09-10
  • 1970-01-01
相关资源
最近更新 更多