最近遇到了一個麻煩的需求,我們需要一個微服務應用同時訪問兩個不同的 Redis 叢集。一般我們不會這麼使用 Redis,但是這兩個 Redis 本來是不同的業務叢集,現在需要一個微服務同
2021-09-02 03:07:29
最近遇到了一個麻煩的需求,我們需要一個微服務應用同時訪問兩個不同的 Redis 叢集。一般我們不會這麼使用 Redis,但是這兩個 Redis 本來是不同的業務叢集,現在需要一個微服務同時訪問。
其實我們在實際業務開發的時候,可能還會遇到類似的場景。例如 Redis 讀寫分離,這個也是 spring-data-redis 沒有提供的功能,底層連線池例如 Lettuce 或者 Jedis 提供了獲取只讀連線的 API,但是缺陷有兩個:
因此,我們需要在 spring-data-redis 在此基礎上實現一個動態切換 Redis 連線的機制。
spring-data-redis 的配置類為:org.springframework.boot.autoconfigure.data.redis.RedisProperties,可以配置單個 Redis 例項或者 Redis 叢集的連線配置。根據這些配置,會生成統一的 Redis 連線工廠 RedisConnectionFactory
spring-data-redis 核心介面與背後的連線相關抽象關係為:
通過這個圖,我們可以知道,我們實現一個可以動態返回不同 Redis 連線的 RedisConnectionFactory 即可,並且根據 spring-data-redis 的自動裝載源碼可以知道,框架內的所有 RedisConnectionFactory 是 @ConditionalOnMissingBean 的,即我們可以使用我們自己實現的 RedisConnectionFactory 進行替換。
項目地址:github.com/JoJoTec/spr…
我們可以給 RedisProperties 配置外層封裝一個多 Redis 連線的配置,即MultiRedisProperties:
@Data@NoArgsConstructor@ConfigurationProperties(prefix = "spring.redis")public class MultiRedisProperties { /** * 預設連線必須配置,配置 key 為 default */ public static final String DEFAULT = "default"; private boolean enableMulti = false; private Map<String, RedisProperties> multi;}複製程式碼
這個配置是在原有配置基礎上的,也就是使用者可以使用原有配置,也可以使用這種多 Redis 配置,就是需要配置 spring.redis.enable-multi=true。multi 這個 Map 中放入的 key 是資料來源名稱,使用者可以在使用 RedisTemplate 或者 ReactiveRedisTemplate 之前,通過這個資料來源名稱指定用哪個 Redis。
接下來我們來實現 MultiRedisLettuceConnectionFactory,即可以動態切換 Redis 連線的 RedisConnectionFactory,我們的項目採用的 Redis 客戶端是 Lettuce:
public class MultiRedisLettuceConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory { private final Map<String, LettuceConnectionFactory> connectionFactoryMap; private static final ThreadLocal<String> currentRedis = new ThreadLocal<>(); public MultiRedisLettuceConnectionFactory(Map<String, LettuceConnectionFactory> connectionFactoryMap) { this.connectionFactoryMap = connectionFactoryMap; } public void setCurrentRedis(String currentRedis) { if (!connectionFactoryMap.containsKey(currentRedis)) { throw new RedisRelatedException("invalid currentRedis: " + currentRedis + ", it does not exists in configuration"); } MultiRedisLettuceConnectionFactory.currentRedis.set(currentRedis); } @Override public void destroy() throws Exception { connectionFactoryMap.values().forEach(LettuceConnectionFactory::destroy); } @Override public void afterPropertiesSet() throws Exception { connectionFactoryMap.values().forEach(LettuceConnectionFactory::afterPropertiesSet); } private LettuceConnectionFactory currentLettuceConnectionFactory() { String currentRedis = MultiRedisLettuceConnectionFactory.currentRedis.get(); if (StringUtils.isNotBlank(currentRedis)) { MultiRedisLettuceConnectionFactory.currentRedis.remove(); return connectionFactoryMap.get(currentRedis); } return connectionFactoryMap.get(MultiRedisProperties.DEFAULT); } @Override public ReactiveRedisConnection getReactiveConnection() { return currentLettuceConnectionFactory().getReactiveConnection(); } @Override public ReactiveRedisClusterConnection getReactiveClusterConnection() { return currentLettuceConnectionFactory().getReactiveClusterConnection(); } @Override public RedisConnection getConnection() { return currentLettuceConnectionFactory().getConnection(); } @Override public RedisClusterConnection getClusterConnection() { return currentLettuceConnectionFactory().getClusterConnection(); } @Override public boolean getConvertPipelineAndTxResults() { return currentLettuceConnectionFactory().getConvertPipelineAndTxResults(); } @Override public RedisSentinelConnection getSentinelConnection() { return currentLettuceConnectionFactory().getSentinelConnection(); } @Override public DataAccessException translateExceptionIfPossible(RuntimeException ex) { return currentLettuceConnectionFactory().translateExceptionIfPossible(ex); }}複製程式碼
邏輯非常簡單,就是提供了設定 Redis 資料來源的介面,並且放入了 ThreadLocal 中,並且僅對當前一次有效,讀取後就清空。
然後,將 MultiRedisLettuceConnectionFactory 作為 Bean 註冊到我們的 ApplicationContext 中:
@ConditionalOnProperty(prefix = "spring.redis", value = "enable-multi", matchIfMissing = false)@Configuration(proxyBeanMethods = false)public class RedisCustomizedConfiguration { /** * @param builderCustomizers * @param clientResources * @param multiRedisProperties * @return * @see org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration */ @Bean public MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory( ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers, ClientResources clientResources, MultiRedisProperties multiRedisProperties, ObjectProvider<RedisSentinelConfiguration> sentinelConfigurationProvider, ObjectProvider<RedisClusterConfiguration> clusterConfigurationProvider ) { //讀取配置 Map<String, LettuceConnectionFactory> connectionFactoryMap = Maps.newHashMap(); Map<String, RedisProperties> multi = multiRedisProperties.getMulti(); multi.forEach((k, v) -> { //這個其實就是框架中原有的源碼使用 RedisProperties 的方式,我們其實就是在 RedisProperties 外面包裝了一層而已 LettuceConnectionConfiguration lettuceConnectionConfiguration = new LettuceConnectionConfiguration( v, sentinelConfigurationProvider, clusterConfigurationProvider ); LettuceConnectionFactory lettuceConnectionFactory = lettuceConnectionConfiguration.redisConnectionFactory(builderCustomizers, clientResources); connectionFactoryMap.put(k, lettuceConnectionFactory); }); return new MultiRedisLettuceConnectionFactory(connectionFactoryMap); }}複製程式碼
我們來測試下,使用 embedded-redis 來啟動本地 redis,從而實現單元測試。我們啟動兩個 Redis,在兩個 Redis 中放入不同的 Key,驗證是否存在,並且測試同步介面,多執行緒呼叫同步介面,和多次非同步介面無等待訂閱從而測試有效性。:
import com.github.jojotech.spring.boot.starter.redis.related.lettuce.MultiRedisLettuceConnectionFactory;import org.junit.jupiter.api.AfterAll;import org.junit.jupiter.api.Assertions;import org.junit.jupiter.api.BeforeAll;import org.junit.jupiter.api.Test;import org.junit.jupiter.api.extension.ExtendWith;import org.redisson.api.RedissonClient;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.autoconfigure.EnableAutoConfiguration;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.core.ReactiveStringRedisTemplate;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.test.context.junit.jupiter.SpringExtension;import reactor.core.publisher.Mono;import redis.embedded.RedisServer;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicBoolean;@ExtendWith(SpringExtension.class)@SpringBootTest(properties = { "spring.redis.enable-multi=true", "spring.redis.multi.default.host=127.0.0.1", "spring.redis.multi.default.port=6379", "spring.redis.multi.test.host=127.0.0.1", "spring.redis.multi.test.port=6380",})public class MultiRedisTest { //啟動兩個 redis private static RedisServer redisServer; private static RedisServer redisServer2; @BeforeAll public static void setUp() throws Exception { System.out.println("start redis"); redisServer = RedisServer.builder().port(6379).setting("maxheap 200m").build(); redisServer2 = RedisServer.builder().port(6380).setting("maxheap 200m").build(); redisServer.start(); redisServer2.start(); System.out.println("redis started"); } @AfterAll public static void tearDown() throws Exception { System.out.println("stop redis"); redisServer.stop(); redisServer2.stop(); System.out.println("redis stopped"); } @EnableAutoConfiguration @Configuration public static class App { } @Autowired private StringRedisTemplate redisTemplate; @Autowired private ReactiveStringRedisTemplate reactiveRedisTemplate; @Autowired private MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory; private void testMulti(String suffix) { //使用預設連線,設定 "testDefault" + suffix, "testDefault" 鍵值對 redisTemplate.opsForValue().set("testDefault" + suffix, "testDefault"); //使用 test 連線,設定 "testSecond" + suffix, "testDefault" 鍵值對 multiRedisLettuceConnectionFactory.setCurrentRedis("test"); redisTemplate.opsForValue().set("testSecond" + suffix, "testSecond"); //使用預設連線,驗證 "testDefault" + suffix 存在,"testSecond" + suffix 不存在 Assertions.assertTrue(redisTemplate.hasKey("testDefault" + suffix)); Assertions.assertFalse(redisTemplate.hasKey("testSecond" + suffix)); //使用 test 連線,驗證 "testDefault" + suffix 不存在,"testSecond" + suffix 存在 multiRedisLettuceConnectionFactory.setCurrentRedis("test"); Assertions.assertFalse(redisTemplate.hasKey("testDefault" + suffix)); multiRedisLettuceConnectionFactory.setCurrentRedis("test"); Assertions.assertTrue(redisTemplate.hasKey("testSecond" + suffix)); } //單次驗證 @Test public void testMultiBlock() { testMulti(""); } //多執行緒驗證 @Test public void testMultiBlockMultiThread() throws InterruptedException { Thread thread[] = new Thread[50]; AtomicBoolean result = new AtomicBoolean(true); for (int i = 0; i < thread.length; i++) { int finalI = i; thread[i] = new Thread(() -> { try { testMulti("" + finalI); } catch (Exception e) { e.printStackTrace(); result.set(false); } }); } for (int i = 0; i < thread.length; i++) { thread[i].start(); } for (int i = 0; i < thread.length; i++) { thread[i].join(); } Assertions.assertTrue(result.get()); } //reactive 介面驗證 private Mono<Boolean> reactiveMulti(String suffix) { return reactiveRedisTemplate.opsForValue().set("testReactiveDefault" + suffix, "testReactiveDefault") .flatMap(b -> { multiRedisLettuceConnectionFactory.setCurrentRedis("test"); return reactiveRedisTemplate.opsForValue().set("testReactiveSecond" + suffix, "testReactiveSecond"); }).flatMap(b -> { return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix); }).map(b -> { Assertions.assertTrue(b); System.out.println(Thread.currentThread().getName()); return b; }).flatMap(b -> { return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix); }).map(b -> { Assertions.assertFalse(b); System.out.println(Thread.currentThread().getName()); return b; }).flatMap(b -> { multiRedisLettuceConnectionFactory.setCurrentRedis("test"); return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix); }).map(b -> { Assertions.assertFalse(b); System.out.println(Thread.currentThread().getName()); return b; }).flatMap(b -> { multiRedisLettuceConnectionFactory.setCurrentRedis("test"); return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix); }).map(b -> { Assertions.assertTrue(b); return b; }); } //多次呼叫 reactive 驗證,並且 subscribe,這本身就是多執行緒的 @Test public void testMultiReactive() throws InterruptedException { for (int i = 0; i < 10000; i++) { reactiveMulti("" + i).subscribe(System.out::println); } TimeUnit.SECONDS.sleep(10); }}複製程式碼
運行測試,通過。
相關文章
最近遇到了一個麻煩的需求,我們需要一個微服務應用同時訪問兩個不同的 Redis 叢集。一般我們不會這麼使用 Redis,但是這兩個 Redis 本來是不同的業務叢集,現在需要一個微服務同
2021-09-02 03:07:29
在日常工作中經常會使用excel製作表格資料,有時需要計算不同表格中的資料,比如兩個表中的資料相加得出新的資料,這樣的跨表格計算怎麼操作呢,今天小編來就來分享下excel中跨表格
2021-09-02 03:07:23
11代酷睿11600K/KF處理器應該搭配什麼主機板更合理,嚴格意義上講似乎並沒有標準答案,因為不論是搭配B560還是Z590主機板,都有它的合理性。B560主機板釋放了記憶體超頻能力,同時
2021-09-02 03:06:42
忙忙碌碌中,2021年已過了三分之二。初秋是個思念的季節,遠方的朋友,雖然距離讓我們疏遠,但心裡的牽掛卻一直沒減,天涼了,再忙也一定要記得照顧好自己。以下是芝麻科技訊帶來#八月
2021-09-02 03:06:38
根據多方訊息,蘋果可能將在9月14日舉行新品釋出會,屆時,萬眾期待的iPhone13系列手機將正式亮相。不僅如此,iPhone13系列的開售時間也被確定,爆料訊息顯示,該機將在9月24日正式開售
2021-09-02 03:06:30
你用上5G了嗎?自從2019年三大運營商獲得了5G商用許可後,5G就成為了大家經常關注的熱點,而國家也出臺了各種各樣的政策來支援5G發展。不過在2019年時,當三大運營商第一次推出5G套
2021-09-02 03:06:21