<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
Consumer
端可以設定重試次數,當訊息消費失敗的時候會進行重試。
底層使用Spring Retry
去重試,重試次數可自定義設定。
# 預設重試次數為3,設定大於1時才會生效 spring.cloud.stream.bindings.<channelName>.consumer.maxAttempte=3
Producer
傳送訊息出錯的情況下,可以設定錯誤處理,將錯誤資訊傳送給對應ID的MessageChannel
MessageChannel
。這個MessageChannel
會取ApplicationContext
中name為topic.errors
(topic
就是設定的destination
)的Bean。PublishSubscribeChannel
。BridgeHandler
訂閱這個MessageChannel
,同時再設定ApplicationContext
中name為errorChannel
的PublishSubscribeChannel
訊息通道為BridgeHandler
的outputChannel
。public static final String ERROR_CHANNEL_BEAN_NAME = "errorChannel" private SubscribableChannel registerErrorInfrastructure( ProducerDestination destination) { // destination.getName() + ".errors" String errorChannelName = errorsBaseName(destination); SubscribableChannel errorChannel; if (getApplicationContext().containsBean(errorChannelName)) { Object errorChannelObject = getApplicationContext().getBean(errorChannelName); if (!(errorChannelObject instanceof SubscribableChannel)) { throw new IllegalStateException("Error channel '" + errorChannelName + "' must be a SubscribableChannel"); } errorChannel = (SubscribableChannel) errorChannelObject; } else { errorChannel = new PublishSubscribeChannel(); ((GenericApplicationContext) getApplicationContext()).registerBean( errorChannelName, SubscribableChannel.class, () -> errorChannel); } MessageChannel defaultErrorChannel = null; if (getApplicationContext() .containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)) { defaultErrorChannel = getApplicationContext().getBean( IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, MessageChannel.class); } if (defaultErrorChannel != null) { BridgeHandler errorBridge = new BridgeHandler(); errorBridge.setOutputChannel(defaultErrorChannel); errorChannel.subscribe(errorBridge); String errorBridgeHandlerName = getErrorBridgeName(destination); ((GenericApplicationContext) getApplicationContext()).registerBean( errorBridgeHandlerName, BridgeHandler.class, () -> errorBridge); } return errorChannel; }
spring.cloud.stream.bindings.output.destination=test-output # 訊息傳送失敗的處理邏輯預設是關閉的 spring.cloud.stream.bindings.output.producer.errorChannelEnabled=true
@Bean("test-output.errors") MessageChannel testOutputErrorChannel() { return new PublishSubscribeChannel(); } @Service class ErrorProduceService { @ServiceActivator(inputChannel = "test-output.errors") public void receiveProduceError(Message receiveMsg) { System.out.println("receive error msg: " + receiveMsg); } }
Consumer
消費訊息出錯的情況下,可以設定錯誤處理,將錯誤資訊發給對應ID的MessageChannel
訊息錯誤處理與生產錯誤處理大致相同。錯誤的MessageChannel
對應的name為topic.group.errors
,還會加上多個MessageHandler
訂閱的一些判斷,使用ErrorMessageStrategy
建立錯誤訊息等內容。
spring.cloud.stream.bindings.input.destination=test-input spring.cloud.stream.bindings.input.group=test-input-group
@StreamListener(Sink.INPUT) public void receive(String receiveMsg) { throw new RuntimeException("Oops"); } @ServiceActivator(inputChannel = "test-input.test-input-group.errors") public void receiveConsumeError(Message receiveMsg) { System.out.println("receive error msg: " + receiveMsg); }
建議直接使用topic.group.errors
這個訊息通道,並設定傳送到單播模式的DirectChannel
訊息通道中(使用@ServiceActivator
註解接收會直接構成DirectChannel
),這樣會確保只會被唯一的一個訂閱了topic.group.errors
的MessageHandler
處理,否則可能會被多個MessageHandler
處理,導致出現一些意想不到的結果。
預設情況下,Output Binding
對應的MessageChannel
和Input Binding
對應的SubscribeChannel
會被構造成DirectChannel
。
SCS提供了BindingTargetFactory
介面進行擴充套件,比如可以擴充套件構造PublishSubscribeChannel
這種廣播型別的MessageChannel
。
BindingTargetFactory
介面只有兩個實現類
SubscribableChannelBindingTargetFactory
:針對Input Binding
和Output Binding
都會構造成DirectWithAttributesChannel
型別的MessageChannel
(一種帶有HashMap
屬性的DirectChannel
)。MessageSourceBindingTargetFactory
:不支援Output Binding
,Input Binding
會構造成DefaultPollableMessageSource
。DefaultPollableMessageSource
內部維護著MessageSource
屬性,該屬性用於拉取訊息。SCS提供了BindingsEndpoint
,可以獲取Binding
資訊或對Binding
生命週期進行修改,比如start
、stop
、pause
或resume
。
BindingsEndpoint
的ID是bindings,對外暴露了一下3個操作:
Binding
狀態,可以改成STARTED
、STOPPED
、PAUSED
和RESUMED
,對應Binding
介面的4個操作。Binding
的狀態資訊。Binding
的狀態資訊。@Endpoint(id = "bindings") public class BindingsEndpoint { ... @WriteOperation public void changeState(@Selector String name, State state) { Binding<?> binding = BindingsEndpoint.this.locateBinding(name); if (binding != null) { switch (state) { case STARTED: binding.start(); break; case STOPPED: binding.stop(); break; case PAUSED: binding.pause(); break; case RESUMED: binding.resume(); break; default: break; } } } @ReadOperation public List<?> queryStates() { List<Binding<?>> bindings = new ArrayList<>(gatherInputBindings()); bindings.addAll(gatherOutputBindings()); return this.objectMapper.convertValue(bindings, List.class); } @ReadOperation public Binding<?> queryState(@Selector String name) { Assert.notNull(name, "'name' must not be null"); return this.locateBinding(name); } ... }
該功能自動與micrometer整合進行Metrics統計,可以通過字首spring.cloud.stream.metrics
進行相關設定,設定項spring.cloud.stream.bindings.applicationMetrics.destination
會構造MetersPublisherBinding
,將相關的metrics傳送到MQ中。
預設與Spring Cloud Function
整合。
可以使用Function處理訊息。組態檔需要加上function設定。
spring.cloud.stream.function.definition=uppercase | addprefix
@Bean public Function<String, String> uppercase() { return x -> x.toUpperCase(); } @Bean public Function<String, String> addprefix() { return x -> "prefix-" + x; }
SCS統一Partition
相關的設定,可以遮蔽不同MQ Partition的設定。
Producer Binding提供的ProducerProperties提供了一些Partition相關的設定:
partitionKeyExpression
:partition key提取表示式。partitionKeyExtractorName
:是一個實現PartitionKeyExtractorStrategy
介面的Bean name。PartitionKeyExtractorStrategy
是一個根據Message獲取partition key的介面。如果兩者都設定,優先順序高於partitionKeyExtractorName
。partitionSelectorName
:是一個實現PartitionSelectorStrategy
介面的Bean name。PartitionSelectorStrategy
是一個根據partition key決定選擇哪個partition 的介面。partitionSelectorExpression
:partition 選擇表示式,會根據表示式和partition key得到最終的partition。如果兩者都設定,優先partitionSelectorExpression
表示式解析partition。partitionCount
:partition 個數。該屬性不一定會生效,Kafka Binder 和RocketMQ Binder會使用topic上的partition 個數覆蓋該屬性。public final class PartitioningInterceptor implements ChannelInterceptor { ... @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { if (!message.getHeaders().containsKey(BinderHeaders.PARTITION_OVERRIDE)) { int partition = this.partitionHandler.determinePartition(message); return MessageConverterConfigurer.this.messageBuilderFactory .fromMessage(message) .setHeader(BinderHeaders.PARTITION_HEADER, partition).build(); } else { return MessageConverterConfigurer.this.messageBuilderFactory .fromMessage(message) .setHeader(BinderHeaders.PARTITION_HEADER, message.getHeaders() .get(BinderHeaders.PARTITION_OVERRIDE)) .removeHeader(BinderHeaders.PARTITION_OVERRIDE).build(); } } } public class PartitionHandler { ... public int determinePartition(Message<?> message) { Object key = extractKey(message); int partition; if (this.producerProperties.getPartitionSelectorExpression() != null) { partition = this.producerProperties.getPartitionSelectorExpression() .getValue(this.evaluationContext, key, Integer.class); } else { partition = this.partitionSelectorStrategy.selectPartition(key, this.partitionCount); } // protection in case a user selector returns a negative. return Math.abs(partition % this.partitionCount); } private Object extractKey(Message<?> message) { Object key = invokeKeyExtractor(message); if (key == null && this.producerProperties.getPartitionKeyExpression() != null) { key = this.producerProperties.getPartitionKeyExpression() .getValue(this.evaluationContext, message); } Assert.notNull(key, "Partition key cannot be null"); return key; } ... }
實現MessageSource
進行polling
操作的Consumer
。
普通的Pub/Sub模式需要定義SubscribeableChannel
型別的返回值,Polling Consumer需要定義PollableMessageSource
型別的返回值。
public interface PollableSink { /** * Input channel name. */ String INPUT = "input"; /** * @return input channel. */ @Input(Sink.INPUT) PollableMessageSource input(); }
支援多個Binder
同時使用,在設定Binding
的時候需要指定對應的Binder
。
設定全域性預設的Binder
:spring.cloud.stream.default-binder=rocketmq
。
設定各個Binder內部的設定資訊:
spring.cloud.stream.binders.rocketmq.environment.<xx>=xx
spring.cloud.stream.binders.rocketmq.type=rocketmq
設定Binding
對應的Binder
:
spring.cloud.stream.bindings.<channelName>.binder=kafka
spring.cloud.stream.bindings.<channelName>.binder=rocketmq
spring.cloud.stream.bindings.<channelName>.binder=rabbit
比如,新建BindingCreateEvent
事件,使用者的應用就可以監聽該事件在建立Input Binding
或Output Binding
時做業務相關的處理。
以上就是Spring Cloud Stream 高階特性使用詳解的詳細內容,更多關於Spring Cloud Stream 高階特性的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45