说到java,就不得不说起多线程的使用,在 java8 的时代,可以使用多线程的方式已经非常多了,这里主要记录一下 ListenableFurure 的使用。

ListenableFurure 是guava库中的一员,主要是扩展原生的Future没有回调函数的诟病,通过ListenableFurure ,可以轻松的为异步方法添加回调、统一处理、链式处理等。

Listenablefuture vs Completablefuture

说道Future,就要提一下Completablefuture,这是java8中提供的一种新类型,他用与Listenablefuture 类似的方式扩展了基础的Future,那么他们有什么异同呢?

首先,和原始的Future比起来,这两者都具有一个优势。即允许给异步一个操作注册一个回调方法,这个方法将会在异步方法完成之后执行。

如果用原始的Future:

1
2
3
ExecutorService executor = ...;
Future f = executor.submit(...);
f.get();

f.get() 会阻塞住线程,直到异步方法结束。

如果使用Listenablefuture,就可以注册一个回调方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
ListenableFuture listenable = service.submit(...);
    Futures.addCallback(listenable, new FutureCallback<Object>() {
                @Override
                public void onSuccess(Object o) {
                    //handle on success
                }

                @Override
                public void onFailure(Throwable throwable) {
                   //handle on failure
                }
            })

如果使用Completablefuture,也可以注册一个回调方法供异步方法完成之后调用。

1
2
3
4
5
6
7
8
CompletableFuture completableFuture = new CompletableFuture();
completableFuture.whenComplete(new BiConsumer() {
	@Override
    public void accept(Object o, Object o2) {
    //handle complete
    }
}); // complete the task
completableFuture.complete(new Object())

不过当然他们也有一些区别,这些区别我们可以放到以后再研究,本篇还是记录一下ListenableFuture的使用。

ListenableFuture的使用

线程池

ListenableFuture 使用的线程池为ListeningExecutorService,可以通过MoreExecutors工具类方便的封装一个原生线程池得到:

1
        ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

这个线程池也是对ExecutorService的一种实现,可以当做只是对原生线程池的一种封装。

ListenableFuture

假设有一个耗时的方法,所以把他写成一个异步的方法,他的返回值是 string,可是我们希望使用它的封装类 Document,那么,我们可以这样实现

1
2
3
4
5
6
7
8
final ListenableFuture<String> future = //...
 
final ListenableFuture<Document> documentFuture = Futures.transform(future, new Function<String, Document>() {
    @Override
    public Document apply(String contents) {
        return parse(contents);
    }
});

或者也可以将转换方法分离出来,增加可读性:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
final Function<String, Document> parseFun = new Function<String, Document>() {
    @Override
    public Document apply(String contents) {
        return parse(contents);
    }
};
 
final ListenableFuture<String> future = //...
 
final ListenableFuture<Document> documentFuture = Futures.transform(future, parseFun);

Futures.transform 方法就相当于注册了一个回调方法,当异步方法执行结束之后,就会调用它。当我们希望继续转换的时候,可以继续使用transform 方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
final Function<Document, Double> relevanceFun = new Function<Document, Double>() {
    @Nullable
    @Override
    public Double apply(@Nullable Document input) {
        return calculateRelevance(input);
    }
};
 
final ListenableFuture<String> future = //...
final ListenableFuture<Document> documentFuture = Futures.transform(future, parseFun);
final ListenableFuture<Double> relevanceFuture = Futures.transform(documentFuture, relevanceFun);

看起来是不是还不错,我们从获取到一个 String 首先转换成Document,然后计算relevance的值,这一切都是异步的。如果使用Future,就需要不断的get和submit,ListenableFuture让这个过程方便了很多。

如果我们有很多Future,也可以轻松的获取到他们执行都结束以后的Future:

1
2
final List<ListenableFuture<Double>> relevanceFutures = //...;
final ListenableFuture<List<Double>> futureOfRelevance = Futures.allAsList(relevanceFutures);

在最后,可以增加一个callback方法进行处理:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Futures.addCallback(maxRelevanceFuture, new FutureCallback<List<Double>>() {
    @Override
    public void onSuccess(List<Double> result) {
    	Double result = Collections.max(relevanceList);
        log.debug("Result: {}", result);
    }
 
    @Override
    public void onFailure(Throwable t) {
        log.error("Error :-(", t);
    }
});

这样就获取到了最终的结果,是不是看起来还不错呢。

当使用异步方法处理rest请求的时候,一般需要配合DeferredResult,以便于等待结果处理好之后再返回,防止提前返回结果。