注册

CompletableFuture使用与解读

1 前言


jdk8后给出的类,android需要N版本之后才能使用;提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,也提供了转换和组合 CompletableFuture 的方法;


本文会从以下方面来介绍



  • 使用、方法意义以及总结归纳
  • 流程解读

2 使用


从类来看,其实现了CompletionStage接口以及Future接口;futrue的用法就不在这里说了,这里仅仅说明CompletionStage方法以及相关方法用法;


调用整个过程,我把它看成是个流,每次方法生成的CompletableFuture都是一个流节点,每个流有自己的完成结果,其后面的流依赖其完成后才可执行


2.1 流的产生



  • 静态方法

    val ff = CompletableFuture<Int>()
复制代码


  • 数据提供者Supplier

CompletableFuture.supplyAsync {
println("create thread ${Thread.currentThread().name}")
100
}
复制代码


  • 任务事件Runnable

 CompletableFuture.runAsync {
println("create: buy meat and vegetables")
}
复制代码


  • 组合并集任务

 CompletableFuture.allOf(CompletableFuture.runAsync{
println("create: wear shoes")
}, CompletableFuture.runAsync{
println("create: wear dress")
})
复制代码


  • 组合互斥任务

    CompletableFuture.anyOf(CompletableFuture.runAsync{
println("create: read a book")
}, CompletableFuture.runAsync{
println("create: write")
})
复制代码

2.2 流的处理


流的处理方法比较多了,有37个,写代码不方便;完成方法表格如下,表格备注表达了我对这些方法的抽象与理解,看了这个,有助于更好理解下面涉及的东西


CompletableFuture详细方法.png
太多了很难记,也不好理解,下面给出了简略精华版本方法表;通过这些方法,清除明了这个类可以做到什么样得组合变换


CompletableFuture简略方法.png


下面给出几个简单事例



  1. 无组合的变化、消费

CompletableFuture.supplyAsync {
println("create thread ${Thread.currentThread().name}")
100
}.thenApply {
println("map thread ${Thread.currentThread().name}")
it * 10
}.thenAccept {
println("consume $it")
}
复制代码


  1. 组合变化、消费

CompletableFuture.supplyAsync {
10
}.applyToEither(CompletableFuture.supplyAsync {
100
}, Function<Int, Int> {
it * 10 + 3
}).thenCombine(CompletableFuture.supplyAsync{
"Lily"
}, BiFunction<Int, String, Stu> { t, u -> Stu(u, t)}).thenAccept {
println("name ${it.name}, age ${it.age}")
}
复制代码


  1. 异常转换、多次消费

val ff = CompletableFuture<Int>()
ff.handle<Int>{
_, _ -> 10
}.whenComplete{
t, u -> println("first handler $t")
}.whenComplete { t, u -> println("second handler $t")}
ff.obtrudeValue(null)
复制代码

2.3 流结果设置


这里也通过表格方式,有下面几种方法


CompletableFuture结果设置.png


我们通过构造器生成时,需要自己设置值,如下


val ff = CompletableFuture<Int>()
ff.thenApply {
it / 2 + 4
}
ff.complete(16)
复制代码

设置值后,后面的流才会执行


3. 源码解读


CompletableFuture是流的一个节点,内部持有了完成状态以及依赖其的任务节点信息,其内部同样实现了完成态时依赖任务执行处理;


3.1 数据结构


这主要体现这两个成员变量上


    volatile Object result; 
volatile Completion stack;
复制代码


  • result:结果为null,表示未执行;执行结果为空,则设置为静态常量NIL,异常则设置为AltResult实例,正常完成,则表示实际的值; AltResult内容如下

   static final AltResult NIL = new AltResult(null);
static final class AltResult {
final Throwable ex;
AltResult(Throwable x) { this.ex = x; }
}
复制代码


  • stack:链表尾部指针,组成了后进先出的链表结构;是依赖当前完成状态需要执行的任务集合;内容如下,其实现ForkJoinTask,只是为了利用ForkJoinPoo线程池,其最大有点就是解决频繁的异步任务的,很配

    abstract static class Completion extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
volatile Completion next;
abstract CompletableFuture<?> tryFire(int mode);

abstract boolean isLive();
public final void run() { tryFire(ASYNC); }
public final boolean exec() { tryFire(ASYNC); return false; }
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
}
复制代码

对于stack处理



  • postFire方法: 通知其依赖的节点,进行完成传播;由于没有使用锁,只使用了原子操作,这样可以防止,有些节点加入到依赖集合中,却不能得到执行
  • cleanStack方法:清除失活以及无效的节点
  • postComplete方法:执行stack集合中任务
  • casStack方法:改变队尾操作
  • tryPushStack方法:尝试加入队尾数据
  • pushStack:队尾加入数据

3.2 Completion以及子类


Completion类,抽象类,待执行的任务节点;其内部持有下个流以及流任务执行的逻辑;其继承关系类图如下:


Completion类图.jpg


内部变量


        CompletableFuture<V> dep; 
CompletableFuture<T> src;
CompletableFuture<U> snd
复制代码

dep代表当前操作新成的流节点,src、snd为其依赖的流节点;其中每个类,还有流任务执行的对象:Runable、Function、ConSumer、BiFunction、BiConsumer等


tryFire方法很重要,其持有的转换对象、消费对象代表了需要执行的操作;其实他们对应的tryFire方法内部实际操作,都在CompletableFuture内有对应方法


tryFire方法


很关键的方法,其持有的转换对象、消费对象代表了需要执行的操作;其情况与具体的模式有关,其情况如下



  • SYNC = 0, 同步状态;执行线程为当前方法调用线程或者上个流执行所在线程;同时其可能仅仅是为了启动线程池启动任务
  • ASYNC = 1,异步,表示需要在线程池内执行
  • NESTED = -1,传播模式,表示依赖的流节点已经处于完成状态,正在传递处理

claim方法


线程池任务提交,并且执行有且提交一次


3.3 中间流生成与执行原理


中间流处理,就是CompletionStage声明的方法;其系列处理方法,基本逻辑相同,也就是方法名称不同而已,而由于持有的任务不同而略有不同


3.3.1 thenRun系列


均是通过私有方法uniRunStage进行处理,进行添加时尝试处理的


  private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
CompletableFuture<Void> d = newIncompleteFuture();
if (e != null || !d.uniRun(this, f, null)) {
UniRun<T> c = new UniRun<T>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}
复制代码

对于此方法有下面逻辑



  1. 同步执行,且uniRun执行成功,则返回生成流节点
  2. 否则,添加相应Completion子类到等待集合中,并再次尝试执行;和之前提到的postFire结合确保一定能够执行

   final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {
Object r; Throwable x;
if (a == null || (r = a.result) == null || f == null)
return false;
if (result == null) {
if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
completeThrowable(x, r);
else
try {
if (c != null && !c.claim())
return false;
f.run();
completeNull();
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
复制代码

方法的最后一个参数,当是触发线程池提交任务操作时,需要传入任务实例,否则传入空指;也就是传入空指,代表此方法中直接执行,这时,线程可能为生成流节点方法线程,也可能是上个流节点执行的线程,也可能是线程池创建的线程中(好像等于白说了);这个方法流程如下:




  1. 检验依赖节点执行状态,未完成则结束




  2. 执行异常结束,则设置异常状态,结束




  3. 正常执行结束时,尝试执行当前任务



    • 需要向线程池提交任务,则通过claim方法,进行处理,并返回;提交任务后会执行tryFire方法
    • 不需要向线程池提交任务,执行;若执行成功,有结果直接设置结果,无结果设置NIL值;若是发生已成设置异常



如果调用CompletionStage声明的方法未能立刻执行的,则需要通过依赖的流节点完成后通过postComplete方法进行分发;


    final void postComplete() {
CompletableFuture<?> f = this; Completion h;
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
if (f.casStack(h, t = h.next)) {
if (t != null) {
if (f != this) {
pushStack(h);
continue;
}
h.next = null;
}
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
复制代码

tryFire方法,返回空表示流节点任务没有完成,否则表示已完成,继续这个节点的分发;也就是分发时通过tryFire方法去执行依赖节点的任务


        final CompletableFuture<Void> tryFire(int mode) {
CompletableFuture<Void> d; CompletableFuture<T> a;
if ((d = dep) == null ||
!d.uniRun(a = src, fn, mode > 0 ? null : this))
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}
复制代码

逻辑如下



  1. 当前任务执行的流节点为空、或者未执行,则返回null,也就是此节点未完成操作
  2. 已经执行成功,则把持有对象全部置空,以便gc;并通过postFire通知其依赖节点进行清理依赖节点集合或者继续传播触发

    final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
if (a != null && a.stack != null) {
if (mode < 0 || a.result == null)
a.cleanStack();
else
a.postComplete();
}
if (result != null && stack != null) {
if (mode < 0)
return this;
else
postComplete();
}
return null;
}
复制代码

主要逻辑




  1. 依赖流节点不为空,且依赖集合不为空



    • 传播模式或者其未完成执行,则进行节点清理
    • 否则,进行传播



  2. 当前流节点执行完毕,且依赖集合不为空



    • 正在处于传播模式,则返回当前对象,继续传播
    • 否则,进行传播处理



整个添加流节点以及执行流程,已经分析完了;那么这个相似处,根据这个例子再来具体的说下:


整个流程:Completion子类(UniRun)以及子类tryFire方法、CompletableFuture中辅助方法(uniRun)以及postFire、postComplete等分发方法


3.3.2 thenRun相似流程系列



  • thenApply系列方法:子类UniApply、辅助方法uniApply
  • thenAccept系列方法:子类UniAccept、辅助方法uniAccept
  • thenCombine系列方法:子类BiApply、辅助方法biApply
  • thenAcceptBoth系列方法:子类BiAccept、辅助方法biAccept
  • runAfterBoth系列方法:子类biRun、辅助方法BiRun
  • applyToEither系列方法:子类orApply、辅助方法OrApply
  • acceptEither系列方法:子类OrAccept、辅助方法orAccept
  • runAfterEither系列方法:子类OrRun、辅助方法orRun
  • handle系列方法:子类UniHandle、辅助方法uniHandle
  • whenComplete系列方法:子类UniWhenComplete、辅助方法uniWhenComplete
  • exceptionally方法:子类UniExceptionally、辅助方法uniExceptionally

uiWhenComplete、uniHandle和uniExceptionally,在异常处理中,因为需要处理异常,而在检测其依赖节点异常时,并不直接退出,而是继续处理


3.3.3 thenCompose系列


这个为何特殊呢,因为它相当于两个任务;



  1. 通过Function<? super T, ? extends CompletionStage>转换流为一个任务
  2. 转换的流执行又是一个任务,其又关联一个流

第一个子类是UniCompose,辅助方法是uniCompose,执行了转换流逻辑,并通过Relay实例把当前加入到转换流执行的依赖集合中;也就是说thenCompose系列方法产生的流,依赖于转换流操作以及转换的流完成


转换的流执行逻辑子类是UniRelay,辅助方法uniRelay


执行逻辑并没有区别;relay表示接力,也就是,其传递上个流节点结果即可


4 小结


CompletableFuture这个类,我觉得异步编程,还是需要一定的功底,它并没有把相应操作等封装的很到位,37个方法组合使用,可以达到不同的效果;


技术变化都很快,但基础技术、理论知识永远都是那些;作者希望在余后的生活中,对常用技术点进行基础知识分享;如果你觉得文章写的不错,请给与关注和点赞;如果文章存在错误,也请多多指教!

">
">
">
">


作者:众少成多积小致巨
链接:https://juejin.cn/post/6956585105875795998
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

0 个评论

要回复文章请先登录注册