1、问题背景
首先我们支持Spark使用HiveUDF的实现,在某个任务中,executor GC时间普遍非常长,而Hive同样逻辑就没有问题。
选取GC过长的一个executor,dump下内存,发现UDFJson对象非常大:
于是推测其中存在内存泄漏。
2、问题排查
本文代码为社区Spark master分支a834dba120
在解析SQL时我们可以看到注册UDF函数入口
Analyzer.scala:
发现函数注册提供qualifiedName以及不覆盖已注册的函数
SessionCatalog.scala:
跟进发现做了一些函数存在性和可加载校验:
HiveSessionCatalog中重写了makeFunctionExpression函数,其中分为了5中不同的case,这边new出来的HiveFunctionWrapper实例在一个executor中会被共享
HiveSessionCatalog.scala:
看一下被共享的HiveFunctionWrapper构造函数和createFunction函数,传入UDF函数名,默认instance为null:
查看instance是否已经有缓存,如果没有才会去反射new一个:
OK,在清楚了Spark UDF加载过程后,再来看下get_json_object的Hive实现
UDFJson.java:
我们可以发现有很多的HashCache,实际上就是加了淘汰机制的LinkedHashMap,看下实现:
而LinkedHashMap中在增加Node时会判断并触发淘汰机制,
LinkedHashMap.java:
很显然这不是线程安全的,在多个线程同时remove head时,有可能remove同一个head,因此会导致被淘汰的元素不断被漏掉,从而发生内存泄漏。
3、解决方案
在权衡了加锁、去除缓存和修改UDF函数创建逻辑后,选择了去除该UDF中的HashCache。