本文部分内容节选自Java Guide, 地址: https://javaguide.cn/java/collection/java-collection-questions-02.html

🚀 基础(上) → 🚀 基础(中) → 🚀基础(下) → 🤩集合(上) → 🤩集合(下) → 🤗JVM专题1 → 🤗JVM专题2 → 🤗JVM专题3 → 🤗JVM专题4 →😋JUC专题1 → 😋JUC专题2

Map

HashMap

HashMap 主要用于存储键值对, 它基于哈希表的 Map 接口实现, 是常用的 Java 集合之一, 是非线程安全的

HashMap 可以存储 null 的 key 和 value, 但 null 作为键只能有一个, 作为值可以有多个

JDK1.8之前 HashMap 是由数组+链表组成的(拉链法, 数组是本体, 链表解决哈希冲突)

JDK1.8之后, 当链表长度大于等于阈值 (默认为 8) 时, 会将链表转化为红黑树, 以减少搜索时间 (将链表转化为红黑树之前会判断, 如果当前数组的长度小于64, 那么会先进行数组扩容, 而不是转换为红黑树)

HashMap 的默认初始化大小为16. 之后每次扩容, 容量变为原来的 2 倍. 并且 HashMap 总是使用 2 的幂作为哈希表的大小

底层数据结构

JDK1.8之前

JDK1.8之前 HashMap 底层是 数组和链表 结合在一起使用, 也就是链表散列 (拉链法)

HashMap 通过 key 的 hashCode 经过扰动函数处理之后得到 hash值, 通过 hash(KEY) % n 判断当前元素存放的位置, 如果当前位置存在元素的话, 判断该元素和要存入元素的 hash 值以及 key 值是否相等, 相等的话直接覆盖, 不等的话就用链表解决哈希冲突

所谓扰动函数也就是 HashMap 的 Hash 方法, 使用扰动函数是为了防止一些实现比较差的 hashCode() 方法得到的哈希值发生碰撞

JDK1.8的 hash 方法源码

static final int hash(Object key) {
int h;
// key.hashCode():返回散列值也就是hashcode
// ^:按位异或
// >>>:无符号右移,忽略符号位,空位都以0补齐
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

JDK1.8之后

当链表长度大于等于阈值 (默认为 8) 时, 默认会先执行 treeifyBin() 方法, 这个方法会根据 HashMap 数组的长度决定是否转换为红黑树, 只有当数组长度大于等于 64 的情况下, 才会执行转换红黑树操作, 以减少搜索时间, 否则就只会执行 resize() 方法对数组扩容

public class HashMap<K,V> extends AbstractMap<K,V> implements Map<K,V>, Cloneable, Serializable {
// 序列号
private static final long serialVersionUID = 362498820763181265L;
// 默认的初始容量是16
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4;
// 最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认的负载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// 当桶(bucket)上的结点数大于等于这个值时会转成红黑树
static final int TREEIFY_THRESHOLD = 8;
// 当桶(bucket)上的结点数小于等于这个值时树转链表
static final int UNTREEIFY_THRESHOLD = 6;
// 桶中结构转化为红黑树对应的table的最小容量
static final int MIN_TREEIFY_CAPACITY = 64;
// 存储元素的数组,总是2的幂次倍
transient Node<k,v>[] table;
// 存放具体元素的集
transient Set<map.entry<k,v>> entrySet;
// 存放元素的个数,注意这个不等于数组的长度。
transient int size;
// 每次扩容和更改map结构的计数器
transient int modCount;
// 阈值(容量*负载因子) 当实际大小超过阈值时,会进行扩容
int threshold;
// 负载因子
final float loadFactor;
}
  • loadFactor 负载因子

loadFactor 负载因子也就是控制数组存放数据的疏密程度, loadFactor越趋近于 1 , 那么数组中存放的数据就越多, 也就越密, 链表的长度也就越长; loadFactor越趋近于0, 数组中存放的数据越少, 也就是越稀疏

loadFactor太大会导致查找效率变低, 太小会导致利用率低, 存放的元素会很分散. loadFactor的默认值为0.75f.

假设给定的默认的容量为16, 负载因子为 0.75, 当容器中元素数量超过 16 * 0.75 = 12 时就需要进行扩容, 而这个过程涉及到rehash, 复制数据等操作, 非常消耗性能

  • threshold

threshold = capacity * loadFactor, 也就是当threshold < Size 时, 就需要对数组进行扩容. 我刚刚提到的这个例子就是 size 大于 threshold 时需要扩容的一个情况

源码分析

构造方法

  // 默认构造函数。
public HashMap() {
this.loadFactor = DEFAULT_LOAD_FACTOR; // all other fields defaulted
}

// 包含另一个“Map”的构造函数
public HashMap(Map<? extends K, ? extends V> m) {
this.loadFactor = DEFAULT_LOAD_FACTOR;
putMapEntries(m, false);//下面会分析到这个方法
}

// 指定“容量大小”的构造函数
public HashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR);
}

// 指定“容量大小”和“负载因子”的构造函数
public HashMap(int initialCapacity, float loadFactor) {
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal initial capacity: " + initialCapacity);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
if (loadFactor <= 0 || Float.isNaN(loadFactor))
throw new IllegalArgumentException("Illegal load factor: " + loadFactor);
this.loadFactor = loadFactor;
// 初始容量暂时存放到 threshold ,在resize中再赋值给 newCap 进行table初始化
this.threshold = tableSizeFor(initialCapacity);
}

上面四个构造方法中均初始化了负载因子, 但是由于HashMap中没有类似于Capacity的字段, 即使制定了初始化容量 initialCapacity, 也只能通过 tableSizeFor() 方法扩容到与 initialCapacity 最接近的 2 的整数幂次方大小, 然后暂时赋给 threshold, 后续通过 resize() 方法将 threshold 复制给 newCap.

put 方法

HashMap 只提供了一个 put 方法用于添加元素, putVal 为 put 实际调用的方法, 不提供给用户使用

对 putVal 的分析

  1. 如果定位到的数组位置没有元素就直接插入
  2. 如果定位到的数组位置有元素就和要插入的 key 比较, 如果 key 是相同的那就直接覆盖, 如果不等那就判断 p 是否是一个 树节点, 如果是 那就调用 e = ((TreeNode<K, V>)p).putTreeVal(this, tab, hash, key, value) 将元素添加进去, 如果不是就遍历链表插入元素
public V put(K key, V value) {
return putVal(hash(key), key, value, false, true);
}

final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
// table未初始化或者长度为0,进行扩容
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
// (n - 1) & hash 确定元素存放在哪个桶中,桶为空,新生成结点放入桶中(此时,这个结点是放在数组中)
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
// 桶中已经存在元素(处理hash冲突)
else {
Node<K,V> e; K k;
//快速判断第一个节点table[i]的key是否与插入的key一样,若相同就直接使用插入的值p替换掉旧的值e。
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
// 判断插入的是否是红黑树节点
else if (p instanceof TreeNode)
// 放入树中
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
// 不是红黑树节点则说明为链表结点
else {
// 在链表最末插入结点
for (int binCount = 0; ; ++binCount) {
// 到达链表的尾部
if ((e = p.next) == null) {
// 在尾部插入新结点
p.next = newNode(hash, key, value, null);
// 结点数量达到阈值(默认为 8 ),执行 treeifyBin 方法
// 这个方法会根据 HashMap 数组来决定是否转换为红黑树。
// 只有当数组长度大于或者等于 64 的情况下,才会执行转换红黑树操作,以减少搜索时间。否则,就是只是对数组扩容。
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
// 跳出循环
break;
}
// 判断链表中结点的key值与插入的元素的key值是否相等
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
// 相等,跳出循环
break;
// 用于遍历桶中的链表,与前面的e = p.next组合,可以遍历链表
p = e;
}
}
// 表示在桶中找到key值、hash值与插入元素相等的结点
if (e != null) {
// 记录e的value
V oldValue = e.value;
// onlyIfAbsent为false或者旧值为null
if (!onlyIfAbsent || oldValue == null)
//用新值替换旧值
e.value = value;
// 访问后回调
afterNodeAccess(e);
// 返回旧值
return oldValue;
}
}
// 结构性修改
++modCount;
// 实际大小大于阈值则扩容
if (++size > threshold)
resize();
// 插入后回调
afterNodeInsertion(evict);
return null;
}

get 方法

public V get(Object key) {
Node<K,V> e;
return (e = getNode(hash(key), key)) == null ? null : e.value;
}

final Node<K,V> getNode(int hash, Object key) {
Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
if ((tab = table) != null && (n = tab.length) > 0 &&
(first = tab[(n - 1) & hash]) != null) {
// 数组元素相等
if (first.hash == hash && // always check first node
((k = first.key) == key || (key != null && key.equals(k))))
return first;
// 桶中不止一个节点
if ((e = first.next) != null) {
// 在树中get
if (first instanceof TreeNode)
return ((TreeNode<K,V>)first).getTreeNode(hash, key);
// 在链表中get
do {
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
} while ((e = e.next) != null);
}
}
return null;
}

resize 方法

进行扩容, 会伴随着一次 rehash, 并且会遍历 HashMap 中的所有元素, 是非常耗时的

final Node<K,V>[] resize() {
Node<K,V>[] oldTab = table;
int oldCap = (oldTab == null) ? 0 : oldTab.length;
int oldThr = threshold;
int newCap, newThr = 0;
if (oldCap > 0) {
// 超过最大值就不再扩充了,就只好随你碰撞去吧
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
// 没超过最大值,就扩充为原来的2倍
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY && oldCap >= DEFAULT_INITIAL_CAPACITY)
newThr = oldThr << 1; // double threshold
}
else if (oldThr > 0) // initial capacity was placed in threshold
// 创建对象时初始化容量大小放在threshold中,此时只需要将其作为新的数组容量
newCap = oldThr;
else {
// signifies using defaults 无参构造函数创建的对象在这里计算容量和阈值
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
if (newThr == 0) {
// 创建时指定了初始化容量或者负载因子,在这里进行阈值初始化,
// 或者扩容前的旧容量小于16,在这里计算新的resize上限
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ? (int)ft : Integer.MAX_VALUE);
}
threshold = newThr;
@SuppressWarnings({"rawtypes","unchecked"})
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab;
if (oldTab != null) {
// 把每个bucket都移动到新的buckets中
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
if (e.next == null)
// 只有一个节点,直接计算元素新的位置即可
newTab[e.hash & (newCap - 1)] = e;
else if (e instanceof TreeNode)
// 将红黑树拆分成2棵子树,如果子树节点数小于等于 UNTREEIFY_THRESHOLD(默认为 6),则将子树转换为链表。
// 如果子树节点数大于 UNTREEIFY_THRESHOLD,则保持子树的树结构。
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else {
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
// 原索引
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
// 原索引+oldCap
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
// 原索引放到bucket里
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
// 原索引+oldCap放到bucket里
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}

HashMap 和 Hashtable 的区别

  • 线程安全 : HashMap 是非线程安全的, Hashtable 是线程安全的, 因为 Hashtable 内部的方法都经过 synchronized 修饰.
  • 效率 : HashMap 的效率比 Hashtable 效率更高
  • 对 Null key 和 Null value 的支持 : HashMap 可以存储 null 的 key 或 value, 但 null 作为 key 只能存一个, 作为 value 可以存多个. Hashtable 不允许有 null值 或 null 键
  • 初始容量大小和每次扩容大小 : 1. 创建时如果不指定初始容量, Hashtable 的初始容量为 11, 之后每次扩容容量都会变成原来的 2n + 1, 而 HashMap 的初始容量为 16, 之后每次扩容都会变成原来的两倍 2. 创建时如果给定了容量初始值, 那么 Hashtable 会直接使用给定的大小, 而 HashMap 会将其扩容为 2 的 幂次方大小 (HashMaptableSizeFor 方法保证)
  • 底层数据结构 : JDK1.8之后的 HashMap 解决哈希冲突时有较大的变化, 前面已经有提及

HashMap 和 HashSet 的区别

HashMapHashSet
实现了 Map 接口实现了 Set 接口
存储键值对存储对象
调用 put() 方法向 map 中添加元素调用 add() 方法向 set 中添加元素
HashMap 使用键计算 HashcodeHashSet 使用成员对象计算 hashCode 值, 对于两个对象来说 hashCode 值可能相等, 所以 equal() 方法用来判断对象的相等性

HashMap 和 TreeMap 的区别

TreeMapHashMap 都继承自 AbstractMap , 但是 TreeMap 还实现了 NavigableMapSortedMap 接口

实现 NavigableMap 接口让 TreeMap 有了对集合内元素的搜索的能力

实现SortedMap接口让 TreeMap 有了对集合中的元素根据键排序的能力. 默认是按 key 的升序排序, 不过我们也可以指定排序的比较器

HashSet 如何检查重复

在JDK1.8中, HashSetadd() 方法只是简单的调用了 HashMapput() 方法, 并且判断了一下返回值以确保是否有重复元素

也就是说, 在 JDK1.8 中, 实际上无论HashSet中是否已经存在了某元素, HashSet都会直接插入, 只是会在add()方法的返回值处告诉我们插入前是否存在相同元素

为什么HashMap的长度是 2 的整数次幂?

这里我觉得 Java Guide解释得不好, 有点让人摸不着头脑了. 我重新写一下

为什么HashMap的长度是 2 的整数次幂? 理由有两个: 加快哈希运算 , 减少哈希冲突

  1. 加快哈希运算

前面已经提及, 计算Key存储的位置需要用 hash(KEY) % n , 但是 % 运算的速度太慢了, 所以我们可以用 & 运算代替 % 运算, 为了保证 & 的计算结果等于 % 的结果需要把 长度 length 减一

也就是, hash(KEY) % n == hash(KEY) & (length - 1)

因为hash值需要用到低位信息, 那么根据 & 运算, &运算的另一个数最好低位全部为1, 所以取长度为 2 的整数次幂

还有一个有趣的事实: 因为扩容为 2 的倍数, 根据 hash 桶的计算方法, 元素哈希值不变而通过 % 计算的方式会因为 length 的变化导致计算出来的 hash 桶的位置不断变化. 数据一致在漂移, 影响性能

  1. 减少哈希冲突

当长度为偶数时, length - 1 为奇数, 奇数的二进制最后一位为 1, 这就保证了 hash & (length - 1) 的最后一位可能是 0, 也可能是 1(这取决于hash值), 这样就保证了散列的均匀性

HashMap 多线程操作导致死锁问题

JDK1.8之前的版本的 HashMap 在多线程环境下可能出现死锁. 这是因为当一个桶位需要由多个元素进行扩容时, 多个线程同时对链表进行操作, 头插法可能导致链表中的节点指向错误的位置, 从而形成环形链表, 进而导致查询元素的操作变成死循环无法结束

在JDK1.8及其之后的版本中, HashMap 采用了尾插法而不是头插法以避免链表倒置, 使得插入的节点永远放在链表的末尾, 避免了链表出现环形结构. 但是在并发环境下仍然不推荐使用 HashMap , 推荐使用 ConcurrentHashMap

HashMap为什么是线程不安全的

前面已经提及了JDK1.7及其之前的版本下 HashMap 的多线程操作可能导致死锁. 实际上, 在 JDK1.8及其之后的版本下, 在多线程环境中, HashMap 在扩容时可能会导致数据丢失的问题

JDK1.8之后, 在 HashMap 中, 多个键值对可能会被分配到一个桶中, 并以链表或红黑树的形式存储. 多个线程对 HashMapput 操作 会导致数据不安全, 具体来说会导致数据覆盖

例如:

  1. 两个线程1, 2同时进行 put 操作, 并且发生了哈希冲突
  2. 不同的线程在不同的时间片下获得了CPU执行的机会, 线程1执行完哈希冲突判断之后, 由于时间片被耗尽所以线程被挂起, 线程2开始执行, 先完成了插入操作
  3. 随后, 线程1获得了时间片, 根据上一个时间片中哈希冲突的判断, 线程1会直接覆盖掉线程2插入的数据

HashMap的遍历方式有什么

HashMap 遍历从大的方向来说, 可分为以下 4 类

  1. 迭代器(Iterator)方式遍历;
  2. For Each 方式遍历;
  3. Lambda 表达式遍历(JDK 1.8+);
  4. Streams API 遍历(JDK 1.8+)

但每种类型下又有不同的实现方式, 因此具体的遍历方式又可以分为以下 7 种:

  1. 使用迭代器(Iterator)EntrySet 的方式进行遍历;
  2. 使用迭代器(Iterator)KeySet 的方式进行遍历;
  3. 使用 For Each EntrySet 的方式进行遍历;
  4. 使用 For Each KeySet 的方式进行遍历;
  5. 使用 Lambda 表达式的方式进行遍历;
  6. 使用 Streams API 单线程的方式进行遍历;
  7. 使用 Streams API 多线程的方式进行遍历

接下来我们来看每种遍历方式的具体实现代码

  1. 迭代器 EntrySet
public class HashMapTest {
public static void main(String[] args) {
// 创建并赋值 HashMap
Map<Integer, String> map = new HashMap();
map.put(1, "Java");
map.put(2, "JDK");
map.put(3, "Spring Framework");
map.put(4, "MyBatis framework");
map.put(5, "Java中文社群");
// 遍历
Iterator<Map.Entry<Integer, String>> iterator = map.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Integer, String> entry = iterator.next();
System.out.println(entry.getKey());
System.out.println(entry.getValue());
}
}
}
  1. 迭代器 KeySet
public class HashMapTest {
public static void main(String[] args) {
// 创建并赋值 HashMap
Map<Integer, String> map = new HashMap();
map.put(1, "Java");
map.put(2, "JDK");
map.put(3, "Spring Framework");
map.put(4, "MyBatis framework");
map.put(5, "Java中文社群");
// 遍历
Iterator<Integer> iterator = map.keySet().iterator();
while (iterator.hasNext()) {
Integer key = iterator.next();
System.out.println(key);
System.out.println(map.get(key));
}
}
}
  1. ForEach EntrySet
public class HashMapTest {
public static void main(String[] args) {
// 创建并赋值 HashMap
Map<Integer, String> map = new HashMap();
map.put(1, "Java");
map.put(2, "JDK");
map.put(3, "Spring Framework");
map.put(4, "MyBatis framework");
map.put(5, "Java中文社群");
// 遍历
for (Map.Entry<Integer, String> entry : map.entrySet()) {
System.out.println(entry.getKey());
System.out.println(entry.getValue());
}
}
}
  1. ForEach KeySet
public class HashMapTest {
public static void main(String[] args) {
// 创建并赋值 HashMap
Map<Integer, String> map = new HashMap();
map.put(1, "Java");
map.put(2, "JDK");
map.put(3, "Spring Framework");
map.put(4, "MyBatis framework");
map.put(5, "Java中文社群");
// 遍历
for (Integer key : map.keySet()) {
System.out.println(key);
System.out.println(map.get(key));
}
}
}
  1. Lambda
public class HashMapTest {
public static void main(String[] args) {
// 创建并赋值 HashMap
Map<Integer, String> map = new HashMap();
map.put(1, "Java");
map.put(2, "JDK");
map.put(3, "Spring Framework");
map.put(4, "MyBatis framework");
map.put(5, "Java中文社群");
// 遍历
map.forEach((key, value) -> {
System.out.println(key);
System.out.println(value);
});
}
}
  1. Streams API单线程
public class HashMapTest {
public static void main(String[] args) {
// 创建并赋值 HashMap
Map<Integer, String> map = new HashMap();
map.put(1, "Java");
map.put(2, "JDK");
map.put(3, "Spring Framework");
map.put(4, "MyBatis framework");
map.put(5, "Java中文社群");
// 遍历
map.entrySet().stream().forEach((entry) -> {
System.out.println(entry.getKey());
System.out.println(entry.getValue());
});
}
}
  1. Streams API多线程
public class HashMapTest {
public static void main(String[] args) {
// 创建并赋值 HashMap
Map<Integer, String> map = new HashMap();
map.put(1, "Java");
map.put(2, "JDK");
map.put(3, "Spring Framework");
map.put(4, "MyBatis framework");
map.put(5, "Java中文社群");
// 遍历
map.entrySet().parallelStream().forEach((entry) -> {
System.out.println(entry.getKey());
System.out.println(entry.getValue());
});
}
}

在性能表现上, entrySet的性能表现最好, 接下来是 单线程stream, 然后是keySet, 最差的是lambda

其中, 对于 多线程stream, 当不存在阻塞时, 多线程stream的性能是最差的

但是当存在阻塞时, 多线程stream的性能是最好的

ConcurrentHashMap

ConcurrentHashMap 1.7

Java7 中的 ConcurrentHashMap 有很多个 Segment 组合, 每个 Segment 又类似于一个 HashMap 的结构, 但是 Segment 的个数一旦初始化之后就不能改变, 默认 Segment 的个数为 16 个

初始化

/**
* Creates a new, empty map with a default initial capacity (16),
* load factor (0.75) and concurrencyLevel (16).
*/
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

无参构造中调用了有参构造, 传入了三个参数的默认值, 他们的值是:

/**
* 默认初始化容量
*/
static final int DEFAULT_INITIAL_CAPACITY = 16;

/**
* 默认负载因子
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;

/**
* 默认并发级别
*/
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

有参构造函数

@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
// 参数校验
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 校验并发级别大小,大于 1<<16,重置为 65536
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
// 2的多少次方
int sshift = 0;
int ssize = 1;
// 这个循环可以找到 concurrencyLevel 之上最近的 2的次方值
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 记录段偏移量
this.segmentShift = 32 - sshift;
// 记录段掩码
this.segmentMask = ssize - 1;
// 设置容量
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// c = 容量 / ssize ,默认 16 / 16 = 1,这里是计算每个 Segment 中的类似于 HashMap 的容量
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
//Segment 中的类似于 HashMap 的容量至少是2或者2的倍数
while (cap < c)
cap <<= 1;
// create segments and segments[0]
// 创建 Segment 数组,设置 segments[0]
Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}

Java7 中 ConcurrentHashMap的初始化:

  1. 参数校验
  2. 校验并发参数 concurrencyLevel 大小, 如果大于最大值, 重置为最大值. 无参构造默认值为 16
  3. 寻找并发级别 concurrencyLevel 之上最近的 2 的幂次方值, 作为初始化容量大小, 默认为 16
  4. 记录 segmentShift 偏移量, 这个值为 “容量 = 2 ^ N” 中的 N. 默认为 32 - sshift = 28
  5. 记录 segmentMask , 默认为 ssize - 1 = 15
  6. 初始化 segment[0] , 默认大小为2, 负载因子0.75, 扩容阈值为 2 * 0.75 = 1.5.

put

/**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
* <p> The value can be retrieved by calling the <tt>get</tt> method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with <tt>key</tt>, or
* <tt>null</tt> if there was no mapping for <tt>key</tt>
* @throws NullPointerException if the specified key or value is null
*/
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
// hash 值无符号右移 28位(初始化时获得),然后与 segmentMask=15 做与运算
// 其实也就是把高4位与segmentMask(1111)做与运算
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
// 如果查找到的 Segment 为空,初始化
s = ensureSegment(j);
return s.put(key, hash, value, false);
}

/**
* Returns the segment for the given index, creating it and
* recording in segment table (via CAS) if not already present.
*
* @param k the index
* @return the segment
*/
@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
// 判断 u 位置的 Segment 是否为null
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
// 获取0号 segment 里的 HashEntry<K,V> 初始化长度
int cap = proto.table.length;
// 获取0号 segment 里的 hash 表里的扩容负载因子,所有的 segment 的 loadFactor 是相同的
float lf = proto.loadFactor;
// 计算扩容阀值
int threshold = (int)(cap * lf);
// 创建一个 cap 容量的 HashEntry 数组
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
// 再次检查 u 位置的 Segment 是否为null,因为这时可能有其他线程进行了操作
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
// 自旋检查 u 位置的 Segment 是否为null
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
// 使用CAS 赋值,只会成功一次
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}

put的流程:

  1. 计算要 put 的 key 的位置, 获取指定位置的 Segment .

  2. 如果指定位置的 Segment 为空, 则初始化这个 Segment

    初始化 Segment 流程 :

    1. 检查计算得到的位置的 Segment 是否为 null.
    2. 为 null 继续初始化, 使用 Segment[0] 的容量和负载因子创建一个 HashEntry 数组
    3. 再次检查计算得到的指定位置的 Segment 是否为 null
    4. 使用创建的 HashEntry 数组初始化这个 Segment.
    5. 自旋判断计算得到的指定位置的 Segment 是否为 null,使用 CAS 在这个位置赋值为 Segment.
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 获取 ReentrantLock 独占锁,获取不到,scanAndLockForPut 获取。
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
// 计算要put的数据位置
int index = (tab.length - 1) & hash;
// CAS 获取 index 坐标的值
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
// 检查是否 key 已经存在,如果存在,则遍历链表寻找位置,找到后替换 value
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
// first 有值没说明 index 位置已经有值了,有冲突,链表头插法。
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 容量大于扩容阀值,小于最大容量,进行扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
// index 位置赋值 node,node 可能是一个元素,也可能是一个链表的表头
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}

由于 Segment 继承了 ReentrantLock , 所以 Segment 内部可以很方便的获取锁, put 流程就用到了这个功能

  1. tryLock() 获取锁, 获取不到使用 scanAndLockForPut 方法继续获取

  2. 计算 put 的数据要放入的 index 位置, 然后获取这个位置上的 HashEntry

  3. 遍历 put 新元素, 为什么要遍历?因为这里获取的 HashEntry 可能是一个空元素, 也可能是链表已存在, 所以要区别对待

    如果这个位置上的 HashEntry 不存在

    1. 如果当前容量大于扩容阀值, 小于最大容量, 进行扩容
    2. 直接头插法插入

    如果这个位置上的 HashEntry 存在

    1. 判断链表当前元素 key 和 hash 值是否和要 put 的 key 和 hash 值一致. 一致则替换值
    2. 不一致, 获取链表下一个节点, 直到发现相同进行值替换, 或者链表表里完毕没有相同的
      1. 如果当前容量大于扩容阀值, 小于最大容量, 进行扩容
      2. 直接链表头插法插入
  4. 如果要插入的位置之前已经存在, 替换后返回旧值, 否则返回 null

这里面的第一步中的 scanAndLockForPut 操作这里没有介绍,这个方法做的操作就是不断的自旋 tryLock() 获取锁。当自旋次数大于指定次数时,使用 lock() 阻塞获取锁。在自旋时顺表获取下 hash 位置的 HashEntry

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
// 自旋获取锁
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
// 自旋达到指定次数后,阻塞等到只到获取到锁
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}

扩容rehash

private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
// 老容量
int oldCapacity = oldTable.length;
// 新容量,扩大两倍
int newCapacity = oldCapacity << 1;
// 新的扩容阀值
threshold = (int)(newCapacity * loadFactor);
// 创建新的数组
HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];
// 新的掩码,默认2扩容后是4,-1是3,二进制就是11。
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
// 遍历老数组
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 计算新的位置,新的位置只可能是不便或者是老的位置+老的容量。
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
// 如果当前位置还不是链表,只是一个元素,直接赋值
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
// 如果是链表了
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
// 新的位置只可能是不便或者是老的位置+老的容量。
// 遍历结束后,lastRun 后面的元素位置都是相同的
for (HashEntry<K,V> last = next; last != null; last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// ,lastRun 后面的元素位置都是相同的,直接作为链表赋值到新位置。
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
// 遍历剩余元素,头插法到指定 k 位置。
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 头插法插入新的节点
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}

get

  1. 计算 key 的存放位置
  2. 遍历指定位置查找相同 key 的 value值
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 计算得到 key 的存放位置
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
// 如果是链表,遍历查找到相同 key 的 value。
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}

ConcurrentHashMap 1.8

Java8之后, ConcurrentHashMap 不再是原来的 Segment 数组 + HashEntry 数组 + 链表, 而是 Node 数组 + 链表/红黑树

初始化

/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 如果 sizeCtl < 0 ,说明另外的线程执行CAS 成功,正在进行初始化。
if ((sc = sizeCtl) < 0)
// 让出 CPU 使用权
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

ConcurrentHashMap的初始化是通过 自旋CAS 操作完成的

put

public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key 和 value 不能为空
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
// f = 目标位置元素
Node<K,V> f; int n, i, fh;// fh 后面存放目标位置的元素 hash 值
if (tab == null || (n = tab.length) == 0)
// 数组桶为空,初始化数组桶(自旋+CAS)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 桶内为空,CAS 放入,不加锁,成功了就直接 break 跳出
if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 使用 synchronized 加锁加入节点
synchronized (f) {
if (tabAt(tab, i) == f) {
// 说明是链表
if (fh >= 0) {
binCount = 1;
// 循环加入新的或者覆盖节点
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
// 红黑树
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

流程:

  1. 根据 key 计算出 hashcode
  2. 判断是否需要初始化
  3. 根据当前 key 定位到的 Node, 如果为空表示当前位置可以写入数据, 利用 CAS 尝试写入, 失败则自旋保证成功
  4. 如果当前位置的 hashcode == MOVED == -1 , 则需要扩容
  5. 如果都不满足, 则利用 synchronized 写入数据
  6. 如果数量大于 TREEIFY_THRESHOLD 则要执行树化方法

get

public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// key 所在的 hash 位置
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果指定位置元素存在,头结点hash值相同
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
// key hash 值相等,key值相同,直接返回元素 value
return e.val;
}
else if (eh < 0)
// 头结点hash值小于0,说明正在扩容或者是红黑树,find查找
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
// 是链表,遍历查找
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

流程:

  1. 根据hash值计算位置
  2. 查找到指定位置, 如果头节点就是要找的节点, 直接返回它的 value
  3. 如果头节点hash值小于0, 说明正在扩容或者是红黑树, 直接查找
  4. 如果是链表, 遍历查找

ConcurrentHashMap 和 Hashtable 的区别

  • 底层数据结构 : JDK1.7的 ConcurrentHashMap底层采用 Segment数组 + 链表实现, JDK1.8之后采用的数据结构和 HashMap一样, 数组 + 链表 / 红黑树. Hashtable 采用的数据结构是数组 + 链表
  • 实现线程安全的方式 :
    • JDK1.7时, ConcurrentHashMap对整个桶数组进行了分割分段, 每一把锁只锁容器其中一部分数据, 多线程访问容器中不同数据段的数据, 就不会存在锁竞争, 提高并发访问率
    • 在JDK1.8之后, ConcurrentHashMap摒弃了 Segment 的概念, 直接用 Node 数组 + 链表 + 红黑树的数据结构来实现, 并发控制用 synchronized 和 CAS 来操作
    • Hashtable 使用 synchronized 来保证线程安全, 效率非常底下. 当一个线程访问同步方法时, 其他线程也访问同步方法, 可能会进入阻塞或轮询状态, 如果使用 put 添加元素, 另一个线程不能使用put 添加元素, 也不能使用 get, 竞争会越来越激烈效率越低

ConcurrentHashMap 为什么 key 和 value 不能为 null

ConcurrentHashMap 的 key 和 value 不能为 null 主要是为了避免二义性. null 是一个特殊的值, 表示没有对象或没有引用. 如果你用 null 作为键, 那么你就无法区分这个键是否存在于 ConcurrentHashMap 中, 还是根本没有这个键. 同样, 如果你用 null 作为值, 那么你就无法区分这个值是否是真正存储在 ConcurrentHashMap 中的, 还是因为找不到对应的键而返回的

ConcurrentHashMap 能保证复合操作的原子性吗

答案是不能

复合操作是指多个基本操作(如 put, get, remove, containsKey 等) 组合的操作

如何保证 ConcurrentHashMap 复合操作的原子性?

ConcurrentHashMap 提供了一些原子性的复合操作, 如 putIfAbsentcomputecomputeIfAbsentcomputeIfPresentmerge等. 这些方法都可以接受一个函数作为参数, 根据给定的 key 和 value 来计算一个新的 value, 并且将其更新到 map 中