使用java线程池调用接口,做并发,提高执行效率。
相关代码,如下:
int concurrentCount = 20; //设置并发数量
if (familyList.size() < concurrentCount) { // 请求人数的并发数量小于20时,使用请求人数的并发数
concurrentCount = familyList.size();
}
ExecutorService executorService = Executors.newWorkStealingPool(concurrentCount); //创建一个抢占式的线程池,由jdk8提供
List<Callable<Map<String,Object>>> jobList = Lists.newArrayList();
for (int i = 0; i < familyList.size(); i++) {
//使用线程池批量调用相关接口
jobList.add(new Callable<Map<String, Object>>() {
@Override
public Map<String, Object> call() throws Exception {
Map<String,Object> res = Maps.newHashMap();
// 调用相关接口
res.put("return_code",response.getReturn_code());
res.put("failMsg",response.getReturn_msg());
res.put("failPhone",phone);
long t2_ = System.currentTimeMillis();
return res;
}
});
}
int errors = 0; // 调用相关接口失败的数量
StringBuilder errmsg = new StringBuilder();
try {
List<Future<Map<String, Object>>> futures = executorService.invokeAll(jobList);
executorService.shutdown();
for(Future<Map<String,Object>> job:futures){
Map<String,Object> res = job.get();
if(!"0".equals(res.get("return_code"))){
errmsg.append("手机号[")
.append(res.get("failPhone"))
.append("]原因:")
.append(res.get("failMsg"));
errors++;
}
}
} catch (InterruptedException e) {
logger.error("线程池调用异常"+e.getMessage(),e);
} catch (ExecutionException e) {
logger.error("线程池调用异常"+e.getMessage(),e);
}
Java 五种线程池,JDK1.8新增newWorkStealingPool
2023-04-19 start
一个多线程调用思路:
public List<ResultVo> function(List<Item> list) {
List<ResultVo> result = new Arraylist<>();
if (CollectionUtils.isEmpty(list)){
return list;
}
List<List<ResultVo>> a = Lists.partition(list, 5);
int size = a.size();
CountDownLatch cdl = new CountDownLatch(size);
// 排序并过滤
for (List<PlayRoomVo> p : a) {
try {
// for循环操作
} catch (Exception e){
//todo log
} finally {
cdl.countDown();
}
}
try {
cdl.await(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
//todo log
e.printStackTrace();
}
return result;
}
优化后:
public List<ResultVo> function(List<Item> list) {
List<ResultVo> result = new ArrayList<>();
if (CollectionUtils.isEmpty(list)) {
return result; // 返回空列表
}
// 将输入列表分区,每个子列表最多包含 5 个元素
List<List<Item>> partitions = Lists.partition(list, 5);
int size = partitions.size();
CountDownLatch cdl = new CountDownLatch(size);
// 并发处理每个分区
for (List<Item> partition : partitions) {
new Thread(() -> {
try {
// 在这里实现具体的处理逻辑,例如排序、过滤等
// 将处理结果添加到 result 中
} catch (Exception e) {
// 记录日志
} finally {
cdl.countDown(); // 确保 CountDownLatch 计数减一
}
}).start();
}
// 等待所有任务完成,最多等待 3 秒
try {
cdl.await(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// 记录日志
e.printStackTrace();
}
return result;
}
改进建议
-
线程池:直接创建新线程(
new Thread()
)的方式不够高效,建议使用线程池(如ExecutorService
)来管理线程。 -
线程安全:
result
是一个共享资源,多线程并发修改时可能会导致数据不一致。建议使用线程安全的集合(如Collections.synchronizedList()
)或通过加锁来保证线程安全。 -
超时处理:如果任务在 3 秒内未完成,
cdl.await()
会超时,但已启动的线程会继续执行。需要根据业务需求决定是否取消未完成的任务。
end
2025-02-25 start:
设置多少线程数量通常根据应用的类型: IO密集型、CPU密集型。
CPU密集型(CPU-bound)
CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好得多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写、I/O (硬盘/内存), I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading很高。
在多重程序系统中,大部分时间用来做计算、逻辑判断等CPU动作的程序称之为CPU bound。例如一个计算圆周率至小数点一千位以下的程序,在执行的过程中绝大部分时间用在三角函数和开根号的计算,便是属于CPU bound的程序。
CPU bound的程序一般而言CPU占用率很高。这可能是因为任务本身不太需要访问I/O设备,也可能是因为程序是多线程实现因此屏蔽掉了等到I/O的时间。
IO密集型(I/O bound)
IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O(硬盘/内存)的读写操作,此时CPU Loading并不高。
I/O bound的程序一般在达到性能极限时,CPU占用率仍然较低。这可能是因为任务本身需要大量I/O操作,而pipeline做得不是很好,没有充分利用处理器能力。
第一种公式:
IO密集型通常设置为 2n+1,其中n为CPU核数。
CPU密集型通常设置为n+1。
在实际的开发中,并不会按照上面的公式进行设置。那么在实际开发中,我们如何给一个线程池设置合适的线程呢?
第二种公式:
其实对于IO密集型类型的应用,网上还有一个公式: 线程数 = CPU核心数 / ( 1 - 阻塞系数)。 引入了阻塞系数的概念,一般为0.8 ~ 0.9.
实际经验
在我们的业务开发中,基本上都是IO密集型,因为往往都会去操作数据库,访问redis、es等存储型组件,涉及到磁盘IO、网络IO。 对于纯计算类场景就属于CPU密集型。
IO密集型,可以考虑多设置一些线程,主要目的是可以增加IO的并发度,CPU密集型不宜过多线程,因为会造成线程切换,反而损耗性能。
一个4C8G的机器如果按照2n+1的公式,线程数设置为9个,但在我们实践过程中发现如果增大线程数量,会显著提高消息的处理能力,说明2n+1对于业务场景来说,并不太合适。
如果套用 线程数 = CPU核心数 / ( 1 - 阻塞系数), 阻塞系数取0.8,线程数为20. 阻塞系数取0.9, 大概线程数40, 20个线程数我觉得可以。
那我们怎么判断需要增加更多线程呢? 其实可以用jstack命令查看一下进程的线程数, 如果发现线程池中大部分线程都处于等待获取任务,则说明线程够用,如果大部分线程都处于运行状态,可以继续适当调高线程数量。
如果我们发现数据库的操作耗时比较多,此时可以继续提高阻塞系数,从而增大线程数量。
end
2025-03-19 start:
结合上面来进行并发查询
/**
* @author haijin
* @description: 并发查询工具类
* @date 2025/3/19 11:37
*/
public class ConcurrentQueryUtil {
/**
* 并发查询时间片数据
*
* @param timeRanges 时间片列表
* @param queryFunction 查询函数
* @return 查询结果列表
*/
public static <T> List<T> concurrentQuery(List<Map<String,String>> timeRanges, QueryFunction<T> queryFunction) {
// 线程安全的集合,用于存储查询结果
List<T> resultList = new CopyOnWriteArrayList<>();
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(timeRanges.size());
// 使用 CountDownLatch 等待所有任务完成
CountDownLatch latch = new CountDownLatch(timeRanges.size());
// 提交查询任务
for (Map<String,String> timeRange : timeRanges) {
executorService.submit(() -> {
try {
// 执行查询
T result = queryFunction.query(timeRange.get("startTime"), timeRange.get("endTime"));
resultList.add(result);
} finally {
latch.countDown();
}
});
}
// 等待所有任务完成
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw MsbException.logic("查询任务被中断【{}】",e.getMessage());
}
// 关闭线程池
executorService.shutdown();
return resultList;
}
/**
* 查询函数接口
*/
@FunctionalInterface
public interface QueryFunction<T> {
T query(String startTime, String endTime);
}
}
使用:(splitTimeRange方法可以查看http://www.haijin.xyz/article/338)
List<Map<String, String>> splitTimeRange = LogEntityUtil.splitTimeRange(faultAnalyzeParamVo.getStartTime(), faultAnalyzeParamVo.getEndTime());
List<GatewayAccessLogVo> resultList = ConcurrentQueryUtil.concurrentQuery(timeRanges, (startTime, endTime) -> {
// 调用 DAO 层查询方法
return gateWayAccessLogDao.getGatewayAccessLogVo(startTime, endTime, null, null);
});
// 打印结果
resultList.forEach(System.out::println);
end
2025-03-26 start:
异步线程查询
ExecutorService executor = Executors.newFixedThreadPool(3);
String finalUrl = url;
String finalStartTime = startTime;
String finalEndTime = endTime;
CompletableFuture<Void> allFuture = CompletableFuture.runAsync(() ->
setEsLogData(esLogDataVo, finalUrl, method, finalStartTime, finalEndTime, null), executor);
CompletableFuture<Void> aFuture = CompletableFuture.runAsync(() ->
setEsLogData(esLogDataVo, finalUrl, method, finalStartTime, finalEndTime, "a"), executor);
CompletableFuture<Void> bFuture = CompletableFuture.runAsync(() ->
setEsLogData(esLogDataVo, finalUrl, method, finalStartTime, finalEndTime, "b"), executor);
// 等待所有任务完成
CompletableFuture.allOf(allFuture, aFuture, bFuture).join();
executor.shutdown();
end
2025-04-24 start:
// 自定义线程池
final ExecutorService executor = Executors.newFixedThreadPool(2);
// 1. 并行执行两个查询任务
CompletableFuture<Map<Long, List<Test1Po>>> futureTest1 = CompletableFuture.supplyAsync(
() -> dTest1Repository.findByTest1IdIn(dTest1Ids)
.stream()
.collect(Collectors.groupingBy(Test1::getTest1Id)),
executor
);
CompletableFuture<Map<Long, List<Test2Po>>> futureTest2 = CompletableFuture.supplyAsync(
() -> dTest2Repository.findByIdIn(dTest2Ids)
.stream()
.collect(Collectors.groupingBy(Test2Po::getId)),
executor
);
// 2. 阻塞主线程,等待两个 Future 完成并提取结果
Map<Long, List<Test1Po>> sTest1Map = futureTest1.join();
Map<Long, List<Test2>> dTest2Map = futureTest2.join();
end