温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

ConcurrentHashMap 源码浅析 1.7

发布时间:2020-06-25 19:59:11 来源:网络 阅读:4438 作者:wx5c78c8b1dbb1b 栏目:编程语言
  1. 简介

    (1) 背景
    HashMap死循环:HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry.
    HashTable效率低下:HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下.因为当一个线程访问HashTable的同步方法,其它线程也访问HashTable的同步方法时,会进入阻塞或轮询状态.如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法获取元素,所以竞争越激烈效率越低.
    (2) 简介
    HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争一把锁,假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么多线程访问容器里不同的数据段时,线程间不会存在竞争,从而可以有效提高并发访问效率,这就是ConcurrentHash所使用的锁分段技术.首先将数据分成一段一段地储存,然后给每一段配一把锁,当一个线程占用锁访问其中一段数据时,其它段的数据也能被其它线程访问.

  2. 结构

    ConcurrentHashMap是由Segments数组结构和HashEntry数组结构组成.Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的色;HashEntry则用于存储键值对数据.一个ConcurrentHashMap里包含一个Segment组.Segment的结构和HashMap类似,是一种数组加链表的结构.一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护者一个HashEntry数组里面的元素,当对HashEntry数组的数据进行修改时,必须先获得与它对应的Segment锁,如下图所示.

    ConcurrentHashMap 源码浅析 1.7

  3. 基本成员
    default_initial_capacitymap默认容量,必须是2的冥

    /**
     * 默认的初始容量 16
     */
    static final int DEFAULT_INITIAL_CAPACITY = 16;

    default_load_factor默认负载因子(存储的比例)

    /**
     * 默认的负载因子
     */
    static final float DEFAULT_LOAD_FACTOR = 0.75f;

    default_concurrency_level默认并发数量,segments数组量(ps:初始化后不能修改)

    /**
     * 默认的并发数量,会影响segments数组的长度(初始化后不能修改)
     */
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    maximum_capacitymap最大容量

    /**
     * 最大容量,构造ConcurrentHashMap时指定的值超过,就用该值替换
     * ConcurrentHashMap大小必须是2^n,且小于等于2^30
     */
    static final int MAXIMUM_CAPACITY = 1 << 30;

    min_segment_table_capacityHashEntry[]默认容量

    /**
     * 每个segment中table数组的长度,必须是2^n,至少为2
     */
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

    max_segments最大并发数,segments数组最大量

    /**
     * 允许最大segment数量,用于限定concurrencyLevel的边界,必须是2^n
     */
    static final int MAX_SEGMENTS = 1 << 16;

    retries_before_lock重试次数,在加锁之前

    /**
     * 非锁定情况下调用size和contains方法的重试次数,避免由于table连续被修改导致无限重试
     */
    static final int RETRIES_BEFORE_LOCK = 2;

    segmentMask计算segment位置的掩码(segments.length-1)

    /**
     * 用于segment的掩码值,用于与hash的高位进行取&
     */
    final int segmentMask;

    segmentShift

    /**
     * 用于算segment位置时,hash参与运算的位数
     */
    final int segmentShift;

    segmentssegment数组

    /**
     * segments数组
     */
    final Segment<K,V>[] segments;

    HashEntry存储数据的链式结构

    static final class HashEntry<K,V> {
        // hash值
        final int hash;
        // key
        final K key;
        // 保证内存可见性,每次从内存中获取
        volatile V value;
        volatile HashEntry<K,V> next;
    
        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }
    
        /**
         * 使用volatile语义写入next,保证可见性
         */
        final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }

    Segment继承ReentrantLock锁,用于存放HashEntry[]

    static final class Segment<K,V> extends ReentrantLock implements Serializable {
        private static final long serialVersionUID = 2249069246763182397L;
    
        /**
         * 对segment加锁时,在阻塞之前自旋的次数
         *
         */
        static final int MAX_SCAN_RETRIES =
                Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
    
        /**
         * 每个segment的HashEntry table数组,访问数组元素可以通过entryAt/setEntryAt提供的volatile语义来完成
         * volatile保证可见性
         */
        transient volatile HashEntry<K,V>[] table;
    
        /**
         * 元素的数量,只能在锁中或者其他保证volatile可见性之间进行访问
         */
        transient int count;
    
        /**
         * 当前segment中可变操作发生的次数,put,remove等,可能会溢出32位
         * 它为chm isEmpty() 和size()方法中的稳定性检查提供了足够的准确性.
         * 只能在锁中或其他volatile读保证可见性之间进行访问
         */
        transient int modCount;
    
        /**
         * 当table大小超过阈值时,对table进行扩容,值为(int)(capacity *loadFactor)
         */
        transient int threshold;
    
        /**
         * 负载因子
         */
        final float loadFactor;
    
        /**
         * 构造方法
         */
        Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
        }
  4. 构造方法
    有参构造
    /**
     * ConcurrentHashMap 构造方法
     * @param initialCapacity 初始化容量
     * @param loadFactor 负载因子
     * @param concurrencyLevel 并发segment,segments数组的长度
     */
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        // 大于最大segments容量,取最大容量
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        // 2^sshift = ssize 例如:sshift = 4,ssize = 16
        // 根据concurrencyLevel计算出ssize为segments数组的长度
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) { // 第一次 满足
            ++sshift;  // 第一次 1
            ssize <<= 1; // 第一次 ssize = ssize << 1 (1 * 2^1)
        }
        // segmentShift和segmentMask的定义
        this.segmentShift = 32 - sshift; // 用于计算hash参与运算位数
        this.segmentMask = ssize - 1; // segments位置范围
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        // 计算每个segment中table的容量
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        // HashEntry[]默认 容量
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        // 确保cap是2^n
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        // 创建segments并初始化第一个segment数组,其余的segment延迟初始化
        Segment<K,V> s0 =
                new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                        (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

    无参构造使用默认参数
    public ConcurrentHashMap() {
    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

  5. 基本方法


    一些UNSAFE方法
    HashEntry
    setNext

    /**
         * 使用volatile语义写入next,保证可见性
         */
        final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }

    entryAt get HashEntry

    /**
     * 获取给定table的第i个元素,使用volatile读语义
     */
    static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
        return (tab == null) ? null :
                (HashEntry<K,V>) UNSAFE.getObjectVolatile
                        (tab, ((long)i << TSHIFT) + TBASE);
    }

    setEntryAt set HashEntry

    /**
     * 设置给定的table的第i个元素,使用volatile写语义
     */
    static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
                                       HashEntry<K,V> e) {
        UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
    }

    put 插入元素
    执行流程分析
    (1) map的put方法就做了三件事情,找出segments的位置;判断当前位置有没有初始化,没有就调用ensureSegment()方法初始化;然后调用segment的put方法.
    (2) segment的put方法.,获取当前segment的锁,成功接着执行,失败调用scanAndLockForPut方法自旋获取锁,成功后也是接着往下执行.
    (3) 通过hash计算出位置,获取节点,找出相同的key和hash替换value,返回.没有找到相同的,设置找出的节点为当前创建节点的next节点,设置创建节点前,判断是否需要扩容,需要调用扩容方法rehash();不需要,设置节点,返回,释放锁.

    /**
     * map的put方法,定位segment
     */
    public V put(K key, V value) {
        Segment<K,V> s;
        // value不能为空
        if (value == null)
            throw new NullPointerException();
        // 获取hash
        int hash = hash(key);
        // 定位segments 数组的位置
        int j = (hash >>> segmentShift) & segmentMask;
        // 获取这个segment
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
                (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            // 为null 初始化当前位置的segment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }
        /**
         * put到table方法
         */
        final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            // 是否获取锁,失败自旋获取锁(直到成功)
            HashEntry<K,V> node = tryLock() ? null :
                    scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                // 定义位置
                int index = (tab.length - 1) & hash;
                // 获取第一个桶的第一个元素
                // entryAt 底层调用getObjectVolatile 具有volatile读语义
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) { // 证明链式结构有数据 遍历节点数据替换,直到e=null
                        K k;
                        if ((k = e.key) == key ||
                                (e.hash == hash && key.equals(k))) { //  找到了相同的key
                            oldValue = e.value;
                            if (!onlyIfAbsent) { // 默认值false
                                e.value = value; // 替换value
                                ++modCount;
                            }
                            break; // 结束循环
                        }
                        e = e.next;
                    }
                    else { // e=null (1) 之前没有数据 (2) 没有找到替换的元素
                        // node是否为空,这个获取锁的是有关系的
                        // (1) node不为null,设置node的next为first
                        // (2) node为null,创建头节点,指定next为first
                        if (node != null)
                            // 底层使用 putOrderedObject 方法 具有volatile写语义
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        // 扩容条件 (1)entry数量大于阈值 (2) 当前table的数量小于最大容量  满足以上条件就扩容
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            // 扩容方法,方法里面具体讲
                            rehash(node);
                        else
                            // 给table的index位置设置为node,
                            // node为头结点,原来的头结点first为node的next节点
                            // 底层也是调用的 putOrderedObject 方法 具有volatile写语义
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

    解释下(hash >>> segmentShift) & segmentMask定位segment位置(个人理解)
    ConcurrentHashMap 源码浅析 1.7
    ensureSegment初始化segment方法
    执行流程
    (1) 计算位置,使用UNSAFE的方法判断当前位置有没有初始化,然后使用segmets[0]的模板创建一个新的HashEntry[],再次判断当前位置有没有初始化,可能存在多线程同时初始化,然后创建一个新的segment,最后使用自旋cas设置新的segment的位置,保证只有一个线程初始化成功.

    /**
     *
     * @param k 位置
     * @return segments
     */
    private Segment<K,V> ensureSegment(int k) {
        final Segment<K,V>[] ss = this.segments;  // 当前的segments数组
        long u = (k << SSHIFT) + SBASE; // raw offset // 计算原始偏移量,在segments数组的位置
        Segment<K,V> seg;
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 判断没有被初始化
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype // 获取第一个segment ss[0]
            // 这就是为什么要在初始化化map时要初始化一个segment,需要用cap和loadFactoe 为模板
            int cap = proto.table.length; // 容量
            float lf = proto.loadFactor; // 负载因子
            int threshold = (int)(cap * lf); // 阈值
            // 初始化ss[k] 内部的tab数组 // recheck
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            // 再次检查这个ss[k]  有没有被初始化
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                    == null) { // recheck
                // 创建一个Segment
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                // 这里用自旋CAS来保证把segments数组的u位置设置为s
                // 万一有多线程执行到这一步,只有一个成功,break
                // getObjectVolatile 保证了读的可见性,所以一旦有一个线程初始化了,那么就结束自旋
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                        == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }

    scanAndLockForPut自旋获取锁方法
    具体流程看代码注释,解释下这个方法的优化(看的某位大佬的博客)
    (1) 我们在put方法获取锁失败,才会进入这个方法,这个方法采用自旋获取锁,直到成功才返回,但是使用了自旋次数的限制,这么做的好处是什么了,就是竞争太激烈的话,这个线程可能一直获取不到锁,自旋也是消耗cpu性能的,所以当达到自旋次数时,就阻塞当前线程,直到有线程释放了锁,通知这些线程.在等待过程中是不消耗cpu的.
    (2) 当我们进入这个方法时,说明获取锁失败,那么可别是别的线程在对这个segment进行修改操作,所以说如果别的线程在操作之后,我们自己的工作内存中的数据可能已经不是最新的了,这个时候我们使用具有volatile语义的方法重新读了数据,在自旋过程中遍历这些数据,把最新的数据缓存在工作内存中,当前线程再次获取锁时,我们的数据是最新的,就不用重新去住内存中获取,这样在自旋获取的锁的过程中就预热了这些数据,在获取锁之后的执行中就提升了效率.

    private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash); // 根据hash获取头结点
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // 是为了找到对应hash桶,遍历链表时找到就停止
            while (!tryLock()) { // 尝试获取锁,成功就返回,失败就开始自旋
                HashEntry<K,V> f; // to recheck first below
                if (retries < 0) {
                    if (e == null) {  // 结束遍历节点
                        if (node == null) // 创造新的节点
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0; // 结束遍历
                    }
                    else if (key.equals(e.key)) // 找到节点 停止遍历
                        retries = 0;
                    else
                        e = e.next; // 下一个节点 直到为null
                }
                else if (++retries > MAX_SCAN_RETRIES) { // 达到自旋的最大次数
                    lock(); // 进入加锁方法,失败进入队列,阻塞当前线程
                    break;
                }
                else if ((retries & 1) == 0 &&
                        (f = entryForHash(this, hash)) != first) {
                    e = first = f; // 头结点变化,需要重新遍历,说明有新的节点加入或者移除
                    retries = -1;
                }
            }
            return node;
        }

    rehash扩容方法
    解释下节点位置变化这一块的处理,如下图所示.
    ConcurrentHashMap 源码浅析 1.7

    /**
         *扩容方法
         */
        private void rehash(HashEntry<K,V> node) {
    
            // 旧的table
            HashEntry<K,V>[] oldTable = table;
            // 旧的table的长度
            int oldCapacity = oldTable.length;
            // 扩容原来capacity的一倍
            int newCapacity = oldCapacity << 1;
            // 新的阈值
            threshold = (int)(newCapacity * loadFactor);
            // 新的table
            HashEntry<K,V>[] newTable =
                    (HashEntry<K,V>[]) new HashEntry[newCapacity];
            // 新的掩码
            int sizeMask = newCapacity - 1;
            // 遍历旧的table
            for (int i = 0; i < oldCapacity ; i++) {
                // table中的每一个链表元素
                HashEntry<K,V> e = oldTable[i];
                if (e != null) { // e不等于null
                    HashEntry<K,V> next = e.next; // 下一个元素
                    int idx = e.hash & sizeMask;  // 重新计算位置,计算在新的table的位置
                    if (next == null)   //  Single node on list 证明只有一个元素
                        newTable[idx] = e; // 把当前的e设置给新的table
                    else { // Reuse consecutive sequence at same slot
                        HashEntry<K,V> lastRun = e; // 当前e
                        int lastIdx = idx;          // 在新table的位置
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) { // 遍历链表
                            int k = last.hash & sizeMask; // 确定在新table的位置
                            if (k != lastIdx) { // 头结点和头结点的next元素的节点发生了变化
                                lastIdx = k;    // 记录变化位置
                                lastRun = last; // 记录变化节点
                            }
                        }
                        // 以下把链表设置到新table分为两种情况
                        // (1) lastRun 和 lastIdx 没有发生变化,也就是整个链表的每个元素位置和一样,都没有发生变化
                        // (2) lastRun 和 lastIdx 发生了变化,记录变化位置和变化节点,然后把变化的这个节点设置到新table
                        //     ,但是整个链表的位置只有变化节点和它后面关联的节点是对的
                        //      下面的这个遍历就是处理这个问题,遍历当前头节点e,找出不等于变化节点(lastRun)的节点重新处理
                        newTable[lastIdx] = lastRun;
                        // Clone remaining nodes
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            // 处理扩容时那个添加的节点
    
            // 计算位置
            int nodeIndex = node.hash & sizeMask; // add the new node
            // 设置next节点,此时已经扩容完成,要从新table里面去当前位置的头结点为next节点
            node.setNext(newTable[nodeIndex]);
            // 设置位置
            newTable[nodeIndex] = node;
            // 新table替换旧的table
            table = newTable;
        }

    get 没有加锁,效率高
    注意:get方法使用了getObjectVolatile方法读取segment和hashentry,保证是最新的,具有锁的语义,可见性
    分析:为什么get不加锁可以保证线程安全
    (1) 首先获取value,我们要先定位到segment,使用了UNSAFE的getObjectVolatile具有读的volatile语义,也就表示在多线程情况下,我们依旧能获取最新的segment.
    (2) 获取hashentry[],由于table是每个segment内部的成员变量,使用volatile修饰的,所以我们也能获取最新的table.
    (3) 然后我们获取具体的hashentry,也时使用了UNSAFE的getObjectVolatile具有读的volatile语义,然后遍历查找返回.
    (4) 总结我们发现怎个get过程中使用了大量的volatile关键字,其实就是保证了可见性(加锁也可以,但是降低了性能),get只是读取操作,所以我们只需要保证读取的是最新的数据即可.

    /**
     * get 方法
     */
    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // 获取segment的位置
        // getObjectVolatile getObjectVolatile语义读取最新的segment,获取table
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
                (tab = s.table) != null) {
            // getObjectVolatile getObjectVolatile语义读取最新的hashEntry,并遍历
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                    (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                // 找到相同的key 返回
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

    size 尝试3次不加锁获取sum,如果发生变化就全部加锁,size和containsValue方法的思想也是基本类似.
    执行流程
    (1) 第一次,retries++=0,不满足全部加锁条件,遍历所有的segment,sum就是所有segment的容量,last等于0,第一次不相等,last=sum.
    (2) 第二次,retries++=1,不满足加锁条件,计算所有的segment,sum就是所有的segment的容量,last是上一次的sum,相等结束循环,不相等下次循环.
    (3) 第三次,retries++=2,先运算后赋值,所以此时还是不满足加锁条件和上面一样统计sum,判断这一次的sum和last(上一次的sum)是否相等,相等结束,不相等,下一次循环.
    (4) 第四次,retries++=2,满足加锁条件,给segment全部加锁,这样所有线程就没有办法进行修改操作,统计每个segment的数量求和,然后返回size.(ps:全部加锁提高了size的准确率,但是降低了吞吐量,统计size的过程中如果其它线程进行修改操作这些线程全部自旋或者阻塞).

    /**
     * size
     * @return
     */
    public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // 为true表示size溢出32位
        long sum;         // modCounts的总和
        long last = 0L;   // previous sum
        int retries = -1; // 第一次不计算次数,所以会重试三次
        try {
            for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) { // 重试次数达到3次 对所有segment加锁
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) { // seg不等于空
                        sum += seg.modCount; // 不变化和size一样
                        int c = seg.count; // seg 的size
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                if (sum == last) // 没有变化
                    break;
                last = sum; // 变化,记录这一次的变化值,下次循环时对比.
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

    remove replace和remove都是用了scanAndLock这个方法
    解释下scanAndLock这个方法,和put方法的scanAndLockForPut方法思想类似,都采用了同样的优化手段.

    /**
         * 删除方法
         */
        final V remove(Object key, int hash, Object value) {
            if (!tryLock()) // 获取锁
                scanAndLock(key, hash); // 自旋获取锁
            V oldValue = null;
            try {
                HashEntry<K,V>[] tab = table; // 当前table
                int index = (tab.length - 1) & hash; // 获取位置
                HashEntry<K,V> e = entryAt(tab, index);// 找到元素
                HashEntry<K,V> pred = null;
                while (e != null) {
                    K k;
                    HashEntry<K,V> next = e.next; // 下一个元素
                    if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) { // e的key=传入的key
                        V v = e.value; // 获取value
                        if (value == null || value == v || value.equals(v)) { // 如果value相等
                            if (pred == null) // 说明是头结点,让next节点为头结点即可
                                setEntryAt(tab, index, next);
                            else //  说明不是头结点,就把当前节点e的上一个节点pred的next节点设置为当前e的next节点
                                pred.setNext(next);
                            ++modCount;
                            --count;
                            oldValue = v;
                        }
                        break;
                    }
                    pred = e;
                    e = next;
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

    isEmpty 其实也和size的思想类似,不过这个始终没有加锁,提高了性能
    执行流程
    (1) 第一次遍历就干了一件事,确定map的每个segment是否为0,其中任何一个segment的count不为0,就返回,都为0,就累加modCount为sum.
    (2) 判断sum是否为0,不为0,就代表了map之前是有数据的,被remove和clean了,modCount指的是操作次数,再次确定map的每个segment是否为0,其中任何一个segment的count不为0,就返回,并且累减sum,最后判断sum是否为0,为0就代表没有任何变化,不为0,就代表在第一次统计过程中有线程又添加了元素,所以返回false.但是如果在第二次统计时又发生了变化了,所以这个不是那么的准确,但是不加锁提高了性能,也是一个可以接受的方案.
    ```public boolean isEmpty() {

    long sum = 0L;
    final Segment<K,V>[] segments = this.segments;
    for (int j = 0; j < segments.length; ++j) {
        Segment<K,V> seg = segmentAt(segments, j);
        if (seg != null) {
            if (seg.count != 0)
                return false; // 某一个不为null,立即返回
            sum += seg.modCount;
        }
    }
    // 上面执行完 说明不为空,并且过程可能发生了变化
    // 发生变化
    if (sum != 0L) { // recheck unless no modifications
        for (int j = 0; j < segments.length; ++j) {
            Segment<K,V> seg = segmentAt(segments, j);
            if (seg != null) {
                if (seg.count != 0)
                    return false;
                sum -= seg.modCount;
            }
        }
        if (sum != 0L) // 变化值没有为0,说明不为空
            return false;
    }
    
    // 没有发生变化
    return true;

    }

  6. 总结

    1.7 ConcurrentHashMap 使用了分段锁的思想提高了并发的的访问量,就是使用很多把锁,每一个segment代表了一把锁,每一段只能有一个线程获取锁;但是segment的数量初始化了,就不能修改,所以这也代表了并发的不能修改,这也是1.7的一个局限性.从get方法可以看出使用了UNSAFE的一些方法和volatile关键字来代替锁,提高了并发性.在size和containsValue这些方法提供一种尝试思想,先不加锁尝试统计,如果其中没有变化就返回,有变化接着尝试,达到尝试次数再加锁,这样也避免了立即加锁对并发的影响.

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI