java中线程池的使用

我爱海鲸 2025-04-24 18:09:23 暂无标签

简介java做并发操作,调用接口

使用java线程池调用接口,做并发,提高执行效率。

相关代码,如下:

        int concurrentCount = 20; //设置并发数量

        if (familyList.size() < concurrentCount) { // 请求人数的并发数量小于20时,使用请求人数的并发数

            concurrentCount = familyList.size();

        }

        ExecutorService executorService = Executors.newWorkStealingPool(concurrentCount);  //创建一个抢占式的线程池,由jdk8提供

        List<Callable<Map<String,Object>>> jobList = Lists.newArrayList();

 for (int i = 0; i < familyList.size(); i++) {

            //使用线程池批量调用相关接口

            jobList.add(new Callable<Map<String, Object>>() {

                @Override

                public Map<String, Object> call() throws Exception {

                    Map<String,Object> res = Maps.newHashMap();

                     // 调用相关接口

                    res.put("return_code",response.getReturn_code());

                    res.put("failMsg",response.getReturn_msg());

                    res.put("failPhone",phone);

                    long t2_ = System.currentTimeMillis();

                    return res;

                }

            });

        }

 

  int errors = 0; // 调用相关接口失败的数量

        StringBuilder errmsg = new StringBuilder();

        try {

            List<Future<Map<String, Object>>> futures = executorService.invokeAll(jobList);

            executorService.shutdown(); 

            for(Future<Map<String,Object>> job:futures){

                Map<String,Object> res = job.get();

                if(!"0".equals(res.get("return_code"))){

                    errmsg.append("手机号[")

                            .append(res.get("failPhone"))

                            .append("]原因:")

                            .append(res.get("failMsg"));

                    errors++;

                }

            }

        } catch (InterruptedException e) {

            logger.error("线程池调用异常"+e.getMessage(),e);

        } catch (ExecutionException e) {

            logger.error("线程池调用异常"+e.getMessage(),e);

        }

Java 五种线程池,JDK1.8新增newWorkStealingPool

2023-04-19 start

一个多线程调用思路:

 public List<ResultVo> function(List<Item> list) {
        List<ResultVo> result = new Arraylist<>();
        
        if (CollectionUtils.isEmpty(list)){
            return list;
        }
        
        List<List<ResultVo>> a = Lists.partition(list, 5);
        
        int size = a.size();
        CountDownLatch cdl = new CountDownLatch(size);
        
        // 排序并过滤
        for (List<PlayRoomVo> p : a) {
            try {
							// for循环操作
            } catch (Exception e){
                //todo log
            } finally {
                cdl.countDown();
            }
        }

        try {
            cdl.await(3, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            //todo log
            e.printStackTrace();
        }

        return result;
    }

优化后:

public List<ResultVo> function(List<Item> list) {
    List<ResultVo> result = new ArrayList<>();

    if (CollectionUtils.isEmpty(list)) {
        return result; // 返回空列表
    }

    // 将输入列表分区,每个子列表最多包含 5 个元素
    List<List<Item>> partitions = Lists.partition(list, 5);

    int size = partitions.size();
    CountDownLatch cdl = new CountDownLatch(size);

    // 并发处理每个分区
    for (List<Item> partition : partitions) {
        new Thread(() -> {
            try {
                // 在这里实现具体的处理逻辑,例如排序、过滤等
                // 将处理结果添加到 result 中
            } catch (Exception e) {
                // 记录日志
            } finally {
                cdl.countDown(); // 确保 CountDownLatch 计数减一
            }
        }).start();
    }

    // 等待所有任务完成,最多等待 3 秒
    try {
        cdl.await(3, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        // 记录日志
        e.printStackTrace();
    }

    return result;
}

改进建议

  • 线程池:直接创建新线程(new Thread())的方式不够高效,建议使用线程池(如 ExecutorService)来管理线程。

  • 线程安全result 是一个共享资源,多线程并发修改时可能会导致数据不一致。建议使用线程安全的集合(如 Collections.synchronizedList())或通过加锁来保证线程安全。

  • 超时处理:如果任务在 3 秒内未完成,cdl.await() 会超时,但已启动的线程会继续执行。需要根据业务需求决定是否取消未完成的任务。

end

2025-02-25 start:

设置多少线程数量通常根据应用的类型: IO密集型、CPU密集型。

CPU密集型(CPU-bound)

CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好得多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写、I/O (硬盘/内存), I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading很高。

在多重程序系统中,大部分时间用来做计算、逻辑判断等CPU动作的程序称之为CPU bound。例如一个计算圆周率至小数点一千位以下的程序,在执行的过程中绝大部分时间用在三角函数和开根号的计算,便是属于CPU bound的程序。

CPU bound的程序一般而言CPU占用率很高。这可能是因为任务本身不太需要访问I/O设备,也可能是因为程序是多线程实现因此屏蔽掉了等到I/O的时间。

IO密集型(I/O bound)

IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O(硬盘/内存)的读写操作,此时CPU Loading并不高。

I/O bound的程序一般在达到性能极限时,CPU占用率仍然较低。这可能是因为任务本身需要大量I/O操作,而pipeline做得不是很好,没有充分利用处理器能力。

第一种公式:

IO密集型通常设置为 2n+1,其中n为CPU核数。
CPU密集型通常设置为n+1。
在实际的开发中,并不会按照上面的公式进行设置。那么在实际开发中,我们如何给一个线程池设置合适的线程呢?

第二种公式:

其实对于IO密集型类型的应用,网上还有一个公式: 线程数 = CPU核心数 / ( 1 - 阻塞系数)。 引入了阻塞系数的概念,一般为0.8 ~ 0.9.

实际经验

在我们的业务开发中,基本上都是IO密集型,因为往往都会去操作数据库,访问redis、es等存储型组件,涉及到磁盘IO、网络IO。 对于纯计算类场景就属于CPU密集型。

IO密集型,可以考虑多设置一些线程,主要目的是可以增加IO的并发度,CPU密集型不宜过多线程,因为会造成线程切换,反而损耗性能。

一个4C8G的机器如果按照2n+1的公式,线程数设置为9个,但在我们实践过程中发现如果增大线程数量,会显著提高消息的处理能力,说明2n+1对于业务场景来说,并不太合适。

如果套用 线程数 = CPU核心数 / ( 1 - 阻塞系数), 阻塞系数取0.8,线程数为20.  阻塞系数取0.9, 大概线程数40, 20个线程数我觉得可以。

那我们怎么判断需要增加更多线程呢? 其实可以用jstack命令查看一下进程的线程数, 如果发现线程池中大部分线程都处于等待获取任务,则说明线程够用,如果大部分线程都处于运行状态,可以继续适当调高线程数量。

如果我们发现数据库的操作耗时比较多,此时可以继续提高阻塞系数,从而增大线程数量。

end

2025-03-19 start:

结合上面来进行并发查询

/**
 * @author haijin
 * @description: 并发查询工具类
 * @date 2025/3/19 11:37
 */
public class ConcurrentQueryUtil {

    /**
     * 并发查询时间片数据
     *
     * @param timeRanges 时间片列表
     * @param queryFunction 查询函数
     * @return 查询结果列表
     */
    public static <T> List<T> concurrentQuery(List<Map<String,String>> timeRanges, QueryFunction<T> queryFunction) {
        // 线程安全的集合,用于存储查询结果
        List<T> resultList = new CopyOnWriteArrayList<>();

        // 创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(timeRanges.size());

        // 使用 CountDownLatch 等待所有任务完成
        CountDownLatch latch = new CountDownLatch(timeRanges.size());

        // 提交查询任务
        for (Map<String,String> timeRange : timeRanges) {
            executorService.submit(() -> {
                try {
                    // 执行查询
                    T result = queryFunction.query(timeRange.get("startTime"), timeRange.get("endTime"));
                    resultList.add(result);
                } finally {
                    latch.countDown();
                }
            });
        }
        // 等待所有任务完成
        try {
            latch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw MsbException.logic("查询任务被中断【{}】",e.getMessage());
        }
        // 关闭线程池
        executorService.shutdown();
        return resultList;
    }

    /**
     * 查询函数接口
     */
    @FunctionalInterface
    public interface QueryFunction<T> {
        T query(String startTime, String endTime);
    }

}

使用:(splitTimeRange方法可以查看http://www.haijin.xyz/article/338)

        List<Map<String, String>> splitTimeRange = LogEntityUtil.splitTimeRange(faultAnalyzeParamVo.getStartTime(), faultAnalyzeParamVo.getEndTime());

      List<GatewayAccessLogVo> resultList = ConcurrentQueryUtil.concurrentQuery(timeRanges, (startTime, endTime) -> {
            // 调用 DAO 层查询方法
            return gateWayAccessLogDao.getGatewayAccessLogVo(startTime, endTime, null, null);
        });

        // 打印结果
        resultList.forEach(System.out::println);

end

2025-03-26 start:

异步线程查询

        ExecutorService executor = Executors.newFixedThreadPool(3);

        String finalUrl = url;
        String finalStartTime = startTime;
        String finalEndTime = endTime;
        CompletableFuture<Void> allFuture = CompletableFuture.runAsync(() ->
                setEsLogData(esLogDataVo, finalUrl, method, finalStartTime, finalEndTime, null), executor);

        CompletableFuture<Void> aFuture = CompletableFuture.runAsync(() ->
                setEsLogData(esLogDataVo, finalUrl, method, finalStartTime, finalEndTime, "a"), executor);

        CompletableFuture<Void> bFuture = CompletableFuture.runAsync(() ->
                setEsLogData(esLogDataVo, finalUrl, method, finalStartTime, finalEndTime, "b"), executor);

        // 等待所有任务完成
        CompletableFuture.allOf(allFuture, aFuture, bFuture).join();

        executor.shutdown();

end

2025-04-24 start:

        // 自定义线程池
        final ExecutorService executor = Executors.newFixedThreadPool(2);       
 // 1. 并行执行两个查询任务
        CompletableFuture<Map<Long, List<Test1Po>>> futureTest1 = CompletableFuture.supplyAsync(
                () -> dTest1Repository.findByTest1IdIn(dTest1Ids)
                        .stream()
                        .collect(Collectors.groupingBy(Test1::getTest1Id)),
                executor
        );

        CompletableFuture<Map<Long, List<Test2Po>>> futureTest2 = CompletableFuture.supplyAsync(
                () -> dTest2Repository.findByIdIn(dTest2Ids)
                        .stream()
                        .collect(Collectors.groupingBy(Test2Po::getId)),
                executor
        );

        // 2. 阻塞主线程,等待两个 Future 完成并提取结果
        Map<Long, List<Test1Po>> sTest1Map = futureTest1.join();
        Map<Long, List<Test2>> dTest2Map = futureTest2.join();

end

你好:我的2025