首頁 > 科技

spring-data-redis 動態切換資料來源

2021-09-02 03:07:29

最近遇到了一個麻煩的需求,我們需要一個微服務應用同時訪問兩個不同的 Redis 叢集。一般我們不會這麼使用 Redis,但是這兩個 Redis 本來是不同的業務叢集,現在需要一個微服務同時訪問。

其實我們在實際業務開發的時候,可能還會遇到類似的場景。例如 Redis 讀寫分離,這個也是 spring-data-redis 沒有提供的功能,底層連線池例如 Lettuce 或者 Jedis 提供了獲取只讀連線的 API,但是缺陷有兩個:

  • 上層 spring-data-redis 並沒有封裝這種介面
  • 基於 redis 的架構實現的,哨兵模式需要配置 sentinel 的地址,叢集模式需要感知叢集拓撲,在雲原生環境中,這些都預設被雲提供商隱藏了,暴露到外面的只有一個個動態 VIP 域名。

因此,我們需要在 spring-data-redis 在此基礎上實現一個動態切換 Redis 連線的機制

spring-data-redis 的配置類為:org.springframework.boot.autoconfigure.data.redis.RedisProperties,可以配置單個 Redis 例項或者 Redis 叢集的連線配置。根據這些配置,會生成統一的 Redis 連線工廠 RedisConnectionFactory

spring-data-redis 核心介面與背後的連線相關抽象關係為:

通過這個圖,我們可以知道,我們實現一個可以動態返回不同 Redis 連線的 RedisConnectionFactory 即可,並且根據 spring-data-redis 的自動裝載源碼可以知道,框架內的所有 RedisConnectionFactory 是 @ConditionalOnMissingBean 的,即我們可以使用我們自己實現的 RedisConnectionFactory 進行替換。

項目地址:github.com/JoJoTec/spr…

我們可以給 RedisProperties 配置外層封裝一個多 Redis 連線的配置,即MultiRedisProperties:

@Data@NoArgsConstructor@ConfigurationProperties(prefix = "spring.redis")public class MultiRedisProperties {    /**     * 預設連線必須配置,配置 key 為 default     */    public static final String DEFAULT = "default";    private boolean enableMulti = false;    private Map<String, RedisProperties> multi;}複製程式碼

這個配置是在原有配置基礎上的,也就是使用者可以使用原有配置,也可以使用這種多 Redis 配置,就是需要配置 spring.redis.enable-multi=true。multi 這個 Map 中放入的 key 是資料來源名稱,使用者可以在使用 RedisTemplate 或者 ReactiveRedisTemplate 之前,通過這個資料來源名稱指定用哪個 Redis

接下來我們來實現 MultiRedisLettuceConnectionFactory,即可以動態切換 Redis 連線的 RedisConnectionFactory,我們的項目採用的 Redis 客戶端是 Lettuce:

public class MultiRedisLettuceConnectionFactory        implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory {    private final Map<String, LettuceConnectionFactory> connectionFactoryMap;    private static final ThreadLocal<String> currentRedis = new ThreadLocal<>();    public MultiRedisLettuceConnectionFactory(Map<String, LettuceConnectionFactory> connectionFactoryMap) {        this.connectionFactoryMap = connectionFactoryMap;    }    public void setCurrentRedis(String currentRedis) {        if (!connectionFactoryMap.containsKey(currentRedis)) {            throw new RedisRelatedException("invalid currentRedis: " + currentRedis + ", it does not exists in configuration");        }        MultiRedisLettuceConnectionFactory.currentRedis.set(currentRedis);    }    @Override    public void destroy() throws Exception {        connectionFactoryMap.values().forEach(LettuceConnectionFactory::destroy);    }    @Override    public void afterPropertiesSet() throws Exception {        connectionFactoryMap.values().forEach(LettuceConnectionFactory::afterPropertiesSet);    }    private LettuceConnectionFactory currentLettuceConnectionFactory() {        String currentRedis = MultiRedisLettuceConnectionFactory.currentRedis.get();        if (StringUtils.isNotBlank(currentRedis)) {            MultiRedisLettuceConnectionFactory.currentRedis.remove();            return connectionFactoryMap.get(currentRedis);        }        return connectionFactoryMap.get(MultiRedisProperties.DEFAULT);    }    @Override    public ReactiveRedisConnection getReactiveConnection() {        return currentLettuceConnectionFactory().getReactiveConnection();    }    @Override    public ReactiveRedisClusterConnection getReactiveClusterConnection() {        return currentLettuceConnectionFactory().getReactiveClusterConnection();    }    @Override    public RedisConnection getConnection() {        return currentLettuceConnectionFactory().getConnection();    }    @Override    public RedisClusterConnection getClusterConnection() {        return currentLettuceConnectionFactory().getClusterConnection();    }    @Override    public boolean getConvertPipelineAndTxResults() {        return currentLettuceConnectionFactory().getConvertPipelineAndTxResults();    }    @Override    public RedisSentinelConnection getSentinelConnection() {        return currentLettuceConnectionFactory().getSentinelConnection();    }    @Override    public DataAccessException translateExceptionIfPossible(RuntimeException ex) {        return currentLettuceConnectionFactory().translateExceptionIfPossible(ex);    }}複製程式碼

邏輯非常簡單,就是提供了設定 Redis 資料來源的介面,並且放入了 ThreadLocal 中,並且僅對當前一次有效,讀取後就清空。

然後,將 MultiRedisLettuceConnectionFactory 作為 Bean 註冊到我們的 ApplicationContext 中:

@ConditionalOnProperty(prefix = "spring.redis", value = "enable-multi", matchIfMissing = false)@Configuration(proxyBeanMethods = false)public class RedisCustomizedConfiguration {    /**     * @param builderCustomizers     * @param clientResources     * @param multiRedisProperties     * @return     * @see org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration     */    @Bean    public MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory(            ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers,            ClientResources clientResources,            MultiRedisProperties multiRedisProperties,            ObjectProvider<RedisSentinelConfiguration> sentinelConfigurationProvider,            ObjectProvider<RedisClusterConfiguration> clusterConfigurationProvider    ) {        //讀取配置        Map<String, LettuceConnectionFactory> connectionFactoryMap = Maps.newHashMap();        Map<String, RedisProperties> multi = multiRedisProperties.getMulti();        multi.forEach((k, v) -> {            //這個其實就是框架中原有的源碼使用 RedisProperties 的方式,我們其實就是在 RedisProperties 外面包裝了一層而已            LettuceConnectionConfiguration lettuceConnectionConfiguration = new LettuceConnectionConfiguration(                    v,                    sentinelConfigurationProvider,                    clusterConfigurationProvider            );            LettuceConnectionFactory lettuceConnectionFactory = lettuceConnectionConfiguration.redisConnectionFactory(builderCustomizers, clientResources);            connectionFactoryMap.put(k, lettuceConnectionFactory);        });        return new MultiRedisLettuceConnectionFactory(connectionFactoryMap);    }}複製程式碼

我們來測試下,使用 embedded-redis 來啟動本地 redis,從而實現單元測試。我們啟動兩個 Redis,在兩個 Redis 中放入不同的 Key,驗證是否存在,並且測試同步介面,多執行緒呼叫同步介面,和多次非同步介面無等待訂閱從而測試有效性。:

import com.github.jojotech.spring.boot.starter.redis.related.lettuce.MultiRedisLettuceConnectionFactory;import org.junit.jupiter.api.AfterAll;import org.junit.jupiter.api.Assertions;import org.junit.jupiter.api.BeforeAll;import org.junit.jupiter.api.Test;import org.junit.jupiter.api.extension.ExtendWith;import org.redisson.api.RedissonClient;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.autoconfigure.EnableAutoConfiguration;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.core.ReactiveStringRedisTemplate;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.test.context.junit.jupiter.SpringExtension;import reactor.core.publisher.Mono;import redis.embedded.RedisServer;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicBoolean;@ExtendWith(SpringExtension.class)@SpringBootTest(properties = {        "spring.redis.enable-multi=true",        "spring.redis.multi.default.host=127.0.0.1",        "spring.redis.multi.default.port=6379",        "spring.redis.multi.test.host=127.0.0.1",        "spring.redis.multi.test.port=6380",})public class MultiRedisTest {    //啟動兩個 redis    private static RedisServer redisServer;    private static RedisServer redisServer2;    @BeforeAll    public static void setUp() throws Exception {        System.out.println("start redis");        redisServer = RedisServer.builder().port(6379).setting("maxheap 200m").build();        redisServer2 = RedisServer.builder().port(6380).setting("maxheap 200m").build();        redisServer.start();        redisServer2.start();        System.out.println("redis started");    }    @AfterAll    public static void tearDown() throws Exception {        System.out.println("stop redis");        redisServer.stop();        redisServer2.stop();        System.out.println("redis stopped");    }    @EnableAutoConfiguration    @Configuration    public static class App {    }    @Autowired    private StringRedisTemplate redisTemplate;    @Autowired    private ReactiveStringRedisTemplate reactiveRedisTemplate;    @Autowired    private MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory;    private void testMulti(String suffix) {        //使用預設連線,設定 "testDefault" + suffix, "testDefault" 鍵值對        redisTemplate.opsForValue().set("testDefault" + suffix, "testDefault");        //使用 test 連線,設定 "testSecond" + suffix, "testDefault" 鍵值對        multiRedisLettuceConnectionFactory.setCurrentRedis("test");        redisTemplate.opsForValue().set("testSecond" + suffix, "testSecond");        //使用預設連線,驗證 "testDefault" + suffix 存在,"testSecond" + suffix 不存在        Assertions.assertTrue(redisTemplate.hasKey("testDefault" + suffix));        Assertions.assertFalse(redisTemplate.hasKey("testSecond" + suffix));        //使用 test 連線,驗證 "testDefault" + suffix 不存在,"testSecond" + suffix 存在        multiRedisLettuceConnectionFactory.setCurrentRedis("test");        Assertions.assertFalse(redisTemplate.hasKey("testDefault" + suffix));        multiRedisLettuceConnectionFactory.setCurrentRedis("test");        Assertions.assertTrue(redisTemplate.hasKey("testSecond" + suffix));    }    //單次驗證    @Test    public void testMultiBlock() {        testMulti("");    }    //多執行緒驗證    @Test    public void testMultiBlockMultiThread() throws InterruptedException {        Thread thread[] = new Thread[50];        AtomicBoolean result = new AtomicBoolean(true);        for (int i = 0; i < thread.length; i++) {            int finalI = i;            thread[i] = new Thread(() -> {                try {                    testMulti("" + finalI);                } catch (Exception e) {                    e.printStackTrace();                    result.set(false);                }            });        }        for (int i = 0; i < thread.length; i++) {            thread[i].start();        }        for (int i = 0; i < thread.length; i++) {            thread[i].join();        }        Assertions.assertTrue(result.get());    }    //reactive 介面驗證    private Mono<Boolean> reactiveMulti(String suffix) {        return reactiveRedisTemplate.opsForValue().set("testReactiveDefault" + suffix, "testReactiveDefault")                .flatMap(b -> {                    multiRedisLettuceConnectionFactory.setCurrentRedis("test");                    return reactiveRedisTemplate.opsForValue().set("testReactiveSecond" + suffix, "testReactiveSecond");                }).flatMap(b -> {                    return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix);                }).map(b -> {                    Assertions.assertTrue(b);                    System.out.println(Thread.currentThread().getName());                    return b;                }).flatMap(b -> {                    return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix);                }).map(b -> {                    Assertions.assertFalse(b);                    System.out.println(Thread.currentThread().getName());                    return b;                }).flatMap(b -> {                    multiRedisLettuceConnectionFactory.setCurrentRedis("test");                    return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix);                }).map(b -> {                    Assertions.assertFalse(b);                    System.out.println(Thread.currentThread().getName());                    return b;                }).flatMap(b -> {                    multiRedisLettuceConnectionFactory.setCurrentRedis("test");                    return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix);                }).map(b -> {                    Assertions.assertTrue(b);                    return b;                });    }    //多次呼叫 reactive 驗證,並且 subscribe,這本身就是多執行緒的    @Test    public void testMultiReactive() throws InterruptedException {        for (int i = 0; i < 10000; i++) {            reactiveMulti("" + i).subscribe(System.out::println);        }        TimeUnit.SECONDS.sleep(10);    }}複製程式碼

運行測試,通過。


IT145.com E-mail:sddin#qq.com