【问题标题】:How to use MDC with parallelStream in Java and logback如何在 Java 和 logback 中将 MDC 与 parallelStream 一起使用
【发布时间】:2019-05-21 08:39:08
【问题描述】:

我需要记录请求的一些属性,例如请求 id 和语言环境,但是在使用 parallelStream 时,MDC 的 ThreadLocal 似乎丢失了信息。

我分析了创建parallelStream时在线程之间传递MDC上下文的解决方案,但它看起来很脏,我也有很多parallelStream的用法。

还有其他方法吗?

谢谢

【问题讨论】:

    标签: java multithreading logging logback mdc


    【解决方案1】:

    我找到的唯一解决方案是将上下文复制到流外部的最终变量中,并将其应用于每次迭代:

    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    Stream.iterate(0, i -> i + 1).parallel()
        .peek(i -> MDC.setContextMap(contextMap))
        // ...logic...
        // in case you're using a filter, you need to use a predicate and combine it with a clear step:
        filter(yourPredicate.or(i -> {
                    MDC.clear();
                    return false;
                }))
        // clear right before terminal operation
        .peek(i -> MDC.clear())
        .findFirst();
    
    // since the initial thread is also used within the stream and the context is cleared there, 
    // we need to set it again to its initial state
    MDC.setContextMap(contextMap);    
    

    该解决方案的成本是 1) 每 100 次迭代需要几微秒和 2) 可读性较差,但两者都是可以接受的:

    1. 这是比较 IntStream.range(0, 100).parallel().sum() (=baseline) 与使用该 MDC 复制逻辑的相同流的基准:
    Benchmark               Mode  Cnt   Score   Error   Units
    MDC_CopyTest.baseline  thrpt    5   0,038 ± 0,005  ops/us
    MDC_CopyTest.withMdc   thrpt    5   0,024 ± 0,001  ops/us
    MDC_CopyTest.baseline   avgt    5  28,239 ± 1,308   us/op
    MDC_CopyTest.withMdc    avgt    5  40,178 ± 0,761   us/op
    
    1. 为了提高可读性,可以将它包装到一个小的帮助器类中:
    public class MDCCopyHelper {
        private Map<String, String> contextMap = MDC.getCopyOfContextMap();
    
        public void set(Object... any) {
            MDC.setContextMap(contextMap);
        }
    
        public void clear(Object... any) {
            MDC.clear();
        }
    
        public boolean clearAndFail() {
            MDC.clear();
            return false;
        }
    }
    

    流式代码看起来更好一点:

    MDCCopyHelper mdcHelper = new MDCCopyHelper();
    Optional<Integer> findFirst = Stream.iterate(0, i -> i + 1)
            .parallel()
            .peek(mdcHelper::set)
            // ...logic...
            // filters predicates should be combined with clear step
            .filter(yourPredicate.or(mdcHelper::clearAndFail))
            // before terminal call:
            .peek(mdcHelper::clear)
            .findFirst();
    mdcHelper.set();
    

    【讨论】:

    • 逻辑内部抛出异常怎么办?它不会让 MDC 变脏吗?您是否设法提出了一个考虑异常的解决方案?
    • 不,我不知道如何处理。流在异常处理方面很糟糕。
    【解决方案2】:

    我的解决方案是包装那些功能接口。类似于静态代理模式。
    例如

    public static void main(String[] args) {
        System.err.println(Thread.currentThread().getName());
        String traceId = "100";
        MDC.put("id", traceId);
        System.err.println("------------------------");
        Stream.of(1, 2, 3, 4)
              .parallel()
              .forEach((num -> {
                  System.err.println(Thread.currentThread().getName()+" "+ traceId.equals(MDC.get("id")));
              }));
        System.err.println("------------------------");
        Stream.of(1, 2, 3, 4)
              .parallel()
              // the key is the TraceableConsumer
              .forEach(new TraceableConsumer<>(num -> {
                  System.err.println(Thread.currentThread().getName() + " " + traceId.equals(MDC.get("id")));
              }));
    }
    

    public class TraceableConsumer<T> extends MDCTraceable implements Consumer<T> {
    
        private final Consumer<T> target;
    
        public TraceableConsumer(Consumer<T> target) {
            this.target = target;
        }
    
        @Override
        public void accept(T t) {
            setMDC();
            target.accept(t);
        }
    }
    

    public abstract class MDCTraceable {
    
        private final Long id;
    
        private final Long userId;
    
        public MDCTraceable() {
            id = Optional.ofNullable(MDC.get("id")).map(Long::parseLong).orElse(0L);
            userId = Optional.ofNullable(MDC.get("userId")).map(Long::parseLong).orElse(0L);
        }
    
        public void setMDC(){
            MDC.put("id", id.toString());
            MDC.put("userId", userId.toString());
        }
    
        public void cleanMDC(){
            MDC.clear();
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-08-29
      • 2020-04-10
      • 2016-01-23
      • 2013-10-06
      • 1970-01-01
      • 1970-01-01
      • 2021-07-12
      • 2023-03-26
      相关资源
      最近更新 更多