【问题标题】:F#, System.IO.IOException: All pipe instances are busyF#,System.IO.IOException:所有管道实例都忙
【发布时间】:2010-09-08 15:59:38
【问题描述】:

我有一个通过命名管道与 java 应用程序通信的 F# 应用程序。其中 F# 充当服务器,java 充当客户端。除了 F# 偶尔出现“System.IO.IOException: All pipe instances are busy”错误外,该应用程序大部分时间都可以工作。下面是 F# 和 Java 的异常和代码片段的完整堆栈跟踪。感谢您在解决此问题时提供任何帮助

谢谢, 苏达利

完整的堆栈跟踪:

Unhandled Exception: System.IO.IOException: All pipe instances are busy.
   at Microsoft.FSharp.Control.CancellationTokenOps.Start@1143-1.Invoke(Exception e)
   at <StartupCode$FSharp-Core>.$Control.loop@413-38(Trampoline this, FSharpFunc`2 action)
   at Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
   at Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
   at <StartupCode$FSharp-Core>.$Control.-ctor@473-1.Invoke(Object state)
   at System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
   at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()

F# 代码:

[<DataContract>] 
type Quote = { 
    [<field: DataMember(Name="securityIdentifier") >] 
    RicCode:string
    [<field: DataMember(Name="madeOn") >] 
    MadeOn:DateTime
    [<field: DataMember(Name="closePrice") >] 
    Price:int 
    }

let globalPriceCache = new Dictionary<string, Quote>()

let ParseQuoteString (quoteString:string) = 
    let data = Encoding.Unicode.GetBytes(quoteString)
    let stream = new MemoryStream() 
    stream.Write(data, 0, data.Length); 
    stream.Position <- 0L 
    let ser = Json.DataContractJsonSerializer(typeof<Quote array>) 
    let results:Quote array = ser.ReadObject(stream) :?> Quote array
    results

let RefreshCache quoteList =
    globalPriceCache.Clear() 
    quoteList 
    |> Array.iter(fun result->globalPriceCache.Add(result.RicCode, result)) 

let EstablishConnection() =
    let pipeServer = new NamedPipeServerStream("testpipe", PipeDirection.InOut, 4)
    pipeServer.WaitForConnection()
    try
        Some(new StreamReader(pipeServer))
    with e -> 
        None

let rec MarketPriceCache() =
    match EstablishConnection() with
    |Some(sr) ->
        // Read request from the stream.
        let m_cache = 
            sr.ReadLine()  
            |>  ParseQuoteString  
            |>  RefreshCache

        MarketPriceCache()
    | _ -> () 


[<EntryPoint>]
let main args=
    try
        async { 
            MarketPriceCache() 
        } |> Async.Start

        while true do
            if globalPriceCache.Count > 0 then
    //Business logic
                System.Threading.Thread.Sleep(1000 * 50)
            else

                ignore(logInfo(sprintf "%s" "Price Cache is empty"))
                System.Threading.Thread.Sleep(1000 * 65)

    with e ->
        ignore(logError e.Message)
        ignore(logError e.StackTrace)    
    0

Java 代码:

public void WatchForPrice()
 {
  while (true)
  {
   try 
   {
    Map<String, SecurityQuoteCacheEntry> priceMap = getPriceCache().getCacheMap();
    List<LocalSecurityQuote> localSecurityQuotes = new ArrayList<LocalSecurityQuote>();
    if(priceMap != null)
    {

     Set<String> keySet = priceMap.keySet();
     System.out.println("Key Size: " + keySet.size());
     for(String key : keySet)
     {
      SecurityQuote quote =  priceMap.get(key).getQuote();
      if(quote != null)
      {
       LocalSecurityQuote localSecurityQuote = new LocalSecurityQuote();
       localSecurityQuote.setClosePrice(quote.getClosePrice());
       localSecurityQuote.setMadeOn(quote.getMadeOn());     
       localSecurityQuote.setSecurityIdentifier(key);
       localSecurityQuotes.add(localSecurityQuote);
      }

     }

     JSONSerializer serializer = new JSONSerializer();
     String jsonString = serializer.serialize(localSecurityQuotes);

     // Connect to the pipe
     RandomAccessFile pipe = new RandomAccessFile("\\\\.\\pipe\\testpipe", "rw");
     if (pipe != null )
     {
      // write to pipe
      pipe.write(jsonString.getBytes());
      pipe.close();

     }
     else
      System.out.println("Pipe not found");
     doPeriodicWait();
    }
    else 
     System.out.println("No Price data found");
   }
   catch (Exception e) 
   {
    e.printStackTrace();
    System.out.println(e.getMessage());
    doPeriodicWait();
   }
  }
 }

【问题讨论】:

    标签: java f# io


    【解决方案1】:

    这是一种预感,但问题可能是您没有关闭管道流阅读器?

    let rec MarketPriceCache() =
    match EstablishConnection() with
    |Some(sr) ->
        // Read request from the stream.        
        try        
            sr.ReadLine()  
            |>  ParseQuoteString  
            |>  RefreshCache   
        finally
            sr.Close()
        MarketPriceCache()
    | _ -> () 
    

    (不需要 m_cache 变量 - 你没有在任何地方使用它)

    【讨论】:

    • 谢谢Mitya,我会试试你的解决方案,让你知道结果
    【解决方案2】:

    您必须在每次创建 NamedPipeServerStream 时处理掉它。在代码中执行此操作的最简单方法是通过在 MarketPriceCache 周围放置 use 语句来处理 StreamReader

    let rec MarketPriceCache() =
        match EstablishConnection() with
        | Some(sr) ->
            // Read request from the stream.
            use reader = sr in
            (
                let m_cache = 
                    reader.ReadLine()  
                    |>  ParseQuoteString  
                    |>  RefreshCache
            )
            MarketPriceCache()
        | _ -> ()
    

    using ... in 的语法是为了防止读者的范围在递归调用 MarketPriceCache 后结束。

    【讨论】:

    • 谢谢罗纳德,我会试试你的解决方案,让你知道结果
    • 实际上,我的解决方案和 Mitya 的解决方案都是正确的。 use 绑定由编译器转换为 try/finally 语句,该语句在 finally 分支中调用 Disposeuse 绑定被认为是更高级的构造。
    猜你喜欢
    • 1970-01-01
    • 2022-07-02
    • 1970-01-01
    • 1970-01-01
    • 2017-09-23
    • 1970-01-01
    • 1970-01-01
    • 2017-09-16
    • 2014-03-25
    相关资源
    最近更新 更多