动态数据源切换

(本文基于Springboot+MyBatis)

背景:业务数据和其他业务的数据都在一个库中,而库中的表也很多,因此需要为自己的业务独立建库,并在新库中创建和老库中结构相同的业务表(自己业务使用到的表)。

整体流程:

  1. 用户 -> 新库: 新建业务表
  2. 老库 -> 新库: 同步数据
  3. 用户 –> 新库: 暂停新库写操作
  4. 新库 -> 老库: 反向同步数据
  5. 用户 -> 新库: 恢复新库写操作

1.方案

在Mapper层接口方法执行前进行动态数据源的切换。即使用AOP切面,切点是Mapper接口中的所有方法,并在mapper层的AOP切面中修改线程变量进行数据源切换。

2.实现步骤

2.1 数据源配置

    <!-- 老库数据源1的配置 -->
    <bean id="dataSource1" class="com.zaxxer.hikari.HikariDataSource">
        <property name="jdbcUrl" value="jdbc:mysql://localhost:3306/test1"/>
        <property name="username" value="root"/>
        <property name="password" value="123456"/>
        <property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/>
    </bean>
    <bean id="ds1SqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
        <property name="dataSource" ref="dataSource1" />
        <property name="configLocation" value="classpath:com/example/demo/common/dal/mybatis-config.xml"/>
        <property name="mapperLocations" value="classpath:com/example/demo/common/dal/ds1/mapper/*Mapper.xml"/>
    </bean>
    <bean id="ds1SqlSessionTemplate" class="org.mybatis.spring.SqlSessionTemplate">
        <constructor-arg index="0" ref="ds1SqlSessionFactory"/>
    </bean>

    <!-- 新库数据源2的配置 -->
    <bean id="dataSource2" class="com.zaxxer.hikari.HikariDataSource">
        <property name="jdbcUrl" value="jdbc:mysql://localhost:3306/test2"/>
        <property name="username" value="root"/>
        <property name="password" value="123456"/>
        <property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/>
    </bean>
    <bean id="ds2SqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
        <property name="dataSource" ref="dataSource2" />
        <property name="configLocation" value="classpath:com/example/demo/common/dal/mybatis-config.xml"/>
        <!-- 这里使用通配符是因为要通用ds1的mapper -->
        <property name="mapperLocations" value="classpath:com/example/demo/common/dal/**/mapper/*Mapper.xml"/>
    </bean>
    <bean id="ds2SqlSessionTemplate" class="org.mybatis.spring.SqlSessionTemplate">
        <constructor-arg index="0" ref="ds2SqlSessionFactory"/>
    </bean>

2.2 通用代码部分

import lombok.Getter;

@Getter
public enum DataSourceType {
    DS_1("dataSource1"),
    DS_2("dataSource2");

    private String value;
    DataSourceType(String value) {
        this.value = value;
    }

    public static DataSourceType getByValue(String value) {
        for (DataSourceType type : DataSourceType.values()) {
            if (type.value.equals(value)) {
                return type;
            }
        }
        return DS_1;
    }
}

// 线程变量管理
public class DynamicDataSourceContextHolder {

    private static final ThreadLocal<DataSourceType> CURRENT_MAPPER_CONTEXT = new ThreadLocal<>();

    public static void setCurrentMapperType(DataSourceType dataSourceType) {
        CURRENT_MAPPER_CONTEXT.set(dataSourceType);
    }

    public static DataSourceType getCurrentMapperType() {
        return CURRENT_MAPPER_CONTEXT.get();
    }

    public static void clearCurrentMapperType() {
        CURRENT_MAPPER_CONTEXT.remove();
    }
}

2.3 实现动态数据源

动态数据源需要实现org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource抽象类,并实现其determineCurrentLookupKey方法。

import com.google.common.base.MoreObjects;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;


@Slf4j
public class DynamicDataSource extends AbstractRoutingDataSource {
    private static Map<Object,Object> dataSourceMap = new HashMap<Object, Object>();

    @Override
    public void setTargetDataSources(Map<Object, Object> targetDataSources) {
        super.setTargetDataSources(targetDataSources);
        dataSourceMap.putAll(targetDataSources);
        super.afterPropertiesSet();// 必须添加该句,否则新添加数据源无法识别到
    }

    @Override
    protected Object determineCurrentLookupKey() {
        // Mapper层方法切面设值
        DataSourceType currentMapper = DynamicDataSourceContextHolder.getCurrentMapperType();
        if (Objects.isNull(currentMapper)) {
            // 默认使用老库的数据源
            log.info("determineCurrentLookupKey, dataSourceType: {}", DataSourceType.DS_1);
            return null;
        }

        // 如果currentMapper不为空,则使用currentMapper
        log.info("determineCurrentLookupKey, dataSourceType:{}", currentMapper);
        return dataSourceType1.getValue();
    }
}

2.4 配置动态数据源

将多个数据源都配置到动态数据源中。并将SqlSessionFactory、SqlSessionTemplate和PlatformTransactionManager都指向动态数据源的实例。

import com.google.common.collect.Lists;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.mapper.MapperScannerConfigurer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Configuration
@AutoConfigureBefore(DataSourceAutoConfiguration.class)
public class DynamicDataSourceConfig {
    // 将多个数据源统一放置到动态数据源中
    @Primary
    @Bean("dynamicDataSource")
    public DynamicDataSource dynamicDataSource(@Qualifier("dataSource1") DataSource master,
                                    @Qualifier("dataSource2") DataSource slave) {
        DynamicDataSource ds = new DynamicDataSource();
        // 设置默认数据源
        ds.setDefaultTargetDataSource(master);
        // 设置数据源集合
        Map<Object, Object> dataSourceList = new HashMap<>();
        // 旧数据源
        dataSourceList.put("dataSource1", master);
        // 新数据源
        dataSourceList.put("dataSource2", slave);
        ds.setTargetDataSources(dataSourceList);
        return ds;
    }

    // 将SqlSessionFactory指向动态数据源
    @Bean("dynamicSqlSessionFactory")
    public SqlSessionFactory dynamicSqlSessionFactory(@Qualifier("dynamicDataSource")
                                                                  DynamicDataSource dynamicDataSource) throws Exception {
        SqlSessionFactoryBean fb = new SqlSessionFactoryBean();
        fb.setDataSource(dynamicDataSource);
        // 设置Mybatis的配置文件
        fb.setConfigLocation(new PathMatchingResourcePatternResolver().getResource("classpath:com/example/demo/common/dal/mybatis-config.xml"));
        // 设置Mapper XML文件的位置
        List<Resource> resources = Lists.newArrayList();
        resources.add(new PathMatchingResourcePatternResolver().getResource("classpath:com/example/demo/common/dal/ds1/mapper/*Mapper.xml"));
        resources.add(new PathMatchingResourcePatternResolver().getResource("classpath:com/example/demo/common/dal/ds2/mapper/*Mapper.xml"));
        fb.setMapperLocations(resources.toArray(new Resource[resources.size()]));
        //返回对象
        return fb.getObject();
    }

    // SqlSession
    @Bean("dynamicSqlSessionTemplate")
    public SqlSessionTemplate dynamicSqlSessionTemplate(@Qualifier("dynamicSqlSessionFactory")
                                                                SqlSessionFactory dynamicSqlSessionFactory) {
        return new  SqlSessionTemplate(dynamicSqlSessionFactory);
    }

    // 扫描指定包路径下的Mapper接口,并注册成Spring Bean,并绑定到数据源上
    // BeanDefinitionRegistryPostProcessor:Mybatis-spring的后置处理器
    // 推荐使用 @MapperScan 方式,简洁
    @Bean
    public MapperScannerConfigurer mapperScannerCOnfigurer() {
        MapperScannerConfigurer msc = new MapperScannerConfigurer();
        // 指定mapper接口的包路径,多个时使用逗号分隔
        msc.setBasePackage("com.example.demo.common.dal.ds1.mapper,com.example.demo.common.dal.ds2.mapper");
        // 绑定数据源
        msc.setSqlSessionTemplateBeanName("dynamicSqlSessionTemplate");
        return msc;
    }

    // 主事务管理器
    @Primary
    @Bean(name="dynamicTransactionManager")
    public PlatformTransactionManager dynamicTransactionManager(@Qualifier("dynamicDataSource")
                                                                            DynamicDataSource dynamicDataSource) {
        DataSourceTransactionManager mg = new DataSourceTransactionManager(dynamicDataSource);
        mg.setNestedTransactionAllowed(true);
        return mg;
    }
}

2.5 Mapper层切面

import com.example.demo.common.dal.config.DataSourceType;
import com.example.demo.common.dal.config.DynamicDataSourceContextHolder;
import com.example.demo.common.dal.config.MultiSqlSessionManager;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSession;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

@Slf4j
@Aspect
@Component
public class MapperTestAspect {
    @Pointcut("execution(* com.example.demo.common.dal.ds1.mapper.*.*(..))")
    public void mapperInterface1() {
    }

    @Pointcut("execution(* com.example.demo.common.dal.ds2.mapper.*.*(..))")
    public void mapperInterface2() {
    }

    private final Map<String, String> mapps = new HashMap(){{
        put("com.example.demo.common.dal.ds1.mapper.StudentInfoMapper", "dataSource1");
        put("com.example.demo.common.dal.ds2.mapper.TeacherInfoMapper", "dataSource2");
    }};
    
    @Around("mapperInterface1() || mapperInterface2()")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        String mapperClassName = method.getDeclaringClass().getName();
        DataSourceType currentMapper = DynamicDataSourceContextHolder.getCurrentMapperType();
        // 根据Mapper接口的全限定名获取对应的数据源名称,动态配置中获取,Nacos或drm
        String dataSourceName = mapps.get(mapperClassName);
        DataSourceType dataSourceType = DataSourceType.getByValue(dataSourceName);
        log.info("当前使用的数据源,currentMapperDataSource={},toMapperDataSource:{},mapperClassName:{}#{}", currentMapper,
                Objects.isNull(dataSourceType) ? "null" : dataSourceType.getValue(), mapperClassName, method.getName());
        try {
            DynamicDataSourceContextHolder.setCurrentMapperType(dataSourceType);
            return joinPoint.proceed();
        } finally {
            DynamicDataSourceContextHolder.clearCurrentMapperType();
        }
    }
}

以上代码在无事务操作的方法中,可以做到随时切换数据源操作

3. 事务中切换数据源

使用上面的代码时虽然日志会显示切换了数据源但其实并没有,这是由于事务开启时已经绑定了数据源,并调用了DataSource.getConnection()方法,将数据源绑定到了当前线程。要解决这个怎么整呢?下文讲述了一种不使用分布式事务的方式来实现多数据源事务中正确使用对应数据源的方法,即使用SqlSession

3.1 SqlSession管理类

import org.apache.ibatis.session.SqlSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

@Component
public class MultiSqlSessionManager {
    @Autowired
    @Qualifier("ds1SqlSessionTemplate")
    private SqlSession sqlSession1;  // 对应dataSource1

    @Autowired
    @Qualifier("ds2SqlSessionTemplate")
    private SqlSession sqlSession2;  // 对应dataSource2

    public SqlSession getSqlSession(String dataSourceKey) {
        if ("dataSource1".equals(dataSourceKey)) {
            return sqlSession1;
        } else if ("dataSource2".equals(dataSourceKey)) {
            return sqlSession2;
        }
        return sqlSession1;
    }
}

3.2 改造Mapper切面

注入SqlSession管理类,并新增了是否启用多数据源事务的判断。

    @Autowired
    private MultiSqlSessionManager multiSqlSessionManager;
    
    @Around("mapperInterface1() || mapperInterface2()")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        String mapperClassName = method.getDeclaringClass().getName();
        DataSourceType currentMapper = DynamicDataSourceContextHolder.getCurrentMapperType();
        // 根据Mapper接口的全限定名获取对应的数据源名称,动态配置中获取,Nacos或drm
        String dataSourceName = mapps.get(mapperClassName);
        DataSourceType dataSourceType = DataSourceType.getByValue(dataSourceName);
        log.info("当前使用的数据源,currentMapperDataSource={},toMapperDataSource:{},mapperClassName:{}#{}", currentMapper,
                Objects.isNull(dataSourceType) ? "null" : dataSourceType.getValue(), mapperClassName, method.getName());
        
        // 是否启用了多数据源事务
        Boolean enableMultiTransaction = TransactionManageAspect.getEnableMultiTransaction();
        if (Objects.nonNull(enableMultiTransaction) && enableMultiTransaction) {

            // 获取对应的SqlSession
            SqlSession targetSqlSession = multiSqlSessionManager.getSqlSession(dataSourceType.getValue());
            // 获取Mapper接口
            Class<?> mapperInterface = method.getDeclaringClass();
            // 从目标SqlSession中获取Mapper实例
            Object targetMapper = targetSqlSession.getMapper(mapperInterface);

            // 执行方法
            Object[] args = joinPoint.getArgs();

            return method.invoke(targetMapper, args);
        }
        try {
            DynamicDataSourceContextHolder.setCurrentMapperType(dataSourceType);
            return joinPoint.proceed();
        } finally {
            DynamicDataSourceContextHolder.clearCurrentMapperType();
        }
    }

3.3 改造多数据源事务切面

本切面在上一篇文章已经介绍过,下面改造后的全部代码。新增了线程变量ENABLE_MULTI_DS和获取其值的方法:getEnableMultiTransaction。

package com.example.demo.common.dal.aspect;

import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.context.ApplicationContext;
import org.springframework.core.annotation.Order;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Stack;
import java.util.stream.Collectors;

@Slf4j
@Aspect
@Component
@Order(0)
public class TransactionManageAspect {

    @Resource
    private ApplicationContext applicationContext;

    private static final ThreadLocal<Boolean> ENABLE_MULTI_DS = new ThreadLocal<>();

    public static Boolean getEnableMultiTransaction() {
        return ENABLE_MULTI_DS.get();
    }

    /**
     * 切点
     */
    @Pointcut("@annotation(com.example.demo.common.dal.aspect.MultiTransactionManager)")
    public void pointCut() {
    }

    @Around("pointCut()")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        // 获取方法上的注解
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        log.info("事务管理器切面start, methodName={}", method.getName());
        MultiTransactionManager multiTransactionManager = joinPoint.getTarget().getClass().getAnnotation(MultiTransactionManager.class);
        if (multiTransactionManager == null) {
            multiTransactionManager = method.getAnnotation(MultiTransactionManager.class);
        }
        // 事务管理器
        Stack<DataSourceTransactionManager> transactionManagerStack = new Stack<>();
        // 事务状态
        Stack<TransactionStatus> transactionStatusStack = new Stack<>();
        boolean isTransaction = true;
        if (multiTransactionManager.transactionManagerNames().length < 1) {
            log.info("事务管理器名称数组为空, methodName={}", method.getName());
            isTransaction = false;
        }
        if (isTransaction) {
            // 去重
            List<String> collect = Arrays.asList(multiTransactionManager.transactionManagerNames()).stream()
                    .distinct()
                    .collect(Collectors.toList());
            for (String transactionManagerName : collect) {
                DataSourceTransactionManager dataSourceTransactionManager = applicationContext.getBean(transactionManagerName,
                        DataSourceTransactionManager.class);
                if (dataSourceTransactionManager == null) {
                    log.warn("事务管理器不存在,transactionManagerName:{},methodName={}", transactionManagerName, method.getName());
                    continue;
                }
                TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(new DefaultTransactionDefinition());
                transactionManagerStack.push(dataSourceTransactionManager);
                transactionStatusStack.push(transactionStatus);
            }
        }
        if (CollectionUtils.isEmpty(transactionManagerStack)) {
            log.info("事务管理器为空, methodName={}", method.getName());
            isTransaction = false;
        }
        try {
            if (isTransaction) {
                ENABLE_MULTI_DS.set(true);
            }
            Object obj = joinPoint.proceed();
            // 未发生异常,提交事务
            while (isTransaction && !transactionManagerStack.isEmpty()) {
                DataSourceTransactionManager dataSourceTransactionManager = transactionManagerStack.pop();
                TransactionStatus transactionStatus = transactionStatusStack.pop();
                dataSourceTransactionManager.commit(transactionStatus);
            }
            log.info("事务管理器切面提交, methodName={}", method.getName());
            return obj;
        } catch (Throwable e) {
            log.error("事务执行异常, methodName={}", method.getName(), e);
            while (isTransaction && !transactionManagerStack.isEmpty()) {
                DataSourceTransactionManager dataSourceTransactionManager = transactionManagerStack.pop();
                TransactionStatus transactionStatus = transactionStatusStack.pop();
                dataSourceTransactionManager.rollback(transactionStatus);
            }
            log.info("事务管理器切面回滚, methodName={}", method.getName());
            throw e;
        } finally {
            log.info("事务管理器切面end, methodName={}", method.getName());
        }
    }
}

4.总结

  1. 在无事务的业务方法中,可以mapper层切面中可以随时切换数据源。(因为每次都会获取新的连接。)
  2. 在有事务的业务方法中,使用SqlSession可以使之正确的处理每个Mapper层接口的方法的执行。