<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
Lettuce連線redis叢集使用的都是叢集專用類,像RedisClusterClient、StatefulRedisClusterConnection、RedisAdvancedClusterCommands、StatefulRedisClusterPubSubConnection等等;
Lettuce對rediscluster的支援:
啟動時只需至少一個可以連線的叢集節點就可以,能夠自動拓撲出叢集全部節點;也可以使用ReadFrom設定讀取資料來源,跟主從模式一樣;
雖然redis本身的多鍵命令要求key必須都在同一個槽位,但Lettuce對一部分命令多了優化,可以對多鍵命令進行跨槽位執行,通過將對不同槽位鍵的操作命令分解為多條命令,單個命令以fork/join方式並行執行,最後將結果合併返回;
提供跨槽位命令的api:RedisAdvancedClusterCommands、RedisAdvancedClusterAsyncCommands、RedisAdvancedClusterReactiveCommands;
普通使用者空間的釋出訂閱,redis叢集會傳送到每個節點,釋出者和訂閱者不需要在同一個節點,普通訂閱釋出訊息可以在叢集拓撲改變時重新連線。對於鍵空間事件,只會發到自己的節點,不會擴散到其他節點,要訂閱鍵空間事件可以去適當的多個節點上訂閱,或者使用RedisClusterClient訊息傳播和NodeSelectionAPI獲得一個託管連線集合;
注意:由於主從同步,鍵會被複制到多個從節點上,特別是鍵過期事件,會在主從節點上都產生過期事件,如果訂閱從節點,可能會收到多條相同的過期事件;訂閱是通過NodeSelectionAPI或者單個節點呼叫subscribe(…)發出的,訂閱對於新增的節點無效;
測試Demo:(redis版本7.0.2,Lettuce版本6.1.8)
叢集節點:虛擬機器器 192.168.1.31,埠 9001-9006,叢集節點已設定notify-keyspace-events AK;
package testlettuce; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import io.lettuce.core.ClientOptions.DisconnectedBehavior; import io.lettuce.core.KeyScanCursor; import io.lettuce.core.KeyValue; import io.lettuce.core.ReadFrom; import io.lettuce.core.RedisURI; import io.lettuce.core.ScanCursor; import io.lettuce.core.SocketOptions; import io.lettuce.core.SslOptions; import io.lettuce.core.TimeoutOptions; import io.lettuce.core.cluster.ClusterClientOptions; import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.sync.Executions; import io.lettuce.core.cluster.api.sync.NodeSelection; import io.lettuce.core.cluster.api.sync.NodeSelectionCommands; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.cluster.pubsub.api.async.NodeSelectionPubSubAsyncCommands; import io.lettuce.core.cluster.pubsub.api.async.PubSubAsyncNodeSelection; import io.lettuce.core.cluster.pubsub.api.reactive.RedisClusterPubSubReactiveCommands; import io.lettuce.core.protocol.DecodeBufferPolicies; import io.lettuce.core.protocol.ProtocolVersion; import io.lettuce.core.pubsub.RedisPubSubListener; import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; public class TestLettuceCluster { /** * @param args */ public static void main(String[] args) { List<RedisURI> nodeList = new ArrayList<>(); nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9001).withAuthentication("default", "123456").build()); nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9002).withAuthentication("default", "123456").build()); nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9003).withAuthentication("default", "123456").build()); nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9004).withAuthentication("default", "123456").build()); nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9005).withAuthentication("default", "123456").build()); nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9006).withAuthentication("default", "123456").build()); RedisClusterClient clusterClient = RedisClusterClient.create(nodeList); ClusterTopologyRefreshOptions clusterTopologyRefreshOptions = ClusterTopologyRefreshOptions.builder() .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(5L))//設定自適應拓撲重新整理超時,每次超時重新整理一次,預設30s; .closeStaleConnections(false)//重新整理拓撲時是否關閉失效連線,預設true,isPeriodicRefreshEnabled()為true時生效; .dynamicRefreshSources(true)//從拓撲中發現新節點,並將新節點也作為拓撲的源節點,動態重新整理可以發現全部節點並計算每個使用者端的數量,設定false則只有初始節點為源和計算使用者端數量; .enableAllAdaptiveRefreshTriggers()//啟用全部觸發器自適應重新整理拓撲,預設關閉; .enablePeriodicRefresh(Duration.ofSeconds(5L))//開啟定時拓撲重新整理並設定週期; .refreshTriggersReconnectAttempts(3)//長連線重新連線嘗試n次才拓撲重新整理 .build(); ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder() .autoReconnect(true)//在連線丟失時開啟或關閉自動重連,預設true; .cancelCommandsOnReconnectFailure(true)//允許在重連失敗取消排隊命令,預設false; .decodeBufferPolicy(DecodeBufferPolicies.always())//設定丟棄解碼緩衝區的策略,以回收記憶體;always:解碼後丟棄,最大記憶體效率;alwaysSome:解碼後丟棄一部分;ratio(n)基於比率丟棄,n/(1+n),通常用1-10對應50%-90%; .disconnectedBehavior(DisconnectedBehavior.DEFAULT)//設定連線斷開時命令的呼叫行為,預設啟用重連;DEFAULT:啟用時重連中接收命令,禁用時重連中拒絕命令;ACCEPT_COMMANDS:重連中接收命令;REJECT_COMMANDS:重連中拒絕命令; // .maxRedirects(5)//當鍵從一個節點遷移到另一個節點,叢集重定向次數,預設5; // .nodeFilter(nodeFilter)//設定節點過濾器 // .pingBeforeActivateConnection(true)//啟用連線前設定PING,預設true; // .protocolVersion(ProtocolVersion.RESP3)//設定協定版本,預設RESP3; // .publishOnScheduler(false)//使用專用的排程器發出響應訊號,預設false,啟用時資料訊號將使用服務的多執行緒發出; // .requestQueueSize(requestQueueSize)//設定每個連線請求佇列大小; // .scriptCharset(scriptCharset)//設定Lua指令碼編碼為byte[]的字元集,預設StandardCharsets.UTF_8; // .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(10)).keepAlive(true).tcpNoDelay(true).build())//設定低階通訊端的屬性 // .sslOptions(SslOptions.builder().build())//設定ssl屬性 // .suspendReconnectOnProtocolFailure(false)//當重新連線遇到協定失敗時暫停重新連線(SSL驗證,連線失敗前PING),預設值為false; // .timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(10)))//設定超時來取消和終止命令; .topologyRefreshOptions(clusterTopologyRefreshOptions)//設定拓撲更新設定 .validateClusterNodeMembership(true)//在允許連線到叢集節點之前,驗證叢集節點成員關係,預設值為true; .build(); clusterClient.setDefaultTimeout(Duration.ofSeconds(5L)); clusterClient.setOptions(clusterClientOptions); StatefulRedisClusterConnection<String, String> clusterConn = clusterClient.connect(); clusterConn.setReadFrom(ReadFrom.ANY);//設定從哪些節點讀取資料; RedisAdvancedClusterCommands<String, String> clusterCmd = clusterConn.sync(); clusterCmd.set("a", "A"); clusterCmd.set("b", "B"); clusterCmd.set("c", "C"); clusterCmd.set("d", "D"); System.out.println("get a=" + clusterCmd.get("a")); System.out.println("get b=" + clusterCmd.get("b")); System.out.println("get c=" + clusterCmd.get("c")); System.out.println("get d=" + clusterCmd.get("d")); //跨槽位命令 Map<String, String> kvmap = new HashMap<>(); kvmap.put("a", "AA"); kvmap.put("b", "BB"); kvmap.put("c", "CC"); kvmap.put("d", "DD"); clusterCmd.mset(kvmap);//Lettuce做了優化,支援一些命令的跨槽位命令; System.out.println("Lettuce mget:" + clusterCmd.mget("a", "b", "c", "d")); //選定部分節點操作 NodeSelection<String, String> replicas = clusterCmd.replicas(); NodeSelectionCommands<String, String> replicaseCmd = replicas.commands(); Executions<KeyScanCursor<String>> executions = replicaseCmd.scan(ScanCursor.INITIAL); executions.forEach(s -> {System.out.println(s.getKeys());}); //訂閱釋出訊息 StatefulRedisClusterPubSubConnection<String, String> pubSubConn = clusterClient.connectPubSub(); pubSubConn.addListener(new RedisPubSubListener<String, String>() { @Override public void message(String channel, String message) { System.out.println("[message]ch:" + channel + ",msg:" + message); } @Override public void message(String pattern, String channel, String message) { } @Override public void subscribed(String channel, long count) { System.out.println("[subscribed]ch:" + channel); } @Override public void psubscribed(String pattern, long count) { } @Override public void unsubscribed(String channel, long count) { } @Override public void punsubscribed(String pattern, long count) { } }); pubSubConn.sync().subscribe("TEST_Ch");//(回撥內部使用阻塞呼叫或者lettuce同步api呼叫,需使用非同步訂閱) clusterCmd.publish("TEST_Ch", "MSGMSGMSG"); //響應式訂閱,可以監聽ChannelMessage和PatternMessage,使用鏈式過濾處理計算等操作 RedisClusterPubSubReactiveCommands<String, String> pubsubReactive = pubSubConn.reactive(); pubsubReactive.subscribe("TEST_Ch2").subscribe(); pubsubReactive.observeChannels() .filter(chmsg -> {return chmsg.getMessage().contains("tom");}) .doOnNext(chmsg -> {System.out.println("<tom>" + chmsg.getChannel() + ">>" + chmsg.getMessage());}) .subscribe(); clusterCmd.publish("TEST_Ch2", "send to jerry"); clusterCmd.publish("TEST_Ch", "tom MSG"); clusterCmd.publish("TEST_Ch2", "this is tom"); //keySpaceEvent事件 StatefulRedisClusterPubSubConnection<String, String> clusterPubSubConn = clusterClient.connectPubSub(); clusterPubSubConn.setNodeMessagePropagation(true);//啟用禁用節點訊息傳播到該listener,例如只能在本節點通知的鍵事件通知; RedisPubSubListener<String, String> listener = new RedisPubSubListener<String, String>() { @Override public void unsubscribed(String channel, long count) { System.out.println("unsubscribed_ch:" + channel); } @Override public void subscribed(String channel, long count) { System.out.println("subscribed_ch:" + channel); } @Override public void punsubscribed(String pattern, long count) { System.out.println("punsubscribed_pattern:" + pattern); } @Override public void psubscribed(String pattern, long count) { System.out.println("psubscribed_pattern:" + pattern); } @Override public void message(String pattern, String channel, String message) { System.out.println("message_pattern:" + pattern + " ch:" + channel + " msg:" + message); } @Override public void message(String channel, String message) { System.out.println("message_ch:" + channel + " msg:" + message); } }; clusterPubSubConn.addListener(listener); PubSubAsyncNodeSelection<String, String> allPubSubAsyncNodeSelection = clusterPubSubConn.async().all(); NodeSelectionPubSubAsyncCommands<String, String> pubsubAsyncCmd = allPubSubAsyncNodeSelection.commands(); clusterCmd.setex("a", 1, "A"); pubsubAsyncCmd.psubscribe("__keyspace@0__:*"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("end"); } }
執行結果:
另外,還有一個cluster專用的Listener:RedisClusterPubSubListener,可以從listener裡獲得釋出訊息的節點資訊:
RedisClusterPubSubListener<String, String> clusterListener = new RedisClusterPubSubListener<String, String>() { @Override public void message(RedisClusterNode node, String channel, String message) { } @Override public void message(RedisClusterNode node, String pattern, String channel, String message) { } @Override public void subscribed(RedisClusterNode node, String channel, long count) { } @Override public void psubscribed(RedisClusterNode node, String pattern, long count) { } @Override public void unsubscribed(RedisClusterNode node, String channel, long count) { } @Override public void punsubscribed(RedisClusterNode node, String pattern, long count) { } };
到此這篇關於Redis Lettuce連線redis叢集實現過程詳細講解的文章就介紹到這了,更多相關Redis Lettuce連線redis叢集內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45