Zayton Squid约 2977 字大约 10 分钟

JDK1.7的线程安全问题

JDK7版本的HashMap底层采用数组加链表的形式存储元素,假设需要存储的键值对通过计算发现存放的位置已经有元素了,那么HashMap就会用头插法将新节点插入到这个位置。

JDK7HashMap头插法
JDK7HashMap头插法

这一点我们可以从put方法去验证,它会根据key计算获得元素的存放位置,如果位置为空则直接调用addEntry插入,如果不为空,则需要判断该位置的数组是否存在一样的key。如果存在key一致则覆盖并返回,若遍历当前索引的整个链表都不存在一致的key则通过头插法将元素添加至链表首部。

public V put(K key, V value) {
    //判断是否是空表
    if (table == EMPTY_TABLE) {
        //初始化
        inflateTable(threshold);
    }
    //判断是否是空值
    if (key == null)
        return putForNullKey(value);
    int hash = hash(key);
    int i = indexFor(hash, table.length);
     //得到元素要存储的位置table[i],如果位置不为空则进行key比对,若一样则进行覆盖操作并返回,反之继续向后遍历,直到走到链表尽头为止
    for (Entry<K,V> e = table[i]; e != null; e = e.next) {
        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
            V oldValue = e.value;
            e.value = value;
            e.recordAccess(this);
            return oldValue;
        }
    }
    modCount++;
    //封装所需参数,准备添加
    addEntry(hash, key, value, i);
    return null;
}

addEntry方法是完成元素插入的具体实现,它会判断数组是否需要扩容,如果不需要扩容则直接调用createEntry,如果需要扩容,会将容量翻倍,然后调用createEntry通过头插法将元素插入。

void addEntry(int hash, K key, V value, int bucketIndex) {
    //判断是否需要扩容
    if ((size >= threshold) && (null != table[bucketIndex])) 
        //扩容
        resize(2 * table.length);
        //重新计算hash值
        hash = (null != key) ? hash(key) : 0;
        //计算所要插入的桶的索引值
        bucketIndex = indexFor(hash, table.length);
    }
    //使用头插法将节点插入
    createEntry(hash, key, value, bucketIndex);
}

那么当HashMap扩容的时候,它具体会如何实现呢?且看下文源码分析

void resize(int newCapacity) {
        Entry[] oldTable = table;
        int oldCapacity = oldTable.length;
        
        if (oldCapacity == MAXIMUM_CAPACITY) {
            threshold = Integer.MAX_VALUE;
            return;
        }

		//创建新的容器
        Entry[] newTable = new Entry[newCapacity];
        //将旧的容器的元素转移到新数组中
        transfer(newTable, initHashSeedAsNeeded(newCapacity));
        table = newTable;
        threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
    }

可以看出它会根据newCapactity创建出一个新的容器newTable,然后将原数组的元素通过transfer方法转移到新的容器中。接下来我们看看transafer的源码:

void transfer(Entry[] newTable, boolean rehash) {
        int newCapacity = newTable.length;
        for (Entry<K,V> e : table) {
            while(null != e) {
            //记录要被转移到新数组的e节点的后继节点
                Entry<K,V> next = e.next;
                if (rehash) {
                    e.hash = null == e.key ? 0 : hash(e.key);
                }
                //计算e节点要存放的新位置i
                int i = indexFor(e.hash, newCapacity);
                //e的next指针指向i位置的节点
                e.next = newTable[i];
                //i位置的指针指向e
                newTable[i] = e;
                //e指向后继,进行下一次循环转移操作
                e = next;
            }
        }
    }

那么通过源码了解整体过程之后,接下来我们来聊聊今日主题,JDK1.7中HashMap在多线程中容易出现死循环,下面我们从这段代码分析死循环的情况。

public class HashMapDeadCycle {

    public static void main(String[] args) {
        HashMapThread thread0 = new HashMapThread();
        HashMapThread thread1 = new HashMapThread();
        HashMapThread thread2 = new HashMapThread();
        HashMapThread thread3 = new HashMapThread();
        HashMapThread thread4 = new HashMapThread();
        thread0.start();
        thread1.start();
        thread2.start();
        thread3.start();
        thread4.start();
    }
}

class HashMapThread extends Thread {
    private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger();
    private static final Map<Integer, Integer> MAP = new HashMap<>();

    private static final Integer SIZE = 1000_000;

    @Override
    public void run() {
        while (ATOMIC_INTEGER.get() < SIZE) {
            MAP.put(ATOMIC_INTEGER.get(), ATOMIC_INTEGER.get());
            ATOMIC_INTEGER.incrementAndGet();
        }
    }
}

上诉代码就是开启多个线程不断地进行put操作,然后AtomicInteger和HashMap全局共享,运行几次后就会出现死循环。 运行过程中可能还会出现数组越界的情况

数组越界
数组越界

当出现死循环后我们可以通过jpsjstack命令来分析死循环的情况。

JDK7HashMap死循环堆栈信息
JDK7HashMap死循环堆栈信息

在上图中我们从堆栈信息可以看到死循环是发生在HashMap的resize方法中,根源在transfer方法中。transfer方法在对table进行扩容到newTable后,需要将原来数据转移到newTable中,使用的是头插法,也就是链表的顺序会翻转,这里也是形成死循环的关键点。我们不妨通过画图的方式来了解一下这个过程。 我们假设map的sieze=2,我们插入下一个元素到0索引位置时发现,0索引位置的元素个数已经等于2,此时触发扩容。

JDK7HashMap初始Map
JDK7HashMap初始Map

于是创建了一个两倍大小的新数组

JDK7扩容
JDK7扩容

在迁移到新容器前,会使用e和next两个指针指向旧容器的元素

e和next指针
e和next指针

此时经过哈希计算,旧容器中索引0位置的元素存到新容器中的索引3上。e的next指向新容器中的索引i位置上,由于是第一次插入,newTable[i]实际上等于NULL。因为有next指向,所以当e指向的元素插入到新数组中时指向消失,next指向的元素不会被垃圾清除。

JDK7HashMap头指针指向
JDK7HashMap头指针指向

此时新数组i索引位置的指针指向e,此时逻辑上e已经存在到新数组中。

  newTable[i] = e
newTable[i] = e

此时e指向next,然后准备下一次的插入

 e = next
e = next

因为当前e没有后继节点,故而next指向null;此时当前e节点经过计算,位置也是在3索引,所以next域指向3索引头节点

index=3
index=3

此时新数组i索引位置的指针指向当前的e,完成迁移,此时循环发现next为null结束本次循环,继而迁移旧容器的其他索引位置的节点。

next=null2
next=null2

上诉就是单线程情况正常扩容的一个流程,但是在多线程情况下会出现什么呢,我们这里简化假设只有两个线程同时执行操作。 未resize前的数据结构如下:

未resize
未resize

我们假设线程A,执行到Entry<K,V> next = e.next;时线程被挂起,此时线程A的新容器和旧容器如下图所示:

线程A的新旧容器
线程A的新旧容器

线程A挂起后,此时线程B正常执行,并完成resize操作,结果如下:

完成resize
完成resize

此时切换到线程A,在线程A挂起时内存中值如下:e指向3,next指向7,此时结果如下:

e指向3并next指向7 接下来我们不妨按照代码逻辑继续往下看,首先e的next域指向头节点,此时3的next指针指向7,可以看到此时7和3构成了一个环。

7和3形成闭环 我们接着往下看,执行 newTable[i] = e;代码,此时将3插入到7前面;

3插入到7前面
3插入到7前面

然后e指向next,而next为7,再次循环,此时e.next=3,而在上次循环中3.next=7,出现环形链表,构成一个死循环,最终导致CPU100。

死循环
死循环

JDK1.8的线程安全问题

JDK1.8中对HashMap进行了优化,发生hash碰撞时不再采用头插法,而是使用尾插法,因此不会出现环形链表的情况,但是JDK1.8就安全了吗? 我们来看看JDK1.8中的put操作代码,整体逻辑大概可以分为四个分支:

  1. 如果没有hash碰撞,则直接插入元素
  2. 如果算出来索引位置有值且转成红黑树则调用插入红黑树节点的方法完成插入
  3. 如果算出来索引位置有值且转为链表则遍历链表将节点插入到末端。
  4. 如果key已存在则覆盖原有的value
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                   boolean evict) {
        Node<K,V>[] tab; Node<K,V> p; int n, i;
        if ((tab = table) == null || (n = tab.length) == 0)
            n = (tab = resize()).length;
            //如果没有hash碰撞,则直接插入元素
        if ((p = tab[i = (n - 1) & hash]) == null)
            tab[i] = newNode(hash, key, value, null);
        else {
            Node<K,V> e; K k;
            if (p.hash == hash &&
                ((k = p.key) == key || (key != null && key.equals(k))))
                e = p;
                //如果算出来索引位置有值且转成红黑树则调用插入红黑树节点的方法完成插入
            else if (p instanceof TreeNode)
                e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
            else {
            //如果算出来索引位置有值且是链表则遍历链表,将节点追加到末端
                for (int binCount = 0; ; ++binCount) {
                    if ((e = p.next) == null) {
                        p.next = newNode(hash, key, value, null);
                        if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                            treeifyBin(tab, hash);
                        break;
                    }
                    //如果key已存在则覆盖原有的value
                    if (e.hash == hash &&
                        ((k = e.key) == key || (key != null && key.equals(k))))
                        break;
                    p = e;
                }
            }
            if (e != null) { // existing mapping for key
                V oldValue = e.value;
                if (!onlyIfAbsent || oldValue == null)
                    e.value = value;
                afterNodeAccess(e);
                return oldValue;
            }
        }
        ++modCount;
        if (++size > threshold)
            resize();
        afterNodeInsertion(evict);
        return null;
    }

其中这段代码不难看出有点问题

		//如果没有hash碰撞,则直接插入元素
        if ((p = tab[i = (n - 1) & hash]) == null)
            tab[i] = newNode(hash, key, value, null);

假设有两个线程A、B都在进行put操作,并且算出来的插入下标一致,当线程A执行完上面这段代码时时间片耗尽被挂起,此线程B抢到时间片完成插入元素,然后线程A重新获得时间片继续往下执行代码,直接插入,这就会导致线程B插入的数据被覆盖,从而线程不安全。接下来我们画图加深理解。 1.线程A执行到 if ((p = tab[i = (n - 1) & hash]) == null),判断到索引2为空后被挂起。

索引2为空
索引2为空

2.线程B判断到索引也是2且为空后执行完代码直接插入。

结束线程
结束线程

3.线程1被唤醒执行后续逻辑,这就会导致线程2的key被覆盖。

线程2key被覆盖
线程2key被覆盖

那么下面我们从这段代码验证一下键值对覆盖问题,创建一个size为2的map,然后设置两个线程A、B往map同一个索引位置插入数据。

public class DeadCycle {
    private static final HashMap<String, String> MAP = new HashMap<>(2, 1.5f);
    public static void main(String[] args) throws InterruptedException {

        CountDownLatch countDownLatch = new CountDownLatch(2);

        new Thread(() -> {
            MAP.put("3", "zayton");
            countDownLatch.countDown();
        }, "t1").start();

        new Thread(() -> {
            MAP.put("5", "squid");
            countDownLatch.countDown();
        }, "t2").start();
        
        countDownLatch.await();

        System.out.println(MAP.get("3"));
        System.out.println(MAP.get("5"));
        
    }
}

在put方法中的if ((p = tab[i = (n - 1) & hash]) == null)处打上断点,然后调试模式设置为thread,设置条件

"t1".equals(Thread.currentThread().getName())||"t2".equals(Thread.currentThread().getName()) 
断点1
断点1

然后启动程序,当t1完成判断,准备创建节点时将线程切换成t2。

断点2
断点2

可以看到t2将(5,squid)键值对准备放入数组中,然后我们放行代码。

断点3
断点3

此时线程自动切换成t1,t1再上面已经完成判断认为当前索引位置的数组为null,所有在这里可以看到t2插入的键值对被覆盖成了(3,zayton)

断点4
断点4

此时放行代码,然后可以看出map.get("5")为null,即验证了hashMap在多线程情况下会出现索引覆盖问题。

索引覆盖
索引覆盖

参考文献

面试官:HashMap 为什么线程不安全?open in new window

大厂常问的HashMap线程安全问题,看这一篇就够了!open in new window