livre 发表于 2023-3-23 09:34

Java8并发编程CompletableFuturean

CompletableFuture
CompletableFuture是Java 8 才被引入的一个非常有用的用于异步编程的类。
CompletableFuture是异步执行的,默认线程池是ForkJoinPool.commonPool(),但为了业务之间互不影响,且便于定位问题,强烈推荐使用自定义线程池。

创建 CompletableFuture
// 有返回结果
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
// 有返回结果,使用自定义线程池(推荐)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

// 无返回结果

static CompletableFuture<Void> runAsync(Runnable runnable);
// 无返回结果,使用自定义线程池(推荐)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

处理异步结算的结果

[*]thenApply()
[*]thenAccept()
[*]thenRun()
[*]whenComplete()

// 沿用上一个任务的线程池
public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}

//使用默认的 ForkJoinPool 线程池(不推荐)
public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(defaultExecutor(), fn);
}
// 使用自定义线程池(推荐)
public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn, Executor executor) {
    return uniApplyStage(screenExecutor(executor), fn);
}
CompletableFuture.completedFuture("hello!")
      .thenApply(s -> s + "world!").thenApply(s -> s + "nice!").thenAccept(System.out::println);//hello!world!nice!

CompletableFuture.completedFuture("hello!")
      .thenApply(s -> s + "world!").thenApply(s -> s + "nice!").thenRun(() -> System.out.println("hello!"));//hello!


异常处理
public <U> CompletableFuture<U> handle(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(null, fn);
}

public <U> CompletableFuture<U> handleAsync(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(defaultExecutor(), fn);
}

public <U> CompletableFuture<U> handleAsync(
    BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
    return uniHandleStage(screenExecutor(executor), fn);
}
CompletableFuture<String> future
      = CompletableFuture.supplyAsync(() -> {
    if (true) {
      throw new RuntimeException("Computation error!");
    }
    return "hello!";
}).handle((res, ex) -> {
    // res 代表返回的结果
    // ex 的类型为 Throwable ,代表抛出的异常
    return res != null ? res : "world!";
});
assertEquals("world!", future.get());
或者使用exceptionally处理异常
CompletableFuture<String> future
      = CompletableFuture.supplyAsync(() -> {
    if (true) {
      throw new RuntimeException("Computation error!");
    }
    return "hello!";
}).exceptionally(ex -> {
    System.out.println(ex.toString());// CompletionException
    return "world!";
});
assertEquals("world!", future.get());



并行运行多个 CompletableFuture
CompletableFuture<Void> task1 =
CompletableFuture.supplyAsync(()->{
    //自定义业务操作
});
......
CompletableFuture<Void> task6 =
CompletableFuture.supplyAsync(()->{
    //自定义业务操作
});
......
CompletableFuture<Void> headerFuture=CompletableFuture.allOf(task1,.....,task6);

try {
    headerFuture.join();
} catch (Exception ex) {
    ......
}
System.out.println("all done. ");











带返回值
List<MyEntity> list = new ArrayList<>();
CompletableFuture<String> myQuery = CompletableFuture.supplyAsync(() -> this.getCourseConsumedCancelList(), updateQueryThreadPoolExecutor);
myQuery.thenAccept(list::addAll).exceptionally((e) -> {
    logger.error("");
    return null;
});
CompletableFuture<Void> allQueryFuture = CompletableFuture.allOf(
      myQuery);
allQueryFuture.join();
Map<String, MyEntity> pinganMap = new HashMap<>();
CompletableFuture<Map<String, MyEntity>> firstQuery = CompletableFuture.supplyAsync(() ->
      this.getPinganRefundRecordStatistics());
firstQuery.thenAccept(pinganMap::putAll).exceptionally((e) -> {
    e.printStackTrace();
    thrownew RuntimeException(e.getMessage());
});

不带返回值
CompletableFuture<Void> MyQuery1 = CompletableFuture.runAsync(() ->
{
});
MyQuery1.exceptionally((e) -> {
    logger.error("[] 异常");
    e.printStackTrace();
    throw new RuntimeException(e.getMessage());
});


CompletableFuture<Void> MyQuery2 = CompletableFuture.runAsync(() ->
{
});
MyQuery2.exceptionally((e) -> {
    logger.error("[] 异常");
    e.printStackTrace();
    throw new RuntimeException(e.getMessage());
});
CompletableFuture<Void> allFuture = CompletableFuture.allOf(
                MyQuery1, MyQuery2);
allFuture.join();


并行计算例子:
@Service
public class QueryService {
   
   @Autowired
   private TaoBaoService taoBaoService;
   @Autowired
   private JingDongService jingDongService;
   @Autowired
   @Qualifier("orderProductBatchRefundTaskExecutor")
   private ThreadPoolExecutor threadPoolExecutor;

   /**
    * 异步并行查询
    * @Return 结果
    */
   public Map<String, Object> queryAsync() {
       // 结果集合
      Map<String, Object> map = new HashMap<>();
      // 1、查询淘宝价格
      CompletableFuture<String> taoBaoQuery = CompletableFuture.supplyAsync(()-> {
                taoBaoService.getResult();
      });
      taoBaoQuery.thenAccept((result) -> {
         log.info("查询淘宝价格结果:{}", result);
         map.put("taoBaoResult", result);
      }).exceptionally((e) -> {
         log.error("查询淘宝价格异常: {}", e.getMessage(), e);
         map.put("taoBaoResult", "");
         return null;
      });
      // 2、查询京东价格(可以指定线程池threadPoolExecutor)
      CompletableFuture<String> jingDongQuery = CompletableFuture.supplyAsync(()-> {
                jingDongService.getResult();
      },threadPoolExecutor);
      jingDongQuery.thenAccept((result) -> {
         log.info("查询京东价格结果:{}", result);
         map.put("jingDongResult", result);
      }).exceptionally((e) -> {
         log.error("查询京东价格异常: {}", e.getMessage(), e);
         map.put("jingDongResult", "");
         return null;
      });

      // 3、allOf-两个查询必须都完成
      CompletableFuture<Void> allQuery = CompletableFuture.allOf(taoBaoQuery, jingDongQuery);
      
      CompletableFuture<Map<String, Object>> future = allQuery.thenApply((result) -> {
         log.info("------------------ 全部查询都完成 ------------------ ");
         return map;
      }).exceptionally((e) -> {
         log.error(e.getMessage(), e);
         return null;
      });
      // 获取异步方法返回值
      // get()-内部抛出了异常需手动处理; join()-内部处理了异常无需手动处理。
      future.join();
      return map;
   }
}

yypisco 发表于 2023-4-13 19:19

forkjoin有线程安全问题,确实不敢随便用

z39506284305 发表于 2023-3-26 21:43

有个疑惑,不咋会java,听说java的多线程算是个难题, 若是担心多线程现在或将来出现BUG,   大概要怎么办? 少写少错?不写不错?还是只能老老实实学透多线程的相关知识?

binstc 发表于 2023-3-23 14:44

学习了,希望能再详细些

jojogao 发表于 2023-3-23 19:33

大佬什么时候出一个java多线程的解说.想学习一下

jianghuai 发表于 2023-3-24 09:07

siwowuxie 发表于 2023-3-30 09:43

学了并发编程,没咋用,项目比较low只能用用自定义线程池做异步。

beyondchampion 发表于 2023-3-30 10:01

学习了,支持!!!

jcookiechen 发表于 2023-3-30 10:54

学习了 ,支持

∫護着妳佉遠方 发表于 2023-4-14 20:19

z39506284305 发表于 2023-3-26 21:43
有个疑惑,不咋会java,听说java的多线程算是个难题, 若是担心多线程现在或将来出现BUG,   大概要怎么办? ...

Java的多线程学习起来理论其实很简单,就是代码封装度很低,写起来巨难受,而且对于一般工作来说,多线程方面用的很少很少,简单学习一下即可
页: [1] 2
查看完整版本: Java8并发编程CompletableFuturean