java中线程池的使用

我爱海鲸 2026-02-06 11:51:31 暂无标签

简介java做并发操作,调用接口、异步

2025-10-11 start:

简单的异步调用:

        CompletableFuture.runAsync(() -> testMapper.find(testDTOS));

end

使用java线程池调用接口,做并发,提高执行效率。

相关代码,如下:

        int concurrentCount = 20; //设置并发数量

        if (familyList.size() < concurrentCount) { // 请求人数的并发数量小于20时,使用请求人数的并发数

            concurrentCount = familyList.size();

        }

        ExecutorService executorService = Executors.newWorkStealingPool(concurrentCount);  //创建一个抢占式的线程池,由jdk8提供

        List<Callable<Map<String,Object>>> jobList = Lists.newArrayList();

 for (int i = 0; i < familyList.size(); i++) {

            //使用线程池批量调用相关接口

            jobList.add(new Callable<Map<String, Object>>() {

                @Override

                public Map<String, Object> call() throws Exception {

                    Map<String,Object> res = Maps.newHashMap();

                     // 调用相关接口

                    res.put("return_code",response.getReturn_code());

                    res.put("failMsg",response.getReturn_msg());

                    res.put("failPhone",phone);

                    long t2_ = System.currentTimeMillis();

                    return res;

                }

            });

        }

 

  int errors = 0; // 调用相关接口失败的数量

        StringBuilder errmsg = new StringBuilder();

        try {

            List<Future<Map<String, Object>>> futures = executorService.invokeAll(jobList);

            executorService.shutdown(); 

            for(Future<Map<String,Object>> job:futures){

                Map<String,Object> res = job.get();

                if(!"0".equals(res.get("return_code"))){

                    errmsg.append("手机号[")

                            .append(res.get("failPhone"))

                            .append("]原因:")

                            .append(res.get("failMsg"));

                    errors++;

                }

            }

        } catch (InterruptedException e) {

            logger.error("线程池调用异常"+e.getMessage(),e);

        } catch (ExecutionException e) {

            logger.error("线程池调用异常"+e.getMessage(),e);

        }

Java 五种线程池,JDK1.8新增newWorkStealingPool

2023-04-19 start

一个多线程调用思路:

 public List<ResultVo> function(List<Item> list) {
        List<ResultVo> result = new Arraylist<>();
        
        if (CollectionUtils.isEmpty(list)){
            return list;
        }
        
        List<List<ResultVo>> a = Lists.partition(list, 5);
        
        int size = a.size();
        CountDownLatch cdl = new CountDownLatch(size);
        
        // 排序并过滤
        for (List<PlayRoomVo> p : a) {
            try {
							// for循环操作
            } catch (Exception e){
                //todo log
            } finally {
                cdl.countDown();
            }
        }

        try {
            cdl.await(3, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            //todo log
            e.printStackTrace();
        }

        return result;
    }

优化后:

public List<ResultVo> function(List<Item> list) {
    List<ResultVo> result = new ArrayList<>();

    if (CollectionUtils.isEmpty(list)) {
        return result; // 返回空列表
    }

    // 将输入列表分区,每个子列表最多包含 5 个元素
    List<List<Item>> partitions = Lists.partition(list, 5);

    int size = partitions.size();
    CountDownLatch cdl = new CountDownLatch(size);

    // 并发处理每个分区
    for (List<Item> partition : partitions) {
        new Thread(() -> {
            try {
                // 在这里实现具体的处理逻辑,例如排序、过滤等
                // 将处理结果添加到 result 中
            } catch (Exception e) {
                // 记录日志
            } finally {
                cdl.countDown(); // 确保 CountDownLatch 计数减一
            }
        }).start();
    }

    // 等待所有任务完成,最多等待 3 秒
    try {
        cdl.await(3, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        // 记录日志
        e.printStackTrace();
    }

    return result;
}

改进建议

  • 线程池:直接创建新线程(new Thread())的方式不够高效,建议使用线程池(如 ExecutorService)来管理线程。

  • 线程安全result 是一个共享资源,多线程并发修改时可能会导致数据不一致。建议使用线程安全的集合(如 Collections.synchronizedList())或通过加锁来保证线程安全。

  • 超时处理:如果任务在 3 秒内未完成,cdl.await() 会超时,但已启动的线程会继续执行。需要根据业务需求决定是否取消未完成的任务。

end

2025-02-25 start:

设置多少线程数量通常根据应用的类型: IO密集型、CPU密集型。

CPU密集型(CPU-bound)

CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好得多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写、I/O (硬盘/内存), I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading很高。

在多重程序系统中,大部分时间用来做计算、逻辑判断等CPU动作的程序称之为CPU bound。例如一个计算圆周率至小数点一千位以下的程序,在执行的过程中绝大部分时间用在三角函数和开根号的计算,便是属于CPU bound的程序。

CPU bound的程序一般而言CPU占用率很高。这可能是因为任务本身不太需要访问I/O设备,也可能是因为程序是多线程实现因此屏蔽掉了等到I/O的时间。

IO密集型(I/O bound)

IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O(硬盘/内存)的读写操作,此时CPU Loading并不高。

I/O bound的程序一般在达到性能极限时,CPU占用率仍然较低。这可能是因为任务本身需要大量I/O操作,而pipeline做得不是很好,没有充分利用处理器能力。

第一种公式:

IO密集型通常设置为 2n+1,其中n为CPU核数。
CPU密集型通常设置为n+1。
在实际的开发中,并不会按照上面的公式进行设置。那么在实际开发中,我们如何给一个线程池设置合适的线程呢?

第二种公式:

其实对于IO密集型类型的应用,网上还有一个公式: 线程数 = CPU核心数 / ( 1 - 阻塞系数)。 引入了阻塞系数的概念,一般为0.8 ~ 0.9.

实际经验

在我们的业务开发中,基本上都是IO密集型,因为往往都会去操作数据库,访问redis、es等存储型组件,涉及到磁盘IO、网络IO。 对于纯计算类场景就属于CPU密集型。

IO密集型,可以考虑多设置一些线程,主要目的是可以增加IO的并发度,CPU密集型不宜过多线程,因为会造成线程切换,反而损耗性能。

一个4C8G的机器如果按照2n+1的公式,线程数设置为9个,但在我们实践过程中发现如果增大线程数量,会显著提高消息的处理能力,说明2n+1对于业务场景来说,并不太合适。

如果套用 线程数 = CPU核心数 / ( 1 - 阻塞系数), 阻塞系数取0.8,线程数为20.  阻塞系数取0.9, 大概线程数40, 20个线程数我觉得可以。

那我们怎么判断需要增加更多线程呢? 其实可以用jstack命令查看一下进程的线程数, 如果发现线程池中大部分线程都处于等待获取任务,则说明线程够用,如果大部分线程都处于运行状态,可以继续适当调高线程数量。

如果我们发现数据库的操作耗时比较多,此时可以继续提高阻塞系数,从而增大线程数量。

end

2025-03-19 start:

结合上面来进行并发查询

/**
 * @author haijin
 * @description: 并发查询工具类
 * @date 2025/3/19 11:37
 */
public class ConcurrentQueryUtil {

    /**
     * 并发查询时间片数据
     *
     * @param timeRanges 时间片列表
     * @param queryFunction 查询函数
     * @return 查询结果列表
     */
    public static <T> List<T> concurrentQuery(List<Map<String,String>> timeRanges, QueryFunction<T> queryFunction) {
        // 线程安全的集合,用于存储查询结果
        List<T> resultList = new CopyOnWriteArrayList<>();

        // 创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(timeRanges.size());

        // 使用 CountDownLatch 等待所有任务完成
        CountDownLatch latch = new CountDownLatch(timeRanges.size());

        // 提交查询任务
        for (Map<String,String> timeRange : timeRanges) {
            executorService.submit(() -> {
                try {
                    // 执行查询
                    T result = queryFunction.query(timeRange.get("startTime"), timeRange.get("endTime"));
                    resultList.add(result);
                } finally {
                    latch.countDown();
                }
            });
        }
        // 等待所有任务完成
        try {
            latch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw MsbException.logic("查询任务被中断【{}】",e.getMessage());
        }
        // 关闭线程池
        executorService.shutdown();
        return resultList;
    }

    /**
     * 查询函数接口
     */
    @FunctionalInterface
    public interface QueryFunction<T> {
        T query(String startTime, String endTime);
    }

}

使用:(splitTimeRange方法可以查看http://www.haijin.xyz/article/338)

        List<Map<String, String>> splitTimeRange = LogEntityUtil.splitTimeRange(faultAnalyzeParamVo.getStartTime(), faultAnalyzeParamVo.getEndTime());

      List<GatewayAccessLogVo> resultList = ConcurrentQueryUtil.concurrentQuery(timeRanges, (startTime, endTime) -> {
            // 调用 DAO 层查询方法
            return gateWayAccessLogDao.getGatewayAccessLogVo(startTime, endTime, null, null);
        });

        // 打印结果
        resultList.forEach(System.out::println);

end

2025-03-26 start:

异步线程查询

        ExecutorService executor = Executors.newFixedThreadPool(3);

        String finalUrl = url;
        String finalStartTime = startTime;
        String finalEndTime = endTime;
        CompletableFuture<Void> allFuture = CompletableFuture.runAsync(() ->
                setEsLogData(esLogDataVo, finalUrl, method, finalStartTime, finalEndTime, null), executor);

        CompletableFuture<Void> aFuture = CompletableFuture.runAsync(() ->
                setEsLogData(esLogDataVo, finalUrl, method, finalStartTime, finalEndTime, "a"), executor);

        CompletableFuture<Void> bFuture = CompletableFuture.runAsync(() ->
                setEsLogData(esLogDataVo, finalUrl, method, finalStartTime, finalEndTime, "b"), executor);

        // 等待所有任务完成
        CompletableFuture.allOf(allFuture, aFuture, bFuture).join();

        executor.shutdown();

end

2025-04-24 start:

        // 自定义线程池
        final ExecutorService executor = Executors.newFixedThreadPool(2);       
 // 1. 并行执行两个查询任务
        CompletableFuture<Map<Long, List<Test1Po>>> futureTest1 = CompletableFuture.supplyAsync(
                () -> dTest1Repository.findByTest1IdIn(dTest1Ids)
                        .stream()
                        .collect(Collectors.groupingBy(Test1::getTest1Id)),
                executor
        );

        CompletableFuture<Map<Long, List<Test2Po>>> futureTest2 = CompletableFuture.supplyAsync(
                () -> dTest2Repository.findByIdIn(dTest2Ids)
                        .stream()
                        .collect(Collectors.groupingBy(Test2Po::getId)),
                executor
        );

        // 2. 阻塞主线程,等待两个 Future 完成并提取结果
        Map<Long, List<Test1Po>> sTest1Map = futureTest1.join();
        Map<Long, List<Test2>> dTest2Map = futureTest2.join();

end

2026-01-15 start:

需求:通过一个数组的资源id集合查询某一个视频接口,视频接口每次查询一个,通过并提供查询的效率工具类如下:

import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 并发查询工具类
 * 
 * @author system
 */
@Slf4j
public class ConcurrentQueryUtil {

    /**
     * 并发执行查询任务
     * 
     * @param <T> 查询结果类型
     * @param <P> 查询参数类型
     * @param params 查询参数列表
     * @param queryFunction 查询函数
     * @param timeoutSeconds 超时时间(秒)
     * @param maxThreadCount 最大线程数
     * @return 查询结果列表
     */
    public static <T, P> List<T> concurrentQuery(
            List<P> params,
            QueryFunction<T, P> queryFunction,
            int timeoutSeconds,
            int maxThreadCount) {
        
        if (CollectionUtils.isEmpty(params)) {
            return new ArrayList<>();
        }
        
        // 线程安全的结果集合
        List<T> resultList = Collections.synchronizedList(new ArrayList<>());
        
        // 计算线程数:取参数数量和最大线程数的较小值
        int threadCount = Math.min(params.size(), maxThreadCount);
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch = new CountDownLatch(params.size());
        
        try {
            // 提交查询任务
            for (P param : params) {
                executorService.submit(() -> {
                    try {
                        T result = queryFunction.query(param);
                        if (result != null) {
                            resultList.add(result);
                        }
                    } catch (Exception e) {
                        log.error("并发查询异常,参数: {}", param, e);
                    } finally {
                        latch.countDown();
                    }
                });
            }
            
            // 等待所有任务完成
            boolean completed = latch.await(timeoutSeconds, TimeUnit.SECONDS);
            if (!completed) {
                log.warn("并发查询超时,部分查询可能未完成,超时时间: {}秒", timeoutSeconds);
            }
        } catch (InterruptedException e) {
            log.error("并发查询被中断", e);
            Thread.currentThread().interrupt();
        } finally {
            executorService.shutdown();
            try {
                if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
                    executorService.shutdownNow();
                }
            } catch (InterruptedException e) {
                executorService.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
        
        return resultList;
    }

    /**
     * 查询函数接口
     * 
     * @param <T> 返回结果类型
     * @param <P> 参数类型
     */
    @FunctionalInterface
    public interface QueryFunction<T, P> {
        /**
         * 执行查询
         * 
         * @param param 查询参数
         * @return 查询结果,如果查询失败或不需要记录,可返回null
         */
        T query(P param);
    }
}

调用:

      // 构建查询参数列表
        List<VideoQueryParam> queryParams = new ArrayList<>();
        for (String resourceId : validResourceIds) {
            queryParams.add(new VideoQueryParam(resourceId, type));
        }
        
        // 使用并发查询工具类执行查询
        List<VideoListResponse.VideoInfo> videoList = ConcurrentQueryUtil.concurrentQuery(
                queryParams,
                this::queryVideoInfo,
                30,
                10
        );


/**
     * 查询视频信息
     * 
     * @param param 查询参数
     * @return 视频信息,查询失败返回null
     */
    private VideoListResponse.VideoInfo queryVideoInfo(VideoQueryParam param) {
        VideoQueryResponse videoResponse = imusicVideoService.queryVideo(param.getResourceId(), param.getType());
        
        if (videoResponse == null || !"0000".equals(videoResponse.getCode())) {
            log.warn("视频查询失败,resourceId: {}, code: {}, message: {}", 
                    param.getResourceId(), 
                    videoResponse != null ? videoResponse.getCode() : "null",
                    videoResponse != null ? videoResponse.getMessage() : "null");
            return null;
        }
        
        VideoQueryResponse.VideoData data = videoResponse.getData();
        if (data == null) {
            return null;
        }
        
        return VideoInfoWrapper.convertToVideoInfo(data);
    }

    /**
     * 视频查询参数
     */
    @Data
    @AllArgsConstructor
    private static class VideoQueryParam {
        /**
         * 资源ID
         */
        private String resourceId;
        
        /**
         * 视频类型
         */
        private ImusicVideoTypeEnum type;
    }

end

2026-02-06 start:

自定义线程池:

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 * 异步任务线程池配置类
 * 自定义ThreadPoolTaskExecutor替代Spring默认异步线程池,避免默认线程池核心参数不合理导致的性能问题
 */
@Configuration
@EnableAsync
public class AsyncConfig {

    /**
     * 自定义异步线程池Bean
     * 命名为asyncExecutor,配合@Async("asyncExecutor")指定使用此线程池
     * @return 自定义的线程池执行器
     */
    @Bean("asyncExecutor")
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        // 核心线程数:CPU核心数*2,保证核心任务的并行执行,充分利用CPU资源
        int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
        executor.setCorePoolSize(corePoolSize);

        // 最大线程数:CPU核心数*4,核心线程忙且队列满时,最多创建的线程数,防止线程过多导致上下文切换频繁
        int maxPoolSize = Runtime.getRuntime().availableProcessors() * 4;
        executor.setMaxPoolSize(maxPoolSize);

        // 任务队列容量:1000,核心线程满时,任务进入阻塞队列等待,避免频繁创建非核心线程
        executor.setQueueCapacity(1000);

        // 非核心线程空闲存活时间:60秒,空闲超时后销毁,减少资源占用
        executor.setKeepAliveSeconds(60);

        // 线程名称前缀:async-task-,便于日志排查(如async-task-1、async-task-2)
        executor.setThreadNamePrefix("async-task-");

        // 拒绝策略:CallerRunsPolicy,队列满+最大线程数满时,由**调用者线程**执行任务
        // 避免任务直接丢弃,适合对任务不允许丢失的业务场景
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        // 初始化线程池(ThreadPoolTaskExecutor必须显式调用,否则不生效)
        executor.initialize();

        return executor;
    }
}

关键配置说明(理解每一步的意义)

 
配置项 取值 设计思路
核心线程数corePoolSize CPU 核心数 * 2 核心线程会一直存活(即使空闲),适配 CPU 密集型 + IO 密集型混合异步任务,充分利用 CPU
最大线程数maxPoolSize CPU 核心数 * 4 非核心线程仅在队列满时创建,上限为 4 倍核心数,避免线程过多导致 CPU 上下文切换开销
队列容量queueCapacity 1000 采用有界队列(必须有界,避免无界队列撑爆内存),缓冲待执行的异步任务
空闲存活时间keepAliveSeconds 60 秒 非核心线程空闲超 60 秒销毁,减少内存和线程调度资源占用
线程名称前缀 async-task- 日志中可快速识别异步任务线程,便于问题排查(如定位慢异步任务、线程死锁)
拒绝策略 CallerRunsPolicy 任务触发拒绝时,由调用方线程同步执行(如 Controller 调用异步方法则由 Tomcat 线程执行),无任务丢失
显式初始化 executor.initialize() ThreadPoolTaskExecutor 是 Spring 封装的线程池,必须显式调用初始化才会创建线程池

配套使用方式(核心!否则线程池不生效)

 
Spring 默认的@Async会使用内置的简易线程池,必须指定自定义线程池名称,才能使用上面的配置:
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class AsyncTaskService {

    // 显式指定使用名为asyncExecutor的线程池
    @Async("asyncExecutor")
    public void doAsyncTask() {
        // 你的异步业务逻辑(如耗时的文件处理、远程调用、数据统计等)
        System.out.println("异步任务执行,线程名称:" + Thread.currentThread().getName());
    }
}
  • CallerRunsPolicy业务无任务丢失要求的首选,其他常见拒绝策略适合场景:
     
    • AbortPolicy(默认):直接抛出RejectedExecutionException,适合需要感知任务拒绝的场景
    • DiscardPolicy:直接丢弃最新任务,无异常,适合非核心异步任务
    • DiscardOldestPolicy:丢弃队列中最旧的任务,执行最新任务,适合实时性要求高的场景
     
  • 参数调优参考
     
    • IO 密集型任务(如远程调用、数据库查询、文件读写):核心 / 最大线程数可适当提高(如 CPU4/CPU8),因为线程大部分时间在等待 IO
    • CPU 密集型任务(如数据计算、加密解密):核心 / 最大线程数接近 CPU 核心数即可(如 CPU1/CPU2),避免上下文切换
    • 队列容量:根据业务 QPS 调整,不宜过大(否则任务等待时间过长),也不宜过小(否则频繁创建非核心线程)

end

你好:我的2025