风离不摆烂学习日志Day16 Java 异步线程池 处理策略

四种基本拒绝策略:
1.AbortPolicy 默认 – 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。
2.CallerRunsPolicy 常用 – 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。
3.DiscardOldestPolicy – 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
4.DiscardPolicy – 当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。

拒绝策略生效原理

新任务到达线程池时

每当有新的任务到线程池时,

  • 第一步:先判断线程池中当前线程数量是否达到了corePoolSize,若未达到,则新建线程运行此任务,且任务结束后将该线程保留在线程池中,不做销毁处理,若当前线程数量已达到corePoolSize,则进入下一步;
  • 第二步:判断工作队列(workQueue)是否已满,未满则将新的任务提交到工作队列中,满了则进入下一步;
  • 第三步:判断线程池中的线程数量是否达到了maxuPoolSize,如果未达到,则新建一个工作线程来执行这个任务,如果达到了则使用饱和策略来处理这个任务。注意: 在线程池中的线程数量超过corePoolSize时,每当有线程的空闲时间超过了keepAliveTime,这个线程就会被终止。直到线程池中线程的数量不大于corePoolSize为止。
  1. 核心线程数:m,最大线程数: n(一般 n > m),线程池队列最大容量: j,请求进入线程总数: s;

  2. 线程进场后(s<m),线程会直接启动直到启动的线程数达到核心线程数(s=m);

  3. 线程继续进场(m<s<m+j),线程开始排队,此时启动的线程数不会增加直到队列饱和(s=m+j);

  4. 线程继续进场(m+j<s<n+j),每进场一个线程,该线程就会启动;直到启动的线程达到最大线程数(s=n+j);

  5. 线程继续进场(s>n+j),此时触发拒绝策略;

落地思路分析

异步处理需要考虑可能出现的问题最大有六点

  • 合理控制处理线程的数量 处理线程的数量不要超过CPU核数 等于是最好的 核心线程数量设置为最大线程数的3/4 保证留余
  • 找到线程池队列的最大容量 尽量减少触发拒绝策略
  • 选择合适的线程池队列(有界队列 无界队列 同步移交队列
  • 系统并发量 当采用CallerRunsPolicy 策略的时候 触发的时候 被拒绝的会被交由主线程来处理 并发部分进行优化,优化的核心就是思想就是:控制并发请求的流量
  • 避免异步之间的依赖关系造成死锁 即互相等待(这种情况建议是不使用异步 具体场景具体分析)
  • 要获取异步函数的返回值可以使用 Future,但是Future 的get方法是阻塞的,使用时需要注意 等所有异步任务结束之后再调用获取方法

针对上述四点 我们可以优化上一篇文章的代码 来解决上述问题

首先查资料可得知

IO密集型(某大厂实践经验) 核心线程数 = CPU核数 / (1-阻塞系数)

CPU密集型:核心线程数 = CPU核数 + 1

=================================================================================

(1)、CPU密集型

CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好很多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading 很高。

在多重程序系统中,大部分时间用来做计算、逻辑判断等CPU动作的程序称之CPU bound。例如一个计算圆周率至小数点一千位以下的程序,在执行的过程当中绝大部分时间用在三角函数和开根号的计算,便是属于CPU bound的程序。

CPU bound的程序一般而言CPU占用率相当高。这可能是因为任务本身不太需要访问I/O设备,也可能是因为程序是多线程实现因此屏蔽掉了等待I/O的时间。

(2)、IO密集型

IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,此时CPU Loading并不高。

I/O bound的程序一般在达到性能极限时,CPU占用率仍然较低。这可能是因为任务本身需要大量I/O操作,而pipeline做得不是很好,没有充分利用处理器能力。

二来 关于队列的选取

1、无界队列

队列大小无限制,常用的为无界的LinkedBlockingQueue,使用该队列作为阻塞队列时要尤其当心,当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致OOM。

阅读代码发现,Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue,而博主踩到的就是这个坑,当QPS很高,发送数据很大,大量的任务被添加到这个无界LinkedBlockingQueue 中,导致cpu和内存飙升服务器挂掉。

当然这种队列,maximumPoolSize 的值也就无效了。

当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。

这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

2、有界队列

当使用有限的 maximumPoolSizes 时,有界队列有助于防止资源耗尽,但是可能较难调整和控制。

常用的有两类,一类是遵循FIFO原则的队列如ArrayBlockingQueue,另一类是优先级队列如PriorityBlockingQueue。

PriorityBlockingQueue中的优先级由任务的Comparator决定。

使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减少内存消耗,降低cpu使用率和上下文切换,但是可能会限制系统吞吐量。

3、同步移交队列

如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。

SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。

只有在使用无界线程池或者有饱和策略时才建议使用该队列。

上一篇文章代码改进

原: ExecutorUtils

package top.flya.utils;


import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.extern.slf4j.Slf4j;
import top.flya.dao.entity.PublicUser;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 线程工具类 ExecutorUtils
 * @author flya
 * @date 2019/12/18
 */

@Slf4j
public class ExecutorUtils {

    private static int corePoolSize = 10; // 线程池维护线程的最少数量

    private static int maxPoolSize = 100; // 线程池维护线程的最大数量

    private static int queueCapacity = 100; // 缓存队列

    private static Long keepAliveTime = 1L;  //设置线程池维护线程所允许的空闲时间 1L

    public  static volatile ThreadPoolExecutor threadPoolExecutorInstance = null; // 线程池

    // 单例模式 懒汉式: 线程安全 但是效率低 为了效率使用双重检查锁
    public ExecutorUtils() {
    }


    /**
     * 自定义线程池初始参数
     * @param corePoolSite 线程池维护线程的最少数量
     * @param maxPoolSite 线程池维护线程的最大数量
     * @param queueCapacity 缓存队列
     * @param keepAliveTime 设置线程池维护线程所允许的空闲时间
     */
    public static void initialize(int corePoolSite, int maxPoolSite, int queueCapacity, long keepAliveTime) {
        ExecutorUtils.corePoolSize = corePoolSite;
        ExecutorUtils.maxPoolSize = maxPoolSite;
        ExecutorUtils.queueCapacity = queueCapacity;
        ExecutorUtils.keepAliveTime = keepAliveTime;
    }

    /**
     * 获取线程池 初始化
     * @return
     */
    public static ThreadPoolExecutor getThreadPoolExecutorInstance() {
        if (threadPoolExecutorInstance == null||threadPoolExecutorInstance.isShutdown()) {
            synchronized (ExecutorUtils.class) {
                if (threadPoolExecutorInstance == null||threadPoolExecutorInstance.isShutdown()) {// 双重检查
                    log.info("初始化线程池");
                    threadPoolExecutorInstance = new ThreadPoolExecutor(corePoolSize,
                            maxPoolSize, keepAliveTime,
                            java.util.concurrent.TimeUnit.SECONDS,
                            new java.util.concurrent.ArrayBlockingQueue<Runnable>(queueCapacity),
                            new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy()); //.ThreadPoolExecutor.CallerRunsPolicy() 的意思是:如果线程池已经满了,那么就由调用者所在的线程来执行任务
                }
            }
        }
        return threadPoolExecutorInstance;
    }

    public static void batchExec(CompletableFuture[] completableFutures)
    {
        CompletableFuture.allOf(completableFutures).join();
        //TODO 任务执行完毕后的处理
    }


}

核心改进点为自动获取配置的核心线程数

package top.flya.utils;


import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.extern.slf4j.Slf4j;
import top.flya.dao.entity.PublicUser;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 线程工具类 ExecutorUtils
 * @author flya
 * @date 2019/12/18
 */

@Slf4j
public class ExecutorUtils {
    private static final int N_CPU = Runtime.getRuntime().availableProcessors();

    private static int corePoolSize = (int) (0.75*N_CPU); // 线程池维护线程的最少数量 0.75*N_CPU

    private static int maxPoolSize = N_CPU; // 线程池维护线程的最大数量 设置为cpu核数

    private static int queueCapacity = 300; // 缓存队列

    private static Long keepAliveTime = 1L;  //设置线程池维护线程所允许的空闲时间 1L

    public  static volatile ThreadPoolExecutor threadPoolExecutorInstance = null; // 线程池

    /**
     * 并发处理的大小,防止触发风控系统的流控 我们商城系统没做限制
     */
    private static final int batchSize = 100;

    // 单例模式 懒汉式: 线程安全 但是效率低 为了效率使用双重检查锁
    public ExecutorUtils() {
    }


    /**
     * 自定义线程池初始参数
     * @param corePoolSite 线程池维护线程的最少数量
     * @param maxPoolSite 线程池维护线程的最大数量
     * @param queueCapacity 缓存队列
     * @param keepAliveTime 设置线程池维护线程所允许的空闲时间
     */
    public static void initialize(int corePoolSite, int maxPoolSite, int queueCapacity, long keepAliveTime) {
        ExecutorUtils.corePoolSize = corePoolSite;
        ExecutorUtils.maxPoolSize = maxPoolSite;
        ExecutorUtils.queueCapacity = queueCapacity;
        ExecutorUtils.keepAliveTime = keepAliveTime;
    }

    /**
     * 获取线程池 初始化
     * @return
     */
    public static ThreadPoolExecutor getThreadPoolExecutorInstance() {
        if (threadPoolExecutorInstance == null||threadPoolExecutorInstance.isShutdown()) {
            synchronized (ExecutorUtils.class) {
                if (threadPoolExecutorInstance == null||threadPoolExecutorInstance.isShutdown()) {// 双重检查
                    log.info("doProcess with cpu numbers: {}, batchSize: {}", N_CPU, batchSize);
                    threadPoolExecutorInstance = new ThreadPoolExecutor(corePoolSize,
                            maxPoolSize, keepAliveTime,
                            java.util.concurrent.TimeUnit.SECONDS,
                            new java.util.concurrent.ArrayBlockingQueue<Runnable>(queueCapacity),
                            new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy()); //.ThreadPoolExecutor.CallerRunsPolicy() 的意思是:如果线程池已经满了,那么就由调用者所在的线程来执行任务
                }
            }
        }
        return threadPoolExecutorInstance;
    }

    public static void batchExec(CompletableFuture[] completableFutures)
    {
        CompletableFuture.allOf(completableFutures).join();
        //TODO 任务执行完毕后的处理
    }


}