【问题标题】:Prevent multiple threads from altering the same object防止多个线程改变同一个对象
【发布时间】:2020-11-02 22:20:28
【问题描述】:

我目前正在为我的应用程序实施审计功能。我开发了一个 Audit Util(无法附加代码)。这就是它的作用-

我的应用程序监听事件。当事件发布到指定的 Topic 时,我的应用程序会监听它并执行 Listener 的 receiveAndDelegate() 方法。监听器然后通过调用适当的 Handler 类的 handleMessage() 方法,将控制权委托给基于事件 ID 的各种 Handler 之一。

我在这里所做的是,一旦应用程序侦听事件并调用侦听器的receiveAndDelegate() 方法,我就会初始化 Audit 对象并在控制流经侦听器类时捕获各种消息/日志。每当 Listener 类将控制权委托给适当的 Handler 类(通过调用适当的处理程序类的 handleMessage() 方法)时,我在调用 Handler 类的 handleMessage()(称为 extractAndTransform())之前调用不同的 Handler 类的方法并传递Listener 类的审计对象到这个方法。在每个处理程序类的 extractAndTransform() 方法中,我正在创建一个新的审计对象,然后使用 Listener 类的审计对象对其进行初始化。

但这就是问题所在。各种线程同时调用侦听器,并且在从侦听器类到处理程序类的不同调用之间传递的审计对象将被后续调用覆盖。假设接收到请求 REQ1 并调用 Listener 类,然后针对该事件初始化审计。所以在整个过程中,这个审计对象应该有REQ1对应的日志,但事实并非如此。它也在捕获其他请求的日志,一切都变得大杂烩。

简要的类结构

@Component
public class MessageListener {
    @Autowired
    private AuditUtil audit;

    public receiverAndDelegate(Object message) {
        .
        .
        .
        .
        .
        IHandler handler = <logic to get the handler for the event>

        handler.extractAndTransform(audit, (FBBEvent) message);

        handler.handleMessage((FBBEvent) message);
    }
}


public interface IHandler {
    public extractAndTransform(AuditUtil audit, FBBEvent fbbEvent);
    public handlerMessage(FBBEvent fbbEvent);
}

@Component 
public class SomeHandler implements IHandler {
    @Autowired
    private AuditUtil audit;

    public extractAndTransform(AuditUtil audit, FBBEvent fbbEvent) {
        this.audit = someMethodToExtactValuesAndCreateANewAuditObject(audit, fbbEvent);
    }

    public handleMessage(FBBEvent fbbEvent) {
        audit.logEvent("Enetered the method handleMessage");

        try {
            Connection con = <getting a new connection>();
            
            audit.logEvent("Created connection successfully");
        } catch (Exception e) {
            audit.logEvent("Exception while creating connection" + e.toString());
        }
    }
}

【问题讨论】:

    标签: java concurrency


    【解决方案1】:

    解决此问题的两种方法是:

    • 使用互斥锁同步处理程序,以确保一次只能执行一个处理程序实例。
    • 使用队列将 MessageListener 与处理程序分离。

    第一种方法是一种以牺牲吞吐量为代价将正确性引入应用程序的简单方法,这会导致延迟增加:

    @Component
    public class MessageListener {
        @Autowired
        private AuditUtil audit;
    
        public receiverAndDelegate(Object message) {
            .
            .
            .
            .
            .
            IHandler handler = <logic to get the handler for the event>
    
           
            mutex.lock()
            try {
              handler.extractAndTransform(audit, (FBBEvent) message);
    
              handler.handleMessage((FBBEvent) message);
            } finally {
              mutex.unlock()
            }
        }
    }
    

    这确保没有重叠的消息处理程序。现在假设重叠的消息处理程序对您的特定程序无效,您的程序是“正确的”。

    这里的问题是receiverAndDelegate是串行执行的,一次只能处理一条消息!!!

    对此的一种解决方案是使用actor model 并为每个处理程序类型设置一个消息主题/队列。每个消息主题或队列都可以有一个处理程序使用者,它:

    • 拉消息
    • 致电extractAndTransform
    • 致电handleMessage

    因为您只有一个处理程序,所以它确保永远不会有 2 个相同类型的并发处理程序在执行。它还最大化了receiverAndDelegate 的吞吐量。 MessageListener 只需检查处理程序类型并将消息发布到该特定处理程序输入队列。

    【讨论】:

    • 以上两种方案都不行。这是一个非常关键的应用程序,不能容忍延迟。此外,队列/主题已经实现,也无法更改。拥有一个队列和主题(FBB Queue/FBB 主题),并且发布到该主题的每条消息都调用相同的消息监听器(FBBPortalMessageListener)。还有其他解决方案吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-11-03
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多