0%

如何获取有性能问题的SQL

通过用户反馈获取存在性能问题的SQL

通过慢查询日志获取存在性能问题的SQL

实时获取存在性能问题的SQL

使用慢查询日志获取有性能问题的SQL

slow_query_log 启动停止记录慢查询日志

slow_query_log_file指定慢查询日志存储路径及文件

long_query_time指定记录慢查询日志SQL执行时间的伐值

  • 默认10秒

记录所有符合条件的SQL

  • 包括查询语句
  • 数据修改语句
  • 已经回滚的SQL

log_queries_not_using_indexes是否记录未使用索引的SQL

常用的慢查询日志分析工具

mysqldumpslow

pt-query-digest

实时获取性能问题SQL

information_schema -> PROCESSLIST表

SQL的解析预处理及生成执行计划

搞清楚这些查询为什么会慢

  • 客户端发送sQL请求到服务器
  • 检查是否可以在查询换存储命中
  • 服务器端执行SQL解析,预处理,再由优化器声称对应的执行计划
  • 根据执行计划,调用存储引擎API来查询数据
  • 将结果返回给客户端

查询缓存对SQL性能的影响

  • query_cache_type

    • 设置查询缓存是否可用
  • query_cache_size

    • 设置查询缓存的内存大小
  • query_cache_limit

    • 设置查询缓存可用存储的最大值
  • query_cache_wlock_invalidate

    • 设置数据表被锁后是否返回缓存中的数据
  • query_cache_min_res_unit

    • 设置查询缓存非配的内存块的最小单位
  • 读写比较频繁的系统,建议关闭缓存

MySQL依照执行计划和存储引擎进行交互

  • 解析SQL

    • 语法解析阶段通过关键字对MySQL语句进行解析,并生成一颗对应的解析树

    • MySQL解析器将使用MySQL语法规则验证和解析查询

      • 检查语法是否使用了正确的关键字
      • 关键字的顺序是否正确
  • 预处理

    • 根据MySQL规则进一步检查解析树是否合法

      • 查询中所涉及的表和数据列是否存在,名字或者别名是否存在歧义等
    • 查询优化器生成查询计划

  • 优化SQL执行计划

SQL的解析预处理以及生成执行计划

  • 会造成MySQL生成错误的执行计划的原因

    • 统计信息不准确

    • 执行计划中的成本估算不等同于实际的执行计划的成本

      • MySQL服务器层不知道哪些页面在内存中
      • 哪些页面在磁盘上
      • 哪些页面要顺序读取
      • 哪些页面需要随机读取
    • MySQL优化器所认为的最优可能与你所认为的最优不一样

      • 基于成本模型选择最优的执行计划
    • MySQL从不考虑其他并发的查询,这可能会影响当前查询的速度

    • MySQL有时候也会基于一些固定的规则来生成执行计划

    • MySQL不会考虑不收其控制的成本

      • 存储过程
      • 用户自定义的函数

MySQL优化器可优化的SQL类型

  • 重新定义表的关联顺序

  • 将外链接转化为内连接

  • 使用等价变换规则

  • 优化count,min,max

    • select tables optimized away
  • 建一个表达式转化为常数表达式

  • 子查询优化

    • 子查询->连接查询
  • 提前终止查询

  • 对in条件进行优化

如何确定查询处理各个阶段所消耗的时间

使用profile

  • set profiling=1

    • session级别
  • 执行查询

  • show profiles

  • show profile for query x

  • show profile cpu for query x

使用performance_schema

特定SQL的查询优化

大批量删除

  • 分批次删除

修改大表的表结构

  • 主从切换,先从后主

  • 新老表同步,比较复杂

    • pt-online-schema-change

优化not in 和<>查询

  • 优化为LEFT JOIN方法

使用汇总表优化查询

  • 截止到前一天的数据汇总count
  • 今天的全部数据进行count
  • 把两部分数据汇总

附Xmind

SQL查询优化

数据库分库分表的几种方式

把一个实例中的多个数据库拆分到不同的实例

把一个库中的表分离到不同的数据库中

数据库分片前的准备

对一个库中相关表进行水平拆分到不同实例的数据库中

如何选择分区键

  • 分区键要能尽量避免跨分片查询的发生
  • 尽量使各个分片中的数据平均

如何存储无需分片的表

  • 冗余到每一个分片中
  • 使用额外的节点统一存储

如何在节点上部署分片

  • 每个分片使用单一数据库,并且数据库名也相同
  • 将多个分片表存储在一个数据库中,并且在表名上加入分片号后缀
  • 在一个节点中部署多个数据库,每个数据库中包含一个分片

如何分配分片中的数据

  • 按分区键Hash值取模分配分片数据
  • 按分区键的范围来分配分片数据
  • 利用分区键和分片的映射表来分配分片数据

如何生成全局唯一Id

  • 使用auto_increment_increment和auto_increment_offset参数

  • 配置全局节点生成Id

    • 容易成为系统瓶颈
  • 在Redis等缓存服务器中创建全局ID

附件:Xmind图

数据库分库分表设计

MySQL支持的索引类型

Btree索引

  • 通过B+Tree结构存储数据

  • 加快数据的查询速度

  • 更适合进行范围查找

  • 顺序存储

  • 什么情况下可以使用到

    • 全值匹配的查询
    • 匹配最左前缀的查询
    • 匹配列前缀查询
    • 匹配范围值的查询
    • 精确匹配左前列并范围匹配另外一列
    • 只访问索引的查询
  • 使用限制

    • 如果不是按照索引最左列开始查找,则无法使用索引
    • 使用索引时不能跳过索引中的列
    • not in和<>不能使用索引
    • 如果查询中有某个列的范围查询,则其右边所有列都无法使用索引

Hash索引

  • 特点

    • 基于hash表实现,只有查询条件精确匹配hash索引中的所有列的时候才能够使用hash索引
    • 对于hash索引中的所有列,存储引擎都会为每一行计算一个hash码,hash索引中存储的就是hash码
  • 限制

    • hash索引必须进行二次查找
    • hash索引无法用于排序
    • hash索引无法进行范围查找,不支持部分索引查找
    • hash索引中的hash码的计算可能存在has冲突

为什么要使用索引

  • 索引大大减少了存储引擎需要扫描的数据量

  • 索引可以帮助我们进行排序,以避免使用临时表

  • 索引可以把随机I/O改变为顺序I/O

索引不是越多越好

  • 索引会增加写操作的成本

  • 太多的索引会增加查询优化器的选择时间

索引优化策略

索引列上不能使用表达式或者函数

前缀索引和索引列的选择性

  • 索引的选择性是不重复的索引值和表的记录数的比值

联合索引

  • 如何选择索引列的顺序

    • 经常被使用到的列优先
    • 选择性高的列优先
    • 宽度小的列优先

覆盖索引

  • 优点

    • 可以优化缓存,减少磁盘IO操作
    • 可以减少随机I/O,变随机I/O操作为顺序I/O操作
    • 可以避免对Innodb主键索引的二次查询
    • 可以避免MyISAM表进行系统调用
  • 无法使用覆盖索引的情况

    • 存储引擎不支持覆盖索引
    • 查询中使用了太多的列
    • 使用了双%号的like查询

使用索引来优化查询

  • 使用索引扫描来优化排序

    • 通过排序操作
    • 按照索引顺序扫描数据
    • 索引的列顺序和Order By子句的顺序完全一致
    • 索引中所有列的方向(升序,降序)和Order By子句完全一致
    • Order by中的字段全部在关联表中的第一张表中
  • 模拟Hash索引优化查询

    • 只能处理键值的全值匹配
    • 所使用的Hash函数决定着索引键的大小
  • 利用索引优化锁

    • 索引可以减少锁定的行数
    • 索引可以加快处理速度,同时也加快了锁的释放

索引的维护和优化

  • 删除重复和冗余的索引

  • 查找未被使用过的索引

  • 更新索引统计信息以及减少索引碎片

    • analyze table table_name
    • optimize table table_name

附件:Xmind图

MySQL数据库索引优化

目的

减少数据冗余

尽量避免数据维护中出现更新,插入和删除异常

  • 插入异常

    • 如果表中的某个实体随着另外一个实体存在而存在
  • 更新异常

    • 如果更高表中的某个实体的单独属性时,需要对多行进行更新
  • 删除异常

    • 如果删除表中的某一实体则会导致其他实体的消失

节约数据存储空间

提高查询效率

数据库设计范式

第一设计范式

  • 数据库中表的所有字段都只具有单一属性
  • 单一属性的列是由基本的数据类型所构成的
  • 设计出来的表都是简单的二维表

第二设计范式

  • 一个表中具有一个业务主键

第三设计范式

  • 每一个非主属性,既不部分依赖于也不传递依赖于业务主键

反范式化设计

  • 少量的数据冗余,提高查询效率。空间换时间。

优缺点

  • 范式化

    • 优点

      • 可以尽量的减少数据冗余

        • 数据表更新快,体积小
      • 范式化的更新操作比反范式化更快

      • 范式化的表通常比反范式化更小

    • 缺点

      • 对于查询需要对多个表进行关联
      • 更难进行索引优化
  • 反范式化

    • 优点

      • 减少表关联
      • 更好的进行索引优化
    • 缺点

      • 存在数据冗余以及数据维护异常
      • 对数据的修改需要更多的成本

物理设计原则

定义数据库,表以及字段的命名规范

  • 可读性原则
  • 表意性原则
  • 长名原则

选择合适的存储引擎

为表中的字段选择合适的数据类型

  • 当一个列可以选择多种数据类型的时候,应该优先考虑数字类型,其次是日期或者二进制类型,最后是字符类型。对于相同级别的数据类型,应该优先考虑占用空间小的数据类型

  • Varchar和char

    • varchar

      • Varchar类型的存储特点

        • VARCHAR用于存储变长字符串,只占用必要的存储空间
        • 列的最大长度小于255则只占用一个额外的字节,用于记录字符串长度
        • 列的最大长度大于255则占用两个额外的字节,用于记录字符串长度
      • VARCHAR长度的选择问题

        • 使用最小的符合需求的长度
        • varchar(5)和varchar(200)存储‘MYSQL’字符串性能不同
      • varchar的适用场景

        • 字符串列最大长度比平均长度大很多
        • 字符串列很少被更新
        • 使用了多字节字符集存储字符串
      • char类型是定长的

    • char

      • Char类型的存储特点

        • 字符串存储在char类型的列中会删除末尾的空格
        • Char类型的最大宽度为255
      • char类型的适用场景

        • char类型适合存储长度近似的值

          • 比如MD5,手机号,身份证号
        • char类型适合存储短字符串

        • char类型适合经常更新的字符串列

  • 如何存储日期类型

    • DATATIME类型

      • 占用8字节存储空间

      • 时区无关

      • 时间范围

        • 1000-01-01 00:00:00 ~ 9999-12-31 23:59:59
      • 存储格式

        • YYYY-MM-DD HH:MM:SS[.fraction]
    • TIMESTAMP

      • 存储了由格林尼治时间到当前时间的秒数

      • 占用4字节

      • 时间范围

        • 1970-01-01到2038-01-19
      • 依赖于所指定的时区

      • 在行的数据修改时,可以自动修改timestamp列的值(根据时间戳自动更新)

      • 默认第一个列是随着更改自动更新

    • date类型

      • 只需要3个字节
      • 可以使用时间函数
      • 时间范围1000-01-01~9999-12-31
    • time类型

      • 用于存储时间数据
      • HH:MM:SS
    • 存储日期时间数据的注意事项

      • 不要使用字符串类型来存储日期时间数据
      • 日期时间类型通常比字符串占用的存储空间小
      • 日期时间类型在进行查找过滤时可以利用日期来进行对比
      • 日期时间类型还有着丰富的处理函数,可以方便的对时间进行日期计算
      • 使用Int存储日期时间不如使用Timestamp类型

附件:Xmind图

数据库结构优化

服务器硬件

CPU资源

  • Web应用-Core重于频率

内存大小

  • 主板支持最大频率的内存

网络

IO子系统

  • 传统硬盘

    • 存储容量
    • 传输速度
    • 访问时间
    • 主轴转速
    • 物理尺寸
  • RAID

    • RAID0

      • 多个独立磁盘串行
      • 最简单
      • 性价比最高
      • 数据没有冗余
    • RAID1

      • 磁盘镜像
      • 安全性最高
    • RAID5

      • 分布式奇偶校验磁盘阵列
    • RAID10

      • 分片镜像
  • 固态存储

    • 更好的随机读写性能

    • 更好的支持并发

    • 更容易损坏

    • 使用场景

      • 大量随机I/O场景
      • 单线程负载I/O瓶颈
  • 网络存储

    • SAN

      • 大量顺序读写
    • NAS

      • 有一定网络延迟
    • 适合数据库备份

网络性能的限制

  • 延迟
  • 带宽
  • 网络质量影响

操作系统

CentOS

  • 内核相关参数(/etc/sysctl.conf)

    • 网络参数

      • net.core.somaxconn=65535 #监听队列长度
      • net.core.netdev_max_backlog=65535
      • net.ipv4.tcp_max_syn_backlog=65535
      • net.ipv4.tcp_fin_timeout =10
      • net.ipv4.tcp_tw_reuse =1
      • net.ipv4.tcp_tw_recycle =1
      • net.core.wmem_default = 87380
      • net.core.wmem_max = 16777216
      • net.core.rmem_default = 87380
      • net.core.rmem_max = 16777216
      • net.ipv4.tcp_keepalive_time =120
      • net.ipv4.tcp_keepalive_intvl =30
      • net.ipv4.tcp_keepalive_probes =3
    • 内核参数

      • kernel.shmmax=4294967295

        • Linux内核参数中最重要的参数之一,用于定义单个共享内存段的最大值

        • 注意事项

          • 这个参数应该设置的足够大,以便能在一个共享内存段下容纳下整个的Innodb缓冲池的大小
          • 这个值的大小对于64位Linux系统,可取的最大值为物理内存值-1byte,建议设置为大于物理内存的一半,一般取值大于Innodb缓冲池的大小即可。
      • vm.swappiness = 0

        • 当内存不足时会对性能产生比较明显的影响

        • 如果完全禁用Swap

          • 降低操作系统性能
          • 容易造成内存溢出,崩溃,或都被操作系统Kill
        • 结论

          • 需要保留交换分区,但是要控制何时使用交换分区
        • 为0

          • 除非内存使用满了,否则不要使用交换分区
  • 增加资源限制(/etc/security/limit.conf)

    • * soft nofile 65535

    • * hard nofile 65535

    • 解释

      • * 所有用户有效
      • soft指的是当前系统生效的配置
      • hard表明系统中所能设定的最大值
      • nofile表示所限制的资源师打开文件的最大数目
      • 65535 限制的数量
    • 结论:把可以打开的文件数量增加到65535个以保证可以打开足够多的文件句柄

    • 注意:这个文件的修改需要重启系统才可以生效

  • 磁盘调度策略(/sys/block/devname/queue/scheduler)

    • noop

      • 电梯式调度策略(适合闪存,RAM,嵌入式系统)
    • cfq

      • 完全公平队列(默认,不适合MySQL)
    • deadline

      • 截止时间调度策略(适合数据库)
    • anticipatory

      • 预料I/O调度测量(适合写入较多的环境,比如文件服务器,不适合数据库)
    • 修改方法

      • echo deadlin > /sys/block/sda/queue/scheduler
  • 文件系统影响

    • Linux

      • 推荐使用XFS

      • EXT3/4

        • 系统挂载参数(/etc/fstab)

          • data = writeback | ordered | journal

            • Innodb适用于writeback,ordered较慢,journal最慢,最安全
          • noatime,nodiratime

            • 禁用相关时间
          • 示例:

            • /dev/sda1/ext3 noatime,nodiratime,data=writeback 1 1

数据库存储引擎的选择

MyISAM

  • MySQL5.5之前的默认存储引擎

  • 存储为MYD,MYI两个文件

  • 特性

    • 并发性与锁级别

      • 表锁
    • 表损坏修复

      • check table tablename
      • repire table tablename
    • 支持索引类型

      • 全文索引
      • Text等前缀索引
    • 支持数据压缩

      • myisampack
      • 只读
  • 限制

    • 单表<256T
  • 适用场景

    • 非事务型应用

    • 只读类应用(支持压缩)

    • 空间类应用

      • 支持空间函数

Innodb

  • 使用表空间进行数据存储

    • innodb_file_per_table

      • ON:独立表空间:tablename.ibd
      • OFF:系统表空间:ibdataX
    • 比较

      • 系统表空间无法简单的收缩文件大小
      • 独立表空间可以通过optimize table命令收缩系统文件
      • 系统表空间会产生I/O瓶颈
      • 独立表空间可以同时向多个文件刷新数据
    • 建议

      • 对Innodb使用独立表空间
  • 系统表空间

    • Innodb数据字典信息
    • Undo回滚段
  • 特性

    • Innodb是一种事务性的存储引擎
    • 完全支持事务的ACID特性
    • Redo Log和Undo Log
    • 支持行级锁
    • 行级锁可以最大程度支持并发
    • 行级锁是由存储引擎层实现的
  • 什么是锁

    • 锁主要作用是管理共享资源的并发访问

    • 锁用于实现事务的隔离性

    • 锁的类型

      • 共享锁(读锁)
      • 独占锁(写锁)
    • 锁的粒度

      • 表级锁
      • 行级锁
    • 阻塞和死锁

  • Innodb状态检查

    • show engine innodb status
  • 适用场景

    • 使用大多数OLTP应用

CSV

  • 文件系统存储特点

    • 数据以文本方式存储在文件中
    • .csv文件存储表内容
    • CSM文件存储表的元数据如表的状态和数据量
    • frm文件存储表结构信息
  • 特点

    • 以CSV格式进行数据存储

    • 所有列必须都是不能为null的

    • 不支持索引

      • 不适合大表
    • 可以对数据文件直接进行编辑

      • 保存文本文件内容
  • 适用场景

    • 适合作为数据交换的中间表

Archive

  • 文件存储特点

    • 以Zlib对表数据进行压缩,磁盘I/O更少
    • 数据存储在ARZ为后缀的文件中
    • 只支持Insert和select操作
    • 只允许在自增ID列上加索引
  • 适用场景

    • 日志和数据采集类应用

Memory

  • 文件系统特点

    • 也称为HEAP存储引擎,数据保存在内存中
  • 功能特点

    • 支持HASH索引和BTree索引

      • HASH适合等值
      • BTree适合范围
    • 所有字段都为固定长度

    • 不支持Blog和Text等大字段

    • 使用表级锁

    • 最大大小由max_heap_table_size参数决定

      • 默认16M
  • 容易混淆的概念

    • Memory存储引擎表

    • 临时表

      • 系统使用临时表

        • 超过限制使用MyISAM临时表
        • 未超过限制使用Memory表
      • create temporary table 建立的临时表

  • 适用场景

    • 用于查找或者映射表
    • 用于保存数据分析过程中产生的中间表
    • 用于缓存周期性聚合数据的结果表
    • MEMory数据容易丢失,所以要求数据可再生

Federated

  • 特点

    • 提供了访问MySQL服务器上表的方法
    • 本地不存储数据,数据全部存放到远程服务器上
    • 本地需要保存表结构和服务器连接信息
  • 如何使用

    • 默认禁止,启用需要在启动时增加federated参数
  • 适用场景

    • 偶尔统计分析以及手工查询

如何选择正确的存储引擎

  • 一般选择InnoDB

  • 参考条件

    • 事务
    • 备份
    • 崩溃恢复
    • 存储引擎的特有特性
  • 尽量不要混合使用存储引擎

数据库参数配置

MySQL获取配置信息路径

  • 命令行参数
  • 配置文件

MySQL配置参数的作用域

  • 全局参数

    • set global 参数名=参数值;
    • set @@global.参数名:=参数值;
  • 会话参数

    • set [session] 参数名=参数值
    • set @@session.参数名:=参数值

内存配置相关参数

  • 确定可以使用的内存的上限

  • 确定MySQL的每个连接使用的内存

    • sort_buffer_size
    • join_buffer_size
    • read_buffer_size
    • read_rnd_buffer_size
  • 确定需要为操作系统保留多少内存

  • 如何为缓存池分配内存

    • Innodb_buffer_pool_size

      • 总内存 -(每个线程所需要的内存*连接数)- 系统保留内存
    • key_buffer_size

      • 主要MyISAM使用

IO相关配置参数

  • Innodb I/O 相关配置

    • Innodb_log_file_size

    • Innodb_log_files_in_group

    • 事务日志总大小 = Innodb_log_file_size * Innodb_log_files_in_group

    • Innodb_log_buffer_size

    • Innodb_flush_log_at_trx_commit

      • 0:每秒进行一次log写入cache,并flush log到磁盘
      • 1:每次事务提交执行log写入cache,并flush log到磁盘
      • 2:每次事务提交,执行log数据写入到cache,每秒执行一次flush log到磁盘
    • Innodb_flush_method=O_DIRECT

      • 关闭操作系统缓存(Linux建议)
    • Innodb_file_per_table = 1

    • Innodb_doublewriter = 1

  • MyISAM

    • delay_key_write

      • OFF:每次写操作后刷新键缓冲中的脏块到磁盘
      • ON: 支队在键表时制定了delay_key_write选项的表使用延迟刷新
      • ALL:对所有MyISAM表都是用延迟建写入

安全相关配置参数

  • expire_logs_days

    • 指定自动清理binlog的天数
  • max_allowed_packet

    • 控制MySQL可以接受的包的大小
  • skip_name_resolve

    • 禁用DNS查找
  • sysdate_is_now

    • 确保sysdate()返回确定性日期
  • read_only

    • 禁止非super权限的用户写权限
  • skip_slave_start

    • 禁用Salve恢复
  • sql_mode

    • 设置MySQL所使用的SQL模式
    • strict_trans_tables
    • no_engine_subtitution
    • no_zero_date
    • no_zero_in_date
    • only_full_group_by

其他常用配置参数

  • sync_binlog

    • 控制MySQL如何向磁盘刷新binlog
  • tmp_table_size/max_heap_table_size

    • 控制内存临时表大小
  • max_connections

    • 控制允许的最大连接数

数据库结构设计和SQL语句

数据库设计对性能的影响

  • 过分的反范式化为表建立太多的列
  • 过分的范式化造成太多的表关联
  • OLTP环境中使用了不恰当的分区表
  • 使用外键保证数据的完整性

附Xmind

什么影响了MySQL性能

Field

final Sync sync;

  • 核心Field,实际锁相关操作均在这里,实际上是对ReentrantLock的包装

方法

lock()

  • 获取锁操作,直接委托给sync,有公平和非公平两种实现,具体看NonfairSync和FairSync
1
2
3
public void lock() {
sync.lock();
}

lockInterruptibly()

  • 支持中断的获取锁
1
2
3
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

tryLock()

  • 非阻塞方式获取锁
1
2
3
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}

tryLock(long timeout, TimeUnit unit)

  • 带有超时的获取锁
1
2
3
4
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

unlock()

  • 释放锁
1
2
3
public void unlock() {
sync.release(1);
}

newCondition()

  • 创建条件变量
1
2
3
public Condition newCondition() {
return sync.newCondition();
}

getHoldCount()

  • 当前线程持有锁的个数
1
2
3
public int getHoldCount() {
return sync.getHoldCount();
}

isHeldByCurrentThread()

  • 锁是否被当前线程支持
1
2
3
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}

isLocked()

  • 是否处于锁定状态
1
2
3
public boolean isLocked() {
return sync.isLocked();
}

isFair()

  • 是否为公平锁
1
2
3
public final boolean isFair() {
return sync instanceof FairSync;
}

getOwner()

  • 持有锁的线程
1
2
3
protected Thread getOwner() {
return sync.getOwner();
}

hasQueuedThreads()

  • 是否有线程在等待获取锁
1
2
3
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}

hasQueuedThread(Thread thread)

  • 线程是否在等待获取锁的队列中
1
2
3
public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}

getQueueLength()

  • 等待队列的长度
1
2
3
public final int getQueueLength() {
return sync.getQueueLength();
}

getQueuedThreads()

  • 等待的线程集合
1
2
3
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}

hasWaiters(Condition condition)

  • 是否有线程阻塞在condition的await()的方法上
1
2
3
4
5
6
7
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}

getWaitQueueLength(Condition condition)

  • 阻塞在condition的await()的方法上的线程数量
1
2
3
4
5
6
7
public int getWaitQueueLength(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}

getWaitingThreads(Condition condition)

  • 阻塞在condition的await()的方法上的线程集合
1
2
3
4
5
6
7
protected Collection<Thread> getWaitingThreads(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
}

内部类

内部类有三个,公共抽象类Sync,非公平锁实现NonfairSync,公平锁实现FairSync,下面分别进行分析

Sync

void lock()

  • 核心加锁方法,因为公平锁和非公平锁实现不同,所以这里为抽象方法。

boolean nonfairTryAcquire(int acquires)

  • 非公平获取锁(资源)的实际实现,从以下源码可以看出,获取锁的时候,哪个先来,哪个就可以获取到,CAS操作成功的就获取到锁了,没有所谓的先来后到。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();//当前线程
    int c = getState();//获取加锁状态
    if (c == 0) {//为0则代表还没有上锁
    if (compareAndSetState(0, acquires)) {//执行CAS操作并且上锁
    setExclusiveOwnerThread(current);//设置持有锁的线程
    return true;//加锁成功
    }
    }
    else if (current == getExclusiveOwnerThread()) {//执行到这里说明已经有某个线程获取到锁了,因为是可重入锁,判断持有锁的线程是否为当前线程
    int nextc = c + acquires;//执行到这里说明是已经不是第一次上锁,并且当前线程是锁的持有线程,则可以直接进行累加(也就是重入)
    if (nextc < 0) // 额,超过int的最大值,出现溢出了(真的存在这种场景么= =??)
    throw new Error("Maximum lock count exceeded");
    setState(nextc);//更新state
    return true;
    }
    return false;//获取失败
    }

boolean tryRelease(int releases)

  • 非阻塞方式尝试释放资源,具体看源码分析
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    protected final boolean tryRelease(int releases) {
    int c = getState() - releases;//待更新资源
    if (Thread.currentThread() != getExclusiveOwnerThread())//判断是否为锁的持有现成
    throw new IllegalMonitorStateException();
    boolean free = false;//释放标识位置。为true则代表当前线程不再持有当前锁的任何资源
    if (c == 0) {//如果释放资源后,资源数量为0,代表释放锁,其他线程可以尝试获取锁,如果不为0,则需要继续释放(因为是重入多次,需要释放多次)
    free = true;
    setExclusiveOwnerThread(null);//清空锁持有线程
    }
    setState(c);//更新状态标志位
    return free;
    }

boolean isHeldExclusively()

  • 判断当前线程是否为锁持有线程
    1
    2
    3
    protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
    }

ConditionObject newCondition()

  • 创建条件变量对象,ConditionObject之后分析
    1
    2
    3
    final ConditionObject newCondition() {
    return new ConditionObject();
    }

Thread getOwner()

  • 如果锁没有被线程持有,返回null,否则返回持有的线程
    1
    2
    3
    final Thread getOwner() {
    return getState() == 0 ? null : getExclusiveOwnerThread();
    }

int getHoldCount()

  • 获取重入次数。如果当前线程没有持有锁,返回0;
    1
    2
    3
    final int getHoldCount() {
    return isHeldExclusively() ? getState() : 0;
    }

boolean isLocked()

  • 是否处于锁定状态
    1
    2
    3
    final boolean isLocked() {
    return getState() != 0;
    }

NonfairSync

  • 非公平锁的委托实现,继承了Sync类,间接继承了AQS

void lock()

  • 获取锁操作,直接通过cas获取,失败则通过aqs的acquire获取,acquire在父类AQS中,会调用子类的tryAcquire方法。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    final void lock() {
    if (compareAndSetState(0, 1))
    setExclusiveOwnerThread(Thread.currentThread());
    else
    acquire(1);
    }

    public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }

boolean tryAcquire(int acquires)

  • 直接调用sync抽象类中的nonfairTryAcquire,非公平方式获取资源
    1
    2
    3
    protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
    }

FairSync

  • 公平锁的委托实现,继承了Sync类,间接继承了AQS

void lock()

  • 没有快速路径,直接调用acquire去获取资源,内部委托依旧是调用tryAcquire
    1
    2
    3
    final void lock() {
    acquire(1);
    }

boolean tryAcquire(int acquires)

  • 与非公平获取资源相比,区别在于多了条件!hasQueuedPredecessors() ,也就是说按照队列的方式获取,如果队列中尚有未获取的在等待,则当前线程等待并且入队(参考AQS部分)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
    }
    }
    else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0)
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
    }
    return false;
    }

总结

  • 其实相对而言,可重入锁实现很简单,基本都是继承了AQS的方法,重点实现了可重入(线程相同则累加state),以及公平非公平。
  • 重点还是在于理解AQS,这里的可重入锁,以及之后的可重入读写锁,CountDownLatch,Semaphore,CyclicBarrier等都是基于AQS实现的。

平时写一些简单工具类,经常要打包ftp到服务器上面,懒得zip压缩,直接生成jar包。
maven配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.5</version>
<configuration>
<archive>
<manifest>
<mainClass>com.xxx.xxx.xxx.xxx</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

AQS分析第一篇,整理得快吐血了,不过话说回来,看一遍和整理了一遍真的完全不一样,收获还是很多的。
这里基本把AQS整个类分析了一遍,剩下还有就是条件对象以及AQS应用了,后续有时间在整理了。


AbstractOwnableSynchronizer

  • 比较简单,内部一个exclusiveOwnerThread,附带get和set方法,不多说

Node

  • AbstractQueuedSynchronizer 核心依赖,内部队列就是由Node组成

核心Field

状态类

  • int CANCELLED = 1;
    • 代表被取消了
  • int SIGNAL = -1;
    • 代表需要被唤醒
  • int CONDITION = -2;
    • 代表在条件队列中等待
  • int PROPAGATE = -3;
    • 代表释放资源的时候需要通知其他Node
  • int waitStatus
    • 代表当前Node的等待状态,取值为CANCELLED、SIGNAL、CONDITION、PROPAGATE,默认初始化为0

记录阻塞模式

  • Node SHARED
    • 代表该Node是因为获取共享资源被阻塞而放入AQS
  • Node EXCLUSIVE
    • 代表该Node是因为获取独占资源被阻塞而放入AQS

链表相关

  • 提供前驱和后继节点的访问方法,也就是说链表是双向的
  • Node prev
    • 记录当前节点的前驱节点
  • Node next
    • 记录当前节点的后继节点

其他

  • Thread thread
    • thread用于存放进入AQS队列的里面的线程
  • Node nextWaiter
    • 在Node作为同步队列节点时,nextWaiter可能有两个值:EXCLUSIVE、SHARED标识当前节点是独占模式还是共享模式;
    • 在Node作为等待队列节点使用时,nextWaiter保存后继节点。

核心方法

boolean isShared()

  • 当前节点获取资源采用的是否为共享方式
    1
    2
    3
    final boolean isShared() {
    return nextWaiter == SHARED;
    }

Node predecessor()

  • 获取前置节点,如果前置节点为null,则抛出NPE异常
    1
    2
    3
    4
    5
    6
    7
    final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
    throw new NullPointerException();
    else
    return p;
    }

AbstractQueuedSynchronizer

核心Field

  • Node head
    • 内部队列的头结点
  • Node tail
    • 内部队列的尾节点
  • int state
    • 状态位,在不同的子类中有不同的含义
  • long spinForTimeoutThreshold
    • 自旋时间,低于这个时间则直接进行空循环,然后重新尝试获取资源

核心方法

int getState()

  • 用的很多,但是没啥可说的

void setState(int newState)

  • 用的很多,但是没啥可说的

boolean compareAndSetState(int expect, int update)

  • Cas更新状态操作,也没啥可说的

Node enq(final Node node)

  • 入队操作,如果队列没有节点,则tail为null,这个时候需要加入一个哨兵节点
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    private Node enq(final Node node) {
    for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize
    if (compareAndSetHead(new Node()))//加入一个哨兵节点到队列尾部,再次循环
    tail = head;
    } else {
    node.prev = t;
    if (compareAndSetTail(t, node)) {
    t.next = node;
    return t;//返回原末尾节点
    }
    }
    }
    }

Node addWaiter(Node mode)

  • 这个作者实现比较有趣,先用快速方式尝试添加节点,成功则返回新添加的节点,失败则通过enq以循环的方式将node入队
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    private Node addWaiter(Node mode) {
    //根据当前线程以及模式(共享或者独占)创建一个节点
    Node node = new Node(Thread.currentThread(), mode);
    //尝试直接添加到队列尾部(所谓的快速添加)
    Node pred = tail;
    if (pred != null) {
    node.prev = pred;
    //CAS添加成功则返回结果,失败则只需enq
    if (compareAndSetTail(pred, node)) {
    pred.next = node;
    return node;
    }
    }
    //说明快速添加遇到竞争,通过enq进行入队操作
    enq(node);
    return node;
    }

void setHead(Node node)

  • 设置头结点,注意一下node的thread和prev会设置为null

void unparkSuccessor(Node node)

  • 该方法用于唤醒等待队列中的下一个线程,下一个线程并不一定是当前节点的next节点,需要根据其状态来进行查找,找到之后执行LockSupport.unpark唤醒对应的线程。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
    if (t.waitStatus <= 0)
    s = t;
    }
    if (s != null)
    LockSupport.unpark(s.thread);
    }

void doReleaseShared()

  • 共享模式的释放操作,一般来说,只需要判断两种情况:
    • SIGNAL代表后继节点之前被阻塞了需要释放
    • PROPAGATE代表共享模式下可以继续进行acquire
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      private void doReleaseShared() {
      for (;;) {
      Node h = head;
      //这里的判断是处理头结点和尾结点都存在的情况,并且队列里节点总数大于1
      if (h != null && h != tail) {
      int ws = h.waitStatus;
      //Node.SIGNAL表示后继节点需要被唤醒
      if (ws == Node.SIGNAL) {
      //h从SIGNAL设置为0
      if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
      continue;
      //执行唤醒操作,这里会将h.waitStatus设置为0,补充,每次只唤醒一个线程
      unparkSuccessor(h);
      }
      //如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去,也就是h从0设置为PROPAGATE,
      else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
      continue;
      }
      //头节点没有发生变化,可以退出循环,如果头结点发生了变化,为了使自己的唤醒动作可以传递,必须进行重试
      if (h == head)
      break;
      }
      }

void setHeadAndPropagate(Node node, int propagate)

  • 首先执行setHead方法,在这之后检查条件,如果满足条件则唤醒后继节点(因为是共享模式,所以后继节点也一并唤醒)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    //检查条件
    // propagate > 0 表示调用方指明了后继节点需要被唤醒
    // 头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点(看第一行和第二行代码)
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared())
    doReleaseShared();
    }
    }

void cancelAcquire(Node node)

  • 取消正在获取资源的操作
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    private void cancelAcquire(Node node) {
    if (node == null)
    return;
    //首先当前node不在关联任何线程
    node.thread = null;
    Node pred = node.prev;
    //CANCELLED的值为1,该判断也就是跳过已经取消的节点
    while (pred.waitStatus > 0)
    node.prev = pred = pred.prev;
    //这里指找到一个有效的前置节点
    Node predNext = pred.next;
    //将节点node设置为CANCELLED状态
    node.waitStatus = Node.CANCELLED;
    //判断node是否为tail节点,如果是tail节点,则cas进行替换,替换为找到的有效前置节点pred
    if (node == tail && compareAndSetTail(node, pred)) {
    执行成则pred的下一个节点为null(已经是tail节点)
    compareAndSetNext(pred, predNext, null);
    } else {
    //执行到这里说明node不是tail节点,或者cas操作失败了
    int ws;
    // pred如果不是head节点,并且thread不为空,并且满足下面条件之一
    // 1. pred.waitStatus为SIGNAL
    // 2. pred.waitStatus <= 0 (SIGNAL,CONDITION,PROPAGATE,0),并成功将pred的WaitStatus进行cas替换为SIGNAL
    if (pred != head && ( (ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)) ) && pred.thread != null) {
    Node next = node.next;
    //将前置节点的next指向当前节点的next(说白了就是删除链表中的当前节点,只不过是在cas中进行操作)
    if (next != null && next.waitStatus <= 0)
    compareAndSetNext(pred, predNext, next);
    } else {
    //不满足条件,也就是说node为head的后继节点,直接进行唤醒
    unparkSuccessor(node);
    }
    // 这个就是清除引用,快速gc用的
    node.next = node;
    }
    }

boolean shouldParkAfterFailedAcquire(Node pred, Node node)

  • 根据前置节点判断当前节点是否应该被阻塞,同时清理掉CANCELLED节点
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    //如果继的节点状态为SIGNAL,则当前节点需要unpark,返回true
    if (ws == Node.SIGNAL)
    return true;
    //否则返回false,并进行如下操作
    //ws > 0说明前置节点已经被取消(CANCELLED = 1), 这时需要继续往前找,直到找到 waitStatus 不为 CANCELLED ,然后返回false。所谓清理CANCELLED节点就是在这里跳过对应的节点。
    if (ws > 0) {
    do {
    node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
    } else {
    //如果节点状态不是CANCELLED,则cas更新waitStatus为SIGNAL
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
    }

void selfInterrupt()

  • 这个方法比较简单,就是调用当前线程的中断方法
    1
    2
    3
    static void selfInterrupt() {
    Thread.currentThread().interrupt();
    }

boolean parkAndCheckInterrupt()

  • 阻塞当前线程并执行中断检查(会清除中断标识)
    1
    2
    3
    4
    private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
    }

boolean acquireQueued(final Node node, int arg)

  • 尝试获取锁,成功返回中断状态,失败则则阻塞。阻塞过程中被中断,会返回被中断过标识
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    boolean interrupted = false;//默认非中断
    for (;;) {
    //获取当前节点的前置节点
    final Node p = node.predecessor();
    //如果前置节点为head节点,则尝试获取资源
    //每次只允许当构造节点的前驱节点是头结点才去获取同步状态
    if (p == head && tryAcquire(arg)) { //只有一个线程可以通过
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
    }
    //否则根据是否可以进行park操作进行阻塞
    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    //如果没有更新failed标志为,则发生异常,取消node节点
    if (failed)
    cancelAcquire(node);
    }
    }

void doAcquireInterruptibly(int arg)

  • 获取资源操作,如果阻塞过程中被中断,则会抛出异常
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    private void doAcquireInterruptibly(int arg) throws InterruptedException {
    //添加一个独占资源到队列末尾
    final Node node = addWaiter(Node.EXCLUSIVE);
    //以下代码基本同acquireQueued
    boolean failed = true;
    try {
    for (;;) {
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return;
    }
    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    throw new InterruptedException();//这里直接抛出异常
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }

boolean doAcquireNanos(int arg, long nanosTimeout)

  • 带有超时的去获取独占资源,如果被中断,会抛出异常
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)//时间判断
    return false;
    final long deadline = System.nanoTime() + nanosTimeout;//结束时间
    final Node node = addWaiter(Node.EXCLUSIVE);//添加独占资源node到队列
    boolean failed = true;
    try {
    for (;;) {
    //获取资源
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return true;
    }
    nanosTimeout = deadline - System.nanoTime();//当前还可以等待时间
    if (nanosTimeout <= 0L)//已经超时
    return false;
    if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)//阻塞nanosTimeout
    LockSupport.parkNanos(this, nanosTimeout);
    if (Thread.interrupted())//线程被中断,则抛出异常
    throw new InterruptedException();
    }
    } finally {
    if (failed)//没有成功则取消节点
    cancelAcquire(node);
    }
    }

void doAcquireShared(int arg)

  • 以共享的方式获取资源,失败则阻塞
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);//添加一个共享节点到队列尾部
    boolean failed = true;//失败标志位
    try {
    boolean interrupted = false;//中断标志位
    for (;;) {
    final Node p = node.predecessor();
    if (p == head) {//必须是头节点才可以
    int r = tryAcquireShared(arg);//获取资源
    //r等于0表示不用唤醒后继节点,大于0需要
    if (r >= 0) {
    //尝试唤醒后继节点
    setHeadAndPropagate(node, r);
    p.next = null; // help GC
    //没有中断,则返回
    if (interrupted)
    selfInterrupt();
    failed = false;
    return;
    }
    }
    //获取失败,则进行阻塞,并将前驱节点的状态改成SIGNAL
    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }

void doAcquireSharedInterruptibly(int arg)

  • 基本同doAcquireShared,被中断则抛出异常
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
    for (;;) {
    final Node p = node.predecessor();
    if (p == head) {
    int r = tryAcquireShared(arg);
    if (r >= 0) {
    setHeadAndPropagate(node, r);
    p.next = null; // help GC
    failed = false;
    return;
    }
    }
    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    throw new InterruptedException();//这里抛出异常
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }

boolean doAcquireSharedNanos(int arg, long nanosTimeout)

  • 和上面的没啥区别,就是多了超时控制而已,被中断也是抛出异常
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
    return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
    for (;;) {
    final Node p = node.predecessor();
    if (p == head) {
    int r = tryAcquireShared(arg);
    if (r >= 0) {
    setHeadAndPropagate(node, r);
    p.next = null; // help GC
    failed = false;
    return true;
    }
    }
    nanosTimeout = deadline - System.nanoTime();
    if (nanosTimeout <= 0L)
    return false;
    if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
    LockSupport.parkNanos(this, nanosTimeout);
    if (Thread.interrupted())
    throw new InterruptedException();
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }

boolean tryAcquire(int arg)

  • AQS没有提供具体实现,需要子类实现
    1
    2
    3
    protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
    }

boolean tryRelease(int arg)

  • AQS没有提供具体实现,需要子类实现
    1
    2
    3
    protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
    }

int tryAcquireShared(int arg)

  • AQS没有提供具体实现,需要子类实现
    1
    2
    3
    protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
    }

boolean tryReleaseShared(int arg)

  • AQS没有提供具体实现,需要子类实现
    1
    2
    3
    protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
    }

boolean isHeldExclusively()

  • AQS没有提供具体实现,需要子类实现
    1
    2
    3
    protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
    }

void acquire(int arg)

  • 以独占的方式去获取资源,忽略中断。
    1
    2
    3
    4
    5
    public final void acquire(int arg) {
    //至少执行一次tryAcquire,成功则返回,失败则进行线程阻塞状态,等待唤醒重新获取资源
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }

void acquireInterruptibly(int arg)

  • 以独占的方式去获取资源,等待期间会被中断。如果线程本身已经被中断,调用该方法会立即抛出异常
    1
    2
    3
    4
    5
    6
    public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    if (!tryAcquire(arg))
    doAcquireInterruptibly(arg);
    }

boolean tryAcquireNanos(int arg, long nanosTimeout)

  • 带有超时的获取独占资源,也会抛出中断异常
    1
    2
    3
    4
    5
    6
    public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    return tryAcquire(arg) ||
    doAcquireNanos(arg, nanosTimeout);
    }

boolean release(int arg)

  • 资源释放
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public final boolean release(int arg) {
    //保证原子方式释放资源,同一时刻只能有一个线程成功
    if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
    unparkSuccessor(h);//唤醒当前节点的后继节点所包含的线程
    return true;
    }
    return false;
    }

void acquireShared(int arg)

  • 以共享模式获取状态
    1
    2
    3
    4
    5
    6
    public final void acquireShared(int arg) {
    //尝试获取共享状态
    if (tryAcquireShared(arg) < 0)
    //获取失败进入sync队列
    doAcquireShared(arg);
    }

void acquireSharedInterruptibly(int arg)

  • 相比acquireShared,只是增加了可中断
    1
    2
    3
    4
    5
    6
    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
    doAcquireSharedInterruptibly(arg);
    }

boolean tryAcquireSharedNanos(int arg, long nanosTimeout)

  • 在acquireSharedInterruptibly的基础上增加了超时
    1
    2
    3
    4
    5
    6
    7
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
    throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
    doAcquireSharedNanos(arg, nanosTimeout);
    }

boolean releaseShared(int arg)

  • 释放共享资源
    1
    2
    3
    4
    5
    6
    7
    8
    9
    public final boolean releaseShared(int arg) {
    //尝试释放共享资源
    if (tryReleaseShared(arg)) {
    //唤醒的过程,上文已经分析
    doReleaseShared();
    return true;
    }
    return false;
    }

boolean hasQueuedThreads()

  • 队列中是否有线程在等待获取资源
    1
    2
    3
    public final boolean hasQueuedThreads() {
    return head != tail;
    }

boolean hasContended()

  • 是否其他线程也竞争获取资源(因为head是公用的)
    1
    2
    3
    public final boolean hasContended() {
    return head != null;
    }

Thread getFirstQueuedThread()

  • 返回队列中的第一个线程,如果快速路径失败(head == tail),则调用fullGetFirstQueuedThread查找
    1
    2
    3
    4
    public final Thread getFirstQueuedThread() {
    // handle only fast path, else relay
    return (head == tail) ? null : fullGetFirstQueuedThread();
    }

Thread fullGetFirstQueuedThread()

  • 返回队列中第一个(等待时间最长的)线程,如果目前没有将任何线程加入队列,则返回 null.
  • 在此实现中,该操作是以固定时间返回的,但是,如果其他线程目前正在并发修改该队列,则可能出现循环争用。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    private Thread fullGetFirstQueuedThread() {
    Node h, s;
    Thread st;
    if (((h = head) != null && (s = h.next) != null &&
    s.prev == head && (st = s.thread) != null) ||
    ((h = head) != null && (s = h.next) != null &&
    s.prev == head && (st = s.thread) != null))
    return st;
    Node t = tail;
    Thread firstThread = null;
    while (t != null && t != head) {
    Thread tt = t.thread;
    if (tt != null)
    firstThread = tt;
    t = t.prev;
    }
    return firstThread;
    }

boolean isQueued(Thread thread)

  • 判断thread是否在队列中等待获取资源
    1
    2
    3
    4
    5
    6
    7
    8
    public final boolean isQueued(Thread thread) {
    if (thread == null)
    throw new NullPointerException();
    for (Node p = tail; p != null; p = p.prev)
    if (p.thread == thread)
    return true;
    return false;
    }

boolean apparentlyFirstQueuedIsExclusive()

  • 在head不为null,head的next不为null,head的next不是共享的,head的thread不为空的条件下返回true,否则返回false
  • 作用就是读锁不应该让写锁始终等待。
    1
    2
    3
    4
    5
    6
    7
    final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
    (s = h.next) != null &&
    !s.isShared() &&
    s.thread != null;
    }

boolean hasQueuedPredecessors()

  • 判断当前线程是不是在CLH队列的队首,来返回AQS中是不是有比当前线程等待更久的线程。
    1
    2
    3
    4
    5
    6
    7
    public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
    ((s = h.next) == null || s.thread != Thread.currentThread());
    }

int getQueueLength()

  • 获取队列长度
    1
    2
    3
    4
    5
    6
    7
    8
    public final int getQueueLength() {
    int n = 0;
    for (Node p = tail; p != null; p = p.prev) {
    if (p.thread != null)
    ++n;
    }
    return n;
    }

Collection getQueuedThreads()

  • 获取线程队列
    1
    2
    3
    4
    5
    6
    7
    8
    9
    public final Collection<Thread> getQueuedThreads() {
    ArrayList<Thread> list = new ArrayList<Thread>();
    for (Node p = tail; p != null; p = p.prev) {
    Thread t = p.thread;
    if (t != null)
    list.add(t);
    }
    return list;
    }

Collection getExclusiveQueuedThreads()

  • 获取独占资源的线程队列
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public final Collection<Thread> getExclusiveQueuedThreads() {
    ArrayList<Thread> list = new ArrayList<Thread>();
    for (Node p = tail; p != null; p = p.prev) {
    if (!p.isShared()) {
    Thread t = p.thread;
    if (t != null)
    list.add(t);
    }
    }
    return list;
    }

Collection getSharedQueuedThreads()

  • 获取共享资源的线程队列
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public final Collection<Thread> getSharedQueuedThreads() {
    ArrayList<Thread> list = new ArrayList<Thread>();
    for (Node p = tail; p != null; p = p.prev) {
    if (p.isShared()) {
    Thread t = p.thread;
    if (t != null)
    list.add(t);
    }
    }
    return list;
    }

boolean isOnSyncQueue(Node node)

  • 判断该节点是否在CLH队列中
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    final boolean isOnSyncQueue(Node node) {
    //如果该节点的状态为CONDITION(该状态只能在CONDITION队列中出现,CLH队列中不会出现CONDITION状态),或者该节点的prev指针为null,则该节点一定不在CLH队列中
    if (node.waitStatus == Node.CONDITION || node.prev == null)
    return false;
    //如果该节点的next(不是nextWaiter,next指针在CLH队列中指向下一个节点)状态不为null,则该节点一定在CLH队列中
    if (node.next != null) // If has successor, it must be on queue
    return true;
    //遍历CLH队列(从尾节点开始遍历)查找该节点
    return findNodeFromTail(node);
    }

boolean findNodeFromTail(Node node)

  • 从tail往前寻找节点
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
    if (t == node)
    return true;
    if (t == null)
    return false;
    t = t.prev;
    }
    }

boolean transferForSignal(Node node)

  • 将节点添加到CLH队列中
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
      final boolean transferForSignal(Node node) {
    //如果CAS失败,则当前节点的状态为CANCELLED
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;
    //首先enq将该node添加到CLH队列中
    Node p = enq(node);
    int ws = p.waitStatus;
    //如果p是一个取消(ws > 0)了的节点,或者对p进行CAS设置失败,则唤醒node节点,让node所在线程进入到acquireQueue方法中,重新进行相关操作
    //否则,由于该节点的前驱节点已经是signal状态了,不用在此处唤醒await中的线程,唤醒工作留给CLH队列中前驱节点
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    LockSupport.unpark(node.thread);//唤醒
    return true;
    }

boolean transferAfterCancelledWait(Node node)

  • 将当前Node强制transfer到CLH队列中
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    final boolean transferAfterCancelledWait(Node node) {
    //将该节点状态由CONDITION变成0,调用enq将该节点从CONDITION队列添加到CLH队列中(但是在CONDITION队列中的nextWaiter连接并没有取消)
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
    enq(node);
    return true;
    }
    //循环检测该node是否已经成功添加到CLH队列中
    while (!isOnSyncQueue(node))
    Thread.yield();
    return false;
    }

int fullyRelease(Node node)

  • 完全释放锁,释放成功则返回,失败则将当前节点的状态设置成cancelled表示当前节点失效
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    final int fullyRelease(Node node) {
    boolean failed = true;
    try {
    int savedState = getState();
    if (release(savedState)) {
    failed = false;
    return savedState;
    } else {
    throw new IllegalMonitorStateException();
    }
    } finally {
    if (failed)
    node.waitStatus = Node.CANCELLED;//失败则当前node状态为CANCELLED
    }
    }

boolean owns(ConditionObject condition)

  • 判断条件对象拥有者
    1
    2
    3
    public final boolean owns(ConditionObject condition) {
    return condition.isOwnedBy(this);
    }

boolean hasWaiters(ConditionObject condition)

  • 条件队列是否有等待者
    1
    2
    3
    4
    5
    public final boolean hasWaiters(ConditionObject condition) {
    if (!owns(condition))
    throw new IllegalArgumentException("Not owner");
    return condition.hasWaiters();
    }

int getWaitQueueLength(ConditionObject condition)

  • 获取条件队列等待者数量
    1
    2
    3
    4
    5
    public final int getWaitQueueLength(ConditionObject condition) {
    if (!owns(condition))
    throw new IllegalArgumentException("Not owner");
    return condition.getWaitQueueLength();
    }

Collection getWaitingThreads(ConditionObject condition)

  • 获取条件队列等待者线程
    1
    2
    3
    4
    5
    public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
    if (!owns(condition))
    throw new IllegalArgumentException("Not owner");
    return condition.getWaitingThreads();
    }

辅助Field及方法

就不一一解释了

  • Cas相关Field
    • Unsafe unsafe;
    • long stateOffset;
    • long headOffset;
    • long tailOffset;
    • long waitStatusOffset;
    • long nextOffset;
  • Cas相关Method
    • boolean compareAndSetHead(Node update)
    • boolean compareAndSetTail(Node expect, Node update)
    • boolean compareAndSetWaitStatus(Node node, int expect, int update)
    • boolean compareAndSetNext(Node node, Node expect, Node update)

参考

内部只有若干方法

内部依赖方法

newTaskFor(Runnable runnable, T value)

  • 构造一个FutureTask,FutureTask 实现了 RunnableFuture,既是Runnable接口,也是Future接口,类似于适配器。
    1
    2
    3
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
    }

newTaskFor(Callable callable)

  • 同上,将一个Callable适配到RunnableFuture
    1
    2
    3
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
    }

submit 提交任务方法

submit(Runnable task)

  • 将Runnable接口封装为 RunnableFuture,并由子类实现执行逻辑
    1
    2
    3
    4
    5
    6
    public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
    }

submit(Runnable task, T result)

  • 将Runnable接口封装为 RunnableFuture,并由子类实现执行逻辑
    1
    2
    3
    4
    5
    6
    public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
    }

submit(Callable task)

  • 将Callable接口封装为 RunnableFuture,并由子类实现执行逻辑
    1
    2
    3
    4
    5
    6
    public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
    }

Invoke系列方法

doInvokeAny

  • 执行tasks任务,可以指定是否带有超时参数。invokeAny方法底层依赖该方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException {
    if (tasks == null)
    throw new NullPointerException();
    int ntasks = tasks.size();
    if (ntasks == 0)
    throw new IllegalArgumentException();
    //全部task对应的future集合
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
    //实际执行的实体
    ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);
    try {
    ExecutionException ee = null;
    //是否需要超时
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Iterator<? extends Callable<T>> it = tasks.iterator();
    //先提交一个任务
    futures.add(ecs.submit(it.next()));
    //任务数减一
    --ntasks;
    //工作中的线程数为1
    int active = 1;
    for (;;) {
    Future<T> f = ecs.poll();//获取一个执行的任务
    //判断任务是否完成,为null则还没有执行完成
    if (f == null) {
    //提交的任务是否已经全部由ecs执行,如果还有未提交的,则继续提交。
    if (ntasks > 0) {
    --ntasks;
    futures.add(ecs.submit(it.next()));
    ++active;
    }
    else if (active == 0)//没有存活的任务,说明任务已经完成,但是有异常,导致active=0,则中断循环,然后抛出异常
    break;
    else if (timed) {//检查是否需要超时
    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
    if (f == null)
    throw new TimeoutException();
    nanos = deadline - System.nanoTime();
    }
    else
    f = ecs.take();//不许要超时,则阻塞获取
    }
    if (f != null) {//有任务完成,active数量减一,并返回结果
    --active;
    try {
    return f.get();
    } catch (ExecutionException eex) {
    ee = eex;
    } catch (RuntimeException rex) {
    ee = new ExecutionException(rex);
    }
    }
    }
    if (ee == null)
    ee = new ExecutionException();
    throw ee;
    } finally {
    for (int i = 0, size = futures.size(); i < size; i++)
    futures.get(i).cancel(true);//已经完成或者抛出异常,取消其他正在执行的任务。
    }
    }

invokeAny

  • 忽略超时异常的执行方式

    1
    2
    3
    4
    5
    6
    7
    8
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
    try {
    return doInvokeAny(tasks, false, 0);
    } catch (TimeoutException cannotHappen) {//忽略超时异常
    assert false;
    return null;
    }
    }
  • 可以设置超时的执行方式

    1
    2
    3
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    return doInvokeAny(tasks, true, unit.toNanos(timeout));//会抛出超时异常
    }

invokeAll

  • 全部执行并等待全部完成
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
    if (tasks == null)
    throw new NullPointerException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
    //全部任务提交并执行
    for (Callable<T> t : tasks) {
    RunnableFuture<T> f = newTaskFor(t);
    futures.add(f);
    execute(f);
    }
    //等待全部结果完成
    for (int i = 0, size = futures.size(); i < size; i++) {
    Future<T> f = futures.get(i);
    if (!f.isDone()) {
    try {
    f.get();
    } catch (CancellationException ignore) {
    } catch (ExecutionException ignore) {
    }
    }
    }
    //全部完成后标示位更新
    done = true;
    return futures;
    } finally {
    if (!done)//如果没有完成,说明有异常,则取消所有任务
    for (int i = 0, size = futures.size(); i < size; i++)
    futures.get(i).cancel(true);
    }
    }
  • 全部执行,并带有超时的等待完成
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
    throws InterruptedException {
    if (tasks == null)
    throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
    for (Callable<T> t : tasks)
    futures.add(newTaskFor(t));
    final long deadline = System.nanoTime() + nanos;
    final int size = futures.size();
    //提交任务
    for (int i = 0; i < size; i++) {
    execute((Runnable)futures.get(i));
    //计算超时时间
    nanos = deadline - System.nanoTime();
    if (nanos <= 0L)
    return futures;
    }
    //获取结果
    for (int i = 0; i < size; i++) {
    Future<T> f = futures.get(i);
    if (!f.isDone()) {
    if (nanos <= 0L)//已经超时,则直接返回现有的
    return futures;
    try {
    f.get(nanos, TimeUnit.NANOSECONDS);//带有超时的去获取,如果超时,则直接返回结果
    } catch (CancellationException ignore) {
    } catch (ExecutionException ignore) {
    } catch (TimeoutException toe) {
    return futures;
    }
    nanos = deadline - System.nanoTime();
    }
    }
    done = true;//正常完成
    return futures;
    } finally {
    if (!done)//非正常完成,则取消剩余任务
    for (int i = 0, size = futures.size(); i < size; i++)
    futures.get(i).cancel(true);
    }
    }

结构

其实从结构上来看,Worker十分简单。实现了Runnable接口,同时继承了AQS队列。如下图所示:
类结构

Worker的方法也不多,也比较简单,如下图所示:
方法

分析

内部Field

  • 内部Field不多,如下:
    • Thread thread 实际的工作线程
    • Runnable firstTask 初始化的第一个任务
    • long completedTasks 当前Worker已经完成的任务数
  • 在补充一下父类的state
    • 0 代表是未锁定状态
    • 1 代表是锁定状态
    • -1 代表是不允许被中断,在构造参数中设置
      接下来简单分析一下各个方法:

方法

public void run()

  • 直接调用线程池的runWorker方法,之后分析
    1
    2
    3
    public void run() {
    runWorker(this);
    }

protected boolean isHeldExclusively()

  • 是否是独占排他的
    1
    2
    3
    protected boolean isHeldExclusively() {
    return getState() != 0;
    }

protected boolean tryAcquire(int unused)

  • 尝试获取锁,参考AQS,这里还是使用CAS进行操作,失败则快速返回
    1
    2
    3
    4
    5
    6
    7
    protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
    setExclusiveOwnerThread(Thread.currentThread());
    return true;
    }
    return false;
    }

protected boolean tryRelease(int unused)

  • 尝试释放锁,这个貌似只会成功,不会失败
    1
    2
    3
    4
    5
    protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
    }

public void lock()

  • 加锁,不过这里是阻塞式的
    1
    public void lock()        { acquire(1); }

public boolean tryLock()

  • 尝试加锁,调用tryAcquire
    1
    public boolean tryLock()  { return tryAcquire(1); }

public void unlock()

  • 释放锁操作,参考AQS
    1
    public void unlock()      { release(1); }

public boolean isLocked()

  • 是否处于锁定状态,调用isHeldExclusively方法,也就是看state是否为0
    1
    public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted()

  • 如果处于运行状态,则进行中断。state>=0代表可以进行中断。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
    try {
    t.interrupt();
    } catch (SecurityException ignore) {
    }
    }
    }

关联方法

runWorker

  • Worker直接调用线程池的runWorker方法,将自身作为参数,执行任务,具体如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    final void runWorker(Worker w) {
    //当前工作线程
    Thread wt = Thread.currentThread();
    //获取待执行的初始任务
    Runnable task = w.firstTask;
    //清除firstTask
    w.firstTask = null;
    //释放w锁(这个时候可以进行中断操作)
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
    //开始循环获取任务
    while (task != null || (task = getTask()) != null) {
    //获取任务之后先进行加锁
    w.lock();
    //检查线程池状态,是否需要中断。
    //这块逻辑比较绕,整理一下
    // 首先wt也就是当前线程,不能被中断。
    // 如果线程池的状态为STOP,TIDYING,TERMINATED 则直接中断
    // (剩下的就是原文中的注释了)如果线程池正在停止过程中,确保线程是中断的。否则就确保线程不会被中断。
    if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(),STOP))) && !wt.isInterrupted())
    wt.interrupt();
    try {
    beforeExecute(wt, task);//这里实际上是空实现
    Throwable thrown = null;
    try {
    task.run();//实际执行
    } catch (RuntimeException x) {
    thrown = x; throw x;
    } catch (Error x) {
    thrown = x; throw x;
    } catch (Throwable x) {
    thrown = x; throw new Error(x);
    } finally {
    afterExecute(task, thrown);//这里也是空实现
    }
    } finally {
    task = null;
    w.completedTasks++;//执行完成后,完成任务+1
    w.unlock();//执行完成后释放锁
    }
    }
    completedAbruptly = false;
    } finally {
    //清理工作,执行到这里代表Worker已经准备销毁了
    processWorkerExit(w, completedAbruptly);
    }
    }

getTask

  • 获取任务的方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
    //先检查线程池的状态
    int c = ctl.get();
    int rs = runStateOf(c);
    //如果已经关闭并且队列为空则返回null,并减少一个工作线程
    //如果已经为STOP,TIDYING,TERMINATED 则减少一个工作线程
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    decrementWorkerCount();
    return null;
    }
    //获取工作线程数量
    int wc = workerCountOf(c);
    //判断是否需要清理Worker
    // 一种是allowCoreThreadTimeOut=true的情况
    //一种是工作线程数量已经超过核心线程数量了
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    //状态检查
    //满足以下两个条件,则会减少工作线程数量
    //1.已经超时或者工作线程数量超过最大线程数量的
    //2.至少有一个工作线程或者任务队列为空
    if ((wc > maximumPoolSize || (timed && timedOut))
    && (wc > 1 || workQueue.isEmpty())) {
    if (compareAndDecrementWorkerCount(c))
    return null;
    continue;
    }
    try {
    //
    Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://带有超时的方式获取,超时之后返回null
    workQueue.take();//阻塞方式获取
    //没有超时则返回结果,否则设置超时状态
    if (r != null)
    return r;
    timedOut = true;
    } catch (InterruptedException retry) {
    timedOut = false;
    }
    }
    }

processWorkerExit

  • 做一些收尾工作
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // 还记得runWork方法中的completedAbruptly么,就是这个了,为true代表没有执行的改变为false,突然执行到这里了。
    decrementWorkerCount();//减少工作线程数量

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    completedTaskCount += w.completedTasks;//更新一下总共完成的任务
    workers.remove(w);//从Worker集合中中移除自己
    } finally {
    mainLock.unlock();
    }
    //尝试设置线程池为TERMINATED,见线程池部分分析
    //线程池为SHUTDOWN且队列为空或者线程池状态为STOP,则触发设置线程池为TERMINATED
    tryTerminate();

    int c = ctl.get();//获取当前线程池状态
    if (runStateLessThan(c, STOP)) {//是否已经停止或者终止
    //是否突然过来的,如果不是突然过来的,代表正常结束
    if (!completedAbruptly) {
    //根据allowCoreThreadTimeOut获取线程池数量最小值
    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
    //队列不为空则最小值不能为0
    if (min == 0 && ! workQueue.isEmpty())
    min = 1;
    //当前工作线程数量大于等于最小值,则代表还不能结束,继续执行
    if (workerCountOf(c) >= min)
    return; // replacement not needed
    }
    //执行到这里说明当前线程数量小于min的值,需要添加一个Worker
    //或者突然执行过来的,可能有异常,添加一个Worker
    addWorker(null, false);
    }
    }