Executor 框架与线程池


上一篇已经提到,我们需要尽量避免在应用中显式的创建线程,而是使用线程池来提供和管理线程资源,这样可以避免频繁的线程创建导致的资源浪费和性能损耗。总结一下使用线程池的好处:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

最常见的创建线程池的的方式如下:

  • Executors.newCachedThreadPool():无限线程池。
  • Executors.newFixedThreadPool(nThreads):创建固定大小的线程池。
  • Executors.newSingleThreadExecutor():创建单个线程的线程池。

随便查看 newSingleThreadExecutor 的源码如下可以看到其实是通过 ThreadPoolExecutor 构建,而 ThreadPoolExecutor 则实现了 ExecutorService 接口,ExecutorService 接口则继承了 Executor 接口。换句话说 Java 中的线程池是通过 Executor 框架来实现。下面我们就来详细研究一下这个框架。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

Java 异步执行框架 Eexecutor

先来看一下 JDK8 juc 包下的线程池类图:

可以看到核心接口即 Executor、ExecutorService、ScheduledExecutorService 这三个。

Executor

Executor 接口是 Java5 中引入的,在 java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。

public interface Executor {
    void execute(Runnable command);
}

Executor 接口中只定义了一个方法 execute(Runnable command),该方法用来执行一个提交的 Runable 任务,任务即一个实现了 Runnable 接口的类。这个接口有两个作用

  • 提供一种将”任务提交”与”任务如何运行及线程如何使用和调度”分离开来的机制
  • 避免我们为每个任务这样显示的创建线程: new Thread(new(RunnableTask())).start()

ExecutorService

但是 JDK 并没有提供 Executor 的直接实现供用户使用,而是使用 ExecutorService 接口扩展(继承)了一下 Executor 接口,提供了更丰富的实现多线程的方法,比如增加了 shutDown(),shutDownNow(),invokeAll(),invokeAny() 和 submit() 等方法。

因此我们一般用 ExecutorService 接口来实现和管理多线程。

ScheduledExecutorService

ScheduledExecutorService 扩展了 ExecutorService 接口,增加了 schedule、scheduleAtFixedRate 及 scheduleWithFixedDelay 方法。使用这些方法可以在指定的延时后执行一个 Runnable 或者 Callable 任务,也可以按照指定时间间隔定期执行任务。

下面是使用 ScheduledExecutorService 接口的一个示例,beepForAnHour 方法可以实现在一小时内每隔 10 秒打印一下 “beep”。

import static java.util.concurrent.TimeUnit.*;
class BeeperControl {
    private final ScheduledExecutorService scheduler =
      Executors.newScheduledThreadPool(1);

    public void beepForAnHour() {
      final Runnable beeper = () -> System.out.println("beep");
      final ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);
      scheduler.schedule(() -> beeperHandle.cancel(true), 60 * 60, TimeUnit.SECONDS);
    }
}

ThreadPoolExecutor 原理

本篇开始的地方介绍了 Executors 中的线程池工厂方法都是通过 ThreadPoolExecutor 类构建不同类型的线程池,因此重点看一下这个类的原理的源码。

/**
 * An ExecutorService that executes each submitted task using
 * one of possibly several pooled threads, normally configured
 * using  Executors factory methods.
 **/
public class ThreadPoolExecutor extends AbstractExecutorService {
    //...
    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
    //...
}

//Provides default implementations of {@link ExecutorService} execution methods.
public abstract class AbstractExecutorService implements ExecutorService {
    //...
}

可以看到 JDK 首先提供了一个 ExecutorService 接口的默认实现抽象类 AbstractExecutorService,而 ThreadPoolExecutor 则继承默认实现使用线程池的方式重写了任务执行的方式。

通过上面 ThreadPoolExecutor 的构造方法源码可以看到其中几个核心的参数,详细介绍如下:

  • corePoolSize 线程池的核心线程数,当线程空闲也不会回收的线程数量,当有新任务在execute()方法提交时,会执行以下判断:

    1. 如果运行的线程少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的;
    2. 如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当 workQueue 满时才创建新的线程去处理任务;
    3. 如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建的线程池的大小是固定的,这时如果有新任务提交,若 workQueue 未满,则将请求放入 workQueue 中,等待有空闲的线程去从 workQueue 中取任务并处理;
    4. 如果运行的线程数量大于等于 maximumPoolSize,这时如果 workQueue 已经满了,则通过 handler 所指定的策略来处理任务;
  • maximumPoolSize 线程池最大线程个数

  • keepAliveTime 和 unit 则是超过核心线程数的线程空闲后的存活时间
  • workQueue 用于存放任务的阻塞队列
  • handler 当队列和最大线程池都满了之后的饱和策略,它是 RejectedExecutionHandler 类型的变量。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:
    1. AbortPolicy:直接抛出异常,这是默认策略;
    2. CallerRunsPolicy:用调用者所在的线程来执行任务;
    3. DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    4. DiscardPolicy:直接丢弃任务;

再来看看最核心的 excute 方法的实现逻辑:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

代码逻辑转化为流程图如下

注意:在向线程池提交任务时,除了 execute 方法,还可以使用 submit 方法提交一个 Callable 实例,submit 方法会返回一个 Future 对象用于获取返回值。

另外类中定义了线程池的五种运行状态,这几种状态的转化关系如下图所示

如何配置线程池

注意线程池肯定不是越大越好,首先大量线程会占用可观的系统资源,频繁的线程上下文切换对 CPU 来说也是很大的消耗,一般的线程配置可以有以下一些参考准则:

  • IO 密集型任务:由于线程并不是一直在运行,所以可以尽可能的多配置线程,比如 CPU 个数 * 2
  • CPU 密集型任务(大量复杂的运算)应当分配较少的线程,比如 CPU 个数相当的大小

Springboot 中使用线程池

在 Springboot 中一般是结合异步 @Async 注解及 Spring 的 TaskExecutor 来实现。首先在 Spring 中可以使用 @async 注解实现方法的异步调用,其原理就是动态代理标有 @async 注解的类,将原来的普通方法封装成一个 Runnable 或者 Callable 类型放在一个队列里等待线程池异步的领取任务执行。另外 @Async 注解允许我们指定其线程池。因此在 Springboot 中使用线程池变得很简单:

  1. 定义线程池

    @Bean("synEsAsyncExecutor")
    public TaskExecutor getSynEsAsyncExecutor() {
     ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
     executor.setCorePoolSize(corePoolSize);// 设置核心线程数
     executor.setMaxPoolSize(maxPoolSize);// 设置最大线程数
     executor.setQueueCapacity(queueCapacity);// 设置队列容量
     executor.setKeepAliveSeconds(keepAliveSeconds);// 设置线程活跃时间(秒)
     executor.setThreadNamePrefix("task-synEs-");
     executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
     executor.initialize();
     return executor;
    }
    
  2. 开启 Spring 对异步的支持

    @Configuration
    @EnableAsync
    public class ThreadPoolConfig implements AsyncConfigurer{
     //...
    }
    
  3. 使用线程池,只需要在 Async 注解上配上线程池实例名称即可

    @Component("commentService")
    @Async("synEsAsyncExecutor")
    public class CommentServiceImpl implements CommentService{
     @Override
     public void hbaseToEsForJobService(UpdateRecordVo updateRecordVo) throws Exception {
         hbaseToEsById(updateRecordVo.getCommentId());
     }
    }
    
  4. 异常处理,实现 AsyncConfigurer 接口的 getAsyncUncaughtExceptionHandler 方法,此 handler 将被用于处理异常

完整的代码示例如下,其中定义了两个线程池分别用于不同的异步任务确保线程池隔离,避免两个任务之间出现线程资源竞争。同时还实现了异步方法异常的统一处理。

@Configuration
@EnableAsync
public class ThreadPoolConfig implements AsyncConfigurer{
    private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolConfig.class);

    @Value("${task-executor.core_pool_size}")
    private int corePoolSize;
    @Value("${task-executor.max_pool_size}")
    private int maxPoolSize;
    @Value("${task-executor.queue-capacity}")
    private int queueCapacity;
    @Value("${task-executor.keep-alive-seconds}")
    private int keepAliveSeconds;

    @Autowired
    private AsyncExceptionHandlerService exceptionHandler;

    /**
     * 聚合任务线程池
     * @return
     */
    @Bean("aggAsyncExecutor")
    public TaskExecutor getAggAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);// 设置核心线程数
        executor.setMaxPoolSize(maxPoolSize);// 设置最大线程数
        executor.setQueueCapacity(queueCapacity);// 设置队列容量
        executor.setKeepAliveSeconds(keepAliveSeconds);// 设置线程活跃时间(秒)
        executor.setThreadNamePrefix("task-agg-");// 设置默认线程名称
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    /**
     * es同步任务线程池
     * @return
     */
    @Bean("synEsAsyncExecutor")
    public TaskExecutor getSynEsAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);// 设置核心线程数
        executor.setMaxPoolSize(maxPoolSize);// 设置最大线程数
        executor.setQueueCapacity(queueCapacity);// 设置队列容量
        executor.setKeepAliveSeconds(keepAliveSeconds);// 设置线程活跃时间(秒)
        executor.setThreadNamePrefix("task-synEs-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
         return new MyAsyncExceptionHandler(exceptionHandler);  
    }

    /**
     * 异常任务统一处理
     */
    class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
        private AsyncExceptionHandlerService exceptionHandler;
        public MyAsyncExceptionHandler(AsyncExceptionHandlerService exceptionHandler) {
            this.exceptionHandler = exceptionHandler;
        }
        @Override  
        public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {  
            // 打印异常日志,添加ump监控
            String methodName = method.getName();
            LOGGER.error("task: {} execute error for record: {}! errMsg: {}", 
                    methodName, JSON.toJSONString(obj), throwable.getMessage());
            exceptionHandler.handle(methodName, obj[0]);
        }  
    } 

}

注意这里使用的线程池是 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor 类,该类实现了 org.springframework.core.task.TaskExecutor 接口(TaskExecutor 继承了 java.util.concurrent.Executor),是 Spring 提供的一种可以使用 JavaConfig 形式配置线程池,并且方便管理和监控线程池的一个类。换句话说 SpringFrameWork 的 ThreadPoolTaskExecutor 是辅助 JDK 的 ThreadPoolExecutor 的工具类,它将属性通过 JavaBeans 的命名规则提供出来,方便进行配置。

更通俗的将 JDK 中都是基于 Executor 接口,而 Spring 中则都是基于 TaskExecutor,但其实 TaskExecutor 继承了 Executor 接口。Spring 的 TaskExecutor 的常用实现类基本都是是基于 Executor 实现类的包装,使其更加方便使用,更好的融入 spring bean 生态。

当然了我们也可以通过 Executors 的工厂方法来创建使用 JDK 的线程池,不过在 Spring 中还是使用 Spring 封装过的 executor 更方便一些。

总结

关于线程池就先整理这些知识点,另外还有一些细节问题后续再补充,比如线程池的监控(通过 ThreadPoolExecutor 的一些方法获取线程池的运行指标)、线程池与 ThreadLocal 同时使用的注意事项(线程服用导致线程变量互串、线程结束没有调用 remove 导致内存泄漏等等)。

参考

Copyright © jverson.com 2018 all right reserved,powered by GitbookFile Modify: 2019-05-26 23:00:06

results matching ""

    No results matching ""