博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
强大的CompletableFuture
阅读量:5083 次
发布时间:2019-06-13

本文共 7316 字,大约阅读时间需要 24 分钟。

1197846-20190718212847156-925839215.jpg

引子

为了让程序更加高效,让CPU最大效率的工作,我们会采用异步编程。首先想到的是开启一个新的线程去做某项工作。再进一步,为了让新线程可以返回一个值,告诉主线程事情做完了,于是乎Future粉墨登场。然而Future提供的方式是主线程主动问询新线程,要是有个回调函数就爽了。所以,为了满足Future的某些遗憾,强大的CompletableFuture随着Java8一起来了。

Future

传统多线程的却让程序更加高效,毕竟是异步,可以让CPU充分工作,但这仅限于新开的线程无需你的主线程再费心了。比如你开启的新线程仅仅是为了计算1+...+n再打印结果。有时候你需要子线程返回计算结果,在主线程中进行进一步计算,就需要Future了。

看下面这个例子,主线程计算2+4+6+8+10;子线程计算1+3+5+7+9;最后需要在主线程中将两部分结果再相加。

public class OddNumber implements Callable
{ @Override public Integer call() throws Exception { Thread.sleep(3000); int result = 1 + 3 + 5 + 7 + 9; return result; }}
public class FutureTest {    public static void main(String[] args) {        ExecutorService executor = Executors.newCachedThreadPool();        OddNumber oddNumber = new OddNumber();        Future
future = executor.submit(oddNumber); long startTime = System.currentTimeMillis(); int evenNumber = 2 + 4 + 6 + 8 + 10; try { Thread.sleep(1000); System.out.println("0.开始了:"+ (System.currentTimeMillis()-startTime) +"秒"); int oddNumberResult = future.get();//这时间会被阻塞 System.out.println("1+2+...+9+10="+(evenNumber+oddNumberResult)); System.out.println("1.开始了:"+ (System.currentTimeMillis()-startTime) +"秒"); } catch (Exception e) { System.out.println(e); } }}输出结果:0.开始了:1001秒1+2+...+9+10=551.开始了:3002秒

看一下Future接口,只有五个方法比较简单

//取消任务,如果已经完成或者已经取消,就返回失败boolean cancel(boolean mayInterruptIfRunning);//查看任务是否取消boolean isCancelled();//查看任务是否完成boolean isDone();//刚才用到了,查看结果,任务未完成就一直阻塞V get() throws InterruptedException, ExecutionException;//同上,但是加了一个过期时间,防止长时间阻塞,主线程也做不了事情V get(long timeout, TimeUnit unit)    throws InterruptedException, ExecutionException, TimeoutException;

CompletableFuture

上面的看到Future的五个方法,不是很丰富,既然我们的主线程叫做main,就应该以我为主,我更希望子线程做完了事情主动通知我。为此,Java8带来了CompletableFuture,一个Future的实现类。其实CompletableFuture最迷人的地方并不是极大丰富了Future的功能,而是完美结合了Java8流的新特性。

实现回调,自动后续操作

提前说一下CompletableFuture实现回调的方法(之一):thenAccept()

public CompletableFuture
thenAccept(Consumer
action) { return uniAcceptStage(null, action); }

参数有个Consumer,用到了Java8新特性,行为参数化,就是参数不一定是基本类型或者类,也可使是函数(行为),或者说一个方法(接口)。

public class OddNumberPlus implements Supplier
{ @Override public Integer get() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return 1+3+5+7+9; }}
public class CompletableFutureTest {    public static void main(String[] args) {        long startTime = System.currentTimeMillis();        final int evenNumber = 2 + 4 + 6 + 8 + 10;        CompletableFuture
oddNumber = CompletableFuture.supplyAsync(new OddNumberPlus()); try { Thread.sleep(1000); System.out.println("0.开始了:"+ (System.currentTimeMillis()-startTime) +"秒"); //看这里,实现回调 oddNumber.thenAccept(oddNumberResult-> { System.out.println("1.开始了:"+ (System.currentTimeMillis()-startTime) +"秒"); System.out.println("此时计算结果为:"+(evenNumber+oddNumberResult)); }); oddNumber.get(); } catch (Exception e) { System.out.println(e); } }}输出结果:0.开始了:1006秒1.开始了:3006秒此时计算结果为:55

值得一提的是,本例中并没有显示的创建任务连接池,程序会默认选择一个任务连接池ForkJoinPool.commonPool()

private static final Executor asyncPool = useCommonPool ?        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

ForkJoinPool始自JDK7,叫做分支/合并框架。可以通过将一个任务递归分成很多分子任务,形成不同的流,进行并行执行,同时还伴随着强大的工作窃取算法。极大的提高效率。当然,你也可以自己指定连接池。

CompletableFuture合并

Java8的确丰富了Future实现,CompletableFuture有很多方法可供大家使用,但是但从上面的例子来看,其实CompletableFuture能做的功能,貌似Future。毕竟你CompletableFuture用get()这个方法的时候还不是阻塞了,我Future蛮可以自己拿到返回值,再手动执行一些操作嘛(虽说这样main方法一定很不爽)。那么接下来的事情,Future做起来就十分麻烦了。假设我们main方法只做奇数合集加上偶数合集这一个操作,提前算这两个合集的操作异步交给两个子线程,我们需要怎么做呢?没错,开启两个线程,等到两个线程都计算结束的时候,我们进行最后的相加,问题在于,你怎么知道那个子线程最后结束的呢?(貌似可以做个轮询,不定的调用isDone()这个方法...)丰富的CompletableFuture功能为我们提供了一个方法,用于等待两个子线程都结束了,再进行相加操作:

//asyncPool就是上面提到的默认线程池ForkJoinPool    public 
CompletableFuture
thenCombineAsync( CompletionStage
other, BiFunction
fn) { return biApplyStage(asyncPool, other, fn); }

看个例子:

public class OddCombine implements Supplier
{ @Override public Integer get() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return 1+3+5+7+9; }}
public class EvenCombine implements Supplier
{ @Override public Integer get() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return 2+4+6+8+10; }}
public class CompletableCombineTest {    public static void main(String[] args) throws Exception{        CompletableFuture
oddNumber = CompletableFuture.supplyAsync(new OddCombine()); CompletableFuture
evenNumber = CompletableFuture.supplyAsync(new EvenCombine()); long startTime = System.currentTimeMillis(); CompletableFuture
resultFuturn = oddNumber.thenCombine(evenNumber,(odd,even)->{ return odd + even; }); System.out.println(resultFuturn.get()); System.out.println("0.开始了:"+ (System.currentTimeMillis()-startTime) +"秒"); }}输出结果:550.开始了:3000秒

这边模拟一个睡1秒,一个睡3秒,但是真正的网络请求时间是不定的。是不是很爽,最爽的还不是现象,而是以上操作已经利用了Java8流的概念。

两个子线程还不够,那么还有anyOff()函数,可以承受多个CompletableFuture,会等待所有任务都完成。

public static CompletableFuture
allOf(CompletableFuture
... cfs) { return andTree(cfs, 0, cfs.length - 1); }

与它长的很像的,有个方法,是当第一个执行结束的时候,就结束,后面任务不再等了,可以看作充分条件。

public static CompletableFuture anyOf(CompletableFuture
... cfs) { return orTree(cfs, 0, cfs.length - 1); }

在上面那个例子的基础上,把OddNumberPlus类时间调长一点:

public class OddNumberPlus implements Supplier
{ @Override public Integer get() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return 1+3+5+7+9; }}
public class CompletableCombineTest {    public static void main(String[] args) throws Exception{        CompletableFuture
oddNumber = CompletableFuture.supplyAsync(new OddCombine()); CompletableFuture
evenNumber = CompletableFuture.supplyAsync(new EvenCombine()); CompletableFuture
testNumber = CompletableFuture.supplyAsync(new OddNumberPlus()); long startTime = System.currentTimeMillis(); CompletableFuture
resultFuturn = CompletableFuture.anyOf(oddNumber,evenNumber,testNumber); System.out.println(resultFuturn.get()); System.out.println("0.开始了:"+ (System.currentTimeMillis()-startTime) +"秒"); }}输出结果:300.开始了:1000秒

小结

CompletableFuture的方法其实还有很多,常用的比如说runAsync(),类似于supplyAsync(),只是没有返回值;除了thenApply()可以加回调函数以外,还有thenApply();还有注入runAfterBoth()、runAfterEither(),这些见名知意。还有很多,可以点开CompletableFuture这个类的源码仔细看一看。见微知著,透过CompletableFuture,更加感觉到Java8的强大,强大的流概念、行为参数化、高效的并行理念等等,不仅让Java写起来更爽,还不断丰富Java整个生态。Java一直在进步,所以没有被时代淘汰,我们Javaer也可以继续职业生涯,感谢Java,一起进步。

转载于:https://www.cnblogs.com/pjjlt/p/11210120.html

你可能感兴趣的文章
linux程序设计---序
查看>>
【字符串入门专题1】hdu3613 【一个悲伤的exkmp】
查看>>
C# Linq获取两个List或数组的差集交集
查看>>
HDU 4635 Strongly connected
查看>>
ASP.NET/C#获取文章中图片的地址
查看>>
Spring MVC 入门(二)
查看>>
格式化输出数字和时间
查看>>
页面中公用的全选按钮,单选按钮组件的编写
查看>>
java笔记--用ThreadLocal管理线程,Callable<V>接口实现有返回值的线程
查看>>
BZOJ 1047 HAOI2007 理想的正方形 单调队列
查看>>
各种语言推断是否是手机设备
查看>>
这个看起来有点简单!--------实验吧
查看>>
PHP count down
查看>>
JVM参数调优:Eclipse启动实践
查看>>
(旧笔记搬家)struts.xml中单独页面跳转的配置
查看>>
不定期周末福利:数据结构与算法学习书单
查看>>
strlen函数
查看>>
python的列表与shell的数组
查看>>
关于TFS2010使用常见问题
查看>>
软件工程团队作业3
查看>>