`
greemranqq
  • 浏览: 965918 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论
阅读更多

连接池的管理用了了享元模式,这里对连接池进行简单设计。

一、设计思路

     1.连接池配置属性DBbean:里面存放可以配置的一些属性

     2.连接池接口IConnectionPool:里面定义一些基本的获取连接的一些方法

     3.接口实现ConnectionPool:对上面操作进行实现,并加入一些其他方法

     4.连接池管理ConnectionPoolManager:管理所有的不同的连接池,所有的连接都能通过这里进行获得连接

     5.另外还有几个测试类,和连接信息模拟的类,这里就不进行xml 和配置文件信息的读取了

package pool;
/**
 * 这是外部可以配置的连接池属性
 * 可以允许外部配置,拥有默认值
 * @author Ran
 *
 */
public class DBbean {
	// 连接池属性
	private String driverName;
	private String url;
	private String userName;
	private String password;
	// 连接池名字
	private String poolName;
	private int minConnections = 1; // 空闲池,最小连接数
	private int maxConnections = 10; // 空闲池,最大连接数
	
	private int initConnections = 5;// 初始化连接数
	
	private long connTimeOut = 1000;// 重复获得连接的频率
	
	private int maxActiveConnections = 100;// 最大允许的连接数,和数据库对应
	
	private long connectionTimeOut = 1000*60*20;// 连接超时时间,默认20分钟
	
	private boolean isCurrentConnection = true; // 是否获得当前连接,默认true
	
	private boolean isCheakPool = true; // 是否定时检查连接池
	private long lazyCheck = 1000*60*60;// 延迟多少时间后开始 检查
	private long periodCheck = 1000*60*60;// 检查频率
	
	
	
	public DBbean(String driverName, String url, String userName,
			String password, String poolName) {
		super();
		this.driverName = driverName;
		this.url = url;
		this.userName = userName;
		this.password = password;
		this.poolName = poolName;
	}
	public DBbean() {
	}
	public String getDriverName() {
		if(driverName == null){
			driverName = this.getDriverName()+"_"+this.getUrl();
		}
		return driverName;
	}
	public void setDriverName(String driverName) {
		this.driverName = driverName;
	}
	public String getUrl() {
		return url;
	}
	public void setUrl(String url) {
		this.url = url;
	}
	public String getUserName() {
		return userName;
	}
	public void setUserName(String userName) {
		this.userName = userName;
	}
	public String getPassword() {
		return password;
	}
	public void setPassword(String password) {
		this.password = password;
	}
	public String getPoolName() {
		return poolName;
	}
	public void setPoolName(String poolName) {
		this.poolName = poolName;
	}
	public int getMinConnections() {
		return minConnections;
	}
	public void setMinConnections(int minConnections) {
		this.minConnections = minConnections;
	}
	public int getMaxConnections() {
		return maxConnections;
	}
	public void setMaxConnections(int maxConnections) {
		this.maxConnections = maxConnections;
	}
	public int getInitConnections() {
		return initConnections;
	}
	public void setInitConnections(int initConnections) {
		this.initConnections = initConnections;
	}

	public int getMaxActiveConnections() {
		return maxActiveConnections;
	}
	public void setMaxActiveConnections(int maxActiveConnections) {
		this.maxActiveConnections = maxActiveConnections;
	}
	public long getConnTimeOut() {
		return connTimeOut;
	}
	public void setConnTimeOut(long connTimeOut) {
		this.connTimeOut = connTimeOut;
	}
	public long getConnectionTimeOut() {
		return connectionTimeOut;
	}
	public void setConnectionTimeOut(long connectionTimeOut) {
		this.connectionTimeOut = connectionTimeOut;
	}
	public boolean isCurrentConnection() {
		return isCurrentConnection;
	}
	public void setCurrentConnection(boolean isCurrentConnection) {
		this.isCurrentConnection = isCurrentConnection;
	}
	public long getLazyCheck() {
		return lazyCheck;
	}
	public void setLazyCheck(long lazyCheck) {
		this.lazyCheck = lazyCheck;
	}
	public long getPeriodCheck() {
		return periodCheck;
	}
	public void setPeriodCheck(long periodCheck) {
		this.periodCheck = periodCheck;
	}
	public boolean isCheakPool() {
		return isCheakPool;
	}
	public void setCheakPool(boolean isCheakPool) {
		this.isCheakPool = isCheakPool;
	}
	
	
	
}

 

package pool;

import java.sql.Connection;
import java.sql.SQLException;

public interface IConnectionPool {
	// 获得连接
	public Connection  getConnection();
	// 获得当前连接
	public Connection getCurrentConnecton();
	// 回收连接
	public void releaseConn(Connection conn) throws SQLException;
	// 销毁清空
	public void destroy();
	// 连接池是活动状态
	public boolean isActive();
	// 定时器,检查连接池
	public void cheackPool();
}

 

package pool;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Vector;

public class ConnectionPool implements IConnectionPool {
	// 连接池配置属性
	private DBbean dbBean;
	private boolean isActive = false; // 连接池活动状态
	private int contActive = 0;// 记录创建的总的连接数
	
	// 空闲连接
	private List<Connection> freeConnection = new Vector<Connection>();
	// 活动连接
	private List<Connection> activeConnection = new Vector<Connection>();
	// 将线程和连接绑定,保证事务能统一执行
	private static ThreadLocal<Connection> threadLocal = new ThreadLocal<Connection>();
	
	public ConnectionPool(DBbean dbBean) {
		super();
		this.dbBean = dbBean;
		init();
		cheackPool();
	}

	// 初始化
	public void init() {
		try {
			Class.forName(dbBean.getDriverName());
			for (int i = 0; i < dbBean.getInitConnections(); i++) {
				Connection conn;
				conn = newConnection();
				// 初始化最小连接数
				if (conn != null) {
					freeConnection.add(conn);
					contActive++;
				}
			}
			isActive = true;
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		} catch (SQLException e) {
			e.printStackTrace();
		}
	}
	
	// 获得当前连接
	public Connection getCurrentConnecton(){
		// 默认线程里面取
		Connection conn = threadLocal.get();
		if(!isValid(conn)){
			conn = getConnection();
		}
		return conn;
	}

	// 获得连接
	public synchronized Connection getConnection() {
		Connection conn = null;
		try {
			// 判断是否超过最大连接数限制
			if(contActive < this.dbBean.getMaxActiveConnections()){
				if (freeConnection.size() > 0) {
					conn = freeConnection.get(0);
					if (conn != null) {
						threadLocal.set(conn);
					}
					freeConnection.remove(0);
				} else {
					conn = newConnection();
				}
				
			}else{
				// 继续获得连接,直到从新获得连接
				wait(this.dbBean.getConnTimeOut());
				conn = getConnection();
			}
			if (isValid(conn)) {
				activeConnection.add(conn);
				contActive ++;
			}
		} catch (SQLException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return conn;
	}

	// 获得新连接
	private synchronized Connection newConnection()
			throws ClassNotFoundException, SQLException {
		Connection conn = null;
		if (dbBean != null) {
			Class.forName(dbBean.getDriverName());
			conn = DriverManager.getConnection(dbBean.getUrl(),
					dbBean.getUserName(), dbBean.getPassword());
		}
		return conn;
	}

	// 释放连接
	public synchronized void releaseConn(Connection conn) throws SQLException {
		if (isValid(conn)&& !(freeConnection.size() > dbBean.getMaxConnections())) {
			freeConnection.add(conn);
			activeConnection.remove(conn);
			contActive --;
			threadLocal.remove();
			// 唤醒所有正待等待的线程,去抢连接
			notifyAll();
		}
	}

	// 判断连接是否可用
	private boolean isValid(Connection conn) {
		try {
			if (conn == null || conn.isClosed()) {
				return false;
			}
		} catch (SQLException e) {
			e.printStackTrace();
		}
		return true;
	}

	// 销毁连接池
	public synchronized void destroy() {
		for (Connection conn : freeConnection) {
			try {
				if (isValid(conn)) {
					conn.close();
				}
			} catch (SQLException e) {
				e.printStackTrace();
			}
		}
		for (Connection conn : activeConnection) {
			try {
				if (isValid(conn)) {
					conn.close();
				}
			} catch (SQLException e) {
				e.printStackTrace();
			}
		}
		isActive = false;
		contActive = 0;
	}

	// 连接池状态
	@Override
	public boolean isActive() {
		return isActive;
	}
	
	// 定时检查连接池情况
	@Override
	public void cheackPool() {
		if(dbBean.isCheakPool()){
			new Timer().schedule(new TimerTask() {
			@Override
			public void run() {
			// 1.对线程里面的连接状态
			// 2.连接池最小 最大连接数
			// 3.其他状态进行检查,因为这里还需要写几个线程管理的类,暂时就不添加了
			System.out.println("空线池连接数:"+freeConnection.size());
			System.out.println("活动连接数::"+activeConnection.size());
			System.out.println("总的连接数:"+contActive);
				}
			},dbBean.getLazyCheck(),dbBean.getPeriodCheck());
		}
	}
}

 

package pool;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Hashtable;
/**
 * 连接管理类
 * @author Ran
 *
 */
public class ConnectionPoolManager {
	
	
	// 连接池存放
	public Hashtable<String,IConnectionPool> pools = new Hashtable<String, IConnectionPool>();
	
	// 初始化
	private ConnectionPoolManager(){
		init();
	}
	// 单例实现
	public static ConnectionPoolManager getInstance(){
		return Singtonle.instance;
	}
	private static class Singtonle {
		private static ConnectionPoolManager instance =  new ConnectionPoolManager();
	}
	
	
	// 初始化所有的连接池
	public void init(){
		for(int i =0;i<DBInitInfo.beans.size();i++){
			DBbean bean = DBInitInfo.beans.get(i);
			ConnectionPool pool = new ConnectionPool(bean);
			if(pool != null){
				pools.put(bean.getPoolName(), pool);
				System.out.println("Info:Init connection successed ->" +bean.getPoolName());
			}
		}
	}
	
	// 获得连接,根据连接池名字 获得连接
	public Connection  getConnection(String poolName){
		Connection conn = null;
		if(pools.size()>0 && pools.containsKey(poolName)){
			conn = getPool(poolName).getConnection();
		}else{
			System.out.println("Error:Can't find this connecion pool ->"+poolName);
		}
		return conn;
	}
	
	// 关闭,回收连接
	public void close(String poolName,Connection conn){
			IConnectionPool pool = getPool(poolName);
			try {
				if(pool != null){
					pool.releaseConn(conn);
				}
			} catch (SQLException e) {
				System.out.println("连接池已经销毁");
				e.printStackTrace();
			}
	}
	
	// 清空连接池
	public void destroy(String poolName){
		IConnectionPool pool = getPool(poolName);
		if(pool != null){
			pool.destroy();
		}
	}
	
	// 获得连接池
	public IConnectionPool getPool(String poolName){
		IConnectionPool pool = null;
		if(pools.size() > 0){
			 pool = pools.get(poolName);
		}
		return pool;
	}
}

 

package pool;

import java.util.ArrayList;
import java.util.List;
/**
 * 初始化,模拟加载所有的配置文件
 * @author Ran
 *
 */
public class DBInitInfo {
	public  static List<DBbean>  beans = null;
	static{
		beans = new ArrayList<DBbean>();
		// 这里数据 可以从xml 等配置文件进行获取
		// 为了测试,这里我直接写死
		DBbean beanOracle = new DBbean();
		beanOracle.setDriverName("oracle.jdbc.driver.OracleDriver");
		beanOracle.setUrl("jdbc:oracle:thin:@7MEXGLUY95W1Y56:1521:orcl");
		beanOracle.setUserName("mmsoa");
		beanOracle.setPassword("password1234");
		
		beanOracle.setMinConnections(5);
		beanOracle.setMaxConnections(100);
		
		beanOracle.setPoolName("testPool");
		beans.add(beanOracle);
	}
}

 

    测试:

   

package pool;

import java.sql.Connection;
/**
 * 模拟线程启动,去获得连接
 * @author Ran
 *
 */
public class ThreadConnection implements Runnable{
	private IConnectionPool pool;
	@Override
	public void run() {
		pool = ConnectionPoolManager.getInstance().getPool("testPool");
	}
	
	public Connection getConnection(){
		Connection conn = null;
		if(pool != null && pool.isActive()){
			conn = pool.getConnection();
		}
		return conn;
	}
	
	public Connection getCurrentConnection(){
		Connection conn = null;
		if(pool != null && pool.isActive()){
			conn = pool.getCurrentConnecton();
		}
		return conn;
	}
}

 

package pool;



public class Client {
	public static void main(String[] args) throws InterruptedException {
		// 初始化连接池
		Thread t = init();
		t.start();
		t.join();
		
		ThreadConnection a = new ThreadConnection();
		ThreadConnection b = new ThreadConnection();
		ThreadConnection c = new ThreadConnection();
		Thread t1 = new Thread(a);
		Thread t2 = new Thread(b);
		Thread t3 = new Thread(c);
		
		
		// 设置优先级,先让初始化执行,模拟 线程池 先启动
		// 这里仅仅表面控制了,因为即使t 线程先启动,也不能保证pool 初始化完成,为了简单模拟,这里先这样写了
		t1.setPriority(10);
		t2.setPriority(10);
		t3.setPriority(10);
		t1.start();
		t2.start();
		t3.start();
		
		System.out.println("线程A-> "+a.getConnection());
		System.out.println("线程B-> "+b.getConnection());
		System.out.println("线程C-> "+c.getConnection());
	}

	// 初始化
	public static Thread init() {
		Thread t = new Thread(new Runnable() {
			@Override
			public void run() {
				IConnectionPool  pool = initPool();
				while(pool == null || !pool.isActive()){
					pool = initPool();
				}
			}
		});
		return t;
	}
	
	public static IConnectionPool initPool(){
		return ConnectionPoolManager.getInstance().getPool("testPool");
	}

}

 

小结 :

          1.连接池诞生原因是,如果每次都从数据库获得连接,时间比较长,因此我们提前做建立一些连接,放在连接池里面,每次都从里面取

          2.上面仅仅写了连接池基本原理,关于多线程下连接池的管理没写,后面对多线程操作熟练了添加吧

             

15
5
分享到:
评论
13 楼 lc464297691 2016-04-21  
给个我实现的代码,需要自己写一个类继承之~
public abstract class ResourcePool<T> {

	private final int ini;
	private final int max;
	private LinkedList<T> freeQueue = null;
	private final AtomicInteger freeCount = new AtomicInteger();
	private ArrayList<T> busyList = null;
	private final AtomicInteger busyCount = new AtomicInteger();
	private final AtomicInteger aliveCount = new AtomicInteger();

	/** Lock held by getConnection */
	private final ReentrantLock getLock = new ReentrantLock();
	/** Wait pool for waiting getConnection */
	private final Condition empty = getLock.newCondition();


	private void signalEmpty() {
		final ReentrantLock getLock = this.getLock;
		getLock.lock();
		try {
			empty.signal();
		} finally {
			getLock.unlock();
		}
	}

	public ResourcePool(int ini, int max) {
		if (ini > max) {
			throw new IllegalArgumentException();
		}
		this.ini = ini;
		this.max = max;
		this.freeQueue = new LinkedList<T>();
		this.busyList = new ArrayList<T>(this.max);
		for (int i = 0; i < this.ini; i++) {
			this.freeQueue.offer(this.newConnection());
			this.freeCount.incrementAndGet();
			this.aliveCount.incrementAndGet();
		}

	}

	public T getConnection() {
		final AtomicInteger freeCount = this.freeCount;
		final ReentrantLock getLock = this.getLock;
		T conn = null;

		getLock.lock();
		try {
			log.info("Step in lock 1 . "+Thread.currentThread().getName() );
			if (freeCount.get() > 0) {

				conn = this.freeQueue.getFirst();
				if (this.freeCount.decrementAndGet() > 0) {
					this.empty.signal();
				}
				if (this.busyList.add(conn)) {
					this.busyCount.incrementAndGet();
				}

				return conn;
			}
			if (this.aliveCount.get() >=  this.max) {
				while (this.freeCount.get() <= 0) {
					log.info("Waiting for free. " + Thread.currentThread().getName());
					//empty.await(2, TimeUnit.SECONDS);
					empty.await();
				}
				conn = this.freeQueue.getFirst();
				this.busyList.add(conn);
				if (this.freeCount.decrementAndGet() > 0) {
					this.empty.signal();
				}
				this.busyCount.incrementAndGet();
				return conn;
			}

		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			return conn;
		} finally {
			getLock.unlock();
		}

		getLock.lock();
		try {
			log.info("Step in lock 3 . "+Thread.currentThread().getName() );
			if (this.aliveCount.get() < this.max) {
				conn = this.newConnection();
				this.busyList.add(conn);
				this.aliveCount.incrementAndGet();
				this.busyCount.incrementAndGet();
			}
			return conn;
		} finally {
			getLock.unlock();
		}


	}

	protected abstract T newConnection();

	public boolean release(T conn) {
		if (conn == null) {
			throw new NullPointerException();
		}

		final ReentrantLock releaseLock = this.getLock;

		boolean rz = false;
		releaseLock.lock();

		try {
			
			if (this.freeCount.get() >= this.max || this.busyCount.get() < 0) {
				log.error("Pool is full, The conn is not belong to our pool or is already release");
				return false;
			}
			if (this.busyList.remove(conn)) {
				this.busyCount.decrementAndGet();
				if (this.freeQueue.offer(conn) && this.freeCount.incrementAndGet() > 0) {
					this.empty.signal();
					//log.info("Released! " + Thread.currentThread().getName());
					rz = true;
				}else{
					log.error("Can't Release");
				}
			}else{
				log.error("Can't Release");
				rz = false;
			}
			
		} finally {
			log.info(Thread.currentThread().getName() + "Release. alive : " + this.aliveCount.get() + "; free :" + this.freeCount.get() + "; busy : " + this.busyCount.get());

			releaseLock.unlock();
		}
		signalEmpty();
	
		return rz;
	}

}

12 楼 lc464297691 2016-04-21  
100%发生死锁! 
 if (isValid(conn)) {  
                activeConnection.add(conn);  
                contActive ++;  
            }  
  这个在递归调用时会重复++
11 楼 herman_liu76 2016-02-15  
        System.out.println("线程A-> "+a.getConnection());  
        System.out.println("线程B-> "+b.getConnection());  
        System.out.println("线程C-> "+c.getConnection());
System.out.println("线程A-> "+a.getCurrentConnection());
System.out.println("线程B-> "+b.getCurrentConnection());
System.out.println("线程C-> "+c.getCurrentConnection());

运行时,后面3个语句Current取得的ThreadLocal中的都是c的。
因为这里是主线程执行,前3个put进ThreadLocal中记为主线程中的连接。不断被替换,最后一个是c取得的,所以后面都是c的。
所以LZ并没有用真正的线程中去获取Conn。
10 楼 liuyuhua0066 2014-07-16  
Concurrent用的过少。活动连接数这些都应该是Atomic的,这些都没有考虑。
9 楼 lvwenwen 2013-11-05  
写得不错
8 楼 javatozhang 2013-11-05  
lz学习精神值得称赞
7 楼 sosojustdo 2013-11-04  
首先还是值得称赞的,楼主自己实现DataBase 连接池,没有借助CommonPool第三方的工具包。
ThreadLocal和synchronized的使用,保证线程安全;ReentrantLock是不是好点,其次建议引用Concurrent包下面的类完善上述代码,期待中
6 楼 cywhoyi 2013-11-04  
greemranqq 写道
cywhoyi 写道
cywhoyi 写道
在之前的blog文中也已经略微提到过pool的实现
http://cywhoyi.iteye.com/blog/1966606 & http://cywhoyi.iteye.com/admin/blogs/1954393,建议LZ多多的采用exectue framework,new Time()的方式在mutli thread下unsafe,而且效率低下。

写错地址了http://cywhoyi.iteye.com/blog/1954393,boneCP framework代码比较易读,你不放从boneCP吸取些知识,分区 guava 模拟回放功能...


谢谢哥们提醒,我这里仅仅模拟了简单的连接池功能,没有采用多线程的一些工具,那些工具我还不熟悉,后面会把源码实现都了解了之后再进行操作。这里也想自己写写才能看出问题。
关于boneCP 我了解了一下源码的框架,对具体实现 也没进行具体分析,逻辑上没理解透,就暂时没像那样写了,怕写得四不像的。我的想法是 从一些基础开始写,主要了解一些流程,然后和现在开源的一些东西比较,看看他们的有点是什么,然后改进,总结优点,从而写一些更加适应的东西,也能让自己从一个简单的问题,慢慢扩展深入,理解整个演变过程

嗯,多探讨
5 楼 greemranqq 2013-11-04  
cywhoyi 写道
cywhoyi 写道
在之前的blog文中也已经略微提到过pool的实现
http://cywhoyi.iteye.com/blog/1966606 & http://cywhoyi.iteye.com/admin/blogs/1954393,建议LZ多多的采用exectue framework,new Time()的方式在mutli thread下unsafe,而且效率低下。

写错地址了http://cywhoyi.iteye.com/blog/1954393,boneCP framework代码比较易读,你不放从boneCP吸取些知识,分区 guava 模拟回放功能...


谢谢哥们提醒,我这里仅仅模拟了简单的连接池功能,没有采用多线程的一些工具,那些工具我还不熟悉,后面会把源码实现都了解了之后再进行操作。这里也想自己写写才能看出问题。
关于boneCP 我了解了一下源码的框架,对具体实现 也没进行具体分析,逻辑上没理解透,就暂时没像那样写了,怕写得四不像的。我的想法是 从一些基础开始写,主要了解一些流程,然后和现在开源的一些东西比较,看看他们的有点是什么,然后改进,总结优点,从而写一些更加适应的东西,也能让自己从一个简单的问题,慢慢扩展深入,理解整个演变过程
4 楼 ls403718787 2013-11-04  
学习了
3 楼 xiaohuafyle 2013-11-04  
写得不错
2 楼 cywhoyi 2013-11-03  
cywhoyi 写道
在之前的blog文中也已经略微提到过pool的实现
http://cywhoyi.iteye.com/blog/1966606 & http://cywhoyi.iteye.com/admin/blogs/1954393,建议LZ多多的采用exectue framework,new Time()的方式在mutli thread下unsafe,而且效率低下。

写错地址了http://cywhoyi.iteye.com/blog/1954393,boneCP framework代码比较易读,你不放从boneCP吸取些知识,分区 guava 模拟回放功能...
1 楼 cywhoyi 2013-11-03  
在之前的blog文中也已经略微提到过pool的实现
http://cywhoyi.iteye.com/blog/1966606 & http://cywhoyi.iteye.com/admin/blogs/1954393,建议LZ多多的采用exectue framework,new Time()的方式在mutli thread下unsafe,而且效率低下。

相关推荐

Global site tag (gtag.js) - Google Analytics