创建线程
本质都是实现Runnable接口,实现run方法,来创建线程任务;
1、继承Thread
- 缺点:Java只有单继承,占用了继承的位置;
2、实现Runnable
- 无返回值的线程任务;
- 创建Thread,传入Runnable接口,也可以实现线程;
3、实现Callable
- 有返回值的线程任务:Future<T>
4、利用线程池
- 向线程池提交Runnable、Callable任务;
线程状态
1、new:新建; 2、Runnable:可执行;等待CPU调度; 3、Running:正在执行; 4、Blocked:等待获取一个排他锁; 5、Waiting:无限等待状态;线程可以主动进入Waiting;并等待被唤醒; 6、Time Waiting:等待固定时长状态,无需其他线程唤醒,超时后自动唤醒; 7、Terminated:任务执行完毕/抛出异常后的死亡状态;
如何中断线程
1、暴力中断:stop():x:禁止使用,直接杀死线程;
@Test public void stop_thread(){ CustomThread thread = new CustomThread(); thread.start(); thread.stop(); // native 方法暴力中断,不可把控 }
2、使用Thread自带的中断标志位,到达自定义的安全点中断:
@Test public void interrupted_thread() throws InterruptedException { CustomThread thread = new CustomThread(); thread.start(); TimeUnit.MILLISECONDS.sleep(100); thread.interrupt(); TimeUnit.SECONDS.sleep(1); } class CustomThread extends Thread { @Override public void run() { while (!Thread.interrupted()) { System.out.println("running"); } System.out.println("thread is stopped"); // quit } }
3、线程内部自行中断:
- 真正要中断,还是需要手动判断中断标志位,退出线程;
- 如果线程处于阻塞状态,则会抛出异常;
4、自定义标志位,与2相同
@Test public void interrupted_thread() throws InterruptedException { CustomThread thread = new CustomThread(); thread.start(); TimeUnit.MILLISECONDS.sleep(100); thread.cancel(); TimeUnit.SECONDS.sleep(1); } class CustomThread extends Thread { private boolean cancel; @Override public void run() { while (!cancel) { System.out.println("running"); } System.out.println("thread is stopped"); // quit } public void cancel() { this.cancel = true; } }
异步任务
主业务中某些不影响主业务,且耗时的操作可以使用异步的方式执行;
异步执行的方式:
- 直接创建一个线程执行;
- 提交到线程池;
- 如果需要获取执行结果,使用
- Future:通过get()阻塞等待结果
- ==CompletableFuture==:通过回调的方式处理结果;
- Future:通过
线程间通信
两种实现:
- 共享内存:存在并发安全问题
- 消息传递:消息队列;
1. synchronized + wait + notify
synchronized是依赖于对象的,线程间通信也需要依赖于对象;
因此要使用
2. ReentrantLock + Condition
ReentrantLock是JDK级别的锁,需要结合Condition的API进行线程通信;
3. volatile
volatile仅解决了并发原则中的可见性和有序性,不保证原子性;
同时Java线程是系统级线程,也就是说每个线程占用一个CPU核,同时拥有一个CPU缓存;因此不同线程操作同一个变量,线程间是不可见的;
volatile关键字修饰的变量禁用CPU缓存,是操作系统支持的,读取写入变量都必须基于内存;
其中可见性,也可以算是线程间的通信;
4. CountDownLatch
用于线程执行顺序,主任务等待一批子任务结束后,再执行;基于AQS;
类似与
构造方法
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
核心方法
- await():阻塞当前线程,直到计数器为零为止;
- await(long timeout, TimeUnit unit):可以指定阻塞时长;
- countDown():计数器减1,如果计数达到零,释放所有等待的线程。
- getCount():返回当前计数
5. CyclicBarrier
可以理解为重复使用的
启动一个线程组,其中有一个主线程,其他线程执行到await()点后,启动主线程,并且重置计数器,其他线程再继续执行;
public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> { // 主线程 System.out.println("All Thread Is Ok."); }); // 其他线程 IntStream.range(0, 3).forEach(i -> new Thread(() -> { try { System.out.println("Thread-" + i + " start."); TimeUnit.SECONDS.sleep(1); cyclicBarrier.await(); // 到达 TimeUnit.SECONDS.sleep(1); System.out.println("Thread-" + i + " continue."); cyclicBarrier.await(); // 到达 TimeUnit.SECONDS.sleep(1); System.out.println("Thread-" + i + " done."); TimeUnit.SECONDS.sleep(1); } catch (Exception e) { throw new RuntimeException(e); } }).start()); } ---------------------------------------------------- Thread-0 start. Thread-2 start. Thread-1 start. All Thread Is Ok. Thread-2 continue. Thread-1 continue. Thread-0 continue. All Thread Is Ok. Thread-2 done. Thread-1 done. Thread-0 done.
6. Semaphore
信号量:用来控制同时访问特定资源的线程数量;基于AQS实现;
结构上同ReentrantLock类似,同样拥有内部类:Sync、NonfairSync、FairSync;
构造方法
// 默认非公平 public Semaphore(int permits) { sync = new NonfairSync(permits); } // 公平 true public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
核心方法:
- acquire(int permits):获取信号量:state减n
- 公平需要排队;
- 不公平各自自旋竞争;
- tryAcquire(int permits):不阻塞,直接返回true/false;
- tryAcquire(int permits, long timeout, TimeUnit unit):带超时时间;
- release(int permits):释放信号量:state加n
- availablePermits():返回可用信号量;
- reducePermits():减少总数;