王富贵

Stay hungry,Stay foolish

若你自认理性且正确,那更应该心平气和,言之有据地与人交流,即使不能立刻说服对方,但也至少埋下了一颗种子,冷嘲热讽或居高临下的态度,总会加大人与人之间的隔阂,最终只是在自己的小圈子中制造回声。愿我们最终都能成为乐观且包容的人,愿意耐心听取他人的苦衷和心声。
  menu
69 文章
0 浏览
0 当前访客
ღゝ◡╹)ノ❤️

并发编程之ThreadLocal

1.ThreadLocal

1.1、ThreadLocal简单介绍

ThreadLocal和Synchronized都是为了解决多线程中相同变量的访问冲突问题,不同的点是

  • Synchronized是通过线程等待,牺牲时间来解决访问冲突
  • ThreadLocal是通过每个线程单独一份存储空间,牺牲空间来解决冲突,并且相比于Synchronized,ThreadLocal具有线程隔离的效果,只有在线程内才能获取到对应的值,线程外则不能访问到想要的值
public class ThreadLocalUtils {

    private static final ThreadLocal<String> LOCAL = new ThreadLocal<>();

    //放入用户信息
    public static void put(String value){
        LOCAL.set(value);
    }

    //获取用户信息
    public static String get(){
        return LOCAL.get();
    }

    //移除用户信息
    public static void remove(){
        LOCAL.remove();
    }
}

我们可以通过put来放入变量,通过get来获取变量,同时,一旦我们操作完ThreadLocal,必须remove移除,否则就会内存泄露

1.2、内存泄露

前面我们提到,用完必须remove移除,否则就会内存泄露

那么,为什么呢?

这里需要理解ThreadLocal底层

1.2.1、引用关系

实现代表强引用,虚线代表弱引用

强引用:是使用最普遍的引用,一个对象具有强引用,不会被垃圾回收所回收,当内存空间不足的时候,Java虚拟机宁愿抛出OutOfMemoryError错误,是程序异常终止,也不会回收的对象

弱引用,jvm进行垃圾回收的时候,无论内存是否充足,弱引用对象都会被回收,在java中,用java.lang.ref.WeakReference类表示

1.2.2、内存泄露原理

如图所示:

1

1

我们每一个线程都会维护一个ThreadLocalMapThreadLocalMap 中的Entry中的 key 为 ThreadLocal 的弱引用,而 value 是强引用(实际上是entry键值对被ThreadLocalMap强引用)。所以,如果 ThreadLocal 没有被外部强引用的情况下,在垃圾回收的时候,key 会被清理掉,而 value 不会被清理掉。这样一来,ThreadLocalMap 中就会出现 key 为 null 的 Entry。假如我们不做任何措施的话,value 永远无法被 GC 回收,这个时候就可能会产生内存泄露。

1.3、ThreadLocal使用场景

1.3.1、解决线程安全问题

例:数据库连接问题

我们知道,连接对象为一个变量,用于连接数据库,当我们一个线程使用连接变量去连接数据库的时候,其他变量不能获取这个连接变量来连接数据库,这就是我们所说的线程安全问题。而在这个案例中,我们的连接变量是单例的,通常我们需要给他加锁来解决问题。

同时,我们也可以使用ThreadLocal来解决线程安全。我们知道了,每个线程都会维护一个ThreadLocalMap,所以,如果将连接变量放入ThreadLocal时,那么每个线程,都会创建一个链接变量,这就保证了每个线程都使用的自己的连接变量,保障了线程安全问题。

区别:

我们知道,变量创建需要占用内存

  1. 那么加锁的方式,就是三个萝卜一个坑,一个一个轮流蹲,解决了空间占用,可能排队会消耗时间
  2. 使用ThreadLocal的方式,就是三个萝卜三个坑,萝卜都蹲自己坑,节约了时间,消耗了更大的空间占用

1.3.2、代替参数显式传递

我们知道,在项目中,如果service层要使用controller的变量,需要在方法中进行显式的声明传递

如果我们使用ThreadLocal的话,就可以通过set和get方法来存储,这样就可以隐性的拿到其他方法中的变量

1.3.3、全局存储用户信息

现在的项目中,觉大多是都是前后端分离,其特点就是不用建立长连接,这就表示我们不再使用以前的session来放置用户信息。那么有什么方法可以将用户信息在全局中都可以拿出来呢?

方法一:显式的方法,如果我们使用变量来全局传递,这显然是不好的,我们需要定义大量的用户信息在方法中传递

方式二:使用redis,我们将用户信息存在redis中,并通过解析token中的关键字如用户id,拼接成key,从而从redis中拿到用户信息,这种方式的坏处就是需要使用用户信息的时候我们都需要连接查询redis

方式三:使用ThreadLocal,将用户信息存在ThreadLocal中,我们只要在需要用户信息的地方调用ThreadLocal.get()即可

1.4、深入理解ThreadLocal

掘金:ThreadLocal几个问题

掘金:ThreadLocal底层源码解析

b站:ThreadLocal原理解析

1.4.1、ThreadLocal结构

3

线程里面拥有一个名为threadLocals的对象,对象的类型为ThreadLocalMap,理解如下:

public class Thread{
    //名为threadLocals的对象类型为ThreadLocalMap,每个线程都维护一个这个东西
    ThreadLocalMap threadLocals = new ThreadLocalMap();
}

ThreadLocalMap是一个map对象,里面存着Entry数组,一个Entry也就是一个键值对。ThreadLocal对象是多个线程都能够读取到的对象,并且ThreadLocal也可以是多个。当我们创建ThreadLocal存放东西的时候,一个Entry就对应一个ThreadLocal。

1.4.2、弱引用那些事

我们来看看每个Entry对象

static class Entry extends WeakReference<ThreadLocal<?>> {
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

可以看见Entry是WeakReference的子类,继承了这个对象并使用到了Entry的key上,说明ThreadLocal对象被key弱引用,我们就可以通过ThreadLocal对象获取到当前线程的Entry了。们可以思考一个问题:

"threadlocal的key是弱引用,那么在threadlocal.get()的时候,发生GC之后,key是否是null?"

这个问题晃眼一看,弱引用嘛,还有垃圾回收那肯定是为null,这其实是不对的,因为题目说的是在做threadlocal.get()操作,证明其实还是有强引用存在的。所以key并不为null。如果我们的强引用不存在的话,那么Key就会被回收,但是整个Entry是被当前线程强引用的,虽然Key被回收了,但value就被留下来了,也就出现了内存泄露。

这个内存泄露是永久的么?

答案当然是:不是。因为我们发现了强引用关系在了当前线程上,也就是说,如果线程都不需要使用被回收了,当然线程里面的一部分ThreadLocalMap都被回收了。也就是说,出现了内存泄露,也会随着当前线程的结束,而被回收。

1.4.3、线程泄露优化

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        map.set(this, value);
    } else {
        createMap(t, value);
    }
}

上面是ThreadLocal的set方法,当我们获取到的ThreadLocalMap不为空的时候会执行map.set(this, value);方法,我们看看这个方法。

private void set(ThreadLocal<?> key, Object value) {
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);

    //遍历所有entry
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get();

        if (k == key) {
            e.value = value;
            return;
        }

        //当key为null的时候(null,value),也就是发送内存泄露,替换掉这个值
        if (k == null) {
            replaceStaleEntry(key, value, i);
            return;
        }
    }

    tab[i] = new Entry(key, value);
    int sz = ++size;
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
}

总结:源码优化了可能出现的内存泄露问题,也就是在set的过程中,会遍历所有key,当遇到了内存泄露情况,替换入我们set的值。来回收泄露内存。

1.4.4、ThreadLocal源码分析

1.四个属性
// 源码
static class ThreadLocalMap {
    //初始容量默认为16,必须是2的幂
    private static final int INITIAL_CAPACITY = 16;

    // table每次resized,容量都得是2的幂
    private Entry[] table;

    // 当前table中的存放的元素数量
    private int size = 0;

    // 扩容阀值
    private int threshold; // Default to 0

    /**
	 * ...其他源码...
     */
} 
  1. int INITIAL_CAPACITY:默认初始容量值
  2. Entry[] table:Entry数组,存放ThreadLocal的键值对
  3. int size:存放元素的数量
  4. int threshold:扩容阈值,默认为0
2.Hash冲突

我们知道每个线程维护了一个 ThreadLocalMap,近似于 HashMap ,也是通过哈希对容量取模得到具体会存放到哪个数组地址中去的。但是不同于 HashMap的拉链法和红黑树。ThreadLocalMap 使用的是线性探测的开放地址法去解决 hash 冲突。 当当前 key 存在 hash 冲突,会线性地往后探测直到找到为 null 的位置存入对象,或者找到 key 相同的位置覆盖更新原来的对象。

在这过程中若发现不为空但 key 为 null 的桶(key 过期的 Entry 数据)则启动探测式清理操作。也就是上面提到的内存泄露,会进行全扫描清理所有出现内存泄露的地方。

为了更好的理解ThreadLocalMap,我们定义两个名词,如果Key进行Hash计算之后就插入到了本该插入的位置,我们叫他原生Entry,如果是因为哈希冲突而向后遍历Entry数组,找到Entry为null的地方插入,我们叫他非原生Entry

3.set()原理

ThreadLocalMapset() 方法可以新增或更新数据,大致分为四种情况:

  1. 通过 Hash 计算后位置对应的 Entry 数据为空,直接将数据存入该位置
  2. 通过 Hash 计算后位置对应的 Entry 数据不为空,但他们两个的Key值完全相同(Key值实际上是ThreadLocal,key值相同代表着他们是同一个ThreadLocal),直接将数据覆盖到档期你位置。
  3. 通过 Hash 计算后位置对应的 Entry 数据不为空,但是他们两个的key值不同且Hash值相同(是两个不同ThreadLocal的发生了真正的哈希冲突),向后遍历Entry为空或者key值相同的位置之前,未遇到内存泄露(Key为空,value不为空的情况),存入数据或更新数据。
  4. 通过 Hash 计算后位置对应的 Entry 数据不为空,并且两个ThreadLocal也不相同(发生了真正的Hash冲突),向后遍历插入的时候遇到了内存泄露的情况(Key为空,value不为空的情况),假设当前发生内存泄露的 Entry 下标为 x
    1. 执行 replaceStaleEntry() 方法(替换过期数据),从下标 x 向前遍历,初始化探测式清理开始位置:slotToExpunge = staleSlot = x,进行探测式数据清理。
    2. staleSlot 开始向前遍历查找其他的过期数据,并更新清理过期数据的起始下标 slotToExpunge(遇到 key 为 null 的位置则更新 slotToExpu nge = 当前下标 ),直到遇到 Entry = null 停止向前遍历。
    3. staleSlot 开始向后遍历,直至遇到 Entry = null 或者 key = hash计算后得到的 keyEntry = null:将数据覆盖替换掉staleSlot 位置上的Entry
    4. 在前 3 步的过程中若发现有两个或以上的key = null 则调用cleanSomeSlots(expungeStaleEntry(slotToExpunge), len) 方法清理过期元素。(从 slotToExpunge 开始向后检查并清理过期元素,此时主要是通过 expungeStaleEntry()cleanSomeSlots() 两个方法工作。)

图解:

4

纠错机制:我们定义了一个区间下标,我们在此称之为左下标和右下标,起始位置都是当前Entry下标。

  • 向左找下标位置,直到遇见了Entry为null的情况停止寻找,寻找过程中遇见内存泄露(key为null但value不为null),更新下标。
  • 向右找下标位置,直到遇见了Entry为null或者原生Entry的情况停止寻找,寻找过程中遇见了内存泄露(key为null但value不为null),更新下标。
  • 在这个区间内如果存在两个或两个以上内存泄露情况,执行启发式清理。

我们看看 set() 方法的具体代码,需要注意的是第四种情况封装到了 replaceStaleEntry() 方法中。

// ThreadLocal.ThreadLocalMap.set()方法
private void set(ThreadLocal<?> key, Object value) {
    // 通过 key 计算出当前 key 在散列表对应的位置——i
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);
  
    // 从 i 开始向后遍历,查找找到为空的位置(也就是得到 tab[i]),注意:通过nextIndex()方法,在遍历完散列数组的最后位置后,遍历的下一个位置是 index=0
    /** 
     * private static int nextIndex(int i, int len) {
     *     return ((i + 1 < len) ? i + 1 : 0);
     * }
	*/
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get();
		// 遇到key相同,直接更新覆盖,返回
        if (k == key) {
            e.value = value;
            return;
        }
		// 遍历到到key=null(过期元素),执行replaceStaleEntry(),返回
        if (k == null) {
            replaceStaleEntry(key, value, i);
            return;
        }
    }

    // 在 空位置 存放数据
    tab[i] = new Entry(key, value);
    // size++
    int sz = ++size;
    // 调用boolean cleanSomeSlots()进行启发式清理过期元素
    // 若未清理到任何数据且size超过阈值threshold(len*2/3)则rehash(),rehash()中会先进行探测式清理过期元素,若此时size>=len/2(threshold-threshold/4)则扩容
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
}

第四种处理情况 ThreadLocal.ThreadLocalMap.replaceStaleEntry() 方法代码如下:

// ThreadLocal.ThreadLocalMap.replaceStaleEntry()
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                       int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;
    Entry e;
	// 从staleSlot向前遍历直到遇到Entry=null,期间遇到key=null时更新slotToExpunge
    int slotToExpunge = staleSlot;
    for (int i = prevIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = prevIndex(i, len))
        if (e.get() == null)
            slotToExpunge = i;
	// 从staleSlot向后遍历,直到Entry=null停止
    for (int i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {

        ThreadLocal<?> k = e.get();
		// 遇到key=key
        if (k == key) {
            // 更新该位置Entry并将该位置和staleSlot的Entry交换
            e.value = value;
		
            tab[i] = tab[staleSlot];
            tab[staleSlot] = e;
			// 若此时slotToExpunge=staleSlot,说明向前遍历时没有发现过期元素以及向后遍历也没发现过期元素,此时修改探测式清理过期元素的起始下标为i(也就是从i作为起始下标开始探测式清理)
            if (slotToExpunge == staleSlot)
                slotToExpunge = i;
            // cleanSomeSlots()为启发式清理,expungeStaleEntry()为探测式清理
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
            return;
        }
		// 若遇到key=null 且 slotToExpunge=staleSlot,说明向前遍历未遇到过期元素但向后遍历遇到了过期元素,此时修改探测式清理过期元素的起始下标为i
        if (k == null && slotToExpunge == staleSlot)
            slotToExpunge = i;
    }
	// 从staleSlot向后遍历过程中遇到了Entry=null,此时直接将数据更新到staleSlot位置
    tab[staleSlot].value = null;
    tab[staleSlot] = new Entry(key, value);
	// 若slotToExpunge!=staleSlot,说明向前遍历或者向后遍历过程中有遇到过期元素,此时slotToExpunge为向前遍历中“最远”的或者向后遍历中遇到的“最远”的key为null的下标,启动探测式清理后启动启发式清理。
    if (slotToExpunge != staleSlot)
        cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
4.探测式清理

探测式清理,也就是 expungeStaleEntry() 方法。

从开始位置向后遍历,清除过期元素,将遍历到的过期数据的 Entry 设置为 null ,沿途碰到的未过期的数据则将其 rehash 后重新在 table 中定位,如果定位到的位置有数据则往后遍历找到第一个 Entry=null 的位置存入。接着继续往后检查过期数据,直到遇到空的桶才终止探测。

总结来说就是:从开始元素向后遍历直到遇见空Entry,遇到内存泄露,清空Entry,遇到其他元素,重新进行Hash计算插入。

代码如下:

private int expungeStaleEntry(int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;
	// 传入的staleSlot位置上的数据一定是过期数据,将staleSlot位置的置空
    tab[staleSlot].value = null;
    tab[staleSlot] = null;
    size--;
	// for循环是向后遍历,直到遇到 Entry=null
    Entry e;
    int i;
    for (i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();
        // 若当前遍历的 key 为 null则将 Entry置空
        if (k == null) {
            e.value = null;
            tab[i] = null;
            size--;
        //若当前遍历的 key 不为null,将其rehash并将key的原本位置Entry置空,再将key的Entry放入rehash后的位置以及其后面位置的第一个为null的位置
        } else {
            int h = k.threadLocalHashCode & (len - 1);
            if (h != i) {
                tab[i] = null;

                while (tab[h] != null)
                    h = nextIndex(h, len);
                tab[h] = e;
            }
        }
    }
    // 返回i,也就是探测式清理向后遍历中遇到的第一个为null的位置
    return i;
}

这可以使得 rehash 后的数据距离正确的位置(i= key.hashCode & (tab.len - 1))更近一些。能过提高整个散列表的查询性能。

5.启发式清理

启发式清理,cleanSomeSlots(int i, int n)

向后遍历个位置[log2(n)],下标 i 作为遍历的第一个位置。遍历中遇到位置上 key=null 时(假设该位置为 i ),同步调用 expungeStaleEntry(i) 探测式清理方法。

代码如下:

private boolean cleanSomeSlots(int i, int n) {
    boolean removed = false;
    Entry[] tab = table;
    int len = tab.length;
    do {
        i = nextIndex(i, len);
        Entry e = tab[i];
        if (e != null && e.get() == null) {
            n = len;
            removed = true;
            i = expungeStaleEntry(i);
        }
    } while ( (n >>>= 1) != 0);
    return removed;
}

注意:在 ThreadLocalMap.set() 方法的调用方法 ThreadLocalMap.replaceStaleEntry() ,一般会这样调用—— cleanSomeSlots(expungeStaleEntry(slotToExpunge), len)

6.扩容

前面我们提到了一个属性,threshold,就是THreadLocal扩容阈值。但不同于HashMap的 len * 0.75

ThreadLocalMap是 len * 2 / 3

ThreadLocalMap 的扩容是在 set() 方法之后才有可能执行的。在 set() 方法的最后,若在 set() 未清理到任何数据且 size 超过或等于阈值 threshold(也就是 len*2/3)则 rehash()rehash() 中会先进行探测式清理过期元素,若在 rehash() 清除过后 size>=len/2(也就是 threshold-threshold/4)则调用 resize() 扩容。

rehash() 的代码如下:

private void rehash() {
    // 该方法为从下标0出发,找到第一个 key=null 的位置j,以j为起始开始探测式清理
    expungeStaleEntries();
    // 阈值 threshold=len*2/3
	// 当前size超过或等于阈值的3/4时执行扩充
    if (size >= threshold - threshold / 4)
        resize();
}

private void expungeStaleEntries() {
    Entry[] tab = table;
    int len = tab.length;
    for (int j = 0; j < len; j++) {
        Entry e = tab[j];
        if (e != null && e.get() == null)
            expungeStaleEntry(j);
    }
}

扩容的具体实现是 resize() 。首先,扩容是 tab 直接扩容为原来的 2 倍的,然后遍历旧的散列表,重新计算每个元素的 hash 位置放到新的 tab 数组中,遇到 hash 冲突则往后寻找最近的 entry=null 的位置存放。最后重新计算 tab 执行扩容的阈值。

resize() 的代码如下:

private void resize() {
    Entry[] oldTab = table;
    int oldLen = oldTab.length;
    int newLen = oldLen * 2;
    Entry[] newTab = new Entry[newLen];
    int count = 0;

    for (int j = 0; j < oldLen; ++j) {
        Entry e = oldTab[j];
        if (e != null) {
            ThreadLocal<?> k = e.get();
            if (k == null) {
                e.value = null;
            } else {
                int h = k.threadLocalHashCode & (newLen - 1);
                while (newTab[h] != null)
                    h = nextIndex(h, newLen);
                newTab[h] = e;
                count++;
            }
        }
    }

    setThreshold(newLen);
    size = count;
    table = newTab;
}
7.get()原理

通过对set方法的理解,那么获取就非常简单了。

  1. 计算Hash值,拿Hash取模找到对应的数组下标,判断Key值是否相同
    1. 相同,返回即可。
    2. 不同,向后寻找Key值相同的。

这里回顾一下,ThreadLocalMap的key实际上是ThreadLocal的弱引用。


标题:并发编程之ThreadLocal
作者:1938857445
地址:https://www.lmlx66.top/articles/2022/01/20/1642658757579.html