13) 创建类:IConverter
package com.atguigu.analysis.converter; import com.atguigu.analysis.kv.base.BaseDimension; import java.io.Closeable; import java.io.IOException; public interface IConverter { // 根据传入的dimension对象,获取数据库中对应该对象数据的id,如果不存在,则插入该数据再返回 int getDimensionId(BaseDimension dimension) throws IOException; } |
14) 创建类:DimensionConverter
package com.atguigu.analysis.converter.impl; import com.atguigu.analysis.converter.IConverter; import com.atguigu.analysis.kv.base.BaseDimension; import com.atguigu.analysis.kv.impl.ContactDimension; import com.atguigu.analysis.kv.impl.DateDimension; import com.atguigu.utils.JDBCCacheBean; import com.atguigu.utils.JDBCUtil; import com.atguigu.utils.LRUCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; /** * 维度对象转维度id类 * * @author 尽际 */ public class DimensionConverter implements IConverter { //日志记录类,注意导包的正确性 private static final Logger logger = LoggerFactory.getLogger(DimensionConverter.class); //每个线程保留自己的Connection实例 private ThreadLocal<Connection> threadLocalConnection = new ThreadLocal<>(); //创建数据缓存队列 private LRUCache<String, Integer> lruCache = new LRUCache(3000); public DimensionConverter() { // JVM虚拟机关闭时,尝试关闭数据库连接 Runtime.getRuntime().addShutdownHook(new Thread(() -> { logger.info(“stopping mysql connection…”); JDBCUtil.close(threadLocalConnection.get(), null, null); logger.info(“mysql connection is successfully closed”); })); } /** * 1、判断内存缓存中是否已经有该维度的id,如果存在则直接返回该id * 2、如果内存缓存中没有,则查询数据库中是否有该维度id,如果有,则查询出来,返回该id,并缓存到内存中。 * 3、如果数据库中也没有该维度id,则直接插入一条新的维度信息,成功插入后,重新查询该维度,返回该id,并缓存到内存中。 * * @param dimension * @return */ @Override public int getDimensionId(BaseDimension dimension) throws IOException { //1、根据传入的维度对象取得该维度对象对应的cachekey String cacheKey = genCacheKey(dimension); //2、判断缓存中是否存在该cacheKey的缓存 if (lruCache.containsKey(cacheKey)) { return lruCache.get(cacheKey); } //3、缓存中没有,查询数据库 String[] sqls = null; if (dimension instanceof DateDimension) { // 时间维度表tb_dimension_date sqls = genDateDimensionSQL(); } else if (dimension instanceof ContactDimension) { //联系人表tb_contacts sqls = genContactSQL(); } else { //抛出Checked异常,提醒调用者可以自行处理。 throw new IOException(“Cannot match the dimession, unknown dimension.”); } try { Connection conn = this.getConnection(); int id = -1; synchronized (this) { id = execSQL(conn, sqls, dimension); } //将该id缓存到内存中 lruCache.put(cacheKey, id); return id; } catch (SQLException e) { e.printStackTrace(); throw new IOException(e); } } /** * LRUCACHE中缓存的键值对形式例如:<date_dimension20170820, 3> * * @param dimension * @return */ private String genCacheKey(BaseDimension dimension) { StringBuilder sb = new StringBuilder(); if (dimension instanceof DateDimension) { DateDimension dateDimension = (DateDimension) dimension; //拼装缓存id对应的key sb.append(“date_dimension”); sb.append(dateDimension.getYear()).append(dateDimension.getMonth()).append(dateDimension.getDay()); } else if (dimension instanceof ContactDimension) { ContactDimension contactDimension = (ContactDimension) dimension; //拼装缓存id对应的key sb.append(“contact_dimension”); sb.append(contactDimension.getTelephone()).append(contactDimension.getName()); } if (sb.length() <= 0) throw new RuntimeException(“Cannot create cachekey.” + dimension); return sb.toString(); } /** * 生成时间维度的数据库查询语句和插入语句 * * @return */ private String[] genDateDimensionSQL() { String query = “SELECT `id` FROM `tb_dimension_date` WHERE `year` = ? AND `month` = ? AND `day` = ? order by `id`;”; String insert = “INSERT INTO `tb_dimension_date`(`year`, `month`, `day`) VALUES(?, ?, ?);”; return new String[]{query, insert}; } /** * 生成联系人的数据库查询语句和插入语句 * * @return */ private String[] genContactSQL() { String query = “SELECT `id` FROM `tb_contacts` WHERE `telephone` = ? AND `name` = ? order by `id`;”; String insert = “INSERT INTO `tb_contacts`(`telephone`, `name`) VALUES(?, ?);”; return new String[]{query, insert}; } /** * 尝试获取数据库连接对象:先从线程缓冲中获取,没有可用连接则创建。 * * @return * @throws SQLException */ private Connection getConnection() throws SQLException { Connection conn = null; synchronized (this) { conn = threadLocalConnection.get(); if (conn == null || conn.isClosed() || conn.isValid(3)) { conn = JDBCCacheBean.getInstance(); } threadLocalConnection.set(conn); } return conn; } /** * @param conn * @param sqls 第一个为查询语句,第二个为插入语句 * @param dimension * @return */ private int execSQL(Connection conn, String[] sqls, BaseDimension dimension) throws SQLException { PreparedStatement preparedStatement = null; ResultSet resultSet = null; try { //1、假设数据库中有该条数据 //封装查询sql语句 preparedStatement = conn.prepareStatement(sqls[0]); setArguments(preparedStatement, dimension); //执行查询 resultSet = preparedStatement.executeQuery(); if (resultSet.next()) { return resultSet.getInt(1); } //2、假设 数据库中没有该条数据 //封装插入sql语句 preparedStatement = conn.prepareStatement(sqls[1]); setArguments(preparedStatement, dimension); preparedStatement.executeUpdate(); JDBCUtil.close(null, preparedStatement, resultSet); //重新获取id,调用自己即可。 preparedStatement = conn.prepareStatement(sqls[0]); setArguments(preparedStatement, dimension); //执行查询 resultSet = preparedStatement.executeQuery(); if (resultSet.next()) { return resultSet.getInt(1); } } finally { JDBCUtil.close(null, preparedStatement, resultSet); } throw new RuntimeException(“Failed to get id”); } private void setArguments(PreparedStatement preparedStatement, BaseDimension dimension) throws SQLException { int i = 0; if (dimension instanceof DateDimension) { DateDimension dateDimension = (DateDimension) dimension; preparedStatement.setInt(++i, dateDimension.getYear()); preparedStatement.setInt(++i, dateDimension.getMonth()); preparedStatement.setInt(++i, dateDimension.getDay()); } else if (dimension instanceof ContactDimension) { ContactDimension contactDimension = (ContactDimension) dimension; preparedStatement.setString(++i, contactDimension.getTelephone()); preparedStatement.setString(++i, contactDimension.getName()); } } } |
上一篇: 尚硅谷大数据技术之电信客服
下一篇: 尚硅谷大数据技术之电信客服