理解Java-CompleteService应用

CompleteService使用场景

当我们需要批量任务处理,但是并不关心任务完成的先后顺序,我们异步的提交任务,等待有任务执行完成之后然后对该完成结果处理,如此循环直到该批量任务完成.
我们遵循异步处理完成后的操作原则时,谁先完成收割谁.

基于集合Future遍历处理

针对这种场景,我们很可能会想到把所有的异步任务收集到一个集合,然后遍历这个集合(Future),调用future.get()获取处理结果,进行后续操作,然后我们就会写出如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ExecutorService pool = Executors.newFixedThreadPool(5);
final List<String> dList = Arrays.asList("aa", "bb", "cc", "dd", "ee");
List<Future> fList= new ArrayList<Future>();
for(int i=0; i<dList.size(); i++){
final int tmp = i;
Future future = pool.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
if (tmp == 2) {
Thread.sleep(3000);
}
Thread.sleep(1000);
return "线程" + Thread.currentThread().getName() + "处理数据元素list(" + + tmp +") = " + dList.get(tmp) ;
}
});
fList.add(future);
}
System.out.println("聚合完成");
for (int i = 0; i < fList.size(); i++) {
System.out.println(fList.get(i).get());
}

执行这段代码,会发现执行结果并没有按照我们所预期的来

1
2
3
4
5
6
聚合完成
线程pool-1-thread-1处理数据元素list(0) = aa
线程pool-1-thread-2处理数据元素list(1) = bb
线程pool-1-thread-3处理数据元素list(2) = cc
线程pool-1-thread-4处理数据元素list(3) = dd
线程pool-1-thread-5处理数据元素list(4) = ee

可见,上面的执行结果并不是我们想要的,明显cc元素的执行比较耗时,但是我们的处理结果却是按照循环遍历的顺序来的,原因如下:

从list中遍历的每个Future对象并不一定处于完成状态,这时调用get()方法就会被阻塞住,如果系统是设计成每个线程完成后就能根据其结果继续做后面的事,这样对于处于list后面的但是先完成的线程就会增加了额外的等待时间。

基于CompletionService完成并行聚合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ExecutorService pool = Executors.newFixedThreadPool(5);
CompletionService<Object> cs = new ExecutorCompletionService<Object>(pool);
final List<String> dList = Arrays.asList("aa", "bb", "cc", "dd", "ee");
for(int i=0; i<dList.size(); i++){
final int tmp = i;
cs.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
if (tmp == 2) {
Thread.sleep(3000);
}
Thread.sleep(1000);
return "线程" + Thread.currentThread().getName() + "处理数据元素list(" + + tmp +") = " + dList.get(tmp);
}
});
}
System.out.println("聚合完成");
for (int i = 0; i < dList.size(); i++) {
System.out.println(cs.take().get());
}

执行会发现这种结果才是我们真正要的

1
2
3
4
5
6
7
聚合
完成
线程pool-1-thread-2处理数据元素list(1) = bb
线程pool-1-thread-1处理数据元素list(0) = aa
线程pool-1-thread-4处理数据元素list(3) = dd
线程pool-1-thread-5处理数据元素list(4) = ee
线程pool-1-thread-3处理数据元素list(2) = cc

我们能得到想要的结果,是因为CompleteService中内部维护着一个BlockingQueue.原理如下:

CompletionService的实现是维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。

CompleteService中take()和poll()的区别

查看CompleteService的接口定义

1
2
3
4
5
6
7
8
9
10
11
12
public interface CompletionService<V> {

Future<V> submit(Callable<V> task);

Future<V> submit(Runnable task, V result);

Future<V> take() throws InterruptedException;

Future<V> poll();

Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

从接口中我们可以看到CompletionService中定义的summit相关的方法用来载入线程体(分别处理实现Callable或Runable的线程), poll()和take()用来获取返回结果集.

关于poll()和take()的区别

poll()是非阻塞的,若目前无结果,返回一个null,线程继续运行不阻塞。take()是阻塞的,若当前无结果,则线程阻塞,直到产生一个结果

示例poll()和take()的区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
ExecutorService pool = Executors.newFixedThreadPool(5);
CompletionService<Object> cs = new ExecutorCompletionService<Object>(pool);
final List<String> dList = Arrays.asList("aa", "bb", "cc", "dd", "ee");
for(int i=0; i<dList.size(); i++){
final int tmp = i;
cs.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
if (tmp == 2) {
Thread.sleep(3000);
}
Thread.sleep(1000);
return "线程" + Thread.currentThread().getName() + "处理数据元素list(" + + tmp +") = " + dList.get(tmp);
}
});
}
System.out.println("聚合完成");
AtomicInteger index = new AtomicInteger(0);
while(index.get()<dList.size()) {
Future<Object> f = cs.poll();
if(f == null) {
System.out.println("没发现有完成的任务");
}else {
System.out.println(f.get());
index.incrementAndGet();
}
Thread.sleep(500);
}

程序运行结果

1
2
3
4
5
6
7
8
9
10
聚合完成
没发现有完成的任务
没发现有完成的任务
线程pool-1-thread-1处理数据元素list(0) = aa
线程pool-1-thread-4处理数据元素list(3) = dd
线程pool-1-thread-2处理数据元素list(1) = bb
线程pool-1-thread-5处理数据元素list(4) = ee
没发现有完成的任务
没发现有完成的任务
线程pool-1-thread-3处理数据元素list(2) = cc