2025-10-11 start:
简单的异步调用:
CompletableFuture.runAsync(() -> testMapper.find(testDTOS));
end
使用java线程池调用接口,做并发,提高执行效率。
相关代码,如下:
int concurrentCount = 20; //设置并发数量
if (familyList.size() < concurrentCount) { // 请求人数的并发数量小于20时,使用请求人数的并发数
concurrentCount = familyList.size();
}
ExecutorService executorService = Executors.newWorkStealingPool(concurrentCount); //创建一个抢占式的线程池,由jdk8提供
List<Callable<Map<String,Object>>> jobList = Lists.newArrayList();
for (int i = 0; i < familyList.size(); i++) {
//使用线程池批量调用相关接口
jobList.add(new Callable<Map<String, Object>>() {
@Override
public Map<String, Object> call() throws Exception {
Map<String,Object> res = Maps.newHashMap();
// 调用相关接口
res.put("return_code",response.getReturn_code());
res.put("failMsg",response.getReturn_msg());
res.put("failPhone",phone);
long t2_ = System.currentTimeMillis();
return res;
}
});
}
int errors = 0; // 调用相关接口失败的数量
StringBuilder errmsg = new StringBuilder();
try {
List<Future<Map<String, Object>>> futures = executorService.invokeAll(jobList);
executorService.shutdown();
for(Future<Map<String,Object>> job:futures){
Map<String,Object> res = job.get();
if(!"0".equals(res.get("return_code"))){
errmsg.append("手机号[")
.append(res.get("failPhone"))
.append("]原因:")
.append(res.get("failMsg"));
errors++;
}
}
} catch (InterruptedException e) {
logger.error("线程池调用异常"+e.getMessage(),e);
} catch (ExecutionException e) {
logger.error("线程池调用异常"+e.getMessage(),e);
}
Java 五种线程池,JDK1.8新增newWorkStealingPool
2023-04-19 start
一个多线程调用思路:
public List<ResultVo> function(List<Item> list) {
List<ResultVo> result = new Arraylist<>();
if (CollectionUtils.isEmpty(list)){
return list;
}
List<List<ResultVo>> a = Lists.partition(list, 5);
int size = a.size();
CountDownLatch cdl = new CountDownLatch(size);
// 排序并过滤
for (List<PlayRoomVo> p : a) {
try {
// for循环操作
} catch (Exception e){
//todo log
} finally {
cdl.countDown();
}
}
try {
cdl.await(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
//todo log
e.printStackTrace();
}
return result;
}
优化后:
public List<ResultVo> function(List<Item> list) {
List<ResultVo> result = new ArrayList<>();
if (CollectionUtils.isEmpty(list)) {
return result; // 返回空列表
}
// 将输入列表分区,每个子列表最多包含 5 个元素
List<List<Item>> partitions = Lists.partition(list, 5);
int size = partitions.size();
CountDownLatch cdl = new CountDownLatch(size);
// 并发处理每个分区
for (List<Item> partition : partitions) {
new Thread(() -> {
try {
// 在这里实现具体的处理逻辑,例如排序、过滤等
// 将处理结果添加到 result 中
} catch (Exception e) {
// 记录日志
} finally {
cdl.countDown(); // 确保 CountDownLatch 计数减一
}
}).start();
}
// 等待所有任务完成,最多等待 3 秒
try {
cdl.await(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// 记录日志
e.printStackTrace();
}
return result;
}
改进建议
-
线程池:直接创建新线程(
new Thread())的方式不够高效,建议使用线程池(如ExecutorService)来管理线程。 -
线程安全:
result是一个共享资源,多线程并发修改时可能会导致数据不一致。建议使用线程安全的集合(如Collections.synchronizedList())或通过加锁来保证线程安全。 -
超时处理:如果任务在 3 秒内未完成,
cdl.await()会超时,但已启动的线程会继续执行。需要根据业务需求决定是否取消未完成的任务。
end
2025-02-25 start:
设置多少线程数量通常根据应用的类型: IO密集型、CPU密集型。
CPU密集型(CPU-bound)
CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好得多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写、I/O (硬盘/内存), I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading很高。
在多重程序系统中,大部分时间用来做计算、逻辑判断等CPU动作的程序称之为CPU bound。例如一个计算圆周率至小数点一千位以下的程序,在执行的过程中绝大部分时间用在三角函数和开根号的计算,便是属于CPU bound的程序。
CPU bound的程序一般而言CPU占用率很高。这可能是因为任务本身不太需要访问I/O设备,也可能是因为程序是多线程实现因此屏蔽掉了等到I/O的时间。
IO密集型(I/O bound)
IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O(硬盘/内存)的读写操作,此时CPU Loading并不高。
I/O bound的程序一般在达到性能极限时,CPU占用率仍然较低。这可能是因为任务本身需要大量I/O操作,而pipeline做得不是很好,没有充分利用处理器能力。
第一种公式:
IO密集型通常设置为 2n+1,其中n为CPU核数。
CPU密集型通常设置为n+1。
在实际的开发中,并不会按照上面的公式进行设置。那么在实际开发中,我们如何给一个线程池设置合适的线程呢?
第二种公式:
其实对于IO密集型类型的应用,网上还有一个公式: 线程数 = CPU核心数 / ( 1 - 阻塞系数)。 引入了阻塞系数的概念,一般为0.8 ~ 0.9.
实际经验
在我们的业务开发中,基本上都是IO密集型,因为往往都会去操作数据库,访问redis、es等存储型组件,涉及到磁盘IO、网络IO。 对于纯计算类场景就属于CPU密集型。
IO密集型,可以考虑多设置一些线程,主要目的是可以增加IO的并发度,CPU密集型不宜过多线程,因为会造成线程切换,反而损耗性能。
一个4C8G的机器如果按照2n+1的公式,线程数设置为9个,但在我们实践过程中发现如果增大线程数量,会显著提高消息的处理能力,说明2n+1对于业务场景来说,并不太合适。
如果套用 线程数 = CPU核心数 / ( 1 - 阻塞系数), 阻塞系数取0.8,线程数为20. 阻塞系数取0.9, 大概线程数40, 20个线程数我觉得可以。
那我们怎么判断需要增加更多线程呢? 其实可以用jstack命令查看一下进程的线程数, 如果发现线程池中大部分线程都处于等待获取任务,则说明线程够用,如果大部分线程都处于运行状态,可以继续适当调高线程数量。
如果我们发现数据库的操作耗时比较多,此时可以继续提高阻塞系数,从而增大线程数量。
end
2025-03-19 start:
结合上面来进行并发查询
/**
* @author haijin
* @description: 并发查询工具类
* @date 2025/3/19 11:37
*/
public class ConcurrentQueryUtil {
/**
* 并发查询时间片数据
*
* @param timeRanges 时间片列表
* @param queryFunction 查询函数
* @return 查询结果列表
*/
public static <T> List<T> concurrentQuery(List<Map<String,String>> timeRanges, QueryFunction<T> queryFunction) {
// 线程安全的集合,用于存储查询结果
List<T> resultList = new CopyOnWriteArrayList<>();
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(timeRanges.size());
// 使用 CountDownLatch 等待所有任务完成
CountDownLatch latch = new CountDownLatch(timeRanges.size());
// 提交查询任务
for (Map<String,String> timeRange : timeRanges) {
executorService.submit(() -> {
try {
// 执行查询
T result = queryFunction.query(timeRange.get("startTime"), timeRange.get("endTime"));
resultList.add(result);
} finally {
latch.countDown();
}
});
}
// 等待所有任务完成
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw MsbException.logic("查询任务被中断【{}】",e.getMessage());
}
// 关闭线程池
executorService.shutdown();
return resultList;
}
/**
* 查询函数接口
*/
@FunctionalInterface
public interface QueryFunction<T> {
T query(String startTime, String endTime);
}
}
使用:(splitTimeRange方法可以查看http://www.haijin.xyz/article/338)
List<Map<String, String>> splitTimeRange = LogEntityUtil.splitTimeRange(faultAnalyzeParamVo.getStartTime(), faultAnalyzeParamVo.getEndTime());
List<GatewayAccessLogVo> resultList = ConcurrentQueryUtil.concurrentQuery(timeRanges, (startTime, endTime) -> {
// 调用 DAO 层查询方法
return gateWayAccessLogDao.getGatewayAccessLogVo(startTime, endTime, null, null);
});
// 打印结果
resultList.forEach(System.out::println);
end
2025-03-26 start:
异步线程查询
ExecutorService executor = Executors.newFixedThreadPool(3);
String finalUrl = url;
String finalStartTime = startTime;
String finalEndTime = endTime;
CompletableFuture<Void> allFuture = CompletableFuture.runAsync(() ->
setEsLogData(esLogDataVo, finalUrl, method, finalStartTime, finalEndTime, null), executor);
CompletableFuture<Void> aFuture = CompletableFuture.runAsync(() ->
setEsLogData(esLogDataVo, finalUrl, method, finalStartTime, finalEndTime, "a"), executor);
CompletableFuture<Void> bFuture = CompletableFuture.runAsync(() ->
setEsLogData(esLogDataVo, finalUrl, method, finalStartTime, finalEndTime, "b"), executor);
// 等待所有任务完成
CompletableFuture.allOf(allFuture, aFuture, bFuture).join();
executor.shutdown();
end
2025-04-24 start:
// 自定义线程池
final ExecutorService executor = Executors.newFixedThreadPool(2);
// 1. 并行执行两个查询任务
CompletableFuture<Map<Long, List<Test1Po>>> futureTest1 = CompletableFuture.supplyAsync(
() -> dTest1Repository.findByTest1IdIn(dTest1Ids)
.stream()
.collect(Collectors.groupingBy(Test1::getTest1Id)),
executor
);
CompletableFuture<Map<Long, List<Test2Po>>> futureTest2 = CompletableFuture.supplyAsync(
() -> dTest2Repository.findByIdIn(dTest2Ids)
.stream()
.collect(Collectors.groupingBy(Test2Po::getId)),
executor
);
// 2. 阻塞主线程,等待两个 Future 完成并提取结果
Map<Long, List<Test1Po>> sTest1Map = futureTest1.join();
Map<Long, List<Test2>> dTest2Map = futureTest2.join();
end
2026-01-15 start:
需求:通过一个数组的资源id集合查询某一个视频接口,视频接口每次查询一个,通过并提供查询的效率工具类如下:
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 并发查询工具类
*
* @author system
*/
@Slf4j
public class ConcurrentQueryUtil {
/**
* 并发执行查询任务
*
* @param <T> 查询结果类型
* @param <P> 查询参数类型
* @param params 查询参数列表
* @param queryFunction 查询函数
* @param timeoutSeconds 超时时间(秒)
* @param maxThreadCount 最大线程数
* @return 查询结果列表
*/
public static <T, P> List<T> concurrentQuery(
List<P> params,
QueryFunction<T, P> queryFunction,
int timeoutSeconds,
int maxThreadCount) {
if (CollectionUtils.isEmpty(params)) {
return new ArrayList<>();
}
// 线程安全的结果集合
List<T> resultList = Collections.synchronizedList(new ArrayList<>());
// 计算线程数:取参数数量和最大线程数的较小值
int threadCount = Math.min(params.size(), maxThreadCount);
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
CountDownLatch latch = new CountDownLatch(params.size());
try {
// 提交查询任务
for (P param : params) {
executorService.submit(() -> {
try {
T result = queryFunction.query(param);
if (result != null) {
resultList.add(result);
}
} catch (Exception e) {
log.error("并发查询异常,参数: {}", param, e);
} finally {
latch.countDown();
}
});
}
// 等待所有任务完成
boolean completed = latch.await(timeoutSeconds, TimeUnit.SECONDS);
if (!completed) {
log.warn("并发查询超时,部分查询可能未完成,超时时间: {}秒", timeoutSeconds);
}
} catch (InterruptedException e) {
log.error("并发查询被中断", e);
Thread.currentThread().interrupt();
} finally {
executorService.shutdown();
try {
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
return resultList;
}
/**
* 查询函数接口
*
* @param <T> 返回结果类型
* @param <P> 参数类型
*/
@FunctionalInterface
public interface QueryFunction<T, P> {
/**
* 执行查询
*
* @param param 查询参数
* @return 查询结果,如果查询失败或不需要记录,可返回null
*/
T query(P param);
}
}
调用:
// 构建查询参数列表
List<VideoQueryParam> queryParams = new ArrayList<>();
for (String resourceId : validResourceIds) {
queryParams.add(new VideoQueryParam(resourceId, type));
}
// 使用并发查询工具类执行查询
List<VideoListResponse.VideoInfo> videoList = ConcurrentQueryUtil.concurrentQuery(
queryParams,
this::queryVideoInfo,
30,
10
);
/**
* 查询视频信息
*
* @param param 查询参数
* @return 视频信息,查询失败返回null
*/
private VideoListResponse.VideoInfo queryVideoInfo(VideoQueryParam param) {
VideoQueryResponse videoResponse = imusicVideoService.queryVideo(param.getResourceId(), param.getType());
if (videoResponse == null || !"0000".equals(videoResponse.getCode())) {
log.warn("视频查询失败,resourceId: {}, code: {}, message: {}",
param.getResourceId(),
videoResponse != null ? videoResponse.getCode() : "null",
videoResponse != null ? videoResponse.getMessage() : "null");
return null;
}
VideoQueryResponse.VideoData data = videoResponse.getData();
if (data == null) {
return null;
}
return VideoInfoWrapper.convertToVideoInfo(data);
}
/**
* 视频查询参数
*/
@Data
@AllArgsConstructor
private static class VideoQueryParam {
/**
* 资源ID
*/
private String resourceId;
/**
* 视频类型
*/
private ImusicVideoTypeEnum type;
}
end
2026-02-06 start:
自定义线程池:
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* 异步任务线程池配置类
* 自定义ThreadPoolTaskExecutor替代Spring默认异步线程池,避免默认线程池核心参数不合理导致的性能问题
*/
@Configuration
@EnableAsync
public class AsyncConfig {
/**
* 自定义异步线程池Bean
* 命名为asyncExecutor,配合@Async("asyncExecutor")指定使用此线程池
* @return 自定义的线程池执行器
*/
@Bean("asyncExecutor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数:CPU核心数*2,保证核心任务的并行执行,充分利用CPU资源
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
executor.setCorePoolSize(corePoolSize);
// 最大线程数:CPU核心数*4,核心线程忙且队列满时,最多创建的线程数,防止线程过多导致上下文切换频繁
int maxPoolSize = Runtime.getRuntime().availableProcessors() * 4;
executor.setMaxPoolSize(maxPoolSize);
// 任务队列容量:1000,核心线程满时,任务进入阻塞队列等待,避免频繁创建非核心线程
executor.setQueueCapacity(1000);
// 非核心线程空闲存活时间:60秒,空闲超时后销毁,减少资源占用
executor.setKeepAliveSeconds(60);
// 线程名称前缀:async-task-,便于日志排查(如async-task-1、async-task-2)
executor.setThreadNamePrefix("async-task-");
// 拒绝策略:CallerRunsPolicy,队列满+最大线程数满时,由**调用者线程**执行任务
// 避免任务直接丢弃,适合对任务不允许丢失的业务场景
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化线程池(ThreadPoolTaskExecutor必须显式调用,否则不生效)
executor.initialize();
return executor;
}
}
关键配置说明(理解每一步的意义)
| 配置项 | 取值 | 设计思路 |
|---|---|---|
核心线程数corePoolSize |
CPU 核心数 * 2 | 核心线程会一直存活(即使空闲),适配 CPU 密集型 + IO 密集型混合异步任务,充分利用 CPU |
最大线程数maxPoolSize |
CPU 核心数 * 4 | 非核心线程仅在队列满时创建,上限为 4 倍核心数,避免线程过多导致 CPU 上下文切换开销 |
队列容量queueCapacity |
1000 | 采用有界队列(必须有界,避免无界队列撑爆内存),缓冲待执行的异步任务 |
空闲存活时间keepAliveSeconds |
60 秒 | 非核心线程空闲超 60 秒销毁,减少内存和线程调度资源占用 |
| 线程名称前缀 | async-task- | 日志中可快速识别异步任务线程,便于问题排查(如定位慢异步任务、线程死锁) |
| 拒绝策略 | CallerRunsPolicy | 任务触发拒绝时,由调用方线程同步执行(如 Controller 调用异步方法则由 Tomcat 线程执行),无任务丢失 |
| 显式初始化 | executor.initialize() | ThreadPoolTaskExecutor 是 Spring 封装的线程池,必须显式调用初始化才会创建线程池 |
配套使用方式(核心!否则线程池不生效)
@Async会使用内置的简易线程池,必须指定自定义线程池名称,才能使用上面的配置:import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
public class AsyncTaskService {
// 显式指定使用名为asyncExecutor的线程池
@Async("asyncExecutor")
public void doAsyncTask() {
// 你的异步业务逻辑(如耗时的文件处理、远程调用、数据统计等)
System.out.println("异步任务执行,线程名称:" + Thread.currentThread().getName());
}
}
-
CallerRunsPolicy是业务无任务丢失要求的首选,其他常见拒绝策略适合场景:AbortPolicy(默认):直接抛出RejectedExecutionException,适合需要感知任务拒绝的场景DiscardPolicy:直接丢弃最新任务,无异常,适合非核心异步任务DiscardOldestPolicy:丢弃队列中最旧的任务,执行最新任务,适合实时性要求高的场景
-
参数调优参考
- IO 密集型任务(如远程调用、数据库查询、文件读写):核心 / 最大线程数可适当提高(如 CPU4/CPU8),因为线程大部分时间在等待 IO
- CPU 密集型任务(如数据计算、加密解密):核心 / 最大线程数接近 CPU 核心数即可(如 CPU1/CPU2),避免上下文切换
- 队列容量:根据业务 QPS 调整,不宜过大(否则任务等待时间过长),也不宜过小(否则频繁创建非核心线程)
end