【问题标题】:How to process an asynchronous queue from within the main UI thread?如何从主 UI 线程中处理异步队列?
【发布时间】:2016-07-19 10:48:08
【问题描述】:

我正在设计两个组件,它们异步接收自定义类 (TMELogMessage) 的对象并将它们存储在线程安全的内部容器中。第一个组件是非可视的(TMEFileLogger),应该将这些对象的一些信息写入日志文件(不足为奇)。第二个组件 (TMELogGrid) 是一个可视化的 FMX.Grid 后代,它应该在 UI 中可视化来自这些对象的一些信息。但我认为他们对这些对象所做的事情无关紧要。

我面临的问题是这些组件实际上并不知道这些对象何时会在其内部容器中排队,因此它们必须自己检查容器以查看是否有任何新对象需要处理,处理它们并从队列中删除它们。理想情况下,我希望在应用程序不太忙时以类似于操作更新的方式完成此操作,以免使 UI 陷入困境。

一个组件挂钩像 Application.OnIdle 这样的事件处理程序显然是错误的......我也许可以订阅 TIdleMessage,但我不确定这是一个好主意,因为我已经读过一些应用程序永远不会闲置。使用内部计时器似乎有点过时了。我也许可以使用低优先级线程来轮询队列,然后仅当我找到要处理的对象时才与 UI 同步。不过我没有其他想法。

在 Delphi + 多平台 FireMonkey 中执行此操作的正确方法是什么?

【问题讨论】:

  • 我倾向于为此使用某种惰性更新模式。无论收到多少项目,仅每 n 毫秒触发一次更新事件。是的,您需要一个计时器,它会在延迟后触发事件
  • 某些应用程序永远不会空闲 - 如果发生这种情况,则 UI 线程被阻塞,用户无法再与应用程序通信,这是应用程序的设计失败。
  • "一个组件挂钩一个像 Application.OnIdle 这样的事件处理程序显然是错误的..." - 你可以使用 TApplicationEvents 组件。它允许为相同的TApplication 事件分配多个处理程序。 “我也许可以订阅 TIdleMessage”——这当然是一种选择。或者你可以让你的容器使用TThread.Queue()在新对象被推入容器时在主线程上安排任务。

标签: delphi firemonkey


【解决方案1】:

我不喜欢回答自己的问题,但我希望能回答这个问题,因为它可能对其他人有所帮助。虽然 Deltics 的回答很有用,但这不是我决定采用的方式。我遵循了 Remy 评论中的建议,并将所有内容封装在组件和表单可以使用的消息处理类中。所以 TMEFileLogger 和 TMELogGrid 现在都使用这个新的 TMEMessageHandler 类的实例。

这里有一些接口代码来解释我做了什么。请记住,这是对 rtl System.Messaging 单元的替代和增强。 rtl 消息传递系统的问题在于它只提供发送同步消息。我想要一个更丰富的界面。这是我的消息管理器的样子:

  TMEMessageManager = Class
    ...
  Public
    ...
    Procedure PostDelayedEnvelope(Const Envelope: TMEMessageEnvelope; Const DelayMSec: Cardinal; Const ADispose: Boolean = True); Inline;
    Procedure PostDelayedMessage(Const Sender: TObject; AMessage: TMessage; Const DelayMSec: Cardinal; Const ADispose: Boolean = True); Inline;
    Procedure PostEnvelope(Const Envelope: TMEMessageEnvelope; Const ADispose: Boolean = True); Inline;
    Procedure PostMessage(Const Sender: TObject; AMessage: TMessage; Const ADispose: Boolean = True); Inline;
    Procedure SendEnvelope(Const Envelope: TMEMessageEnvelope; Const ADispose: Boolean = True); Inline;
    Procedure SendMessage(Const Sender: TObject; AMessage: TMessage; Const ADispose: Boolean = True); Inline;

    Function Subscribe(Const AMessageClass: TClass; Const AReceiver: IMEEnvelopeReceiver): Integer; Overload;
    Function Subscribe(Const AMessageClass: TClass; Const AMethod: TMessageMethod): Integer; Overload; Deprecated 'Use TMEMessageManager.Subscribe(AMessageClass, AReceiver)';
    Function Subscribe(Const AMessageClass: TClass; Const AProcedure: TMessageProcedure): Integer; Overload; Deprecated 'Use TMEMessageManager.Subscribe(AMessageClass, AReceiver)';

    Procedure Unsubscribe(Const AMessageClass: TClass; ID: Integer; Const Immediate: Boolean = False); Overload;
    Procedure Unsubscribe(Const AMessageClass: TClass; Const AReceiver: IMEEnvelopeReceiver; Const Immediate: Boolean = False); Overload;
    Procedure Unsubscribe(Const AMessageClass: TClass; Const AMethod: TMessageMethod; Const Immediate: Boolean = False); Overload; Deprecated;
    Procedure Unsubscribe(Const AMessageClass: TClass; Const AProcedure: TMessageProcedure; Const Immediate: Boolean = False); Overload; Deprecated;
    ...
  End;

TMEMessageEnvelope 是消息的包装器,定义为:

Type
  TMEMessageEnvelope = Class(TMEPersistent)
  Public
    ...
    Property Infos: TMEMessageInfos Read FInfos;
    Property Sender: TObject Read FSender;
    Property Msg: TMessage Read FMsg;
  End;

通过信封接收器订阅的接收器将同时接收同步和异步消息。这是首选的订阅方法。通过对象方法或过程订阅的接收者将只接收同步消息。这是为了与 RTL 消息传递系统兼容而维护的,但已弃用。

问题在于 RTL 消息无法按原样发布。订阅的消费者只需提供一个过程或对象方法来立即消费消息。要发布消息以便稍后可以异步使用它,它需要被包装和排队。这样发送者与接收者是隔离的。所以实际上...通过首先将它们包裹在信封中来发布(立即或延迟)消息。

以下是此消息传递系统中包含的基本接口:

Type

  IMEClonableMessage = Interface(IInterface)
    ['{45B223E2-DCA8-4E42-9847-6A3FCC910891}']
    Function Clone: TMessage;
  End;

  IMEMessageSender = Interface(IInterface)
    ['{99AFDC4A-CE30-41A3-9AA5-D49F2F1106BD}']
    Procedure PostDelayedMessage(const M: TMessage; Const DelayMSec: Cardinal);
    Procedure PostMessage(Const M: TMessage);
    Procedure SendMessage(Const M: TMessage);
  End;

  IMEEnvelopeSender = Interface(IInterface)
    ['{C3AEC52C-A558-40AB-B07B-3000ECDB9C0C}']
    Procedure PostDelayedEnvelope(Const Envelope: TMEMessageEnvelope; Const DelayMSec: Cardinal);
    Procedure PostEnvelope(Const Envelope: TMEMessageEnvelope);
    Procedure SendEnvelope(Const Envelope: TMEMessageEnvelope);
  End;

  IMEEnvelopeReceiver = Interface(IInterface)
    ['{7D464713-2F25-4666-AAF8-757AF07688C3}']
    Procedure ClearEnvelopes;
    Procedure ProcessEnvelope;
    Procedure ProcessEnvelopes;
    Function QueueEnvelope(Const Envelope: TMEMessageEnvelope): Integer;
    Procedure ReceiveEnvelope(Const Envelope: TMEMessageEnvelope);
    Procedure Subscribe(Const AMessageClass: TClass);
    Procedure Unsubscribe(Const AMessageClass: TClass);
  End;

IMEClonableMessage接口用于克隆消息...异步消息必须克隆...因为如果同一条消息有很多订阅者,每个订阅者会在不同的时间接收和消费该消息,所以最好每个有自己的消息副本。

我认为其他界面是不言自明的。

最后是 TMEMessageHandler 类:

  TMEMessageHandler = Class(TMEPersistent, IMEMessageSender, IMEEnvelopeSender, IMEEnvelopeReceiver)
    /// <summary>Basic thread-safe class that can send and receive synchronous and asynchronous messages and envelopes.</summary>
  Private
    FLock:                 TObject;
    FMessageManager:       TMEMessageManager;
    FSubscriptions:        TDictionary<TClass, Integer>;
    FEnvelopes:            TObjectList<TMEMessageEnvelope>;
    FOnReceiveEnvelope:    TReceiveEnvelopeEvent;
    FAutoProcessEnvelopes: Boolean;
    Procedure _Lock;
    Procedure _Unlock;
    Procedure ClearSubscriptions;
    Function GetMessageManager: TMEMessageManager;
    Procedure SetAutoProcessEnvelopes(Const Value: Boolean);
    Procedure SetMessageManager(Const Value: TMEMessageManager);
  Protected
    Function QueryInterface(Const IID: TGuid; Out Obj): HResult; Stdcall;
    Function _AddRef: Integer; Stdcall;
    Function _Release: Integer; Stdcall;
    Procedure DoReceiveEnvelope(Const Envelope: TMEMessageEnvelope);
    Procedure PostDelayedEnvelope(Const Envelope: TMEMessageEnvelope; Const DelayMSec: Cardinal);
    Procedure PostDelayedMessage(Const M: TMessage; Const DelayMSec: Cardinal);
    Procedure PostEnvelope(Const Envelope: TMEMessageEnvelope);
    Procedure PostMessage(Const M: TMessage);
    Procedure SendEnvelope(Const Envelope: TMEMessageEnvelope);
    Procedure SendMessage(Const M: TMessage);
    Function QueueEnvelope(Const Envelope: TMEMessageEnvelope): Integer;
    Procedure ReceiveEnvelope(Const Envelope: TMEMessageEnvelope);
  Public
    Constructor Create; Override;
    Destructor Destroy; Override;
    Procedure ClearEnvelopes;
    Procedure ProcessEnvelope;
    Procedure ProcessEnvelopes;
    Procedure Subscribe(Const AMessageClass: TClass);
    Procedure Unsubscribe(Const AMessageClass: TClass);
    Property AutoProcessEnvelopes: Boolean Read FAutoProcessEnvelopes Write SetAutoProcessEnvelopes Default True;
    Property MessageManager: TMEMessageManager Read GetMessageManager Write SetMessageManager;
    Property OnReceiveEnvelope: TReceiveEnvelopeEvent Read FOnReceiveEnvelope Write FOnReceiveEnvelope;
  End;

这一切是如何运作的

TMEMessageHandler 立即将任何订阅和取消订阅调用委托给 MessageManager。它将始终以 IMEEnvelopeReceiver 的形式提供自己的订阅。它会在其内部字典中跟踪订阅,以便在取消订阅时更加高效。

它还立即将任何调用委托给 Send、Post 和 PostDelayed 方法。 TMEMessageManager:

  • 向订阅的过程发送消息(作为 RTL)
  • 向订阅的对象方法发送消息(作为 RTL)
  • 通过调用订阅的接收者向订阅的接收者发送信封 ReceiveEnvelope 方法
  • 将信封(和信封包装的消息)发布到订阅者 接收者通过使用克隆副本调用他们的 QeueEnvelope 方法 信封
  • 将延迟的信封(和信封包装的消息)发布到订阅者 接收者首先在内部轻量级线程中排队 (TMEDelayedEnvelopeDeliverer) 本身具有消息管理器 在延迟过去后交付它们

作为接收者,TMEMessageHandler 通过简单地委托给 OnReceiveEnvelope 事件处理程序来实现 ReceiveEnvelope。

发送的信封由 QueueEnvelope 方法接收,该方法将信封添加到其线程安全队列中,然后,但仅当 AutoProcessEnvelopes 为 True 时,才使用主线程的 Queue 调用其自己的 ProcessEnvelope 方法(如 Remy 的建议):

Function TMEMessageHandler.QueueEnvelope(Const Envelope: TMEMessageEnvelope): Integer;
Begin
  _Lock;
  Try
    FEnvelopes.Add(Envelope);
    Result := FEnvelopes.Count;
  Finally
    _Unlock;
  End;
  If AutoProcessEnvelopes Then
    TThread.Queue(Nil,
      Procedure
      Begin
        ProcessEnvelope;
      End);
End;

ProcessEnvelope 方法从线程安全队列中提取信封,调用 ReceiveEnvelope 方法(与同步消息的消息管理器调用的方法相同),然后释放信封(请记住,这是仅为此接收者复制的副本) :

Procedure TMEMessageHandler.ProcessEnvelope;
Var
  E: TMEMessageEnvelope;
Begin
  If FEnvelopes.Count > 0 Then Begin
    _Lock;
    Try
      E := FEnvelopes.Extract(FEnvelopes[0]);
    Finally
      _Unlock;
    End;
    E.UpdateInfo(mieReceived);
    ReceiveEnvelope(E);
    E.Free;
  End;
End;

ProcessEnvelopes 方法只是根据需要多次调用前者来清空异步消息队列:

Procedure TMEMessageHandler.ProcessEnvelopes;
Begin
  While FEnvelopes.Count > 0 Do
    ProcessEnvelope;
End;

TMEMessageHandler是如何使用的

将 TMELogMessage 定义为 IMEClonableMessage 来处理要记录的信息,TMEFileLogger 和其他组件的最小实现如下所示:

Type
  TMEFileLogger = Class(TMEComponent)
  Private
    ...
    FMessageHandler:    TMEMessagehandler;
  Protected
    ...
    Procedure ReceiveLogEnvelope(Const Envelope: TMEMessageEnvelope);
    Property MessageHandler: TMEMessagehandler Read FMessageHandler;
  Public
    Constructor Create(AOwner: TComponent); Override;
    Destructor Destroy; Override;
    ...
  End;

Constructor TMEFileLogger.Create(AOwner: TComponent);
Begin
  Inherited;
  ...
  FMessageHandler                  := TMEMessagehandler.Create;
  MessageHandler.OnReceiveEnvelope := ReceiveLogEnvelope;
  MessageHandler.Subscribe(TMELogMessage);
End;

Destructor TMEFileLogger.Destroy;
Begin
  MessageHandler.Unsubscribe(TMELogMessage);
  MessageHandler.ProcessEnvelopes;
  FreeAndNil(FMessageHandler);
  ...
  Inherited;
End;

Procedure TMEFileLogger.ReceiveLogEnvelope(Const Envelope: TMEMessageEnvelope);
Begin
  If Envelope.Msg Is TMELogMessage Then
    With Envelope.Msg As TMELogMessage Do
      ... something useful ...
End;

抱歉,这篇文章很长,但我希望这对其他人有用。

【讨论】:

    【解决方案2】:

    队列实现通常实现应用程序代码可以等待的事件(OS synchronization object,而不是 VCL“事件”)。该事件被设置/触发/触发/但是您想在将项目添加到空队列时考虑它(或者,如果在“批量”中添加多个项目,则在它们全部添加之后。精确的模式可能各不相同)。如果您的情况下的队列是您自己的实现,那么我会考虑在您的实现中添加这样的机制。

    为避免阻塞 UI,应用程序代码创建了一个轻量级线程,其唯一目的是等待该队列事件,将队列中的项目出列到 UI 线程安全容器中,然后通知 UI 线程有项目待处理。监控线程然后继续等待事件表明队列中还有 更多 个项目。

    在 VCL 应用程序中,监控线程通知 UI 的机制可能是一个幼稚的 Synchronized 过程,或者(我建议)发布到负责 UI 处理项目的某种形式的基于消息的通知。

    注意:队列监控线程通常还负责处理应用程序/UI 不再关心处理项目(例如正在关闭)并监听“取消”的情况或“终止”事件,它向线程发出信号以使项目出队但丢弃它们(或以此时适合应用程序需要的任何方式处理它们)然后终止(即,监视线程退出)。

    【讨论】:

      猜你喜欢
      • 2013-09-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2010-10-23
      • 2021-02-26
      • 1970-01-01
      • 2017-10-24
      • 1970-01-01
      相关资源
      最近更新 更多