Java8并发编程CompletableFuturean
CompletableFutureCompletableFuture是Java 8 才被引入的一个非常有用的用于异步编程的类。
CompletableFuture是异步执行的,默认线程池是ForkJoinPool.commonPool(),但为了业务之间互不影响,且便于定位问题,强烈推荐使用自定义线程池。
创建 CompletableFuture
// 有返回结果
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
// 有返回结果,使用自定义线程池(推荐)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
// 无返回结果
static CompletableFuture<Void> runAsync(Runnable runnable);
// 无返回结果,使用自定义线程池(推荐)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
处理异步结算的结果
[*]thenApply()
[*]thenAccept()
[*]thenRun()
[*]whenComplete()
// 沿用上一个任务的线程池
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
//使用默认的 ForkJoinPool 线程池(不推荐)
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(defaultExecutor(), fn);
}
// 使用自定义线程池(推荐)
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
CompletableFuture.completedFuture("hello!")
.thenApply(s -> s + "world!").thenApply(s -> s + "nice!").thenAccept(System.out::println);//hello!world!nice!
CompletableFuture.completedFuture("hello!")
.thenApply(s -> s + "world!").thenApply(s -> s + "nice!").thenRun(() -> System.out.println("hello!"));//hello!
异常处理
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(defaultExecutor(), fn);
}
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
return uniHandleStage(screenExecutor(executor), fn);
}
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("Computation error!");
}
return "hello!";
}).handle((res, ex) -> {
// res 代表返回的结果
// ex 的类型为 Throwable ,代表抛出的异常
return res != null ? res : "world!";
});
assertEquals("world!", future.get());
或者使用exceptionally处理异常
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("Computation error!");
}
return "hello!";
}).exceptionally(ex -> {
System.out.println(ex.toString());// CompletionException
return "world!";
});
assertEquals("world!", future.get());
并行运行多个 CompletableFuture
CompletableFuture<Void> task1 =
CompletableFuture.supplyAsync(()->{
//自定义业务操作
});
......
CompletableFuture<Void> task6 =
CompletableFuture.supplyAsync(()->{
//自定义业务操作
});
......
CompletableFuture<Void> headerFuture=CompletableFuture.allOf(task1,.....,task6);
try {
headerFuture.join();
} catch (Exception ex) {
......
}
System.out.println("all done. ");
带返回值
List<MyEntity> list = new ArrayList<>();
CompletableFuture<String> myQuery = CompletableFuture.supplyAsync(() -> this.getCourseConsumedCancelList(), updateQueryThreadPoolExecutor);
myQuery.thenAccept(list::addAll).exceptionally((e) -> {
logger.error("");
return null;
});
CompletableFuture<Void> allQueryFuture = CompletableFuture.allOf(
myQuery);
allQueryFuture.join();
Map<String, MyEntity> pinganMap = new HashMap<>();
CompletableFuture<Map<String, MyEntity>> firstQuery = CompletableFuture.supplyAsync(() ->
this.getPinganRefundRecordStatistics());
firstQuery.thenAccept(pinganMap::putAll).exceptionally((e) -> {
e.printStackTrace();
thrownew RuntimeException(e.getMessage());
});
不带返回值
CompletableFuture<Void> MyQuery1 = CompletableFuture.runAsync(() ->
{
});
MyQuery1.exceptionally((e) -> {
logger.error("[] 异常");
e.printStackTrace();
throw new RuntimeException(e.getMessage());
});
CompletableFuture<Void> MyQuery2 = CompletableFuture.runAsync(() ->
{
});
MyQuery2.exceptionally((e) -> {
logger.error("[] 异常");
e.printStackTrace();
throw new RuntimeException(e.getMessage());
});
CompletableFuture<Void> allFuture = CompletableFuture.allOf(
MyQuery1, MyQuery2);
allFuture.join();
并行计算例子:
@Service
public class QueryService {
@Autowired
private TaoBaoService taoBaoService;
@Autowired
private JingDongService jingDongService;
@Autowired
@Qualifier("orderProductBatchRefundTaskExecutor")
private ThreadPoolExecutor threadPoolExecutor;
/**
* 异步并行查询
* @Return 结果
*/
public Map<String, Object> queryAsync() {
// 结果集合
Map<String, Object> map = new HashMap<>();
// 1、查询淘宝价格
CompletableFuture<String> taoBaoQuery = CompletableFuture.supplyAsync(()-> {
taoBaoService.getResult();
});
taoBaoQuery.thenAccept((result) -> {
log.info("查询淘宝价格结果:{}", result);
map.put("taoBaoResult", result);
}).exceptionally((e) -> {
log.error("查询淘宝价格异常: {}", e.getMessage(), e);
map.put("taoBaoResult", "");
return null;
});
// 2、查询京东价格(可以指定线程池threadPoolExecutor)
CompletableFuture<String> jingDongQuery = CompletableFuture.supplyAsync(()-> {
jingDongService.getResult();
},threadPoolExecutor);
jingDongQuery.thenAccept((result) -> {
log.info("查询京东价格结果:{}", result);
map.put("jingDongResult", result);
}).exceptionally((e) -> {
log.error("查询京东价格异常: {}", e.getMessage(), e);
map.put("jingDongResult", "");
return null;
});
// 3、allOf-两个查询必须都完成
CompletableFuture<Void> allQuery = CompletableFuture.allOf(taoBaoQuery, jingDongQuery);
CompletableFuture<Map<String, Object>> future = allQuery.thenApply((result) -> {
log.info("------------------ 全部查询都完成 ------------------ ");
return map;
}).exceptionally((e) -> {
log.error(e.getMessage(), e);
return null;
});
// 获取异步方法返回值
// get()-内部抛出了异常需手动处理; join()-内部处理了异常无需手动处理。
future.join();
return map;
}
}
forkjoin有线程安全问题,确实不敢随便用 有个疑惑,不咋会java,听说java的多线程算是个难题, 若是担心多线程现在或将来出现BUG, 大概要怎么办? 少写少错?不写不错?还是只能老老实实学透多线程的相关知识? 学习了,希望能再详细些 大佬什么时候出一个java多线程的解说.想学习一下 学了并发编程,没咋用,项目比较low只能用用自定义线程池做异步。 学习了,支持!!! 学习了 ,支持 z39506284305 发表于 2023-3-26 21:43
有个疑惑,不咋会java,听说java的多线程算是个难题, 若是担心多线程现在或将来出现BUG, 大概要怎么办? ...
Java的多线程学习起来理论其实很简单,就是代码封装度很低,写起来巨难受,而且对于一般工作来说,多线程方面用的很少很少,简单学习一下即可
页:
[1]
2