【问题标题】:Stream one STDIN over websocket to multiple clients通过 websocket 将一个 STDIN 流式传输到多个客户端
【发布时间】:2017-09-14 09:44:54
【问题描述】:

目前我在 Linux 中有“streamgenerator”,它将数据输出到 stdout/namedpipe 等。我在网上发现 websocketd 有点接近我的需要,但问题是它为每个连接的客户端生成新进程。
我正在寻找的是将相同的数据(将数据生成到标准输出的单个进程)流式传输到多个客户端。

$ websocketd --port=8080 ./streamgenerator  

将为每个与 websocket 的新连接创建新的流生成器​​进程。我想要的是一个流生成器实例,并为所有客户端复制它的输出。

有什么简单的方法吗?我现在唯一想到的就是编写 C 程序,它将 STDIN 带入大小为 X 的缓冲区,并且每个客户端都有指向该缓冲区的指针(指向客户端能够读取的位置)......如果客户端连接他将开始只获取新数据......如果客户端太慢......他的“READ”指针将超出缓冲区他的连接将被丢弃,因为他无法跟上。

我的问题是有什么办法不开发这个工具?首先,我认为管道连接到命名管道,然后从中读取 websocketd ......但这当然行不通,因为 websocket 的第一个客户端将读取数据并将它们丢弃......

【问题讨论】:

  • 从 STDOUT 流式传输数据不会危及第一条消息的完整性吗?即,如果在 STDOUT 输出 JSON 对象的中间发起连接,则只会发送 JSON 的最后一部分......还是我遗漏了什么?
  • 这是我将在客户端 Javascript 处理的事情...连接后丢弃字节,直到您看到下一条消息开始...您可以将其想象为视频流...。随时连接只是等待下一个 iframe...但是在这种情况下与视频无关...

标签: linux unix websocket


【解决方案1】:

根据websocketd source code,它似乎使用基于进程的服务器(每个连接一个新进程)....我不是 GO 程序员,所以我不确定我是否正确阅读,但README 似乎表示相同的概念。

因此...

问题是它为每个连接的客户端生成新进程

这是无法避免的。该设计意味着新连接与它们的 STDIN 和 STDOUT 本质上是分叉的。无法从新连接中访问流式传输到原始 STDIN 的数据...

...因此不能选择使用单个 streamgeneratorwebsocketd

我现在唯一想到的就是写 C 程序......

我认为这可能是避免多进程设计的唯一方法。

您不必为此使用 C。您可能可以同样轻松地使用 Ruby 或 node.js(它们可能更容易编写,而您会为方便而付出代价)。

我的问题是有什么办法不开发这个工具?

我不这么认为。

但是,facil.io 可以让创作一个使用其原生 Pub/Sub API 广播数据的 C Web 套接字工具变得相当简单(允许您在未来使用 Redis 轻松扩展)...作者,我有偏见。

编辑

这是一个简短的 Ruby 脚本,它将数据从管道广播到任何连接的 Web 套接字连接(数据由行分隔)。

文件script.rb

#!/usr/bin/env ruby
require 'iodine'

class Example
    def self.call(env)
      if env['upgrade.websocket?'.freeze] && env["HTTP_UPGRADE".freeze] =~ /websocket/i.freeze
        env['upgrade.websocket'.freeze] = Example.new
        return [0,{}, []] # It's possible to set cookies for the response.
      end
      [404, {"Content-Length" => "12"}, ["Bad Request."] ]
    end 

    def on_open
        subscribe channel: :stream, force: :text
    end
    def on_message data
        close # are we expecting any messages?
    end
    def on_close
        # nothing to do
    end
    def on_shutdown
        # server is going away, notify client.
    end
end

Iodine::Rack.app = Example

# remove these two lines for automatice, core related, detection
Iodine.processes = 1;
Iodine.threads = 1;

# initialize the Redis engine for each iodine process.
require 'uri'
if ENV["REDIS_URL"]
  uri = URI(ENV["REDIS_URL"])
  Iodine.default_pubsub = Iodine::PubSub::RedisEngine.new(uri.host, uri.port, 0, uri.password)
else
  puts "* No Redis, it's okay, pub/sub will still run on the whole process cluster."
end

# Create the loop that reads from ARGF (the pipe)
# defer threading because we might fork the main server
root_pid = Process.pid
Iodine.run do 
    puts "Starting to listen to pipe"
    if(root_pid == Process.pid)
        Thread.new do
            ARGF.each_line do |s|
                Iodine.publish channel: :stream, message: s
                puts "read:", s
            end
        end
    end
end

# start iodine
Iodine.start

您可以通过以下方式从终端使用它:

$ streamgenerator | ruby script.rb

这只是一个肮脏的例子,但它说明了这可能是多么容易。

哦,它需要the iodine gem,这是 fail.io 到 Ruby(也是我的)的端口。

编辑 2

我添加了一些代码,允许您将 Redis 与我提供的示例代码一起使用,并将发布限制为单个进程。

Redis 引擎是 facil.io 的原生引擎(它是 C 语言,带有一个用于碘的 Ruby 桥),您可以使用它来发送命令以及 Pub/Sub。

如果您使用 Redis 在多台机器上进行扩展,我会考虑将脚本拆分为发布者脚本和服务器应用程序。

另外,如果您使用多台机器,则只需要 Redis。如果您使用 Iodine.processes = 8 运行 Iodine,则 pub/sub 引擎仍然可以工作。

Iodine 还有很多其他功能,如果您需要的话,例如静态文件服务等。

您还可以将整个东西打包到一个中间件中,并使其成为现有 Rails/Sintara/Rack 项目的一部分,使用 iodine 作为服务器(用于 Websocket 支持)。

...

至于:

  $ ./streamgenerator | pub --channel "redisstream"
  $ websocketd --port=8080 sub --channel "redisstream"

听起来这会缓解这个问题,尽管我认为websocketd 仍然会为每个连接打开一个新进程,它比事件“反应器模式”使用更多的资源,例如 nginx 等服务器使用的资源(和碘,乘客,美洲狮和其他一堆)。

【讨论】:

  • 这是迄今为止最好的解决方案。比编写自定义 C 代码更容易,它是在我使用的 Ruby 中&& 喜欢的。搜索您的建议,我发现 Redis 有 Pub/Sub 命令行工具。因此我应该能够做这样的事情: $ ./streamgenerator | pub --channel "redisstream" 和 websoketd 端:$websocketd --port=8080 sub --channel "redisstream" 我最大的问题是它是用 haskell 编写的,这对我来说是黑匣子:(
  • @underhood - 我在示例代码中添加了可选的 Redis pub/sub。只有在使用多台机器时才需要 Redis。它实际上只是几行代码。 Iodine 还提供了一些命令行参数(例如p 用于port 等)。如果您喜欢我的回答,请随时投票 ;-)
【解决方案2】:

我不明白这个问题。这不应该做的伎俩:

streamgenerator | tee fifo1 | tee fifo2 | tee fifo3

【讨论】:

  • 是的,这是正确的方向......但是如何将它连接到 websocketd?每次客户端连接时如何创建“三通拆分”?我如何理解 websocketd 是它执行作为参数给出的命令并通过 websocket 转发其 STDOUT。问题是它对每个客户端都这样做......我需要流生成器只运行一次,因为资源很重。注意 websocketd,因为 websocket 服务器不是固定的……它只是我找到的最接近的解决方案,不需要实现自定义工具。
猜你喜欢
  • 2012-04-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-06-21
  • 2014-12-21
  • 2023-03-02
  • 2018-07-15
相关资源
最近更新 更多