【问题标题】:Camel MDC Logback Stale Info Under Volume卷下的骆驼 MDC Logback 陈旧信息
【发布时间】:2016-01-18 02:25:29
【问题描述】:

我们有一个使用 logback/MDC 来记录信息的高负载 Apache Camel 应用程序。我们发现一些 MDC 信息在线程上已经过时,正如 logback 文档中预先警告的那样。我发现这个 SO question 解决了这个问题:

How to use MDC with thread pools?

我们应该如何将它应用到我们的骆驼应用程序以避免陈旧信息?是否有一个简单的全局更改默认 ThreadPoolExecutor 为链接问题中建议的自定义变体?我看到你可以为池本身做这件事,但没有看到任何执行者的例子。请记住,我们的应用程序非常庞大,每天处理大量订单 - 我希望对现有应用程序的影响尽可能小。

【问题讨论】:

    标签: apache-camel logback mdc


    【解决方案1】:

    我想通了,想发布我所做的,以防它使其他人受益。请注意我使用的是 JDK 6/camel2.13.2

    • Camel 有一个DefaultExecutorServiceManager,它使用DefaultThreadPoolFactory。我将默认工厂扩展为MdcThreadPoolFactory

    • DefaultThreadPoolFactory 具有生成RejectableThreadPoolExecutors 和RejectableScheduledThreadPoolExecutors 的方法。我将这两个扩展为 Mdc* 版本,覆盖 execute() 方法以包装 Runnable 并在线程之间传递 MDC 信息(如我原始问题中的链接所指定)。

    • 我在我的应用程序配置中创建了一个 MdcThreadPoolFactory 的 bean 实例,它被 Camel 自动拾取并在 ExecutorServiceManager 中使用

    MdcThreadPoolExecutor:

    package com.mypackage.concurrent
    
    import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor
    import org.slf4j.MDC;
    
    import java.util.Map;
    import java.util.concurrent.*;
    
    /**
     * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
     * <p/>
     * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
     * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
     * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
     * <p/>
     * Created by broda20.
     * Date: 10/29/15
     */
    public class MdcThreadPoolExecutor extends RejectableThreadPoolExecutor {
    
        @SuppressWarnings("unchecked")
        private Map<String, Object> getContextForTask() {
            return MDC.getCopyOfContextMap();
        }
    
        public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                            BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
    
        public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        }
    
        public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        }
    
        public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
    
        /**
         * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
         * all delegate to this.
         */
        @Override
        public void execute(Runnable command) {
            super.execute(wrap(command, getContextForTask()));
        }
    
        public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
            return new Runnable() {
                @Override
                public void run() {
                    Map previous = MDC.getCopyOfContextMap();
                    if (context == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(context);
                    }
                    try {
                        runnable.run();
                    } finally {
                        if (previous == null) {
                            MDC.clear();
                        } else {
                            MDC.setContextMap(previous);
                        }
                    }
                }
            };
        }
    }
    

    MdcScheduledThreadPoolExecutor:

    package com.mypackage.concurrent
    
    import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor
    import org.slf4j.MDC;
    
    import java.util.Map;
    import java.util.concurrent.*;
    
    /**
     * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
     * <p/>
     * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
     * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
     * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
     * <p/>
     * Created by broda20.
     * Date: 10/29/15
     */
    public class MdcScheduledThreadPoolExecutor extends RejectableScheduledThreadPoolExecutor {
    
        @SuppressWarnings("unchecked")
        private Map<String, Object> getContextForTask() {
            return MDC.getCopyOfContextMap();
        }
    
        public MdcScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize);
        }
    
        public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
            super(corePoolSize, threadFactory);
        }
    
        public MdcScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
            super(corePoolSize, handler);
        }
    
        public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize, threadFactory, handler);
        }
    
        /**
         * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
         * all delegate to this.
         */
        @Override
        public void execute(Runnable command) {
            super.execute(wrap(command, getContextForTask()));
        }
    
        public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
            return new Runnable() {
                @Override
                public void run() {
                    Map previous = MDC.getCopyOfContextMap();
                    if (context == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(context);
                    }
                    try {
                        runnable.run();
                    } finally {
                        if (previous == null) {
                            MDC.clear();
                        } else {
                            MDC.setContextMap(previous);
                        }
                    }
                }
            };
        }
    }
    

    MdcThreadPoolFactory:

    package com.mypackage.concurrent
    
    import org.apache.camel.impl.DefaultThreadPoolFactory
    import org.apache.camel.spi.ThreadPoolProfile
    import org.apache.camel.util.concurrent.SizedScheduledExecutorService
    import org.slf4j.MDC;
    
    import java.util.Map;
    import java.util.concurrent.*;
    
    public class MdcThreadPoolFactory extends DefaultThreadPoolFactory {
    
        @SuppressWarnings("unchecked")
        private Map<String, Object> getContextForTask() {
            return MDC.getCopyOfContextMap();
        }
    
    
        public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, boolean allowCoreThreadTimeOut,
                                                 RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) throws IllegalArgumentException {
    
                // the core pool size must be 0 or higher
                if (corePoolSize < 0) {
                   throw new IllegalArgumentException("CorePoolSize must be >= 0, was " + corePoolSize);
                }
    
                // validate max >= core
                if (maxPoolSize < corePoolSize) {
                    throw new IllegalArgumentException("MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize);
                }
    
                BlockingQueue<Runnable> workQueue;
                if (corePoolSize == 0 && maxQueueSize <= 0) {
                    // use a synchronous queue for direct-handover (no tasks stored on the queue)
                    workQueue = new SynchronousQueue<Runnable>();
                    // and force 1 as pool size to be able to create the thread pool by the JDK
                    corePoolSize = 1;
                    maxPoolSize = 1;
                } else if (maxQueueSize <= 0) {
                    // use a synchronous queue for direct-handover (no tasks stored on the queue)
                    workQueue = new SynchronousQueue<Runnable>();
                } else {
                    // bounded task queue to store tasks on the queue
                    workQueue = new LinkedBlockingQueue<Runnable>(maxQueueSize);
                }
    
                ThreadPoolExecutor answer = new MdcThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
                answer.setThreadFactory(threadFactory);
                answer.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
                if (rejectedExecutionHandler == null) {
                    rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
                }
                answer.setRejectedExecutionHandler(rejectedExecutionHandler);
                return answer;
            }
    
            @Override
            public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
                RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler();
                if (rejectedExecutionHandler == null) {
                    rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
                }
    
                ScheduledThreadPoolExecutor answer = new MdcScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
                //JDK7: answer.setRemoveOnCancelPolicy(true);
    
                // need to wrap the thread pool in a sized to guard against the problem that the
                // JDK created thread pool has an unbounded queue (see class javadoc), which mean
                // we could potentially keep adding tasks, and run out of memory.
                if (profile.getMaxPoolSize() > 0) {
                    return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize());
                } else {
                    return answer;
                }
            }
    }
    

    最后是 bean 实例:

    <bean id="mdcThreadPoolFactory" class="com.mypackage.concurrent.MdcThreadPoolFactory"/>
    

    【讨论】:

    • 为了让它在 Camel 2.16.3 中为 org.apache.camel.util.component.AbstractApiProducer.process(Exchange, AsyncCallback) 请求的新线程工作,我还必须重写 java.util。 concurrent.ScheduledThreadPoolExecutor.submit(Runnable)
    • 我随后将其更改为覆盖 public Sc​​heduledFuture> schedule(Runnable command, long delay, TimeUnit unit) 这是 submit() & execute() bith 的委托(至少在 JDK8 中)。我认为这将很好地提交给核心骆驼。如果我有时间解决这个问题,您可以将版权转让给 Apache(或任何需要许可的工作)吗?
    • 我完全没问题。从长远来看,任何能让 Camel 变得更好的事情:)
    • @PaulM - 这是否使它成为骆驼核心?我在异步模式下遇到了 CXF 组件的类似问题,导致在引发故障后记录不同的 MDC 数据,我想知道这个解决方案是否有帮助。谢谢
    猜你喜欢
    • 2013-01-24
    • 2014-06-29
    • 1970-01-01
    • 1970-01-01
    • 2019-06-29
    • 2022-11-11
    • 2014-10-27
    • 1970-01-01
    相关资源
    最近更新 更多