<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
在第三方支付中,例如支付寶、或者微信,對於訂單請求,第三方支付系統採用的是訊息同步返回、非同步通知+主動補償查詢的補償機制。
由於網際網路通訊的不可靠性,例如雙方網路、伺服器、應用等因素的影響,不管是同步返回、非同步通知、主動查詢報文都可能出現超時無響應、報文丟失等情況,所以像支付業務,對結果的通知一般採用幾種方案結合的補償機制,不能完全依賴某一種機制。
例如一個支付結果的通知,一方面會在支付頁面跳轉時候返回支付結果(一般只用作前端展示使用,非最終狀態),同時會採用後臺非同步通知機制(有前臺、後臺通知的,以後臺非同步通知結果為準),但由於前臺跳轉、後臺結果通知都可能失效,因此還以定時補單+請求方主動查詢介面作為輔助手段。
常見的補單操作,任務排程策略一般設定30秒、60秒、3分鐘、6分鐘、10分鐘排程多次(以自己業務需要),如果排程接收到響應確認報文,補單成功,則中止對應訂單的排程任務;如果超過補單上限次數,則停止補單,避免無謂的資源浪費。請求端隨時可以發起請求報文查詢對應訂單的狀態。在日常開發中,對於網站前端來說,支付計費中心對於訂單請求資訊的處理也是通過訊息同步返回、非同步通知+主動補償查詢相結合的機制,其中對於訂單的非同步通知,目前的通知策略為3s、30s、60s、120s、180、300s的階梯性通知。返回成功情況下就不繼續通知了,本來打算使用將失敗的訊息寫到資料庫等待傳送,然後每秒查詢資料庫獲取訊息通知前端。但覺得這樣的處理方式太粗暴。
存在以下缺點:
1 、每秒請求有點兒浪費資源;
2 、通知方式不穩定;
3 、無法承受巨量資料量等等
所以最終打算使用rabbitmq的訊息延遲+死信佇列來實現。訊息模型如下:
producer釋出訊息,通過exchangeA的訊息會被分發到QueueA,Consumer監聽queueA,一旦有訊息到來就被消費,這邊的消費業務就是通知前端,如果通知失敗,就建立一個延遲佇列declareQueue,設定每個訊息的ttl然後通過declare_exchange將訊息分發到declare_queue,因為declare_queue沒有consumer並且declare_queue中的訊息設定了ttl,當ttl到期後,將通過DEX路由到queueA,被重新消費。程式碼如下:DeclareQueue.java
package org.delayQueue; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class DeclareQueue { public static String EXCHANGE_NAME = "notifyExchange"; public static void init() { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = null; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String routingKey = "AliPaynotify"; String message = "http://localhost:8080/BossCenter/payGateway/notifyRecv.jsp?is_success=T¬ify_id=4ab9bed148d043d0bf75460706f7774a¬ify_time=2014-08-29+16%3A22%3A02¬ify_type=trade_status_sync&out_trade_no=1421712120109862&total_fee=424.42&trade_no=14217121201098611&trade_status=TRADE_SUCCESS"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent :" + message); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) { } } } } public static void main(String args[]) { init(); }
DeclareConsumer.java
package org.delayQueue; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.http.HttpResponse; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpPost; import org.apache.http.impl.client.DefaultHttpClient; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class DeclareConsumer { public static String EXCHANGE_NAME = "notifyExchange"; public static String QU_declare_15S = "Qu_declare_15s"; public static String EX_declare_15S = "EX_declare_15s"; public static String ROUTINGKEY = "AliPaynotify"; public static Connection connection = null; public static Channel channel = null; public static Channel DECLARE_15S_CHANNEL = null; public static String declare_queue = "init"; public static String originalExpiration = "0"; public static void init() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); connection = factory.newConnection(); channel = connection.createChannel(); DECLARE_15S_CHANNEL = connection.createChannel(); } public static void consume() { try { channel.exchangeDeclare(EXCHANGE_NAME, "topic"); final String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ROUTINGKEY); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); Map<String, Object> headers = properties.getHeaders(); if (headers != null) { List<Map<String, Object>> xDeath = (List<Map<String, Object>>) headers.get("x-death"); System.out.println("xDeath--- > " + xDeath); if (xDeath != null && !xDeath.isEmpty()) { Map<String, Object> entrys = xDeath.get(0); // for(Entry<String, Object> // entry:entrys.entrySet()){ // System.out.println(entry.getKey()+":"+entry.getValue()); // } originalExpiration = entrys.get("original-expiration").toString(); } } System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'" + "time" + System.currentTimeMillis()); HttpClient httpClient = new DefaultHttpClient(); HttpPost post = new HttpPost(message); HttpResponse response = httpClient.execute(post); BufferedReader inreader = null; if (response.getStatusLine().getStatusCode() == 200) { inreader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), "UTF-8")); StringBuffer responseBody = new StringBuffer(); String line = null; while ((line = inreader.readLine()) != null) { responseBody.append(line); if (!responseBody.equals("success")) { // putDeclre15s(message); if (originalExpiration.equals("0")) { putDeclreQueue(message, 3000, QU_declare_15S); } if (originalExpiration.equals("3000")) { putDeclreQueue(message, 30000, QU_declare_15S); if (originalExpiration.equals("30000")) { putDeclreQueue(message, 60000, QU_declare_15S); if (originalExpiration.equals("60000")) { putDeclreQueue(message, 120000, QU_declare_15S); if (originalExpiration.equals("120000")) { putDeclreQueue(message, 180000, QU_declare_15S); if (originalExpiration.equals("180000")) { putDeclreQueue(message, 300000, QU_declare_15S); if (originalExpiration.equals("300000")) { // channel.basicConsume(QU_declare_300S,true, this); System.out.println("finish notify"); } else { System.out.println(response.getStatusLine().getStatusCode()); } }; channel.basicConsume(queueName, true, consumer); } catch (Exception e) { e.printStackTrace(); } finally { } static Map<String, Object> xdeathMap = new HashMap<String, Object>(); static List<Map<String, Object>> xDeath = new ArrayList<Map<String, Object>>(); static Map<String, Object> xdeathParam = new HashMap<String, Object>(); public static void putDeclre15s(String message) throws IOException { channel.exchangeDeclare(EX_declare_15S, "topic"); Map<String, Object> args = new HashMap<String, Object>(); args.put("x-dead-letter-exchange", EXCHANGE_NAME);// 死信exchange AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("3000").deliveryMode(2);// 設定訊息TTL AMQP.BasicProperties properties = builder.build(); channel.queueDeclare(QU_declare_15S, false, false, false, args); channel.queueBind(QU_declare_15S, EX_declare_15S, ROUTINGKEY); channel.basicPublish(EX_declare_15S, ROUTINGKEY, properties, message.getBytes()); System.out.println("send message in QA_DEFERRED_15S" + message + "time" + System.currentTimeMillis()); public static void putDeclreQueue(String message, int mis, String queue) throws IOException { builder.expiration(String.valueOf(mis)).deliveryMode(2);// 設定訊息TTL channel.queueDeclare(queue, false, false, false, args); channel.queueBind(queue, EX_declare_15S, ROUTINGKEY); System.out.println("send message in " + queue + message + "time============" + System.currentTimeMillis()); public static void main(String args[]) throws Exception { init(); consume(); }
訊息通過dlx轉發的情況下,header頭部會帶有x-death的一個陣列,裡面包含訊息的各項屬性,比如說訊息成為死信的原因reason,original-expiration這個欄位表示訊息在原來佇列中的過期時間,根據這個值來確定下一次通知的延遲時間應該是多少秒。執行結果如下:
到此這篇關於RabbitMQ 延遲佇列實現訂單支付結果非同步階梯性通知的文章就介紹到這了,更多相關RabbitMQ 延遲佇列實現訂單支付內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45