ThreadLocal和连接池

gyp666 / 2024-10-13 / 原文

ThreadLocal

线程隔离工具

用来存储一些只有线程才可以访问的内容。

你可能会想,既然我只想本线程才能访问,那么我使用局部变量不就行了吗?
局部变量的问题在于它只能存在于本方法内部,没有办法让本线程内的其他方法访问使用。
public static void main(String[] args) {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + " first "+MySqlConUtil.getCon());
                    System.out.println(Thread.currentThread().getName() + " second "+MySqlConUtil.getCon());
                } catch (ClassNotFoundException e) {
                    throw new RuntimeException(e);
                } catch (SQLException e) {
                    throw new RuntimeException(e);
                }
            }
        };

        for(int i=0;i<5;i++){
            new Thread(runnable).start();
        }
        
    }

}
class MySqlConUtil{


    private static ThreadLocal<Connection> connectionThreadLocal = new ThreadLocal<>();


    static public Connection  getCon() throws ClassNotFoundException, SQLException {
        Connection result = connectionThreadLocal.get();
        if(result!=null){
            return result;
        }
        Class.forName("com.mysql.cj.jdbc.Driver");
        //获取链接
        String url="jdbc:mysql://localhost:3306/practiceDemo?useSSL=false";
        String username="root";
        String password="775210";
        Connection conn = DriverManager.getConnection(url, username, password);
        connectionThreadLocal.set(conn);
        return conn;
    }
}
Thread-0 first com.mysql.cj.jdbc.ConnectionImpl@528d2c99
Thread-0 second com.mysql.cj.jdbc.ConnectionImpl@528d2c99
Thread-3 first com.mysql.cj.jdbc.ConnectionImpl@2474c971
Thread-3 second com.mysql.cj.jdbc.ConnectionImpl@2474c971
Thread-4 first com.mysql.cj.jdbc.ConnectionImpl@70e35ef0
Thread-4 second com.mysql.cj.jdbc.ConnectionImpl@70e35ef0
Thread-1 first com.mysql.cj.jdbc.ConnectionImpl@626f3cc3
Thread-1 second com.mysql.cj.jdbc.ConnectionImpl@626f3cc3
Thread-2 first com.mysql.cj.jdbc.ConnectionImpl@6ed28130
Thread-2 second com.mysql.cj.jdbc.ConnectionImpl@6ed28130


通过控制台输出可以看到每个线程都获取到了一个不同的connection对象,在一个线程中多次获取也会返回相同的对象。

实现原理

ThreadLocalMap是ThreadLocal中的一个静态内部类,访问权限符是默认(只允许同一个包内的访问)


static class ThreadLocalMap {
				//数组元素类型
				static class Entry extends WeakReference<ThreadLocal<?>> {
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }

				private static final int INITIAL_CAPACITY = 16;//初始容量
        private void setThreshold(int len) {
        		//扩容因子
            threshold = len * 2 / 3;
        }
        private Entry[] table;//底层数组

}

在线程类Thread内部,有一个ThreadLocalMap对象。可以发现ThreadLocalMap中的key是一个ThreadLocal对象。也就是说ThreadLocal只是一个标识而已,数据真实存储在各个线程自己内部的ThreadLocalMap中。

ThreadLocalMap有点类似于HashMap在扩容上,但是HashMap是拉链法解决hash冲突,ThreadLocalMap是使用开放寻址法解决的hash冲突。

开放寻址法:key根据hash算法发现当前地址上已经有元素存在,则向下遍历,直到下一个没有元素的空位,放入。

弱引用:如果内存不足时,一个对象只有弱引用,那么也会被标识为垃圾进行回收

内存泄漏

如果线程不再被使用,但ThreadLocal变量还被强引用着,那么这些变量就不会被垃圾回收

ThreadLocal对象只是一个key,他是独立于线程外的,当线程执行结束后,Thread中ThreadLocalMap中的key-value肯定也不会被再次使用(因为Thread已经结束了,对于线程状态来说就是TERMINATED状态了,是不可恢复的)此时如果为强引用的话那么这部分内存就没办法被回收,只能等待ThreadLocal对象被释放,所以使用弱引用此处
  1. 及时清理:在不再需要使用ThreadLocal变量时,应该调用其remove()方法,将其从当前线程的Map中移除,从而避免内存泄漏。
  2. 静态ThreadLocal:尽量避免使用静态的ThreadLocal,因为静态的ThreadLocal的生命周期与应用程序的生命周期一样长,这可能导致内存泄漏。
  3. 避免在线程池中使用:如果在线程池中使用ThreadLocal,那么必须确保在使用完ThreadLocal后将其清理掉,否则可能会导致内存泄漏。

连接池

连接池模板

// 导入需要的包
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

// 创建连接池类
public class SocketPool {
    // 连接池大小
    private static final int POOL_SIZE = 10;
    // 连接池
    private static BlockingQueue<Socket> pool;

    // 初始化连接池
    static {
        pool = new ArrayBlockingQueue<>(POOL_SIZE);//最好使用linkBlockingQueue
        for (int i = 0; i < POOL_SIZE; i++) {
            Socket socket = createSocket();
            pool.offer(socket);
        }
    }

    // 创建Socket连接
    private static Socket createSocket() {
        // 创建和初始化Socket对象的代码
        return socket;
    }

    // 从连接池中获取连接
    public static Socket getConnection() {
        Socket socket = null;
        try {
            socket = pool.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return socket;
    }

    // 释放连接到连接池
    public static void releaseConnection(Socket socket) {
        try {
            pool.offer(socket);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

SOCKET连接池

WEBSERVICE连接池

package com.msun.common.util;

import com.msun.common.config.PrefrontConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
import org.apache.cxf.bus.CXFBusFactory;
import org.apache.cxf.configuration.jsse.TLSClientParameters;
import org.apache.cxf.endpoint.Client;
import org.apache.cxf.endpoint.dynamic.DynamicClientFactory;
import org.apache.cxf.transport.http.HTTPConduit;
import org.apache.cxf.transport.http.HTTPConduitConfigurer;
import org.apache.cxf.transports.http.configuration.ConnectionType;
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;

/**
 * webservice
 * 连接池
 */
@Slf4j
@Component
public class WebServicePool {
    private static final Map<String, Pool> map=new ConcurrentHashMap<String, Pool>();
    private static ReentrantLock lock = new ReentrantLock();

    @Resource
    private PrefrontConfig prefrontConfig;

    public Client get(String url, String method, int connectionTimeout, int readTimeout) throws Exception {
        String key = MD5Util.get(url+ ":" + method);
        if(!map.containsKey(key)){
            init(key);
        }
        return map.get(key).get(url, connectionTimeout, readTimeout, prefrontConfig);
    }

    public void close(String url, String method, Client client){
        try {
            String key = MD5Util.get(url + ":" + method);
            if (!map.containsKey(key)) {
                return;
            }
            map.get(key).close(client);
        }catch (Exception ex){
            log.error("将连接归还连接池出错",ex);
        }
    }

    /**
     * 初始化
     * @param key
     */
    private void init(String key){
        lock.lock();
        try {
            if(!map.containsKey(key)){
                map.put(key,new Pool());
            }
        }catch (Exception e){
            log.error("webservice-pool-init",e);
        }finally {
            lock.unlock();
        }
    }
}

/**
 * 连接池
 */
@Slf4j
class Pool{
    private static final long WAIT_TIMEOUT=3;
    private static final long CREATE_TIMEOUT=10;
    private static final int MAX=100;
    private static ExecutorService pool = new ThreadPoolExecutor(1, 100, 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
    private BlockingQueue<Client> queue;

    public Pool(){
        queue = new ArrayBlockingQueue<>(Pool.MAX);
    }

    //在一个方法中连续调用多次WebService接口,每次调用前需要重置上下文。
    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

    /**
     * 获取连接
     * @param url
     * @return
     * @throws Exception
     */
    public Client get(String url, int connectionTimeout, int readTimeout,PrefrontConfig prefrontConfig) throws Exception {
        if(queue.remainingCapacity()<=0){
            return queue.poll(WAIT_TIMEOUT,TimeUnit.SECONDS);
        }
        if(queue.size() >0){
            return queue.poll(WAIT_TIMEOUT,TimeUnit.SECONDS);
        }
        if(url.contains("https")){
            createHttpsClient(url,connectionTimeout,readTimeout, prefrontConfig);
        }else{
            createClient(url, connectionTimeout, readTimeout);
        }
        return queue.poll(CREATE_TIMEOUT,TimeUnit.SECONDS);
    }

    /**
     * 将连接归还到连接池
     * @param c 连接
     */
    public void close(Client c){
        if(c == null){
            return;
        }
        queue.offer(c);
    }


    /**
     * 添加连接
     * @param client
     */
    private void add(Client client){
        queue.offer(client);
    }
    /**
     * 创建webservice客户端
     * @param url
     * @throws Exception
     */
    private void createClient(String url, int connectionTimeout, int readTimeout) throws Exception {
        pool.execute(() -> {
            try {
                Thread.currentThread().setContextClassLoader(classLoader);//在获取连接之前 还原上下文
                DynamicClientFactory dcf = DynamicClientFactory.newInstance();
                Client c = dcf.createClient(url);
                // 设置超时单位为毫秒
                HTTPConduit conduit = (HTTPConduit) c.getConduit();
                HTTPClientPolicy policy = new HTTPClientPolicy();
                policy.setConnectionTimeout(connectionTimeout == 0 ? 10000 : connectionTimeout);
                policy.setAllowChunking(false);
                policy.setReceiveTimeout(readTimeout == 0 ? 10000 : readTimeout);
                policy.setConnection(ConnectionType.KEEP_ALIVE);
                conduit.setClient(policy);
                add(c);
            } catch (Exception ex) {
                log.error("创建webservice客户端", ex);
            }
        });
    }


    /**
     *
     *
    private void createHttpsClient(String url, int connectionTimeout, int readTimeout, PrefrontConfig prefrontConfig) throws Exception {
        pool.execute(() -> {
            try {
                Thread.currentThread().setContextClassLoader(classLoader);//在获取连接之前 还原上下文
                log.info("加载证书密钥...");
                System.setProperty("javax.net.ssl.keyStore",prefrontConfig.getConfigByKey("trustStorePath"));
                System.setProperty("javax.net.ssl.keyStorePassword",prefrontConfig.getConfigByKey("trustStorePasswordPath"));
                java.security.Security.addProvider(new com.sun.net.ssl.internal.ssl.Provider());
                DynamicClientFactory dcf = DynamicClientFactory.newInstance();
                Client c = dcf.createClient(url);
                // 设置超时单位为毫秒
                HTTPConduit conduit = (HTTPConduit) c.getConduit();
                HTTPClientPolicy policy = new HTTPClientPolicy();
                policy.setConnectionTimeout(connectionTimeout == 0 ? 10000 : connectionTimeout);
                policy.setAllowChunking(false);
                policy.setReceiveTimeout(readTimeout == 0 ? 10000 : readTimeout);
                policy.setConnection(ConnectionType.KEEP_ALIVE);
                conduit.setClient(policy);
                add(c);
            } catch (Exception ex) {
                log.error("创建webservice客户端", ex);
            }
        });
    }
     **/


    private void createHttpsClient(String url, int connectionTimeout, int readTimeout, PrefrontConfig prefrontConfig) throws Exception {
        pool.execute(() -> {
            try {
                Thread.currentThread().setContextClassLoader(classLoader);//在获取连接之前 还原上下文
                log.info("加载证书密钥开始...");
                BusFactory bf = CXFBusFactory.newInstance();
                Bus bus = bf.createBus();
                bus.setExtension(new SoapHTTPConduitConfigurer(), HTTPConduitConfigurer.class);
                DynamicClientFactory dcf = DynamicClientFactory.newInstance(bus);
                Client c = dcf.createClient(url);
                log.info("加载证书密钥结束...");
                add(c);
            } catch (Exception ex) {
                log.error("创建webservice客户端", ex);
            }
        });
    }

    private static class SoapHTTPConduitConfigurer implements HTTPConduitConfigurer {
        public void configure(String name, String address, HTTPConduit c) {
            try{
                HTTPClientPolicy httpClientPolicy = new HTTPClientPolicy();
//                httpClientPolicy.setConnectionTimeout(itfConfig.getConnectionTimeout());
//                httpClientPolicy.setReceiveTimeout(itfConfig.getSoTimeout());
                c.setClient(httpClientPolicy);
                SSLContext context = createIgnoreVerifySSL();
                TLSClientParameters params = c.getTlsClientParameters();
                if (params == null) {
                    params = new TLSClientParameters();
                }
                params.setSSLSocketFactory(context.getSocketFactory());
                params.setHostnameVerifier((s, sslSession) -> true);
                c.setTlsClientParameters(params);
            }catch(Exception ex){
                log.error("创建SoapHTTPConduitConfigurer异常", ex);
            }
        }

    }

    public static SSLContext createIgnoreVerifySSL() throws NoSuchAlgorithmException, KeyManagementException {
        SSLContext sc = SSLContext.getInstance("TLS");
        // 实现一个X509TrustManager接口,用于绕过验证,不用修改里面的方法
        X509TrustManager trustManager = new X509TrustManager() {
            @Override
            public void checkClientTrusted(
                    java.security.cert.X509Certificate[] paramArrayOfX509Certificate,
                    String paramString) throws CertificateException {
            }
            @Override
            public void checkServerTrusted(
                    java.security.cert.X509Certificate[] paramArrayOfX509Certificate,
                    String paramString) throws CertificateException {
            }
            @Override
            public java.security.cert.X509Certificate[] getAcceptedIssuers() {
                return null;
            }
        };
        sc.init(null, new TrustManager[]{trustManager}, null);
        return sc;
    }

}