吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 2817|回复: 16
收起左侧

[Java 原创] Java8并发编程CompletableFuturean

  [复制链接]
livre 发表于 2023-3-23 09:34
CompletableFuture
CompletableFuture是Java 8 才被引入的一个非常有用的用于异步编程的类。
CompletableFuture是异步执行的,默认线程池是ForkJoinPool.commonPool(),但为了业务之间互不影响,且便于定位问题,强烈推荐使用自定义线程池。

创建 CompletableFuture
[Java] 纯文本查看 复制代码
// 有返回结果
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
// 有返回结果,使用自定义线程池(推荐)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

// 无返回结果

static CompletableFuture<Void> runAsync(Runnable runnable);
// 无返回结果,使用自定义线程池(推荐)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);


处理异步结算的结果
  • thenApply()
  • thenAccept()
  • thenRun()
  • whenComplete()

[Java] 纯文本查看 复制代码
// 沿用上一个任务的线程池
public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}

//使用默认的 ForkJoinPool 线程池(不推荐)
public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(defaultExecutor(), fn);
}
// 使用自定义线程池(推荐)
public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn, Executor executor) {
    return uniApplyStage(screenExecutor(executor), fn);
}
CompletableFuture.completedFuture("hello!")
        .thenApply(s -> s + "world!").thenApply(s -> s + "nice!").thenAccept(System.out::println);//hello!world!nice!

CompletableFuture.completedFuture("hello!")
        .thenApply(s -> s + "world!").thenApply(s -> s + "nice!").thenRun(() -> System.out.println("hello!"));//hello!



异常处理
[Java] 纯文本查看 复制代码
public <U> CompletableFuture<U> handle(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(null, fn);
}

public <U> CompletableFuture<U> handleAsync(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(defaultExecutor(), fn);
}

public <U> CompletableFuture<U> handleAsync(
    BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
    return uniHandleStage(screenExecutor(executor), fn);
}
CompletableFuture<String> future
        = CompletableFuture.supplyAsync(() -> {
    if (true) {
        throw new RuntimeException("Computation error!");
    }
    return "hello!";
}).handle((res, ex) -> {
    // res 代表返回的结果
    // ex 的类型为 Throwable ,代表抛出的异常
    return res != null ? res : "world!";
});
assertEquals("world!", future.get());
或者使用exceptionally处理异常
CompletableFuture<String> future
        = CompletableFuture.supplyAsync(() -> {
    if (true) {
        throw new RuntimeException("Computation error!");
    }
    return "hello!";
}).exceptionally(ex -> {
    System.out.println(ex.toString());// CompletionException
    return "world!";
});
assertEquals("world!", future.get());




并行运行多个 CompletableFuture
[Java] 纯文本查看 复制代码
CompletableFuture<Void> task1 =
  CompletableFuture.supplyAsync(()->{
    //自定义业务操作
  });
......
CompletableFuture<Void> task6 =
  CompletableFuture.supplyAsync(()->{
    //自定义业务操作
  });
......
 CompletableFuture<Void> headerFuture=CompletableFuture.allOf(task1,.....,task6);

  try {
    headerFuture.join();
  } catch (Exception ex) {
    ......
  }
System.out.println("all done. ");












带返回值
[Java] 纯文本查看 复制代码
List<MyEntity> list = new ArrayList<>();
CompletableFuture<String> myQuery = CompletableFuture.supplyAsync(() -> this.getCourseConsumedCancelList(), updateQueryThreadPoolExecutor);
myQuery.thenAccept(list::addAll).exceptionally((e) -> {
    logger.error("");
    return null;
});
CompletableFuture<Void> allQueryFuture = CompletableFuture.allOf(
        myQuery);
allQueryFuture.join();
Map<String, MyEntity> pinganMap = new HashMap<>();
CompletableFuture<Map<String, MyEntity>> firstQuery = CompletableFuture.supplyAsync(() -> 
        this.getPinganRefundRecordStatistics());
firstQuery.thenAccept(pinganMap::putAll).exceptionally((e) -> {
    e.printStackTrace();
    throw  new RuntimeException(e.getMessage());
});


不带返回值
[Java] 纯文本查看 复制代码
CompletableFuture<Void> MyQuery1 = CompletableFuture.runAsync(() ->
{
});
MyQuery1.exceptionally((e) -> {
    logger.error("[] 异常");
    e.printStackTrace();
    throw new RuntimeException(e.getMessage());
});
 

CompletableFuture<Void> MyQuery2 = CompletableFuture.runAsync(() ->
{ 
});
MyQuery2.exceptionally((e) -> {
    logger.error("[] 异常");
    e.printStackTrace();
    throw new RuntimeException(e.getMessage());
});
CompletableFuture<Void> allFuture = CompletableFuture.allOf(
                MyQuery1, MyQuery2);
allFuture.join();



并行计算例子:
[Java] 纯文本查看 复制代码
@Service
public class QueryService {
    
   @Autowired
   private TaoBaoService taoBaoService;
   @Autowired
   private JingDongService jingDongService;
   @Autowired
   @Qualifier("orderProductBatchRefundTaskExecutor")
   private ThreadPoolExecutor threadPoolExecutor;

   /**
    * 异步并行查询
    * [url=home.php?mod=space&uid=155549]@Return[/url] 结果
    */
   public Map<String, Object> queryAsync() {
       // 结果集合
      Map<String, Object> map = new HashMap<>();
      // 1、查询淘宝价格
      CompletableFuture<String> taoBaoQuery = CompletableFuture.supplyAsync(()-> {
                taoBaoService.getResult();
      });
      taoBaoQuery.thenAccept((result) -> {
         log.info("查询淘宝价格结果:{}", result);
         map.put("taoBaoResult", result);
      }).exceptionally((e) -> {
         log.error("查询淘宝价格异常: {}", e.getMessage(), e);
         map.put("taoBaoResult", "");
         return null;
      });
      // 2、查询京东价格(可以指定线程池threadPoolExecutor)
      CompletableFuture<String> jingDongQuery = CompletableFuture.supplyAsync(()-> {
                jingDongService.getResult();
      },threadPoolExecutor);
      jingDongQuery.thenAccept((result) -> {
         log.info("查询京东价格结果:{}", result);
         map.put("jingDongResult", result);
      }).exceptionally((e) -> {
         log.error("查询京东价格异常: {}", e.getMessage(), e);
         map.put("jingDongResult", "");
         return null;
      });

      // 3、allOf-两个查询必须都完成
      CompletableFuture<Void> allQuery = CompletableFuture.allOf(taoBaoQuery, jingDongQuery);
      
      CompletableFuture<Map<String, Object>> future = allQuery.thenApply((result) -> {
         log.info("------------------ 全部查询都完成 ------------------ ");
         return map;
      }).exceptionally((e) -> {
         log.error(e.getMessage(), e);
         return null;
      });
      // 获取异步方法返回值
      // get()-内部抛出了异常需手动处理; join()-内部处理了异常无需手动处理。
      future.join();
      return map;
   }
}

免费评分

参与人数 3吾爱币 +9 热心值 +2 收起 理由
jianghuai + 1 + 1 我很赞同!
jojogao + 1 我很赞同!
wushaominkk + 7 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!

查看全部评分

本帖被以下淘专辑推荐:

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

yypisco 发表于 2023-4-13 19:19
forkjoin有线程安全问题,确实不敢随便用
z39506284305 发表于 2023-3-26 21:43
有个疑惑,  不咋会java,  听说java的多线程算是个难题, 若是担心多线程现在或将来出现BUG,   大概要怎么办? 少写少错?  不写不错?  还是只能老老实实学透多线程的相关知识?
binstc 发表于 2023-3-23 14:44
jojogao 发表于 2023-3-23 19:33
大佬什么时候出一个java多线程的解说.想学习一下
头像被屏蔽
jianghuai 发表于 2023-3-24 09:07
提示: 作者被禁止或删除 内容自动屏蔽
siwowuxie 发表于 2023-3-30 09:43
学了并发编程,没咋用,项目比较low只能用用自定义线程池做异步。
beyondchampion 发表于 2023-3-30 10:01
学习了,支持!!!
jcookiechen 发表于 2023-3-30 10:54
学习了 ,支持
∫護着妳佉遠方 发表于 2023-4-14 20:19
z39506284305 发表于 2023-3-26 21:43
有个疑惑,  不咋会java,  听说java的多线程算是个难题, 若是担心多线程现在或将来出现BUG,   大概要怎么办? ...

Java的多线程学习起来理论其实很简单,就是代码封装度很低,写起来巨难受,而且对于一般工作来说,多线程方面用的很少很少,简单学习一下即可
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-1-7 19:51

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表