多线程实现机制

并发编程简单复习

volatile

实现原则:

  • 通过lock前缀指令,引起处理器缓存写回内存。(锁总线)

  • 一个处理器的缓存写回内存,会导致其他处理器的缓存无效。(轮询)

synchronized

  • 对于普通同步方法,锁是当前实例对象。

  • 对于静态同步方法,锁是当前类的Class对象。

  • 对于同步方法块,锁是Synchonized括号里配置的对象。

原子操作

不可分割的操作

如何保证原子性呢?

  • 处理器的总线锁、缓存锁。

如何实现原子操作呢?

  • 自旋CAS

    存在三个问题:

    • ABA死锁 -> 每次更新加版本号

    • 循环时间长,开销大

    • 只能保证一个共享变量的原子操作

  • 使用锁机制

JVMTEP - 2 中简单带过了对象头机制,以下开始认识之。

可见,数组类型对象头为3字宽,否则为2

Java对象头里的MarkWord里默认存储对象的HashCode、分代年龄和锁标记位。

32bit情况
64-bit情况

公平锁与非公平锁

公平锁会确保线程按照请求的顺序获得锁,非公平锁允许一些线程在等待队列之前直接获取锁。前者性能较高,后者可能会导致饥饿。

比如说,ReentrantLock默认就是非公平锁。

锁级别升级

锁膨胀的方向:无锁->偏向锁->轻量级锁->重量级锁,并且膨胀方向不可逆。我们来认识这些锁级别。

偏向锁

研究发现,大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,为了让线程获得锁的代价更低而引入了偏向锁。当一个线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里存储锁偏向的线程ID,匹配的线程不需要进行CAS(compare and swap)操作。因为偏向锁不会主动释放锁,每次获取锁都要比较。如果不相等(其他线程),查看原线程是否存活。不存活则锁对象被重置为无锁状态,供其他线程竞争。如果根据原线程是否还需要这个锁,设置为无锁状态或升级为轻量级锁进行后续操作。

全局安全点

轻量级锁

多个线程交替进入临界区。只需要依靠一条CAS原子指令就可以完成锁的获取及释放。当存在锁竞争的情况下,执行CAS指令失败的线程将调用操作系统互斥锁进入到阻塞状态,当锁被释放的时候被唤醒。通过CAS操作让它自旋等待锁对象的释放。

由于自旋存在CPU的空转,若存在多个自旋线程,则升级为重量级锁,由操作系统monitor统一管理。

重量级锁

当多个线程同时在竞争锁对象时,阻塞所有等待竞争的线程,而不是让它们空转,等待原线程释放锁后再竞争。

image.png
  1. 线程进入重量级锁获取队列,即Contention List队列的尾部。

  2. 等待Owner线程解锁:

    • 如果Entry List为空,那么会先将Contention List中队列尾部的部分线程移动到Entry List

    • 如果Entry List不为空,从Entry List中取一个线程,把锁竞争的权利交给OnDeck

  3. OnDeck线程获取到锁,成为Owner线程进行执行。

  4. Owner线程调用锁对象的wait()方法进行等待,会移动到Wait Set中,并且会释放CPU资源,也同时释放锁,

  5. 当其他线程调用锁对象的notify()方法,之前调用wait方法等待的这个线程才会从Wait Set移动到Entry List,等待获取锁。

双重检查锁定

这是懒汉单例模式的代码:

-> 初始化和设定对象分配的地址指令重排,则会导致对象未初始化。

线程 B 将会访问到一个还未初始化的对象 。

解决方案1:把instance声明为volatile

private volatile static Instance instance ;

解决方案2:延迟类初始化方案

其中第6行满足类初始化(还记得吗,“万不得已”时)的条件:

  • 类中声明的一个静态字段被使用,而且这个字段不是一个常量字段。

实质:允许非构造线程(这里指线程 B)“看到“这个重排序。

不允许非构造线程(这里指线程 B)“看到“这个重排序

下面我们来看JVM类初始化的同步处理机制,五步:

  1. 通过在 Class 对象上同步(即获取 Class 对象的初始化锁),来控制类或接口的初始化。这个获取锁的线程会一直等待,直到当前线程能够获取到这个初始化锁

  2. 线程 A 执行类的初始化,同时线程 B 在初始化锁对应的 condition 上等待 。

  3. 线程 A 设置 state = initialized, 然后唤醒在 condition 中等待的所有线程 。

  4. 线程 B 结束类的初始化处理 。

    这个 happens-before 关系将保证:线程 A 执行类的初始化时的写入操作(执行类的静态初始化和初始化类中声明的静态字段),线程 B 一定能看到

  5. 线程 C 执行类的初始化的处理。

    happens-before 关系将保证:线程 A 执行类的初始化时的写入操作,线程 C 一定能看到 。

并发编程实例概览

ThreadLocal的使用

通常情况下,我们创建的变量是可以被任何一个线程访问并修改的。ThreadLocal类使得每一个线程都有自己的专属本地变量,只能被当前线程访问,其他线程无法访问和修改。ThreadLocal支持泛型,以 ThreadLocal 对象为键、任意对象为值。这个存储结构附带在线程上,也就是说一个线程可以根据键对象查询到绑定在这个线程上的专属值。可用于:订单ID等。

可以通过set(T)方法来设置这个值,在当前线程下再通过get()方法获取到原先设置的值。第一次get方法调用时会进行初始化(如果 set 方法没有调用)。

如何实现?

实际上,ThreadLocal的值是放入了当前线程的一个ThreadLocalMap实例中,所以只能在本线程中访问。

底层分析

因为ThreadLocal实例实际上也是被其创建的类持有(更顶端应该是被线程持有)。而ThreadLocal的值其实也是被线程实例持有。它们都是位于堆上,只是通过一些技巧将可见性修改成了线程可见。

ThreadLocal正确使用并不会产生内存泄露,因为ThreadLocalMap在选择key的时候,并不是直接选择ThreadLocal实例,而是ThreadLocal实例的弱引用。不过…使用完 ThreadLocal方法后,建议最好手动调用remove()方法。

线程池

池化

线程池技术能够很好地解决频繁的短小任务时的线程开销问题,它预先创建了若干数量的线程,并且不能由用户直接对线程的创建进行控制,在这个前提下重复使用固定或较为固定数目的线程来完成任务的执行。一方面,消除了频繁创建和消亡线程的系统资源开销,另一方面,面对过量任务的提交能够平缓劣化。即:降低资源消耗、提高响应速度、提高线程的可管理性和拓展性。

如:线程池技术带来服务器吞吐量的提高。

线程池有七大参数:

ThreadPoolExecutor 执行 execute 方法分下面 4 种情况 :

  1. 如果当前运行的线程少于 corePoolSize, 则创建新线程来执行任务(需获取全局锁)。

  2. 如果运行的线程等于或多于 corePoolSize, 则将任务加入 BlockingQueue 。

  3. 如果无法将任务加入 BlockingQueue (队列已满),则创建新的线程来处理任务(需要获取全局锁) 。

  4. 如果创建新线程将使当前运行的线程超出 maximumPoolSize, 任务将被拒绝,并调用 RejectedExecutionHandler.rej ectedExecution()方法.

工作线程:线程池创建线程时,会将线程封装成工作线程 Worker。 在执行完任务后,还会循环获取工作队列里的任务来执行。

使用

  • 创建:通过 ThreadPoolExecutor(下文详述)。

  • 提交:execute方法提交不需要返回值的任务,submit()提交需要返回值的任务。线程池会返回一个 future 类型的对象,通过这个 future 对象可以判断任务是否执行成功。

可以通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池。

生命周期:

  • 可以设置拒绝策略

配置和监控

CPU 密集型任务应配置尽可能小的线程,而 IO 密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程。

  • 建议使用有界队列 :有界队列能增加系统的稳定性和预警能力。

  • 通过扩展线程池进行监控。可通过继承线程池来自定义线程池,重写线程池的beforeExecuteafterExecuteterminated 方法,也可以在任务执行前后和线程池关闭前执行一些代码来进行监控 。 例如,监控任务的平均执行时间、最大执行时间和最小执行时间等几个方法在线程池里是空方法 。

Executor 框架

Java 的线程既是工作单元,也是执行机制。工作单元包括 Runnable 和 Callable , 而执行机制由 Executor 框架(用户级的调度器)提供 。

两级调度模型

Executor 框架主要由 3 大部分组成如下 。

  • 任务:包括被执行任务需要实现的接口:Runnable 接口或 Callable 接口 。

  • 任务的执行 。 包括任务执行机制的核心接口 Executor 以及继承自 Executor 的ExecutorService 接口 。

Executor 框架有两个关键类实现了 ExecutorService 接口:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor

  • 异步计算的结果 。 包括接口 Future 和实现 Future 接口的 FutureTask 类 。

主线程首先要创建实现 Runnable (无返回值)或者 Callable (有返回值)接口的任务对象,工具类 Executors 可以把一个 Runnable 对象封装为 一 个 Callable 对象,然后可以把 Runnable 对象直接交给ExecutorService 执行 或调用 submit。最后 ,主线程 可以执行 FutureTask.get()方法来等待任务执行完成 。

框架成员

  • ThreadPoolExecutor

    • FixedThreadPool:创建使用固定线程数的API 。

    • SingleThreadExecutor:使用单个线程的,常用于需顺序执行的任务。

    • CachedThreadPool:会根据需要创建新线程,是大小无界的线程池,适用于执行很多的短期异步任务的小程序。

  • ScheduledThreadPoolExecutor:适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程数的应用场景。

    SingleThreadScheduledExecutor 是只含一个线程的ScheduledThreadPoolExecutor。适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务 的应用场景 。

  • Future接口:作为submit的返回值。

ThreadPoolExecutor 详解

它是线程池的实现类。

通过 Executor 框架的工具类 Executors, 可以创建 3 种类型的 ThreadPoolExecutor.

  • FixedThreadPool:可重用固定线程数的线程池 。

  • SingleThreadExecutor :使用单个 worker 线程的 Executor 。 区别仅仅在于,corePoolSize 和 maximumPoolSize 被设置为1。

  • CachedThreadPool:会根据需要创建新线程的线程池 。

CachedThreadPool 使用没有容量的 SynchronousQueue 作为线程池的工作队列,其maximumPool 是无界的 。

当初始 maximumPool 为空,或者 maximumPool 中当前没有空闲线程时,将没有线程执行SynchronousQueue.poll (keepAliveTime, Time Unit.NANOSECONDS) 下,无法直接进行offerpoll的匹配 。 此时 CachedThreadPool 会创建一 个新线程执行任务。执行完后,会执行SynchronousQueue.poll,若60秒内主线程提交了新任务则将执行新任务,否则终止。

  • ScheduledThreadPoolExecutor:给定的延迟后运行任务,或者定期执行任务。

待调度任务 ScheduledFutureTask 主要包含 3 个成员变量:

  • long time, 表示这个任务将要被执行的具体时间 。

  • long sequenceNumber, 表示这个任务被添加到 ScheduledThreadPoolExecutor中的序号。

  • long period, 表示任务执行的间隔周期 。

DelayQueue 封装了 一 个 PriorityQueue,会对 ScheduledFutureTask 进行排序 。 (时间早的任务将被先执行 -> 序号小的任务先被执行)

获取任务分为 3 大步骤:

  1. 获取 Lock 。

  2. 获取周期任务 。

  • 如果 PriorityQueue 为空 ,当前线程到 Condition 中等待;

  • 如果 PriorityQueue 的头元素的 time 时间比当前时间大,等待到 time时间

  • 获取 PriorityQueue 的头元素,则唤醒在 Condition中等待的所有线程

  1. 释放 Lock 。

FutureTask

Future 接口和实现 Future 接口的FutureTask 类,代表异步计算的结果。当一个线程需要等待另一个线程把某个任务执行完后它才能继续执行, 此时可以使用FutureTask 。

实现原理:AQS

当执行 get方法时,如果 FutureTask 不是处于执行完成状态或取消状态,当前执行线程将到 AQS 的线程等待队列中等待。当某个线程执行 FutureTask.run()方法或 FutureTask.cancel( …)方法时,会唤醒线程等待队列的第一个线程。


参考: https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html 《Java并发编程的艺术》

最后更新于

这有帮助吗?