CompletableFuture 异步编排(Completable future asynchronous orchestration)

CompletableFuture 异步编排

  • CompletableFuture 异步编排1、创建异步对象2、完成时回调3、完成时处理4、线程串行化方法5、两个任务组合(both)5.1 ps6、两个任务组合(either)7、多任务组合7.1 ps
  • 1、创建异步对象
  • 2、完成时回调
  • 3、完成时处理
  • 4、线程串行化方法
  • 5、两个任务组合(both)5.1 ps
  • 5.1 ps
  • 6、两个任务组合(either)
  • 7、多任务组合7.1 ps
  • 7.1 ps

查询商品详情的业务比较复杂,有的数据还需要远程调用

// 获取sku的基本信息 0.5s
// 获取sku的图片信息 0.5s
// 获取sku的促销信息 1s
// 获取所有spu的销售属性 1s
// 获取规格参数组以及组下规格参数 1.5s
// spu详情 1s

假如获取商品详情页的每个查询,都需要如下标注时间来完成,服务器返回数据每次都需要5.5s,显然是不能接受的

如果多线程同时完成这6步操作,也许只需要1.5秒即可响应完成

CompletableFuture介绍

CompletableFuture介绍

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {}

我们看到继承至Future,可以获取到异步执行结果。

1、创建异步对象

CompletableFuture提供了四个静态方法来创建对象

CompletableFuture提供了四个静态方法来创建对象

// 异步执行,无需返回
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 指定线程池,异步执行,无需返回
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
// 异步执行,有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 指定线程池,异步执行,有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

代码示例

代码示例

package com.test.controller.future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lishanbiao
 * @Date 2021/11/23
 */
public class CompletableFutureTest {
    static ExecutorService executorService = Executors.newFixedThreadPool(10);

    /**
     * 如何创建异步对象
     * // 异步执行,无需返回
     * public static CompletableFuture<Void> runAsync(Runnable runnable)
     * // 指定线程池,异步执行,无需返回
     * public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
     * // 异步执行,有返回值
     * public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
     * // 指定线程池,异步执行,有返回值
     * public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
     */
    public static void main(String[] args) throws Exception {
        runAsync(); // 异步执行,无需返回值,主线程无需等待结果返回
        runAsyncWithExecutor();

        CompletableFuture<Integer> future = supplyAsync(); // 异步执行,有返回值,主线程需等待结果返回
        System.out.println("supplyAsync返回结果:" + future.get());

        CompletableFuture<Integer> execFuture = supplyAsyncWithExecutor();
        System.out.println("supplyAsync返回结果:" + execFuture.get());

    }

    /**
     * 异步执行,无需返回
     */
    static void runAsync() {
        System.out.println("main……start……");
        CompletableFuture.runAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
        });
        System.out.println("main……end……");
    }

    /**
     * 异步执行,无需返回,用线程池
     */
    static void runAsyncWithExecutor() {
        System.out.println("main……start……");
        CompletableFuture.runAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
        }, executorService);
        System.out.println("main……end……");
    }


    /**
     * 异步执行,有返回值
     */
    static CompletableFuture<Integer> supplyAsync() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        });
    }

    /**
     * 异步执行,有返回值,
     */
    static CompletableFuture<Integer> supplyAsyncWithExecutor() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        });
    }
}

2、完成时回调

CompletableFuture提供了四个,感知或处理结果和异常的方法

CompletableFuture提供了四个,感知或处理结果和异常的方法

// 处理正常和异常结果,无返回值
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
// 另开启一个线程,处理正常和异常结果,无返回值
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) 
// 另开启一个线程池中的线程,处理正常和异常结果,无返回值
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
// 处理异常情况,有返回值
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

代码示例

代码示例

package com.test.controller.future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lishanbiao
 * @Date 2021/11/23
 */
public class CompletableFutureTest {
    static ExecutorService executorService = Executors.newFixedThreadPool(10);
    /**
     * 2、结果和异常处理
     * // 处理正常和异常结果
     * public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
     * // 另开启一个线程,处理正常和异常结果
     * public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
     * // 另开启一个线程池中的线程,处理正常和异常结果
     * public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
     * // 处理异常情况
     * public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
     */
    public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> whenCompleteFuture = whenComplete();
        System.out.println("whenCompleted返回结果:" + whenCompleteFuture.get());
    }

    /**
     * 异步执行,有返回值,
     */
    static CompletableFuture<Integer> supplyAsyncWithExecutor() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
						// 故障制造异常,则返回结果为 10
            // int i = 10 / 0;
            // 正常,则返回结果为 5
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        });
    }

    /**
     * 异步执行,有返回值,
     */
    static CompletableFuture<Integer> whenComplete() throws Exception {
        return supplyAsyncWithExecutor()
          // 感知异常
          .whenComplete((resultData, exception) -> {
            System.out.println("执行supplyAsync后,调用whenComplete返回的数据:" + resultData + ",异常:" + exception);
            // 处理异常情况
        }).exceptionally(throwable -> 10);
    }
}

3、完成时处理

CompletableFuture提供了handle方法是另一种处理结果的方式

CompletableFuture提供了handle方法是另一种处理结果的方式

// 处理上一次结果,有返回值
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
// 新开线程处理上一次结果,有返回值
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
// 新拿取线程池中线程处理上一次结果,有返回值
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

代码示例

代码示例

package com.atguigu.gulimail.test.controller.future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lishanbiao
 * @Date 2021/11/23
 */
public class CompletableFutureTest {
    static ExecutorService executorService = Executors.newFixedThreadPool(10);

    /**
     * 3、完成时处理
     * // 处理上一次结果,有返回值
     * public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
     * // 新开线程处理上一次结果,有返回值
     * public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
     * // 新拿取线程池中线程处理上一次结果,有返回值
     * public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
     */
    public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> handleFuture = handle();
        System.out.println("handleFuture返回结果:" + handleFuture.get());

    }

  

    /**
     * 异步执行,有返回值,
     */
    static CompletableFuture<Integer> supplyAsyncWithExecutor() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        });
    }

    /**
     * 异步执行,可处理返回值
     */
    static CompletableFuture<Integer> handle() throws Exception {
        return supplyAsyncWithExecutor().handle((resultData, exception) -> {
            System.out.println("执行supplyAsync后,调用whenComplete返回的数据:" + resultData + ",异常:" + exception);
            if (resultData == null) {
                return resultData * 2;
            }
            if (exception != null) {
                return 0;
            }
            return resultData;
        });
    }
}

4、线程串行化方法

CompletableFuture提供了一系列的串行化方法

CompletableFuture提供了一系列的串行化方法

// 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值
public CompletableFuture<Void> thenRun(Runnable action)
// 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值,新开线程  
public CompletableFuture<Void> thenRunAsync(Runnable action)
// 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值,线程池中新开线程
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)

// 消费一个线程结果,不返回信息
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
// 消费一个线程结果,不返回信息,新开线程
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
// 消费一个线程结果,不返回信息,线程池中新开线程
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)

// 消费一个线程结果,返回信息
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
// 消费一个线程结果,返回信息,新开线程
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
// 消费一个线程结果,返回信息,线程池中新开线程
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

代码示例

代码示例

package com.atguigu.gulimail.test.controller.future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lishanbiao
 * @Date 2021/11/23
 */
public class CompletableFutureTest {
    static ExecutorService executorService = Executors.newFixedThreadPool(10);

    /**
     * 4、线程串行化方法
     */
    public static void main(String[] args) throws Exception {

        CompletableFuture<Integer> handleFuture = handle();
        // 这些方法请自己尝试测验
        thenRun();
        thenRunAsync();
        thenRunAsyncWithExec();
        thenAccept();
        thenAcceptAsync();
        thenAcceptAsyncWithExec();
        CompletableFuture<String> applyFuture = thenApplyAsync();
        applyFuture = thenApply();
        applyFuture = thenApplyAsyncWithExec();
        Thread.sleep(50000);
    }

   
            System.out.println("运行结果:" + i);
        }, executorService);
        System.out.println("main……end……");
    }


    /**
     * 异步执行,有返回值
     */
    static CompletableFuture<Integer> supplyAsync() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("运行结果:" + i);
            return i;
        });
    }

   
    /**
     * 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值(等待线程处理完之后,新开另一个线程执行其他任务)
     */
    static void thenRun() throws Exception {
        supplyAsync().thenRun(() -> {
            System.out.println("我是上一个异步操作执行完后的处理……");
        });
    }

    /**
     * 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值,新开线程(等待线程处理完之后,新开另一个线程执行其他任务)
     */
    static void thenRunAsync() throws Exception {
        supplyAsync().thenRunAsync(() -> {
            System.out.println("我是上一个异步操作执行完后的处理……");
        });
    }

    /**
     * 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值,线程池中新开线程(等待线程处理完之后,新开另一个线程执行其他任务)
     */
    static void thenRunAsyncWithExec() throws Exception {
        supplyAsync().thenRunAsync(() -> {
            System.out.println("我是上一个异步操作执行完后的处理……");
        }, executorService);
    }

    /**
     * 消费一个线程结果,不返回信息
     */
    static void thenAccept() throws Exception {
        supplyAsync().thenAccept(res -> {
            System.out.println("上一个线程返回的结果:" + res);
        });
    }

    /**
     * 消费一个线程结果,不返回信息,线程池中新开线程
     */
    static void thenAcceptAsync() throws Exception {
        supplyAsync().thenAcceptAsync(res -> {
            System.out.println("上一个线程返回的结果:" + res);
        });
    }

    /**
     * 消费一个线程结果,不返回信息,新开线程
     */
    static void thenAcceptAsyncWithExec() throws Exception {
        supplyAsync().thenAcceptAsync(res -> {
            System.out.println("上一个线程返回的结果:" + res);
        }, executorService);
    }

    /**
     * 消费一个线程结果,返回信息
     * @return
     */
    static CompletableFuture<String> thenApply() throws Exception {
        return supplyAsync().thenApply(res -> {
            System.out.println("上一个线程返回的结果:" + res);
            return "我是一个apply";
        });
    }

    /**
     * 消费一个线程结果,返回信息,新开线程
     */
    static CompletableFuture<String> thenApplyAsync() throws Exception {
        return supplyAsync().thenApplyAsync(res -> {
            System.out.println("上一个线程返回的结果:" + res);
            return "我是一个apply";
        });
    }

    /**
     * 消费一个线程结果,返回信息,线程池中新开线程
     * @return
     */
    static CompletableFuture<String> thenApplyAsyncWithExec() throws Exception {
        return supplyAsync().thenApplyAsync(res -> {
            System.out.println("上一个线程返回的结果:" + res);
            return "我是一个apply";
        }, executorService);
    }
}

5、两个任务组合(both)

CompletableFuture提供both组合模式–两个任务必须都完成,触发改任务

CompletableFuture提供both组合模式–两个任务必须都完成,触发改任务

// 调用者任务与参数任务执行完成后,触发action任务
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action)
// 调用者任务与参数任务执行完成后,新开一个线程触发action任务
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action)
// 调用者任务与参数任务执行完成后,线程池新开一个线程触发action任务
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor)

// 消费两个父任务执行结果,触发action任务,无返回值
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
// 消费两个父任务执行结果,新开一个线程触发action任务,无返回值
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
// 消费两个父任务执行结果,线程池新开一个线程触发action任务,无返回值
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor)

// 处理两个父任务结果,触发子任务并返回结果
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
// 处理两个父任务结果,新开一个线程触发子任务并返回结果
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
// 处理两个父任务结果,线程池新开一个线程触发子任务并返回结果
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)

5.1 ps

值得注意的是,两个任务必然是并行执行的!

关于CompletionStage<?>究竟是什么呢?

我们会发现,CompletableFuture 实现了CompletionStage,也就是说,我们需要在方法里面再传一个任务,与调用者一起组成两个任务,都完成后,执行后续操作

代码示例

public static void main(String[] args) throws Exception {
        runAfterBoth();
        runAfterBothAsync();
        runAfterBothAsyncWithExec();
        
        thenAcceptBoth();
        thenAcceptBothAsync();
        thenAcceptBothAsyncWithExec();
        
        CompletableFuture<String> thenCombineFuture = thenCombine();
        thenCombineFuture = thenCombineAsync();
        thenCombineFuture = thenCombineAsyncWithExec();
        System.out.println(thenCombineFuture.get());
        Thread.sleep(50000);
 }

		/**
     * 异步执行,有返回值
     */
    static CompletableFuture<Integer> supplyAsync() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            try {
                System.out.println("在此等待中……");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("运行结果:" + i);
            return i;
        });
    }

/**
 * 两个父任务执行完毕,触发action任务
 *
 * @return
 */
static void runAfterBoth() throws Exception {
    supplyAsync().runAfterBoth(supplyAsync(), () -> {
        System.out.println("runAfterBoth任务执行完成");
    });
}

/**
 * 两个父任务执行完毕,新开一个线程触发action任务
 *
 * @return
 */
static void runAfterBothAsync() throws Exception {
    supplyAsync().runAfterBothAsync(supplyAsync(), () -> {
        System.out.println("runAfterBoth任务执行完成");
    });
}

/**
 * 两个父任务执行完毕,线程池新开一个线程触发action任务
 *
 * @return
 */
static void runAfterBothAsyncWithExec() throws Exception {
    supplyAsync().runAfterBothAsync(supplyAsync(), () -> {
        System.out.println("runAfterBoth任务执行完成");
    }, executorService);
}

/**
     * 消费两个父任务执行结果,触发action任务,无返回值
     *
     * @return
     */
    static void thenAcceptBoth() throws Exception {
        supplyAsync().thenAcceptBoth(supplyAsync(), (firstResult, secondResult) -> {
            System.out.println("thenAcceptBoth-任务执行完成");
        });
    }

    /**
     * 消费两个父任务执行结果,新开一个线程触发action任务,无返回值
     *
     * @return
     */
    static void thenAcceptBothAsync() throws Exception {
        supplyAsync().thenAcceptBothAsync(supplyAsync(), (firstResult, secondResult) -> {
            System.out.println("thenAcceptBoth-任务执行完成");
        });
    }

    /**
     * 消费两个父任务执行结果,线程池新开一个线程触发action任务,无返回值
     *
     * @return
     */
    static void thenAcceptBothAsyncWithExec() throws Exception {
        supplyAsync().thenAcceptBothAsync(supplyAsync(), (firstResult, secondResult) -> {
            System.out.println("thenAcceptBoth-任务执行完成");
        }, executorService);
    }

		/**
     * 处理两个父任务结果,触发子任务并返回结果
     *
     * @return
     */
    static CompletableFuture<String> thenCombine() throws Exception {
         return supplyAsync().thenCombine(supplyAsync(), (firstResult, secondResult) -> {
            System.out.println("thenAcceptBoth-任务执行完成");
            return "thenCombine";
        });
    }

    /**
     * 处理两个父任务结果,新开一个线程触发子任务并返回结果
     *
     * @return
     */
    static CompletableFuture<String> thenCombineAsync() throws Exception {
        return supplyAsync().thenCombineAsync(supplyAsync(), (firstResult, secondResult) -> {
            System.out.println("thenAcceptBoth-任务执行完成");
            return "thenCombineAsync";
        });
    }

    /**
     * 处理两个父任务结果,线程池新开一个线程触发子任务并返回结果
     *
     * @return
     */
    static CompletableFuture<String> thenCombineAsyncWithExec() throws Exception {
        return supplyAsync().thenCombineAsync(supplyAsync(), (firstResult, secondResult) -> {
            System.out.println("thenAcceptBoth-任务执行完成");
            return "thenCombineAsyncWithExec";
        }, executorService);
    }

6、两个任务组合(either)

CompletableFuture提供either组合模式–两个任务只要完成一个,触发改任务

CompletableFuture提供either组合模式–两个任务只要完成一个,触发改任务

与both有异曲同工之妙,照葫芦画瓢,这里不过多阐述

// 两个父任务结果只要返回一个,触发子任务,无返回结果
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action)
// 两个父任务结果只要返回一个,新开一个线程触发子任务,无返回结果
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action)
// 两个父任务结果只要返回一个,线程池新开一个线程触发子任务,无返回结果
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)

// 两个父任务结果只要返回一个,消费其结果,触发子任务,无返回结果
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
// 两个父任务结果只要返回一个,消费其结果,新开一个线程触发子任务,无返回结果
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
// 两个父任务结果只要返回一个,消费其结果,线程池新开一个线程触发子任务,无返回结果
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)
  
// 两个父任务结果只要返回一个,触发子任务处理其结果,并返回
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
// 两个父任务结果只要返回一个,新开线程触发子任务处理其结果,并返回
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
// 两个父任务结果只要返回一个,线程池新开线程触发子任务处理其结果,并返回
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor)

7、多任务组合

CompletableFuture提供了多任务组合模式(allOff、anyOff)

CompletableFuture提供了多任务组合模式(allOff、anyOff)

// 执行完任何一个任务后,返回其结果,有返回值
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
// 等待所有任务执行完毕,返回空值
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

代码示例

public static void main(String[] args) throws Exception {
        allOf();
        CompletableFuture<Object> objectCompletableFuture = anyOf();
        System.out.println(objectCompletableFuture.get());
  			// 主线程等待运行
        Thread.sleep(10000);
    }

		/**
     * 异步执行,有返回值
     */
    static CompletableFuture<Integer> supplyAsync() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            try {
                System.out.println("在此等待中……");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("运行结果:" + i);
            return i;
        });
    }

		/**
     * 执行完任何一个任务后,返回其结果,有返回值
     *
     * @return
     */
    static CompletableFuture<Object> anyOf() throws Exception {
        return CompletableFuture.anyOf(supplyAsync(), supplyAsync(), supplyAsync());
    }

    /**
     * 等待所有任务执行完毕,返回空值
     *
     * @return
     */
    static CompletableFuture<Void> allOf() throws Exception {
        return CompletableFuture.allOf(supplyAsync(), supplyAsync(), supplyAsync());
    }

7.1 ps

值得注意的是,无论是anyOf还是allOf,最后所有的线程任务都会执行完毕!

总结篇:

  • run相关的方法,通常用来做下一步操作
  • accept相关的方法,通常用来消费结果,无返回值
  • supply、apply、combine相关的方法是有返回值的
  • handle方法用于处理正常和异常结果

相信大家应该都学以致用了吧!

————————

CompletableFuture 异步编排

  • Completable future asynchronous orchestration 1. Creating asynchronous objects 2. Callback on completion 3. Processing on completion 4. Thread serialization method 5. Both 5.1 PS6. Either 7. Multitask 7.1 PS
  • 1. Create asynchronous object
  • 2. Callback on completion
  • 3. Process on completion
  • 4. Thread serialization method
  • 5. Two task combination (both) 5.1 PS
  • 5.1 ps
  • 6. Two task combination (either)
  • 7. Multitasking combination 7.1 PS
  • 7.1 ps

The business of querying commodity details is complex, and some data needs to be called remotely

// 获取sku的基本信息 0.5s
// 获取sku的图片信息 0.5s
// 获取sku的促销信息 1s
// 获取所有spu的销售属性 1s
// 获取规格参数组以及组下规格参数 1.5s
// spu详情 1s

If each query to obtain the product details page needs to be completed with the following marked time, the data returned by the server needs 5.5s each time, which is obviously unacceptable

If multiple threads complete these six operations at the same time, it may take only 1.5 seconds to complete the response

CompletableFuture介绍

CompletableFuture介绍

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {}

We can see that the asynchronous execution results can be obtained by inheriting to future.

1. Create asynchronous object

Completable future provides four static methods to create objects

Completable future provides four static methods to create objects

// 异步执行,无需返回
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 指定线程池,异步执行,无需返回
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
// 异步执行,有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 指定线程池,异步执行,有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

< strong > code example < / strong >

< strong > code example < / strong >

package com.test.controller.future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lishanbiao
 * @Date 2021/11/23
 */
public class CompletableFutureTest {
    static ExecutorService executorService = Executors.newFixedThreadPool(10);

    /**
     * 如何创建异步对象
     * // 异步执行,无需返回
     * public static CompletableFuture<Void> runAsync(Runnable runnable)
     * // 指定线程池,异步执行,无需返回
     * public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
     * // 异步执行,有返回值
     * public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
     * // 指定线程池,异步执行,有返回值
     * public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
     */
    public static void main(String[] args) throws Exception {
        runAsync(); // 异步执行,无需返回值,主线程无需等待结果返回
        runAsyncWithExecutor();

        CompletableFuture<Integer> future = supplyAsync(); // 异步执行,有返回值,主线程需等待结果返回
        System.out.println("supplyAsync返回结果:" + future.get());

        CompletableFuture<Integer> execFuture = supplyAsyncWithExecutor();
        System.out.println("supplyAsync返回结果:" + execFuture.get());

    }

    /**
     * 异步执行,无需返回
     */
    static void runAsync() {
        System.out.println("main……start……");
        CompletableFuture.runAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
        });
        System.out.println("main……end……");
    }

    /**
     * 异步执行,无需返回,用线程池
     */
    static void runAsyncWithExecutor() {
        System.out.println("main……start……");
        CompletableFuture.runAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
        }, executorService);
        System.out.println("main……end……");
    }


    /**
     * 异步执行,有返回值
     */
    static CompletableFuture<Integer> supplyAsync() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        });
    }

    /**
     * 异步执行,有返回值,
     */
    static CompletableFuture<Integer> supplyAsyncWithExecutor() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        });
    }
}

2. Callback on completion

Completable future provides four ways to sense or handle results and exceptions

Completable future provides four ways to sense or handle results and exceptions

// 处理正常和异常结果,无返回值
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
// 另开启一个线程,处理正常和异常结果,无返回值
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) 
// 另开启一个线程池中的线程,处理正常和异常结果,无返回值
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
// 处理异常情况,有返回值
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

< strong > code example < / strong >

< strong > code example < / strong >

package com.test.controller.future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lishanbiao
 * @Date 2021/11/23
 */
public class CompletableFutureTest {
    static ExecutorService executorService = Executors.newFixedThreadPool(10);
    /**
     * 2、结果和异常处理
     * // 处理正常和异常结果
     * public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
     * // 另开启一个线程,处理正常和异常结果
     * public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
     * // 另开启一个线程池中的线程,处理正常和异常结果
     * public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
     * // 处理异常情况
     * public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
     */
    public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> whenCompleteFuture = whenComplete();
        System.out.println("whenCompleted返回结果:" + whenCompleteFuture.get());
    }

    /**
     * 异步执行,有返回值,
     */
    static CompletableFuture<Integer> supplyAsyncWithExecutor() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
						// 故障制造异常,则返回结果为 10
            // int i = 10 / 0;
            // 正常,则返回结果为 5
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        });
    }

    /**
     * 异步执行,有返回值,
     */
    static CompletableFuture<Integer> whenComplete() throws Exception {
        return supplyAsyncWithExecutor()
          // 感知异常
          .whenComplete((resultData, exception) -> {
            System.out.println("执行supplyAsync后,调用whenComplete返回的数据:" + resultData + ",异常:" + exception);
            // 处理异常情况
        }).exceptionally(throwable -> 10);
    }
}

3. Process on completion

Completable future provides the handle method as another way to process the results

Completable future provides the handle method as another way to process the results

// 处理上一次结果,有返回值
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
// 新开线程处理上一次结果,有返回值
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
// 新拿取线程池中线程处理上一次结果,有返回值
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

< strong > code example < / strong >

< strong > code example < / strong >

package com.atguigu.gulimail.test.controller.future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lishanbiao
 * @Date 2021/11/23
 */
public class CompletableFutureTest {
    static ExecutorService executorService = Executors.newFixedThreadPool(10);

    /**
     * 3、完成时处理
     * // 处理上一次结果,有返回值
     * public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
     * // 新开线程处理上一次结果,有返回值
     * public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
     * // 新拿取线程池中线程处理上一次结果,有返回值
     * public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
     */
    public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> handleFuture = handle();
        System.out.println("handleFuture返回结果:" + handleFuture.get());

    }

  

    /**
     * 异步执行,有返回值,
     */
    static CompletableFuture<Integer> supplyAsyncWithExecutor() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        });
    }

    /**
     * 异步执行,可处理返回值
     */
    static CompletableFuture<Integer> handle() throws Exception {
        return supplyAsyncWithExecutor().handle((resultData, exception) -> {
            System.out.println("执行supplyAsync后,调用whenComplete返回的数据:" + resultData + ",异常:" + exception);
            if (resultData == null) {
                return resultData * 2;
            }
            if (exception != null) {
                return 0;
            }
            return resultData;
        });
    }
}

4. Thread serialization method

Completable future provides a series of serialization methods

Completable future provides a series of serialization methods

// 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值
public CompletableFuture<Void> thenRun(Runnable action)
// 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值,新开线程  
public CompletableFuture<Void> thenRunAsync(Runnable action)
// 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值,线程池中新开线程
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)

// 消费一个线程结果,不返回信息
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
// 消费一个线程结果,不返回信息,新开线程
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
// 消费一个线程结果,不返回信息,线程池中新开线程
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)

// 消费一个线程结果,返回信息
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
// 消费一个线程结果,返回信息,新开线程
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
// 消费一个线程结果,返回信息,线程池中新开线程
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

< strong > code example < / strong >

< strong > code example < / strong >

package com.atguigu.gulimail.test.controller.future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lishanbiao
 * @Date 2021/11/23
 */
public class CompletableFutureTest {
    static ExecutorService executorService = Executors.newFixedThreadPool(10);

    /**
     * 4、线程串行化方法
     */
    public static void main(String[] args) throws Exception {

        CompletableFuture<Integer> handleFuture = handle();
        // 这些方法请自己尝试测验
        thenRun();
        thenRunAsync();
        thenRunAsyncWithExec();
        thenAccept();
        thenAcceptAsync();
        thenAcceptAsyncWithExec();
        CompletableFuture<String> applyFuture = thenApplyAsync();
        applyFuture = thenApply();
        applyFuture = thenApplyAsyncWithExec();
        Thread.sleep(50000);
    }

   
            System.out.println("运行结果:" + i);
        }, executorService);
        System.out.println("main……end……");
    }


    /**
     * 异步执行,有返回值
     */
    static CompletableFuture<Integer> supplyAsync() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("运行结果:" + i);
            return i;
        });
    }

   
    /**
     * 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值(等待线程处理完之后,新开另一个线程执行其他任务)
     */
    static void thenRun() throws Exception {
        supplyAsync().thenRun(() -> {
            System.out.println("我是上一个异步操作执行完后的处理……");
        });
    }

    /**
     * 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值,新开线程(等待线程处理完之后,新开另一个线程执行其他任务)
     */
    static void thenRunAsync() throws Exception {
        supplyAsync().thenRunAsync(() -> {
            System.out.println("我是上一个异步操作执行完后的处理……");
        });
    }

    /**
     * 不能获取到上一步执行结果,但需要等待上一个任务执行完成,无返回值,线程池中新开线程(等待线程处理完之后,新开另一个线程执行其他任务)
     */
    static void thenRunAsyncWithExec() throws Exception {
        supplyAsync().thenRunAsync(() -> {
            System.out.println("我是上一个异步操作执行完后的处理……");
        }, executorService);
    }

    /**
     * 消费一个线程结果,不返回信息
     */
    static void thenAccept() throws Exception {
        supplyAsync().thenAccept(res -> {
            System.out.println("上一个线程返回的结果:" + res);
        });
    }

    /**
     * 消费一个线程结果,不返回信息,线程池中新开线程
     */
    static void thenAcceptAsync() throws Exception {
        supplyAsync().thenAcceptAsync(res -> {
            System.out.println("上一个线程返回的结果:" + res);
        });
    }

    /**
     * 消费一个线程结果,不返回信息,新开线程
     */
    static void thenAcceptAsyncWithExec() throws Exception {
        supplyAsync().thenAcceptAsync(res -> {
            System.out.println("上一个线程返回的结果:" + res);
        }, executorService);
    }

    /**
     * 消费一个线程结果,返回信息
     * @return
     */
    static CompletableFuture<String> thenApply() throws Exception {
        return supplyAsync().thenApply(res -> {
            System.out.println("上一个线程返回的结果:" + res);
            return "我是一个apply";
        });
    }

    /**
     * 消费一个线程结果,返回信息,新开线程
     */
    static CompletableFuture<String> thenApplyAsync() throws Exception {
        return supplyAsync().thenApplyAsync(res -> {
            System.out.println("上一个线程返回的结果:" + res);
            return "我是一个apply";
        });
    }

    /**
     * 消费一个线程结果,返回信息,线程池中新开线程
     * @return
     */
    static CompletableFuture<String> thenApplyAsyncWithExec() throws Exception {
        return supplyAsync().thenApplyAsync(res -> {
            System.out.println("上一个线程返回的结果:" + res);
            return "我是一个apply";
        }, executorService);
    }
}

5. Two task combination (both)

Completable future provides both combination mode — both tasks must be completed to trigger the task change

Completable future provides both combination mode — both tasks must be completed to trigger the task change

// 调用者任务与参数任务执行完成后,触发action任务
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action)
// 调用者任务与参数任务执行完成后,新开一个线程触发action任务
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action)
// 调用者任务与参数任务执行完成后,线程池新开一个线程触发action任务
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor)

// 消费两个父任务执行结果,触发action任务,无返回值
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
// 消费两个父任务执行结果,新开一个线程触发action任务,无返回值
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
// 消费两个父任务执行结果,线程池新开一个线程触发action任务,无返回值
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor)

// 处理两个父任务结果,触发子任务并返回结果
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
// 处理两个父任务结果,新开一个线程触发子任务并返回结果
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
// 处理两个父任务结果,线程池新开一个线程触发子任务并返回结果
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)

5.1 ps

< strong > it is worth noting that < / strong > the two tasks must be executed in parallel!

< strong > about completionstage & lt& gt; What is it

We will find that completablefuture implements completionstage, that is, we need to pass another task in the method to form two tasks together with the caller. After both tasks are completed, we can perform subsequent operations

< strong > code example < / strong >

public static void main(String[] args) throws Exception {
        runAfterBoth();
        runAfterBothAsync();
        runAfterBothAsyncWithExec();
        
        thenAcceptBoth();
        thenAcceptBothAsync();
        thenAcceptBothAsyncWithExec();
        
        CompletableFuture<String> thenCombineFuture = thenCombine();
        thenCombineFuture = thenCombineAsync();
        thenCombineFuture = thenCombineAsyncWithExec();
        System.out.println(thenCombineFuture.get());
        Thread.sleep(50000);
 }

		/**
     * 异步执行,有返回值
     */
    static CompletableFuture<Integer> supplyAsync() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            try {
                System.out.println("在此等待中……");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("运行结果:" + i);
            return i;
        });
    }

/**
 * 两个父任务执行完毕,触发action任务
 *
 * @return
 */
static void runAfterBoth() throws Exception {
    supplyAsync().runAfterBoth(supplyAsync(), () -> {
        System.out.println("runAfterBoth任务执行完成");
    });
}

/**
 * 两个父任务执行完毕,新开一个线程触发action任务
 *
 * @return
 */
static void runAfterBothAsync() throws Exception {
    supplyAsync().runAfterBothAsync(supplyAsync(), () -> {
        System.out.println("runAfterBoth任务执行完成");
    });
}

/**
 * 两个父任务执行完毕,线程池新开一个线程触发action任务
 *
 * @return
 */
static void runAfterBothAsyncWithExec() throws Exception {
    supplyAsync().runAfterBothAsync(supplyAsync(), () -> {
        System.out.println("runAfterBoth任务执行完成");
    }, executorService);
}

/**
     * 消费两个父任务执行结果,触发action任务,无返回值
     *
     * @return
     */
    static void thenAcceptBoth() throws Exception {
        supplyAsync().thenAcceptBoth(supplyAsync(), (firstResult, secondResult) -> {
            System.out.println("thenAcceptBoth-任务执行完成");
        });
    }

    /**
     * 消费两个父任务执行结果,新开一个线程触发action任务,无返回值
     *
     * @return
     */
    static void thenAcceptBothAsync() throws Exception {
        supplyAsync().thenAcceptBothAsync(supplyAsync(), (firstResult, secondResult) -> {
            System.out.println("thenAcceptBoth-任务执行完成");
        });
    }

    /**
     * 消费两个父任务执行结果,线程池新开一个线程触发action任务,无返回值
     *
     * @return
     */
    static void thenAcceptBothAsyncWithExec() throws Exception {
        supplyAsync().thenAcceptBothAsync(supplyAsync(), (firstResult, secondResult) -> {
            System.out.println("thenAcceptBoth-任务执行完成");
        }, executorService);
    }

		/**
     * 处理两个父任务结果,触发子任务并返回结果
     *
     * @return
     */
    static CompletableFuture<String> thenCombine() throws Exception {
         return supplyAsync().thenCombine(supplyAsync(), (firstResult, secondResult) -> {
            System.out.println("thenAcceptBoth-任务执行完成");
            return "thenCombine";
        });
    }

    /**
     * 处理两个父任务结果,新开一个线程触发子任务并返回结果
     *
     * @return
     */
    static CompletableFuture<String> thenCombineAsync() throws Exception {
        return supplyAsync().thenCombineAsync(supplyAsync(), (firstResult, secondResult) -> {
            System.out.println("thenAcceptBoth-任务执行完成");
            return "thenCombineAsync";
        });
    }

    /**
     * 处理两个父任务结果,线程池新开一个线程触发子任务并返回结果
     *
     * @return
     */
    static CompletableFuture<String> thenCombineAsyncWithExec() throws Exception {
        return supplyAsync().thenCombineAsync(supplyAsync(), (firstResult, secondResult) -> {
            System.out.println("thenAcceptBoth-任务执行完成");
            return "thenCombineAsyncWithExec";
        }, executorService);
    }

6. Two task combination (either)

Completable future provides either combination mode — if only one of two tasks is completed, the task will be triggered

Completable future provides either combination mode — if only one of two tasks is completed, the task will be triggered

It is similar to both. Draw a gourd and a gourd. Here’s just a little more

// 两个父任务结果只要返回一个,触发子任务,无返回结果
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action)
// 两个父任务结果只要返回一个,新开一个线程触发子任务,无返回结果
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action)
// 两个父任务结果只要返回一个,线程池新开一个线程触发子任务,无返回结果
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)

// 两个父任务结果只要返回一个,消费其结果,触发子任务,无返回结果
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
// 两个父任务结果只要返回一个,消费其结果,新开一个线程触发子任务,无返回结果
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
// 两个父任务结果只要返回一个,消费其结果,线程池新开一个线程触发子任务,无返回结果
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)
  
// 两个父任务结果只要返回一个,触发子任务处理其结果,并返回
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
// 两个父任务结果只要返回一个,新开线程触发子任务处理其结果,并返回
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
// 两个父任务结果只要返回一个,线程池新开线程触发子任务处理其结果,并返回
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor)

7. Multi task combination

Completable future provides multi task combination mode (alloff, anyoff)

Completable future provides multi task combination mode (alloff, anyoff)

// 执行完任何一个任务后,返回其结果,有返回值
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
// 等待所有任务执行完毕,返回空值
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

< strong > code example < / strong >

public static void main(String[] args) throws Exception {
        allOf();
        CompletableFuture<Object> objectCompletableFuture = anyOf();
        System.out.println(objectCompletableFuture.get());
  			// 主线程等待运行
        Thread.sleep(10000);
    }

		/**
     * 异步执行,有返回值
     */
    static CompletableFuture<Integer> supplyAsync() throws Exception {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            try {
                System.out.println("在此等待中……");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("运行结果:" + i);
            return i;
        });
    }

		/**
     * 执行完任何一个任务后,返回其结果,有返回值
     *
     * @return
     */
    static CompletableFuture<Object> anyOf() throws Exception {
        return CompletableFuture.anyOf(supplyAsync(), supplyAsync(), supplyAsync());
    }

    /**
     * 等待所有任务执行完毕,返回空值
     *
     * @return
     */
    static CompletableFuture<Void> allOf() throws Exception {
        return CompletableFuture.allOf(supplyAsync(), supplyAsync(), supplyAsync());
    }

7.1 ps

< strong > it is worth noting that, < / strong > whether anyof or allof, all thread tasks will be executed at last!

< strong > summary: < / strong >

  • Run related methods are usually used for the next operation
  • Accept related methods are usually used to consume results without return value
  • supply、apply、combine相关的方法是有返回值的
  • The handle method is used to handle normal and abnormal results

I believe everyone should apply what they have learned!