首頁 > 軟體

Java實現一個簡單的長輪詢的範例程式碼

2022-08-29 22:05:00

分析一下長輪詢的實現方式

現在各大中介軟體都使用了長輪詢的資料互動方式,目前比較流行的例如Nacos的設定中心,RocketMQ Pull(拉模式)訊息等,它們都是採用了長輪詢方的式實現。就例如Nacos的設定中心,如何做到伺服器端感知設定變化實時推播給使用者端的呢?

長輪詢與短輪詢

說到長輪詢,肯定存在和它相對立的,我們暫且叫它短輪詢吧,我們簡單介紹一下短輪詢:

短輪詢也是拉模式。是指不管伺服器端資料有無更新,使用者端每隔定長時間請求拉取一次資料,可能有更新資料返回,也可能什麼都沒有。如果設定中心使用這樣的方式,會存在以下問題:

由於設定資料並不會頻繁變更,若是一直髮請求,勢必會對伺服器端造成很大壓力。還會造成推播資料的延遲,比如:每10s請求一次設定,如果在第11s時設定更新了,那麼推播將會延遲9s,等待下一次請求;

無法在推播延遲和伺服器端壓力兩者之間中和。降低輪詢的間隔,延遲降低,壓力增加;增加輪詢的間隔,壓力降低,延遲增高。

長輪詢為了解決短輪詢存在的問題,使用者端發起長輪詢,如果伺服器端的資料沒有發生變更,會hold住請求,直到伺服器端的資料發生變化,或者等待一定時間超時才會返回。返回後,使用者端再發起下一次長輪詢請求監聽。

這樣設計的好處:

  • 相對於低延時,使用者端發起長輪詢,伺服器端感知到資料發生變更後,能立刻返回響應給使用者端。
  • 伺服器端的壓力減小,使用者端發起長輪詢,如果資料沒有發生變更,伺服器端會hold住此次使用者端的請求,hold住請求的時間一般會設定到30s或者60s,並且伺服器端hold住請求不會消耗太多伺服器端的資源。

下面借用圖片來說明一下流程:

  • 首先使用者端發起長輪詢請求,伺服器端收到使用者端的請求,這時會掛起使用者端的請求,如果在伺服器端設計的30s之內都沒有發生變更,伺服器端會響應回用戶端資料沒有變更,使用者端會繼續傳送請求。
  • 如果在30s之內服務資料發生了變更,伺服器端會推播變更的資料到使用者端。

設定中心長輪詢設計

上面我們已經介紹了整個思路,下面我們用程式碼實現一下:

  • 首先使用者端傳送一個HTTP請求到伺服器端;伺服器端會開啟一個非同步執行緒,如果一直沒有資料變更會掛起當前請求(一個 Tomcat 也就 200 個執行緒,長輪詢也不應該阻塞 Tomcat 的業務執行緒,所以需要設定中心在實現長輪詢時往往採用非同步響應的方式來實現,而比較方便實現非同步 HTTP 的常見手段便是 Servlet3.0 提供的 AsyncContext 機制。)
  • 在伺服器端設定的超時時間內仍然沒有資料變更,那就返回使用者端一個沒有變更的標識。例如響應304狀態碼;
  • 在伺服器端設定的超時時間內有資料變更了,就返回使用者端變更的內容;

設定中心長輪詢實現

下面用程式碼實現長輪詢:

使用者端實現

 @Slf4j
 public class ConfigClientWorker {
 ​
     private final CloseableHttpClient httpClient;
 ​
     private final ScheduledExecutorService executorService;
 ​
     public ConfigClientWorker(String url, String dataId) {
         this.executorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
             Thread thread = new Thread(runnable);
             thread.setName("client.worker.executor-%d");
             thread.setDaemon(true);
             return thread;
         });
 ​
         // ① httpClient 使用者端超時時間要大於長輪詢約定的超時時間
         RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(40000).build();
         this.httpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build();
 ​
         executorService.execute(new LongPollingRunnable(url, dataId));
     }
 ​
     class LongPollingRunnable implements Runnable {
 ​
         private final String url;
         private final String dataId;
 ​
         public LongPollingRunnable(String url, String dataId) {
             this.url = url;
             this.dataId = dataId;
         }
 ​
         @SneakyThrows
         @Override
         public void run() {
             String endpoint = url + "?dataId=" + dataId;
             log.info("endpoint: {}", endpoint);
             HttpGet request = new HttpGet(endpoint);
             CloseableHttpResponse response = httpClient.execute(request);
             switch (response.getStatusLine().getStatusCode()) {
                 case 200: {
                     BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity()
                             .getContent()));
                     StringBuilder result = new StringBuilder();
                     String line;
                     while ((line = rd.readLine()) != null) {
                         result.append(line);
                     }
                     response.close();
                     String configInfo = result.toString();
                     log.info("dataId: [{}] changed, receive configInfo: {}", dataId, configInfo);
                     break;
                 }
                 // ② 304 響應碼標記設定未變更
                 case 304: {
                     log.info("longPolling dataId: [{}] once finished, configInfo is unchanged, longPolling again", dataId);
                     break;
                 }
                 default: {
                     throw new RuntimeException("unExcepted HTTP status code");
                 }
             }
             executorService.execute(this);
         }
     }
 ​
     public static void main(String[] args) throws IOException {
 ​
         new ConfigClientWorker("http://127.0.0.1:8080/listener", "user");
         System.in.read();
     }
 }
  • httpClient 使用者端超時時間要大於長輪詢約定的超時時間,不然還沒等到伺服器端返回,使用者端自己就超時了。
  • 304 響應碼標記設定未變更;
  • http://127.0.0.1:8080/listener 是伺服器端地址;

伺服器端實現

 @RestController
 @Slf4j
 @SpringBootApplication
 public class ConfigServer {
 ​
     @Data
     private static class AsyncTask {
         // 長輪詢請求的上下文,包含請求和響應體
         private AsyncContext asyncContext;
         // 超時標記
         private boolean timeout;
 ​
         public AsyncTask(AsyncContext asyncContext, boolean timeout) {
             this.asyncContext = asyncContext;
             this.timeout = timeout;
         }
     }
 ​
     // guava 提供的多值 Map,一個 key 可以對應多個 value
     private Multimap<String, AsyncTask> dataIdContext = Multimaps.synchronizedSetMultimap(HashMultimap.create());
 ​
     private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d")
             .build();
     private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(1, threadFactory);
 ​
     // 設定監聽接入點
     @RequestMapping("/listener")
     public void addListener(HttpServletRequest request, HttpServletResponse response) {
 ​
         String dataId = request.getParameter("dataId");
 ​
         // 開啟非同步!!!
         AsyncContext asyncContext = request.startAsync(request, response);
         AsyncTask asyncTask = new AsyncTask(asyncContext, true);
 ​
         // 維護 dataId 和非同步請求上下文的關聯
         dataIdContext.put(dataId, asyncTask);
 ​
         // 啟動定時器,30s 後寫入 304 響應
         timeoutChecker.schedule(() -> {
             if (asyncTask.isTimeout()) {
                 dataIdContext.remove(dataId, asyncTask);
                 response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
               // 標誌此次非同步執行緒完成結束!!!
                 asyncContext.complete();
             }
         }, 30000, TimeUnit.MILLISECONDS);
     }
 ​
     // 設定釋出接入點
     @RequestMapping("/publishConfig")
     @SneakyThrows
     public String publishConfig(String dataId, String configInfo) {
         log.info("publish configInfo dataId: [{}], configInfo: {}", dataId, configInfo);
         Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(dataId);
         for (AsyncTask asyncTask : asyncTasks) {
             asyncTask.setTimeout(false);
             HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse();
             response.setStatus(HttpServletResponse.SC_OK);
             response.getWriter().println(configInfo);
             asyncTask.getAsyncContext().complete();
         }
         return "success";
     }
 ​
     public static void main(String[] args) {
         SpringApplication.run(ConfigServer.class, args);
     }
 }
  • 使用者端請求過來,首先開啟一個非同步執行緒request.startAsync(request, response);保證不佔用Tomcat執行緒。此時Tomcat執行緒以及釋放。配合asyncContext.complete()使用。
  • dataIdContext.put(dataId, asyncTask);會將 dataId 和非同步請求上下文給關聯起來,方便設定釋出時,拿到對應的上下文
  • Multimap<String, AsyncTask> dataIdContext它是一個多值 Map,一個 key 可以對應多個 value,你也可以理解為 Map<String,List<AsyncTask>>
  • timeoutChecker.schedule() 啟動定時器,30s 後寫入 304 響應
  • @RequestMapping("/publishConfig") ,設定釋出的入口。設定變更後,根據 dataId 一次拿出所有的長輪詢,為之寫入變更的響應。
  • asyncTask.getAsyncContext().complete();表示這次非同步請求結束了。

啟動設定監聽

先啟動 ConfigServer,再啟動 ConfigClient。30s之後控制檯列印第一次超時之後收到伺服器端304的狀態碼

 16:41:14.824 [client.worker.executor-%d] INFO cn.haoxiaoyong.poll.ConfigClientWorker - longPolling dataId: [user] once finished, configInfo is unchanged, longPolling again

請求一下設定釋出,請求localhost:8080/publishConfig?dataId=user&configInfo=helloworld

伺服器端列印紀錄檔:

 2022-08-25 16:45:56.663  INFO 90650 --- [nio-8080-exec-2] cn.haoxiaoyong.poll.ConfigServer         : publish configInfo dataId: [user], configInfo: helloworld

到此這篇關於Java實現一個簡單的長輪詢的範例程式碼的文章就介紹到這了,更多相關Java長輪詢內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com