目录:
线程池参数解析
线程的生命周期
Synchorized错误&正确使用方式
Future使用
CountDownLatch使用
1. 线程池参数解析
1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize:线程池的基本大小,即在没有任务需要执行的时候线程池的大小, 并且只有在工作队列满了的情况下才会创建超出这个数量的线程**。这里需要注意的是:在刚刚创建ThreadPoolExecutor的时候,线程并不会立即启动,而是要等到有任务提交时才会启动,除非调用了prestartCoreThread/prestartAllCoreThreads事先启动核心线程。再考虑到keepAliveTime和allowCoreThreadTimeOut超时参数的影响,所以没有任务需要执行的时候,线程池的大小不一定是corePoolSize。
maximumPoolSize:线程池的最大大小,当workQueue为无界队列时,此参数无效
keepAliveTime: 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize,如果allowCoreThreadTimeout=true,则会直到线程数量=0
unit: keepAliveTime的单位
workQueue: 用来保存等待被执行的任务的阻塞队列
threadFactory: 创建线程的工厂,通过自定义的线程工程可以给每个新建的线程设置一个具有意义的别名。默认使用DefaultThreadFactory。
handler: 线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,则必须采用想用的策略来处理。有一下四种策略
AbortPolicy:直接抛出异常,默认策略
CallerRunsPolicy:用调用者所在的线程来执行任务
DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务
DiscardPolicy:直接丢弃任务
新提交一个任务时的处理流程:
1、如果当前线程池的线程数还没有达到基本大小(poolSize < corePoolSize),无论是否有空闲的线程新增一个线程处理新提交的任务;
2、如果当前线程池的线程数大于或等于基本大小(poolSize >= corePoolSize) 且任务队列未满时 ,就将新提交的任务提交到阻塞队列排队,等候处理workQueue.offer(command);
3、如果当前线程池的线程数大于或等于基本大小(poolSize >= corePoolSize) 且任务队列满时 ;
3.1、当前poolSize<maximumPoolSize,那么就新增线程 来处理任务;
3.2、当前poolSize=maximumPoolSize,那么意味着线程池的处理能力已经达到了极限,此时需要拒绝 新增加的任务。至于如何拒绝处理新增的任务,取决于线程池的饱和策略RejectedExecutionHandler。
2. 线程生命周期
新创建(New)
就绪(Ready)
运行(Running)
阻塞(Blocked)
销毁(Dead)
3.Synchorized错误&正确使用方式
错误的使用方式如下:以下在线程的一个实例方法中加了Synchorized,以为万事大吉了。哪知道打印全部都乱套了。增加Synchorized关键字的时候一定要注意同一个实例的方法在多线程中才能有效,或者同一个类的static方法、或一个类 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 package com.jh.thread;import java.util.Random;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ThreadMainTest { static class B implements Runnable { private Data data; public B (Data data) { this .data = data; } @Override public void run () { while (true ){ Random random = new Random(); try { int sleepTime = random.nextInt(10 ); Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } data.plusOne(); } } private synchronized void plusOne () { this .data.setDint(this .data.getDint() + 1 ); System.out.println(Thread.currentThread().getId() + " := " + data.getDint()); } } public static void main (String args[]) { ThreadMainTest.Data data = new ThreadMainTest.Data(1 ); ExecutorService executorService = Executors.newFixedThreadPool(3 ); executorService.submit(new B(data)); executorService.submit(new B(data)); executorService.submit(new B(data)); } public static class Data { public int dint = 1 ; public Data (int dint) { this .dint = dint; } public void setDint (int dint) { this .dint = dint; } public int getDint () { return dint; } private synchronized void plusOne () { this .setDint(this .getDint() + 1 ); System.out.println(Thread.currentThread().getId() + " := " + getDint()); } } }
4.Future使用
Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static void main (String[] args) throws InterruptedException, ExecutionException { futureTest(); countDownLatchTest(); } private static void countDownLatchTest () throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(5 ); CountDownLatch start = new CountDownLatch(1 ); CountDownLatch latch = new CountDownLatch(5 ); for (int i = 0 ; i < 5 ; i++) { executorService.submit(new CountDownThread(start, latch, "worker" + i)); } System.out.println("预备" ); start.countDown(); System.out.println("准备开始工作" ); latch.await(); System.out.println("all work finish" ); executorService.shutdown(); }
5.CountDownLatch使用
CountDownLatch是通过计数器的方式来实现,计数器的初始值为线程的数量。每当一个线程完成了自己的任务之后,就会对计数器减1,当计数器的值为0时,表示所有线程完成了任务,此时等待在闭锁上的线程才继续执行,从而达到等待其他线程完成任务之后才继续执行的目的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 static class CountDownThread implements Runnable { CountDownLatch start; CountDownLatch latch; String workName; public CountDownThread (CountDownLatch start, CountDownLatch latch, String workName) { this .start = start; this .latch = latch; this .workName = workName; } @Override public void run () { try { start.await(); this .doWork(); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); } } private void doWork () { Random random = new Random(); try { int sleepTime = random.nextInt(10 ); Thread.sleep(sleepTime*1000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(workName); } } public static void futureTest () throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(3 ); Future future = executorService.submit(new Callable() { @Override public Object call () throws Exception { System.out.println("开始执行 " + Thread.currentThread().getId() + "号线程" ); Thread.sleep(10000 ); System.out.println(Thread.currentThread().getId() + "号线程 执行完准备返回" ); return 1 ; } }); Future future1 = executorService.submit(new Callable() { @Override public Object call () throws Exception { System.out.println("开始执行 " + Thread.currentThread().getId() + "号线程" ); Thread.sleep(10000 ); System.out.println(Thread.currentThread().getId() + "号线程 执行完准备返回" ); return 2 ; } }); System.out.println("已经提交线程" ); System.out.println("future 返回结果 := " + future.cancel(true )); System.out.println("future1 返回结果 := " + future1.get()); FutureTask futureTask = new FutureTask<Integer>(new Callable<Integer>() { @Override public Integer call () throws Exception { System.out.println("通过 Thread 执行的 FutureTask" ); return 11 ; } }); Thread t1 = new Thread(futureTask); t1.start(); System.out.println("thread futureTask return result:=" + futureTask.get()); executorService.shutdown(); }