【发布时间】:2019-05-21 08:39:08
【问题描述】:
我需要记录请求的一些属性,例如请求 id 和语言环境,但是在使用 parallelStream 时,MDC 的 ThreadLocal 似乎丢失了信息。
我分析了创建parallelStream时在线程之间传递MDC上下文的解决方案,但它看起来很脏,我也有很多parallelStream的用法。
还有其他方法吗?
谢谢
【问题讨论】:
标签: java multithreading logging logback mdc
我需要记录请求的一些属性,例如请求 id 和语言环境,但是在使用 parallelStream 时,MDC 的 ThreadLocal 似乎丢失了信息。
我分析了创建parallelStream时在线程之间传递MDC上下文的解决方案,但它看起来很脏,我也有很多parallelStream的用法。
还有其他方法吗?
谢谢
【问题讨论】:
标签: java multithreading logging logback mdc
我找到的唯一解决方案是将上下文复制到流外部的最终变量中,并将其应用于每次迭代:
Map<String, String> contextMap = MDC.getCopyOfContextMap();
Stream.iterate(0, i -> i + 1).parallel()
.peek(i -> MDC.setContextMap(contextMap))
// ...logic...
// in case you're using a filter, you need to use a predicate and combine it with a clear step:
filter(yourPredicate.or(i -> {
MDC.clear();
return false;
}))
// clear right before terminal operation
.peek(i -> MDC.clear())
.findFirst();
// since the initial thread is also used within the stream and the context is cleared there,
// we need to set it again to its initial state
MDC.setContextMap(contextMap);
该解决方案的成本是 1) 每 100 次迭代需要几微秒和 2) 可读性较差,但两者都是可以接受的:
IntStream.range(0, 100).parallel().sum() (=baseline) 与使用该 MDC 复制逻辑的相同流的基准:Benchmark Mode Cnt Score Error Units
MDC_CopyTest.baseline thrpt 5 0,038 ± 0,005 ops/us
MDC_CopyTest.withMdc thrpt 5 0,024 ± 0,001 ops/us
MDC_CopyTest.baseline avgt 5 28,239 ± 1,308 us/op
MDC_CopyTest.withMdc avgt 5 40,178 ± 0,761 us/op
public class MDCCopyHelper {
private Map<String, String> contextMap = MDC.getCopyOfContextMap();
public void set(Object... any) {
MDC.setContextMap(contextMap);
}
public void clear(Object... any) {
MDC.clear();
}
public boolean clearAndFail() {
MDC.clear();
return false;
}
}
流式代码看起来更好一点:
MDCCopyHelper mdcHelper = new MDCCopyHelper();
Optional<Integer> findFirst = Stream.iterate(0, i -> i + 1)
.parallel()
.peek(mdcHelper::set)
// ...logic...
// filters predicates should be combined with clear step
.filter(yourPredicate.or(mdcHelper::clearAndFail))
// before terminal call:
.peek(mdcHelper::clear)
.findFirst();
mdcHelper.set();
【讨论】:
我的解决方案是包装那些功能接口。类似于静态代理模式。
例如
public static void main(String[] args) {
System.err.println(Thread.currentThread().getName());
String traceId = "100";
MDC.put("id", traceId);
System.err.println("------------------------");
Stream.of(1, 2, 3, 4)
.parallel()
.forEach((num -> {
System.err.println(Thread.currentThread().getName()+" "+ traceId.equals(MDC.get("id")));
}));
System.err.println("------------------------");
Stream.of(1, 2, 3, 4)
.parallel()
// the key is the TraceableConsumer
.forEach(new TraceableConsumer<>(num -> {
System.err.println(Thread.currentThread().getName() + " " + traceId.equals(MDC.get("id")));
}));
}
public class TraceableConsumer<T> extends MDCTraceable implements Consumer<T> {
private final Consumer<T> target;
public TraceableConsumer(Consumer<T> target) {
this.target = target;
}
@Override
public void accept(T t) {
setMDC();
target.accept(t);
}
}
public abstract class MDCTraceable {
private final Long id;
private final Long userId;
public MDCTraceable() {
id = Optional.ofNullable(MDC.get("id")).map(Long::parseLong).orElse(0L);
userId = Optional.ofNullable(MDC.get("userId")).map(Long::parseLong).orElse(0L);
}
public void setMDC(){
MDC.put("id", id.toString());
MDC.put("userId", userId.toString());
}
public void cleanMDC(){
MDC.clear();
}
}
【讨论】: