您好,欢迎访问代理记账网站
  • 价格透明
  • 信息保密
  • 进度掌控
  • 售后无忧

一个由java.util.ConcurrentModificationException引起的血案

线上跑的一个flink任务突然报了异常,话不多,先把异常发出来

最近在做一个flink项目时遇到了一个线上报错,话不多说,先上代码,其中用到的第三方封装的flink sdk隐去了包前缀,不会影响分析。

Caused by: java.util.ConcurrentModificationException
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1388)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
    at com.jd.uifs.operators.JimDBBatchSinkFunction.process(JimDBBatchSinkFunction.java:84)
    at xxx.flink.sdk.batchsink2.AbstractBatchSinkFunction$BatchProcessRunnable.run(AbstractBatchSinkFunction.java:218)
    at xxx.flink.sdk.batchsink2.AbstractBatchSinkFunction.processBatch(AbstractBatchSinkFunction.java:190)
    at xxx.flink.sdk.batchsink2.AbstractBatchSinkFunction.lambda$resetBatch$0(AbstractBatchSinkFunction.java:173)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    
    

根据错误的提示,找到BatchSinkFunction的84行,为了方便大家了解上下文,把整段代码贴出来,同时注明84行所在位置

@Override
public void process(List<Tuple3<String, String,String>> inputs, IBatchProcessCallBack callBack) {
  
    if(inputs.size() == 0 ) {
        return;
    }
    Map<String,Map<String,Tuple2<Long,Integer>>> map1 = inputs.stream()
84行所在位置            .collect(Collectors.toMap(
                    tuple3 -> (tuple3.f0)
                    ,tuple3 -> {
                        Long timeStamp = LocalDateTime.parse(tuple3.f2.substring(0,14), formatter).toInstant(ZoneOffset.of("+8")).getEpochSecond();
                        Map<String,Tuple2<Long,Integer>> map2 = new HashMap<String,Tuple2<Long,Integer>>();
                        map2.put(tuple3.f1,new Tuple2<>(timeStamp,1));
                        return map2;
                    }
                    ,(v1,v2) -> {
                       v1.forEach((key,value)->{
                           if(v2.containsKey(key)) {
                               if (v2.get(key).f0 > value.f0) {
                                   Tuple2<Long,Integer> newTuple2 = new Tuple2<>(v2.get(key).f0,v2.get(key).f1 + value.f1);                                 
                                   v2.put(key,newTuple2);
                               } else {
                                   Tuple2<Long,Integer> newTuple2 = new Tuple2<>(value.f0,v2.get(key).f1 + value.f1);
                                   v2.put(key,newTuple2);                             
                               }
                           } else {
                               v2.put(key, value);
                           
                           }
                       });
                       return v2;
                     
                    }));

从代码中可以看到,84行其实就是输入的list在stream后进行的collect操作,由于异常提示是个ConcurrentModificationException,又由于是个flink任务,很自然的想到难道是由于flink并发修改了list里面的值引起的异常?顺着这个想法看调用栈的底部

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

发现是flink sdk在这里使用了线程池,一步步向上看,

xxx.flink.sdk.batchsink2.AbstractBatchSinkFunction$BatchProcessRunnable.run(AbstractBatchSinkFunction.java:218)
    at xxx.flink.sdk.batchsink2.AbstractBatchSinkFunction.processBatch(AbstractBatchSinkFunction.java:190)
    at xxx.flink.sdk.batchsink2.AbstractBatchSinkFunction.lambda$resetBatch$0(AbstractBatchSinkFunction.java:173)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

怎么样,这些是不是很熟悉,即使不看源码也能大概分析出是线程池的task run方法调用了flink sdk的AbstractBatchSinkFunction 的process 方法,process方法的入参是一个List容器,因为list容器是个非线程安全的容器,那么在对list容器同时进行修改和访问就会出现这种异常,分析到这里感觉是有一定道理,那到底是不是由于这个引起的呢,继续向下分析,定位到list.stream 后的collect代码,根据错误提示查看collect源码

我在使用stream的时候并没有设置并行度,所以代码走498行else分支,继续看evaluate方法,

 

大意就是使用Terminal Opration预估pipeline的结果,同样,没设置并行度,走

terminalOp.evaluateSequential方法,

方法说明是使用指定的piplineHelper对操作进行顺序预估,继续查看该接口方法实现类的方法

该抽象类是一个TerminalOp(Terminal Opration),它会预估一个 stream pipline 并将结果发送到AccumulatingSink来执行reduce操作,继续看708行wrapAndCopyInto,查看方法说明

将由PiplineHelper描述的管道数据流的各个阶段应用于Spliterator,并将结果发送到sink。查看下

Spliterator的说明 An object for traversing and partitioning elements of a source. The source * of elements covered by a Spliterator could be, for example, an array, a * {@link Collection}, an IO channel, or a generator function.(一个对source进行遍历和分片的对象,source可以是一个collection,一个io channel,或者一个function )。查看Sink的说明

An extension of {@link Consumer} used to conduct values through the stages of a stream pipeline, with additional methods to manage size information,control flow, etc。

Sink是 Consumer接口的扩展,用于在流管道的各个阶段进行值运算,并带有其他方法来管理大小信息,控制流等。

继续看wrapAndCopyInto的实现

472行,首先是对sink进行了包装,然后根据方法名称的理解是将Spliterator 拷贝到了Sink,那是怎么拷贝的呢,继续深入,

看481,482,483行,Sink接口有对481和482行的解释,

Before calling the {@code accept()} method on a {@code Sink} for the first time, you must first call the {@code begin()} method to inform it that data is coming (optionally informing the sink how much data is coming), and after all data has been sent, you must call the {@code end()} method.  After calling {@code end()}, you should not call {@code accept()} without again calling {@code begin()}. 

翻译之后就是在Sink上调用 accept()方法之前 ,您必须首先调用 begin()方法通知它有数据要发送(可选地通知sink有多少数据要发送),并且在发送完所有数据后,您必须调用  end()方法。 调用end()后,应在再次调用accept()之前调用begin()。

所以481 和483是为accept服务的,不用深究。再看482,马上就离胜利不远了,因为这个ConcurrentModificationException就是由该方法引起的,进入该方法定义

顺序的在当前线程内对每个剩余元素执行给定的action,直到所有元素都已处理或操作引发异常。 如果Spliterator 是有顺序的,则按遇到顺序执行操作。 该操作引发的异常将返回给调用方。根据错误提示,继续找到ArrayListSpliterator对应该方法的实现

终于,找到错误的根了,有没有眼泪哗哗的,赶紧分析。到1372行为止,都是创建一些临时变量,和数组,这里lst被赋值为我们的数据源即输入数组,看1373行,如果hi = fence小于0,fence是什么,转到定义

先看fence后面的注释,直到被使用之前一直都是-1,之后被赋值为最后一个索引,fence的中文意思是篱笆,护栏,那这就好理解了,fence就是数组边界。这里我们看到上面的一大段英文注释,通常这种英文注释里面都有宝贵信息,没办法,硬着头皮啃,一句句翻译。

如果ArrayList是不可变的或结构上不可变的(不允许添加,删除等),我们可以使用Arrays.spliterator来实现spliterator(这里的spliterator其实就是迭代器,大概意思就是如果ArrayList不允许修改,那么我们直接用Arrays提供的迭代器来迭代就可以)。否则,我们需要在遍历期间检测到了尽可能多的冲突(即在遍历期间检查是否被同时读写),同时又不能牺牲太多性能。为了达到这个目的,这里使用了modCounts这个变量。但这并不能保证一定会检测到并发冲突,有时又会对线程内的冲突过于保守,但这可以检测到足够的问题,值得在实践中使用。为此,我们做了两件事

(1)在需要将最新的点提交到我们正在检查的状态时我们才会惰性初始化fence和; 这能提高精度。

(2)我们仅在forEach末尾执行一次ConcurrentModificationException检查。

当使用forEach(与迭代器相对)时,我们通常只能在action后而非action前检测冲突。进一步的CME触发检查适用于所有其他情况,例如null 值或者数组size太小,这可能仅是由于冲突而引起的。这允许forEach的内部循环运行而无需任何进一步的检查,并简化了lambda表达式的解析方式。尽管这确实需要进行大量检查,但请注意,通常情况下,除了在list.stream().forEach(a)内部,其他地方并不需要检查冲突。 

.....拗口的翻译后发现没有太多有用的信息,唯一有用的信息点就是modCounts比较重要,但是又没说明modCounts的具体含义,转到modCounts的定义,又是一大堆说明:

modCounts是list被structurally modified的次数,structurally modified 就是改变了list的size。换句话说,如果list被structurally modified了,则进行中的iterations(迭代)将会产生错误的结果。

这个字段用于迭代器和列表迭代器的iterator和listIterator方法,如果这个字段值被意外的修改了,则iterator或者listIterator的add,next,remove,previous,set将会抛出ConcurrentModificationException异常。对于迭代期间的并发修改,这提供了fail-fast的行为,而不是不确定的行为。如果List的子类要提供这种fail-fast的行为,需要在所有对list进行了structurally modified的方法里对modCounts+1,全局搜索了下modCounts,发现add,remove等所有对list size有影响的方法确实modCounts都+1.(果真注释诚不欺我)。

再继续看forEachRemaining的代码,1373行,如果fence小于0,变量mc和hi分别被设置为modCounts和size。再看1378,mc被设置为expectedModCount,那fence和ex是在哪里被初始化的呢,转到这两个值的定义。

发现是Spliterator的构造器和getFence中被初始化了,分析一下getFence的代码,发现,如果fence< 0,即fence还没有被用到,那么在list为null的情况下fence会被置为0,否则,expectedModCount 被置为modCount,fence被置为list的size,也就是说,expectedModCount 其实也是modCount的初始值。回到forEachRemaining

回想我们的stream.collect方法,在第一次进入的时候,fence一定是-1,所以一定是走的1374和1375这个分支,即将mc会被赋值为原始的modCount,hi会被赋值为数组的size大小。接着转到1379,这里其实就是做了个循环遍历,对数组的每个元素都执行accept方法。在循环结束后检查mc是不是还是和modCount相等,如果相等,直接返回,否则,抛出ConcurrentModificationException 异常,那么问题根源找到了,一定是在forEachRemaining 遍历数组元素的时候,原始数组发生了扩容或者缩容操作,改变了数组的size,导致modCount发生变化,从而导致了mc和modCount不相等。错误的原因已经找到了,那到底是代码的哪里修改了inputs呢?回顾以上代码,发现唯一有修改的地方是Collectors.toMap的第三个参数,即当两个key发生冲突时传入的一个冲突解决函数,这个函数里对v2进行了修改,但这个修改并不是对原始inputs的修改,从我们分析的逻辑上说不通。为了验证不是这个修改的问题,我本地创建了一个线程池,在线程池的excute里重现该操作,具体代码如下

 

List<String> inputs = new ArrayList<>();
for(int i = 0;i < 100;i++) {
    inputs.add("test" + i);
}

for(int i = 50;i < 150;i++) {
    inputs.add("test" + i);
}



ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(new Runnable() {
    @Override
    public void run() {
        Map<String,Map<String,String>> pinMap = inputs.stream().collect(Collectors.toMap(str->str,str-> {
            Map<String,String> valueMap = new HashMap<>();
            valueMap.put(str,str + "value");
            return valueMap;
        },(v1,v2) ->{
            v1.forEach((key,value) -> {
                if(v2.containsKey(key)) {
                    v2.put(key,"new value 1");
                    //System.out.println("new value 1");
                } else {
                    v2.put(key,value);
                    //System.out.println("value");
                }
            });
            return v2;
        }));


    }
});


executor.execute(new Runnable() {
    @Override
    public void run() {
        Map<String,Map<String,String>> pinMap = inputs.stream().collect(Collectors.toMap(str->str,str-> {
            Map<String,String> valueMap = new HashMap<>();
            valueMap.put(str,str + "value");
            return valueMap;
        },(v1,v2) ->{
            v1.forEach((key,value) -> {
                if(v2.containsKey(key)) {
                    v2.put(key,"new value 1");
                    //System.out.println("new value 1");
                } else {
                    v2.put(key,value);
                    //System.out.println("value");
                }
            });
            return v2;
        }));
    }
});

经过多次运行,发现并没有出现ConcurrentModificationException异常,说明不是这里的问题。那么唯一可能的原因就是我所使用的flink sdk 在我使用process方法处理数据的时候又对inputs进行了添加元素的操作,但是因为是偶发,不是每次都出现。所以只是可能性大,不是100%。

最后写到这里,得出的结论是:

List使用了stream.collect遍历元素的时候,如果另一个线程同时对该list进行了扩容或者缩容操作,一定会引发ConcurrentModificationException异常,这点通过本地的多线程测试也可以验证。

如果各位网友大神还有什么其他的观点,欢迎留言区留言,批评指正


分享:

低价透明

统一报价,无隐形消费

金牌服务

一对一专属顾问7*24小时金牌服务

信息保密

个人信息安全有保障

售后无忧

服务出问题客服经理全程跟进