线程池

Posted on By xqw

线程池

public interface Executor
public interface ExecutorService extends Executor
public abstract class AbstractExecutorService implements ExecutorService
public class ThreadPoolExecutor extends AbstractExecutorService 
public ThreadPoolExecutor(int corePoolSize,//线程池中正常线程数
                          int maximumPoolSize,//最大线程数
                          long keepAliveTime,//超过正常线程数的线程存活时间
                          TimeUnit unit,//keepAliveTime的单位
                          BlockingQueue<Runnable> workQueue,//任务队列
                          ThreadFactory threadFactory,//线程工厂
                          RejectedExecutionHandler handler)//拒绝策略

有任务->
实际线程数 < corePoolSize:创建线程
实际线程数 > corePoolSize:(
队列没满:放入队列
队列满了:(
总线程数 < maximumPoolSize:创建新线程执行
总线程数 > maximumPoolSize:拒绝策略
))

队列:
直接提交的队列(容量为0,永远都是:队列满了):SynchronousQueue
有界的任务队列(容量为常数):ArrayBlockingQueue
无界的任务队列(容量为正无穷):LinkedBlockingQueue
优先任务队列(PriorityBlockingQueue,其他的队列为先进先出,这个可以有自己的优先顺序)

拒绝策略:

  1. CallerRunsPolicy 该策略下,在调用者线程中直接执行被拒绝任务的run方法,除非线程池已经shutdown,则直接抛弃任务。
  2. AbortPolicy 该策略下,直接丢弃任务,并抛出RejectedExecutionException异常。
  3. DiscardPolicy 该策略下,直接丢弃任务,什么都不做。
  4. DiscardOldestPolicy 该策略下,抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列
Executors 工厂
//固定数量的线程池
public class Executors {
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
//只有一个线程的线程池
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
//有线程复用就用,没有就创建,队列为0,线程总数无穷大
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
//
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
}

测试代码:

public class ThreadPoolExecutorTest {
	public static void main(String[] args) {
		ExecutorService es = new ThreadPoolExecutor(5, 
				5, 
				0L, 
				TimeUnit.MILLISECONDS, 
				new SynchronousQueue<Runnable>(),
				new ThreadFactory() {
					@Override
					public Thread newThread(Runnable r) {
						Thread th =new Thread(r);
						System.out.println("create"+th);
						return th;
					}
				}
				);
		for (int i = 0; i < 5; i++) {
			es.submit(new Runnable() {
				@Override
				public void run() {
					System.out.println("Runnable");
				}});
		}
	}
}

常用api

execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。

shutdown()和shutdownNow()是用来关闭线程池的。

还有很多其他的方法:

比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友可以自行查阅API。