CompleteService使用场景
当我们需要批量任务处理,但是并不关心任务完成的先后顺序,我们异步的提交任务,等待有任务执行完成之后然后对该完成结果处理,如此循环直到该批量任务完成.
我们遵循异步处理完成后的操作原则时,谁先完成收割谁.
基于集合Future遍历处理
针对这种场景,我们很可能会想到把所有的异步任务收集到一个集合,然后遍历这个集合(Future),调用future.get()获取处理结果,进行后续操作,然后我们就会写出如下代码
1 | ExecutorService pool = Executors.newFixedThreadPool(5); |
执行这段代码,会发现执行结果并没有按照我们所预期的来
1 | 聚合完成 |
可见,上面的执行结果并不是我们想要的,明显cc元素的执行比较耗时,但是我们的处理结果却是按照循环遍历的顺序来的,原因如下:
从list中遍历的每个Future对象并不一定处于完成状态,这时调用get()方法就会被阻塞住,如果系统是设计成每个线程完成后就能根据其结果继续做后面的事,这样对于处于list后面的但是先完成的线程就会增加了额外的等待时间。
基于CompletionService完成并行聚合
1 | ExecutorService pool = Executors.newFixedThreadPool(5); |
执行会发现这种结果才是我们真正要的
1 | 聚合 |
我们能得到想要的结果,是因为CompleteService中内部维护着一个BlockingQueue.原理如下:
CompletionService的实现是维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。
CompleteService中take()和poll()的区别
查看CompleteService的接口定义
1 | public interface CompletionService<V> { |
从接口中我们可以看到CompletionService中定义的summit相关的方法用来载入线程体(分别处理实现Callable或Runable的线程), poll()和take()用来获取返回结果集.
关于poll()和take()的区别
poll()是非阻塞的,若目前无结果,返回一个null,线程继续运行不阻塞。take()是阻塞的,若当前无结果,则线程阻塞,直到产生一个结果
示例poll()和take()的区别
1 | ExecutorService pool = Executors.newFixedThreadPool(5); |
程序运行结果
1 | 聚合完成 |