【问题标题】:Using named pipes with bash - Problem with data loss使用带有 bash 的命名管道 - 数据丢失问题
【发布时间】:2011-05-16 11:54:56
【问题描述】:

在网上做了一些搜索,找到了使用命名管道的简单“教程”。但是,当我对后台作业执行任何操作时,我似乎会丢失很多数据。

[[编辑:找到了一个更简单的解决方案,请参阅对帖子的回复。所以我提出的问题现在是学术性的——以防万一有人想要一个工作服务器]]

使用 Ubuntu 10.04 和 Linux 2.6.32-25-generic #45-Ubuntu SMP Sat Oct 16 19:52:42 UTC 2010 x86_64 GNU/Linux

GNU bash,版本 4.1.5(1)-release (x86_64-pc-linux-gnu)。

我的 bash 函数是:

function jqs
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT SIGKILL

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    if read txt <"$pipe"
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      fi
    fi
  done
}

我在后台运行:

> jqs&
[1] 5336

现在我喂它:

for i in 1 2 3 4 5 6 7 8
do
  (echo aaa$i > /tmp/__job_control_manager__ && echo success$i &)
done

输出不一致。 我经常没有得到所有成功的回声。 我收到的新文本回声最多与成功回声一样多,有时更少。

如果我从“提要”中删除“&”,它似乎可以工作,但在读取输出之前我会被阻止。因此我想让子进程被阻塞,而不是主进程。

目的是编写一个简单的作业控制脚本,这样我最多可以并行运行 10 个作业并将其余作业排队等待以后处理,但可靠地知道它们确实在运行。

下面是完整的工作经理:

function jq_manage
{
  export __gn__="$1"

  pipe=/tmp/__job_control_manager_"$__gn__"__
  trap "rm -f $pipe"    EXIT
  trap "break"      SIGKILL

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    date
    jobs
    if (($(jobs | egrep "Running.*echo '%#_Group_#%_$__gn__'" | wc -l) < $__jN__))
    then
      echo "Waiting for new job"
      if read new_job <"$pipe"
      then
    echo "new job is [[$new_job]]"

    if [[ "$new_job" == 'quit' ]]
    then
      break
    fi

    echo "In group $__gn__, starting job $new_job"
    eval "(echo '%#_Group_#%_$__gn__' > /dev/null; $new_job) &"
      fi
    else
      sleep 3
    fi
  done
}

function jq
{
  # __gn__ = first parameter to this function, the job group name (the pool within which to allocate __jN__ jobs)
  # __jN__ = second parameter to this function, the maximum of job numbers to run concurrently

  export __gn__="$1"
  shift
  export __jN__="$1"
  shift

  export __jq__=$(jobs | egrep "Running.*echo '%#_GroupQueue_#%_$__gn__'" | wc -l)
  if (($__jq__ '<' 1))
  then
    eval "(echo '%#_GroupQueue_#%_$__gn__' > /dev/null; jq_manage $__gn__) &"
  fi

  pipe=/tmp/__job_control_manager_"$__gn__"__

  echo $@ >$pipe
}

打电话

jq <name> <max processes> <command>
jq abc 2 sleep 20

将启动一个进程。 那部分工作正常。开始第二个,很好。 一个接一个似乎工作正常。 但是在循环中开始 10 似乎会丢失系统,就像上面更简单的示例一样。

任何关于我可以做些什么来解决这种明显的 IPC 数据丢失的提示将不胜感激。

问候, 阿兰。

【问题讨论】:

标签: linux bash named-pipes data-loss


【解决方案1】:

你的问题是if下面的声明:

while true
do
    if read txt <"$pipe"
    ....
done

发生的情况是,您的作业队列服务器在每次循环时都会打开和关闭管道。这意味着一些客户端在尝试写入管道时会收到“管道损坏”错误 - 也就是说,在写入器打开管道后,管道的读取器会消失。

要解决此问题,请更改服务器中的循环,为整个循环打开管道一次:

while true
do
    if read txt
    ....
done < "$pipe"

这样做,管道打开一次并保持打开状态。

您需要注意循环内运行的内容,因为循环内的所有处理都将标准输入附加到命名管道。您需要确保从其他地方重定向循环内所有进程的标准输入,否则它们可能会消耗管道中的数据。

编辑:现在的问题是,当最后一个客户端关闭管道时,您在读取时收到 EOF,您可以使用 jilles 方法复制文件描述符,或者您可以确保您也是客户端并保持管道的写入端打开:

while true
do
    if read txt
    ....
done < "$pipe" 3> "$pipe"

这将保持管道的写入端在 fd 3 上打开。同样的警告适用于这个文件描述符和标准输入。您将需要关闭它,以便任何子进程都不会继承它。它可能没有标准输入那么重要,但它会更干净。

【讨论】:

  • 哇,很好的答案。说得通。谢谢。马上试一试。
  • 好的,现在你解决了关键问题,我还有一个问题:你如何让读取等待输入?我将在下面用示例代码进一步回复自己。
  • @asoundmove:我已经用阅读时 EOF 的解决方案更新了答案。
  • 这种方法避免了依赖于非POSIX的行为,但在某些情况下可能是有原因的。
【解决方案2】:

正如在其他答案中所说,您需要始终保持先进先出,以避免丢失数据。

但是,一旦在打开 fifo 后所有写入者都离开(所以有写入者),读取立即返回(poll() 返回POLLHUP)。清除此状态的唯一方法是重新打开 fifo。

POSIX 没有为此提供解决方案,但至少 Linux 和 FreeBSD 提供了解决方案:如果读取开始失败,请再次打开 fifo,同时保持原始描述符打开。这是因为在 Linux 和 FreeBSD 中,“挂起”状态是特定打开文件描述的本地状态,而在 POSIX 中,它是 fifo 的全局状态。

这可以在这样的 shell 脚本中完成:

while :; do
    exec 3<tmp/testfifo
    exec 4<&-
    while read x; do
        echo "input: $x"
    done <&3
    exec 4<&3
    exec 3<&-
done

【讨论】:

  • 在 Bash 中,而不是 { ... read ... } &lt;&amp;3,您可以使用 read -u 3 从指定的文件描述符编号而不是 0 中读取。
  • @ephemient read -u 3 xread x &lt;&amp;3 有什么优势?
  • 哇,这行得通!你能解释一下为什么我不能使用 fd 1 而不是 3 吗?它第一次工作,但随后失败。我将发布单独的评论以完整显示最新的脚本。
  • @jilles:据我所知 read x
  • @jilles:优点是很明显(无需寻找与while ...; do 匹配的doneread 正在从 FD 3 读取,而其余的 do ...; done body 没有重定向 FD 0。
【解决方案3】:

仅针对那些可能感兴趣的人,[[re-edited]] 关注 camh 和 jilles 的 cmets,这里有两个新版本的测试服务器脚本。

现在两个版本都可以正常工作。

camh 的管道管理版本:

function jqs    # Job queue manager
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT TERM

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    if read -u 3 txt
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      else
        sleep 1
        # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand
      fi
    fi
  done 3< "$pipe" 4> "$pipe"    # 4 is just to keep the pipe opened so any real client does not end up causing read to return EOF
}

jille 的管道管理版本:

function jqs    # Job queue manager
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT TERM

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  exec 3< "$pipe"
  exec 4<&-

  while true
  do
    if read -u 3 txt
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      else
        sleep 1
        # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand
      fi
    else
      # Close the pipe and reconnect it so that the next read does not end up returning EOF
      exec 4<&3
      exec 3<&-
      exec 3< "$pipe"
      exec 4<&-
    fi
  done
}

感谢大家的帮助。

【讨论】:

  • 你无法捕捉到 SIGKILL。没有意义尝试。另外,请参阅我上次的编辑,了解不需要复制文件描述符的更简单方法。
  • 好的。经过测试,您当然是对的。感谢camh的指点。
【解决方案4】:

就像 camh 和丹尼斯威廉姆森所说的不要打破管道。

现在我有更小的例子,直接在命令行上:

服务器:

(
  for i in {0,1,2,3,4}{0,1,2,3,4,5,6,7,8,9};
  do
    if read s;
      then echo ">>$i--$s//";
    else
      echo "<<$i";
    fi;
  done < tst-fifo
)&

客户:

(
  for i in {%a,#b}{1,2}{0,1};
  do
    echo "Test-$i" > tst-fifo;
  done
)&

可以将关键行替换为:

    (echo "Test-$i" > tst-fifo&);

发送到管道的所有客户端数据都会被读取,尽管使用选项二的客户端可能需要在读取所有数据之前启动服务器几次。

但是虽然读取等待管道中的数据开始,但一旦数据被推送,它会永远读取空字符串。

有什么办法可以阻止这种情况?

再次感谢您提供任何见解。

【讨论】:

    【解决方案5】:

    一方面,问题比我想象的要严重: 现在,在我的更复杂的示例 (jq_manage) 中似乎存在这样一种情况,即从管道中一遍又一遍地读取相同的数据(即使没有新数据写入其中)。

    另一方面,我找到了一个简单的解决方案(根据丹尼斯的评论进行了编辑):

    function jqn    # compute the number of jobs running in that group
    {
      __jqty__=$(jobs | egrep "Running.*echo '%#_Group_#%_$__groupn__'" | wc -l)
    }
    
    function jq
    {
      __groupn__="$1";  shift   # job group name (the pool within which to allocate $__jmax__ jobs)
      __jmax__="$1";    shift   # maximum of job numbers to run concurrently
    
      jqn
      while (($__jqty__ '>=' $__jmax__))
      do
        sleep 1
        jqn
      done
    
      eval "(echo '%#_Group_#%_$__groupn__' > /dev/null; $@) &"
    }
    

    像魅力一样工作。 不涉及套接字或管道。 很简单。

    【讨论】:

    • 没有理由导出__jqty__(或任何原始导出)。为什么要直接向/dev/null 回音?为什么使用eval?为什么不直接做$@&amp;?没有必要引用&gt;=。我同意 camh 的回答。
    • 这一切都归结为读取和过滤 ps 的输出。回显到 /dev/null 因为我实际上并不想要输出,我只想要 'ps' 输出中的正确字符串。 eval 也是如此,否则 ps 显示变量名,而不是扩展变量,eval 进行扩展。我以前从未使用过 ((...)) ,所以感谢您指出我不需要引号,我只是举了一个我在某处读过的示例,也感谢导出,它是前一个的遗留物具有子流程并需要导出的更复杂的脚本。
    • 对不起,我指的是“工作”,而不是“ps”
    【解决方案6】:

    最多并行运行 10 个作业并将其余作业排队等待以后处理,但可靠地知道它们确实在运行

    您可以使用 GNU Parallel 来做到这一点。您将不需要此脚本。

    http://www.gnu.org/software/parallel/man.html#options

    您可以设置 max-procs “作业槽数。最多并行运行 N 个作业。”有一个选项可以设置您要使用的 CPU 内核数。您可以将已执行作业的列表保存到日志文件中,但这是一个测试版功能。

    【讨论】:

      猜你喜欢
      • 2021-08-28
      • 2015-04-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-04
      • 2016-03-30
      • 2010-12-18
      • 2012-04-15
      相关资源
      最近更新 更多