Java的线程和通信

Java的线程使用和线程间通信

创建线程

本质都是实现Runnable接口,实现run方法,来创建线程任务;

1、继承Thread

  • 缺点:Java只有单继承,占用了继承的位置;

2、实现Runnable

  • 无返回值的线程任务;
  • 创建Thread,传入Runnable接口,也可以实现线程;

3、实现Callable

  • 有返回值的线程任务:
    Future<T>

4、利用线程池

  • 向线程池提交Runnable、Callable任务;

线程状态

线程状态

1、new:新建; 2、Runnable:可执行;等待CPU调度; 3、Running:正在执行; 4、Blocked:等待获取一个排他锁; 5、Waiting:无限等待状态;线程可以主动进入Waiting;并等待被唤醒; 6、Time Waiting:等待固定时长状态,无需其他线程唤醒,超时后自动唤醒; 7、Terminated:任务执行完毕/抛出异常后的死亡状态;

如何中断线程

1、暴力中断:stop():x:禁止使用,直接杀死线程;

@Test
public void stop_thread(){
    CustomThread thread = new CustomThread();
    thread.start();
    thread.stop();  // native 方法暴力中断,不可把控
}

2、使用Thread自带的中断标志位,到达自定义的安全点中断:

@Test
public void interrupted_thread() throws InterruptedException {
    CustomThread thread = new CustomThread();
    thread.start();
    TimeUnit.MILLISECONDS.sleep(100);
    thread.interrupt();
    TimeUnit.SECONDS.sleep(1);
}

class CustomThread extends Thread {
    @Override
    public void run() {
        while (!Thread.interrupted()) {
            System.out.println("running");
        }
        System.out.println("thread is stopped"); // quit
    }
}

3、线程内部自行中断:

interrupt()
,设置线程中断标志位为true;仅仅是改变中断状态,不会中断正在运行的线程;

  • 真正要中断,还是需要手动判断中断标志位,退出线程;
  • 如果线程处于阻塞状态,则会抛出异常;

4、自定义标志位,与2相同

@Test
public void interrupted_thread() throws InterruptedException {
    CustomThread thread = new CustomThread();
    thread.start();
    TimeUnit.MILLISECONDS.sleep(100);
    thread.cancel();
    TimeUnit.SECONDS.sleep(1);
}

class CustomThread extends Thread {
    private boolean cancel;
    @Override
    public void run() {
        while (!cancel) {
            System.out.println("running");
        }
        System.out.println("thread is stopped"); // quit
    }

    public void cancel() {
        this.cancel = true;
    }
}

异步任务

主业务中某些不影响主业务,且耗时的操作可以使用异步的方式执行;

异步执行的方式:

  1. 直接创建一个线程执行;
  2. 提交到线程池;
  3. 如果需要获取执行结果,使用
    • Future:通过
      get()
      阻塞等待结果
    • ==CompletableFuture==:通过回调的方式处理结果;

线程间通信

两种实现:

  • 共享内存:存在并发安全问题
  • 消息传递:消息队列;

1. synchronized + wait + notify

synchronized是依赖于对象的,线程间通信也需要依赖于对象;

因此要使用

Object
下的
wait()
notify()
方法

2. ReentrantLock + Condition

ReentrantLock是JDK级别的锁,需要结合Condition的API进行线程通信;

3. volatile

volatile仅解决了并发原则中的可见性和有序性,不保证原子性;

同时Java线程是系统级线程,也就是说每个线程占用一个CPU核,同时拥有一个CPU缓存;因此不同线程操作同一个变量,线程间是不可见的;

volatile关键字修饰的变量禁用CPU缓存,是操作系统支持的,读取写入变量都必须基于内存;

其中可见性,也可以算是线程间的通信;

4. CountDownLatch

用于线程执行顺序,主任务等待一批子任务结束后,再执行;基于AQS;

类似与

Thread.join()
,join基于
Object的wait()
notify()
实现

构造方法

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

核心方法

  • await():阻塞当前线程,直到计数器为零为止;
  • await(long timeout, TimeUnit unit):可以指定阻塞时长;
  • countDown():计数器减1,如果计数达到零,释放所有等待的线程。
  • getCount():返回当前计数

5. CyclicBarrier

可以理解为重复使用的

CountDownLatch

启动一个线程组,其中有一个主线程,其他线程执行到await()点后,启动主线程,并且重置计数器,其他线程再继续执行;

public static void main(String[] args) {

    CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
        // 主线程
        System.out.println("All Thread Is Ok.");
    });
    // 其他线程
    IntStream.range(0, 3).forEach(i -> new Thread(() -> {
        try {
            System.out.println("Thread-" + i + " start.");
            TimeUnit.SECONDS.sleep(1);
            cyclicBarrier.await();  // 到达

            TimeUnit.SECONDS.sleep(1);
            System.out.println("Thread-" + i + " continue.");
            cyclicBarrier.await();  // 到达

            TimeUnit.SECONDS.sleep(1);
            System.out.println("Thread-" + i + " done.");
            TimeUnit.SECONDS.sleep(1);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }).start());
}
----------------------------------------------------
Thread-0 start.
Thread-2 start.
Thread-1 start.
All Thread Is Ok.
Thread-2 continue.
Thread-1 continue.
Thread-0 continue.
All Thread Is Ok.
Thread-2 done.
Thread-1 done.
Thread-0 done.

6. Semaphore

信号量:用来控制同时访问特定资源的线程数量;基于AQS实现;

结构上同ReentrantLock类似,同样拥有内部类:Sync、NonfairSync、FairSync;

构造方法

// 默认非公平
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

// 公平 true
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

核心方法:

  • acquire(int permits):获取信号量:state减n
    • 公平需要排队;
    • 不公平各自自旋竞争;
  • tryAcquire(int permits):不阻塞,直接返回true/false;
  • tryAcquire(int permits, long timeout, TimeUnit unit):带超时时间;
  • release(int permits):释放信号量:state加n
  • availablePermits():返回可用信号量;
  • reducePermits():减少总数;