首頁 > 軟體

SkyWalking 自定義外掛(Spring RabbitMQ)具體分析過程

2022-02-14 16:00:20

SkyWalking 自定義外掛(Spring RabbitMQ) 官方

RabbitMQ外掛問題

skywalking官方提供的RabbitMQ外掛存在缺陷,其只針對RabbitMQ官方原生Client實現擴充套件,但我們在專案中一般不直接使用原生Client,而是使用Spring RabitMQ Client,因Spring RabitMQ Consumer中存在跨執行緒操作,導致跟蹤ID斷鏈。

具體分析過程

1.官方外掛原始碼的攔截點是原生Consumer的handleDelivery方法,原始碼如下:

2.而Spring RabbitMQ消費者的預設實現是BlockingQueueConsumer, handleDelivery核心邏輯是把訊息放到內部的BlockingQueue佇列,不做真正的消費處理,因此攔截此處無法關聯到消費者邏輯,原始碼如下

@Override
		public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
				byte[] body) {
		...
			try {
				if (BlockingQueueConsumer.this.abortStarted > 0) {
					if (!BlockingQueueConsumer.this.queue.offer(
							new Delivery(consumerTag, envelope, properties, body, this.queueName),
							BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {

						Channel channelToClose = super.getChannel();
						RabbitUtils.setPhysicalCloseRequired(channelToClose, true);
						// Defensive - should never happen
						BlockingQueueConsumer.this.queue.clear();
						if (!this.canceled) {
							RabbitUtils.cancel(channelToClose, consumerTag);
						}
						try {
							channelToClose.close();
						catch (@SuppressWarnings("unused") TimeoutException e) {
							// no-op
					}
				}
				else {
					BlockingQueueConsumer.this.queue
							.put(new Delivery(consumerTag, envelope, properties, body, this.queueName));
			}
			catch (@SuppressWarnings("unused") InterruptedException e) {
				Thread.currentThread().interrupt();
			catch (Exception e) {
				BlockingQueueConsumer.logger.warn("Unexpected exception during delivery", e);
		}

3.真正的消費處理在SimpleMessageListenerContainer,SimpleMessageListenerContainer繼承Runnable介面,在其run方法中while迴圈呼叫mainLoop方法,整體呼叫鏈路為

4.SimpleMessageListenerContainer.run() -> SimpleMessageListenerContainer.mainLoop() -> SimpleMessageListenerContainer.receiveAndExecute() -> SimpleMessageListenerContainer.doReceiveAndExecute() -> AbstractMessageListenerContainer.executeListener()。最終在executeListener中執行消費邏輯

protected void executeListener(Channel channel, Object data) {
	...
		try {
      // 執行消費邏輯
			doExecuteListener(channel, data);
			if (sample != null) {
				this.micrometerHolder.success(sample, data instanceof Message
						? ((Message) data).getMessageProperties().getConsumerQueue()
						: queuesAsListString());
			}
		}
		catch (RuntimeException ex) {
		....
		}
	}

實現自定義外掛

從上面可以分析出,AbstractMessageListenerContainer.executeListener()是最佳的攔截點
實現原始碼已放到碼雲倉庫:https://gitee.com/eureka-gitee/apm-sniffer-pro/tree/v7.0.0.0/

效果展示

SkyWalking呼叫鏈路

logback紀錄檔

到此這篇關於SkyWalking 自定義外掛(Spring RabbitMQ)的文章就介紹到這了,更多相關SkyWalking 自定義外掛內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com