【问题标题】:C# StreamReader.ReadLine returning null before end of streamC# StreamReader.ReadLine 在流结束之前返回 null
【发布时间】:2016-08-31 18:07:43
【问题描述】:

我正在使用SSH.NET 库使用inotifywait 命令在远程Linux 服务器上实现文件系统观察程序。本质上它是一个包装器:

ssh myhost "inotifywait -m -e close_write --format '%:e %f' /dropzone"

该命令将打印出来(到 STDOUT):

CLOSE_WRITE:CLOSE foo
CLOSE_WRITE:CLOSE bar
CLOSE_WRITE:CLOSE baz

这很简单,可以解析并转化为事件。无论如何,我的 c# 代码本质上是:

        var privateKeyFile = new PrivateKeyFile(identity);
        var client = new SshClient(hostname, username, privateKeyFile);

        SshCommand command = null;
        IAsyncResult result = null;
        try
        {
            client.Connect();
            command = client.CreateCommand("inotifywait -m -e close_write --format '%:e %f' " + dropZone);
            result = command.BeginExecute();

            Console.WriteLine("Watching for events");
            var reader = new StreamReader(command.OutputStream);
            string line = null;
            while ((line = reader.ReadLine()) != null)
            {
                Console.WriteLine(line);
            }
            Console.WriteLine("Reached end of stream");
        }
        finally
        {
            if (client != null)
            {
                Console.WriteLine("Close the connection");
                client.Dispose();
            }
        }

        Console.WriteLine("Press enter to quit");
        Console.ReadLine();

在写入单个文件后运行它会产生以下输出:

Watching for events
CLOSE_WRITE:CLOSE baz
Reached end of stream
Close the connection
Press enter to quit

Watching for events 立即出现并等待第一个文件被写入(阻塞等待,正如我对StreamReader 所期望的那样)。然而,下一个 ReadLine,而不是另一个阻塞等待,返回 null(表示流结束),即使命令仍在愉快地运行。我知道我可以这样改变循环:

            while (!result.IsCompleted)
            {
                line = reader.ReadLine();
                if (line != null)
                {
                    Console.WriteLine(line);
                }
            }

结果:

Watching for events
CLOSE_WRITE:CLOSE baz
CLOSE_WRITE:CLOSE bar
CLOSE_WRITE:CLOSE foo
...

如所愿,但它摆脱了阻塞等待新输入,这意味着循环不断旋转(显然不希望......)

你能解释一下这种行为吗?对另一种方法有什么建议吗?

---- 更新----

该库看起来正在迁移到 github 并进行更新。我已提交this issue 以尝试解决此问题。

【问题讨论】:

  • 如何摆脱阻塞等待?您使用相同的阻塞 reader.ReadLine。
  • @Evk,我不明白你在说什么......是的,我使用的是相同的reader.ReadLine,但是,据我了解StreamReader 调用ReadLine 应该阻止如果当前没有要读取的数据,并且您还没有到达流的末尾。由于此命令永远不会到达流的末尾,因此每次调用都应该阻塞,直到出现新消息。至少这是期望的行为......
  • 所以你验证了在一个 reader.ReadLine 返回 null 之后,下一个不会阻塞并立即再次返回 null?我的意思是您是否验证了“循环不断旋转”的假设?
  • @Evk,是的,我确实验证了这一点。 (事实上​​,这就是为什么我将 Console.WriteLine(line) 包装在 if 语句中以防止打印所有空值)。
  • 发生这种情况时 reader.EndOfStream 是否也返回 true?我看到你只检查 result.IsCompleted。

标签: c# streamreader ssh.net


【解决方案1】:

观察到行为的原因是PipeStream 类。它像字节队列一样工作。当您从PipeStream 读取字节时,实际上是在将它们出列,因此流长度会减少。当您读取所有字节时,流长度变为 0。这意味着在您读取第一“行”(实际上可以是多行,只是数据的第一部分)之后 - 流的长度为 0,因此有效结束。下一次读取将在没有阻塞的情况下返回,直到下一部分数据到达(如果有)。

不幸的是,这些流似乎不适用于您的情况 - 它们旨在执行命令,接收一个结果并完成。如果您想读取连续的数据流(例如您的案例或例如“tail -f”结果 - 您唯一的选择似乎是在读取之间回退到Thread.Sleep,至少在快速搜索之后我没有找到任何替代。

更新:仍然可以通过一些反思来实现您想要的结果。 Undelying 频道具有 DataReceived 事件,您可以使用该事件在新数据可用时获得通知。下面的代码应该可以解决问题(注意这是一个草图,所以要小心):

    static void Main(string[] args) {
        var privateKeyFile = new PrivateKeyFile(@"somefile");
        using (var client = new SshClient("somehost", "someuser", privateKeyFile)) {                
            client.Connect();
            var command = client.CreateCommand("tail -f /tmp/test.txt");

            var result = command.BeginExecute();
            var channelField = command.GetType().GetField("_channel", BindingFlags.Instance | BindingFlags.NonPublic);
            var channel = channelField.GetValue(command);
            var receivedEvent = channel.GetType().GetEvent("DataReceived", BindingFlags.Instance | BindingFlags.Public);
            Console.WriteLine("Watching for events");
            using (var handler = new ReceivedHandler()) {
                // add event handler here
                receivedEvent.AddEventHandler(channel, Delegate.CreateDelegate(receivedEvent.EventHandlerType, handler, handler.GetType().GetMethod("OnReceive")));
                while (true) {
                    // wait on both command completion and our custom wait handle. This is blocking call
                    WaitHandle.WaitAny(new[] {result.AsyncWaitHandle, handler.Signal});
                    // if done - break
                    if (result.IsCompleted)
                        break;
                    var line = handler.ReadLine();
                    Console.WriteLine(line);
                }
            }                                
            Console.WriteLine("Reached end of stream");                
            Console.ReadKey();
        }

    }

    public class ReceivedHandler : IDisposable {
        private readonly AutoResetEvent _signal;
        private readonly StringBuilder _buffer = new StringBuilder();
        public ReceivedHandler() {
            _signal = new AutoResetEvent(false);
        }

        public void OnReceive(object sender, EventArgs e) {
            var dataProp = e.GetType().GetProperty("Data", BindingFlags.Instance | BindingFlags.Public);
            var rawData = (byte[])dataProp.GetValue(e);
            var data = Encoding.ASCII.GetString(rawData);
            lock (_buffer) {
                // append to buffer for reader to consume
                _buffer.Append(data);
            }
            // notify reader
            Signal.Set();
        }

        public AutoResetEvent Signal => _signal;

        public string ReadLine() {
            lock (_buffer) {
                // cleanup buffer
                var result = _buffer.ToString();
                _buffer.Clear();
                return result;
            }
        }

        public void Dispose() {
            _signal.Dispose();
        }
    }

当然,最好联系这个库的开发者并解释问题,也许他们可以添加缺失的行为。

【讨论】:

  • 我只想指出,我已经对那个特定的库进行了一些尝试,我可以说它非常敏感,所以它可能不是寻找原因的最佳位置,因为有些东西只是有小故障......我并没有过多地责怪开发人员,因为整个 SSH 事情在多个层面上都有点敏感,但只是说,这可能是你找到一个可行的解决方案而忘记其他应该的解决方案的情况工作,但他们没有......我在那些没有弄清楚原因的情况下花了太多时间:)
  • @Evk,该更新看起来很有希望。无论如何,我一直在寻找一种进入频道的方法。明天我会试一试并更新这个问题......并同意让开发人员看看是最好的,但该项目的最后一个版本是在 2013 年......仍然似乎是最好的 ssh 实现C#...
【解决方案2】:

@Evk 的回答是正确的,PipeStream 是罪魁祸首。 PipeStream 的另一个问题是,如果您尝试读取的字节数超过可用字节,它将阻塞。出于性能原因,阻塞应该是PipeStream消费者 的工作。我使用 SSH.NET 执行SshCommand 并异步读取标准输出/错误。我解决这些问题的方法是写信给中介MemoryStream,然后使用StreamReader 等标准机制。这是从PipeStream 阅读的更通用的答案:

public class SshCommandStreamReader : IDisposable
{
    private readonly Stream stream;
    private readonly MemoryStream intermediateStream;
    private readonly StreamReader reader;

    public SshCommandOutputReader(Stream stream)
    {
        this.stream = stream;
        this.intermediateStream = new MemoryStream();
        this.reader = new StreamReader(intermediateStream, Encoding.UTF8);
    }

    private int FlushToIntermediateStream()
    {
        var length = stream.Length;

        if (length == 0)
        {
            return 0;
        }

        // IMPORTANT: Do NOT read with a count higher than the stream length (which is typical of reading
        // from streams). The streams for SshCommand are implemented by PipeStream (an internal class to
        // SSH.NET). Reading more than the current length causes it to *block* until data is available.
        // If the stream is flushed when reading, it does not block. It is not reliable to flush and then
        // read because there is a possible race condition where a write might occur between flushing and
        // reading (writing resets the flag that it was flushed). The only reliable solution to prevent
        // blocking when reading is to always read the current length rather than an arbitrary buffer size.
        var intermediateOutputBuffer = new byte[length];
        var bytesRead = stream.Read(intermediateOutputBuffer, 0, intermediateOutputBuffer.Length);
        intermediateStream.Write(intermediateOutputBuffer, 0, bytesRead);
        return bytesRead;
    }

    public string Read()
    {
        var bytesFlushed = FlushToIntermediateStream();

        // Allow reading the newly flushed bytes.
        intermediateStream.Position -= bytesFlushed;

        // Minor optimization since this may be called in a tight loop.
        if (intermediateStream.Position == intermediateStream.Length)
        {
            return null;
        }
        else
        {
            var result = reader.ReadToEnd();
            return result;
        }
    }

    public void Dispose()
    {
        reader.Dispose();
        intermediateStream.Dispose();
    }
}

然后使用它:

using (var command = client.CreateCommand("your command text"))
{
    var cmdAsyncResult = command.BeginExecute();

    using (var standardOutputReader = new SshCommandStreamReader(command.OutputStream))
    {
        while (!cmdAsyncResult.IsCompleted)
        {
            var result = standardOutputReader.Read();
            if (!String.IsNullOrEmpty(result))
            {
                Console.Write(result);
            }

            // Or what ever mechanism you'd like to use to prevent CPU thrashing.
            Thread.Sleep(1);
        }

        // This must be done *after* the loop and *before* EndExecute() so that any extra output
        // is captured (e.g. the loop never ran because the command was so fast).
        var resultFinal = standardOutputReader.Read();
        if (!String.IsNullOrEmpty(resultFinal))
        {
            Console.Write(resultFinal);
        }
    }

    command.EndExecute(cmdAsyncResult);
}

您应该能够修改此示例以从标准错误中读取(通过ExtendedOutputStream),并将其更改为逐行读取特定于您的应用程序。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-10-25
    • 1970-01-01
    • 2011-01-11
    • 1970-01-01
    • 1970-01-01
    • 2015-04-15
    • 1970-01-01
    • 2012-05-28
    相关资源
    最近更新 更多