首頁 > 軟體

Socket結合執行緒池使用實現使用者端和伺服器端通訊demo

2022-03-10 19:20:51

引導語

Socket 面試最終題一般都是讓你寫一個簡單的使用者端和伺服器端通訊的例子,本文就帶大家一起來寫這個 demo。

1、要求

  • 可以使用 Socket 和 ServiceSocket 以及其它 API;
  • 寫一個使用者端和伺服器端之間 TCP 通訊的例子;
  • 伺服器端處理任務需要非同步處理;
  • 因為伺服器端處理能力很弱,只能同時處理 5 個請求,當第六個請求到達伺服器時,需要伺服器返回明確的錯誤資訊:伺服器太忙了,請稍後重試~。

需求比較簡單,唯一複雜的地方在於第四點,我們需要對使用者端的請求量進行控制,首先我們需要確認的是,我們是無法控制使用者端傳送的請求數的,所以我們只能從伺服器端進行改造,比如從伺服器端進行限流。

有的同學可能很快想到,我們應該使用 ServerSocket 的 backlog 的屬性,把其設定成 5,但我們在上一章中說到 backlog 並不能準確代表限制的使用者端連線數,而且我們還要求伺服器端返回具體的錯誤資訊,即使 backlog 生效,也只會返回固定的錯誤資訊,不是我們客製化的錯誤資訊。

我們好好想想,執行緒池似乎可以做這個事情,我們可以把執行緒池的 coreSize 和 maxSize 都設定成 4,把佇列大小設定成 1,這樣伺服器端每次收到請求後,會先判斷一下執行緒池中的佇列有沒有資料,如果有的話,說明當前伺服器已經馬上就要處理第五個請求了,當前請求就是第六個請求,應該被拒絕。

正好執行緒池的加入也可以滿足第三點,伺服器端的任務可以非同步執行。

2、使用者端程式碼

使用者端的程式碼比較簡單,直接向伺服器請求資料即可,程式碼如下:

public class SocketClient {
  private static final Integer SIZE = 1024;
  private static final ThreadPoolExecutor socketPoll = new ThreadPoolExecutor(50, 50,
                                                                               365L,
                                                                               TimeUnit.DAYS,
                                                                               new LinkedBlockingQueue<>(400));
  @Test
  public void test() throws InterruptedException {
    // 模擬使用者端同時向伺服器端傳送 6 條訊息
    for (int i = 0; i < 6; i++) {
      socketPoll.submit(() -> {
        send("localhost", 7007, "nihao");
      });
    }
    Thread.sleep(1000000000);
  }
  /**
   * 傳送tcp
   *
   * @param domainName 域名
   * @param port       埠
   * @param content    傳送內容
   */
  public static String send(String domainName, int port, String content) {
    log.info("使用者端開始執行");
    Socket socket = null;
    OutputStream outputStream = null;
    InputStreamReader isr = null;
    BufferedReader br = null;
    InputStream is = null;
    StringBuffer response = null;
    try {
      if (StringUtils.isBlank(domainName)) {
        return null;
      }
      // 無參構造器初始化 Socket,預設底層協定是 TCP
      socket = new Socket();
      socket.setReuseAddress(true);
      // 使用者端準備連線伺服器端,設定超時時間 10 秒
      socket.connect(new InetSocketAddress(domainName, port), 10000);
      log.info("TCPClient 成功和伺服器端建立連線");
      // 準備傳送訊息給伺服器端
      outputStream = socket.getOutputStream();
      // 設定 UTF 編碼,防止亂碼
      byte[] bytes = content.getBytes(Charset.forName("UTF-8"));
      // 輸出位元組碼
      segmentWrite(bytes, outputStream);
      // 關閉輸出
      socket.shutdownOutput();
      log.info("TCPClient 傳送內容為 {}",content);
      // 等待伺服器端的返回
      socket.setSoTimeout(50000);//50秒還沒有得到資料,直接斷開連線
      // 得到伺服器端的返回流
      is = socket.getInputStream();
      isr = new InputStreamReader(is);
      br = new BufferedReader(isr);
      // 從流中讀取返回值
      response = segmentRead(br);
      // 關閉輸入流
      socket.shutdownInput();
      //關閉各種流和通訊端
      close(socket, outputStream, isr, br, is);
      log.info("TCPClient 接受到伺服器端返回的內容為 {}",response);
      return response.toString();
    } catch (ConnectException e) {
      log.error("TCPClient-send socket連線失敗", e);
      throw new RuntimeException("socket連線失敗");
    } catch (Exception e) {
      log.error("TCPClient-send unkown errror", e);
      throw new RuntimeException("socket連線失敗");
    } finally {
      try {
        close(socket, outputStream, isr, br, is);
      } catch (Exception e) {
        // do nothing
      }
    }
  }
  /**
   * 關閉各種流
   *
   * @param socket
   * @param outputStream
   * @param isr
   * @param br
   * @param is
   * @throws IOException
   */
  public static void close(Socket socket, OutputStream outputStream, InputStreamReader isr,
                           BufferedReader br, InputStream is) throws IOException {
    if (null != socket && !socket.isClosed()) {
      try {
        socket.shutdownOutput();
      } catch (Exception e) {
      }
      try {
        socket.shutdownInput();
      } catch (Exception e) {
      }
      try {
        socket.close();
      } catch (Exception e) {
      }
    }
    if (null != outputStream) {
      outputStream.close();
    }
    if (null != br) {
      br.close();
    }
    if (null != isr) {
      isr.close();
    }
    if (null != is) {
      is.close();
    }
  }
  /**
   * 分段讀
   *
   * @param br
   * @throws IOException
   */
  public static StringBuffer segmentRead(BufferedReader br) throws IOException {
    StringBuffer sb = new StringBuffer();
    String line;
    while ((line = br.readLine()) != null) {
      sb.append(line);
    }
    return sb;
  }
  /**
   * 分段寫
   *
   * @param bytes
   * @param outputStream
   * @throws IOException
   */
  public static void segmentWrite(byte[] bytes, OutputStream outputStream) throws IOException {
    int length = bytes.length;
    int start, end = 0;
    for (int i = 0; end != bytes.length; i++) {
      start = i == 0 ? 0 : i * SIZE;
      end = length > SIZE ? start + SIZE : bytes.length;
      length -= SIZE;
      outputStream.write(bytes, start, end - start);
      outputStream.flush();
    }
  }
}

使用者端程式碼中我們也用到了執行緒池,主要是為了並行模擬使用者端一次性傳送 6 個請求,按照預期伺服器端在處理第六個請求的時候,會返回特定的錯誤資訊給使用者端。

以上程式碼主要方法是 send 方法,主要處理像伺服器端傳送資料,並處理伺服器端的響應。

3、伺服器端程式碼

伺服器端的邏輯分成兩個部分,第一部分是控制使用者端的請求個數,當超過伺服器端的能力時,拒絕新的請求,當伺服器端能力可響應時,放入新的請求,第二部分是伺服器端任務的執行邏輯。

3.1、對使用者端請求進行控制

public class SocketServiceStart {
  /**
   * 伺服器端的執行緒池,兩個作用
   * 1:讓伺服器端的任務可以非同步執行
   * 2:管理可同時處理的伺服器端的請求數
   */
  private static final ThreadPoolExecutor collectPoll = new ThreadPoolExecutor(4, 4,
                                                                               365L,
                                                                               TimeUnit.DAYS,
                                                                               new LinkedBlockingQueue<>(
                                                                                   1));
  @Test
  public void test(){
    start();
  }
  /**
   * 啟動伺服器端
   */
  public static final void start() {
    log.info("SocketServiceStart 伺服器端開始啟動");
    try {
      // backlog  serviceSocket處理阻塞時,使用者端最大的可建立連線數,超過使用者端連線不上
      // 當執行緒池能力處理滿了之後,我們希望儘量阻塞使用者端的連線
//      ServerSocket serverSocket = new ServerSocket(7007,1,null);
      // 初始化伺服器端
      ServerSocket serverSocket = new ServerSocket();
      serverSocket.setReuseAddress(true);
//      serverSocket.bind(new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), 80));
      serverSocket.bind(new InetSocketAddress("localhost", 7007));
      log.info("SocketServiceStart 伺服器端啟動成功");
      // 自旋,讓使用者端一直在取使用者端的請求,如果使用者端暫時沒有請求,會一直阻塞
      while (true) {
        // 接受使用者端的請求
        Socket socket = serverSocket.accept();
        // 如果佇列中有資料了,說明伺服器端已經到了並行處理的極限了,此時需要返回使用者端有意義的資訊
        if (collectPoll.getQueue().size() >= 1) {
          log.info("SocketServiceStart 伺服器端處理能力到頂,需要控制使用者端的請求");
          //返回處理結果給使用者端
          rejectRequest(socket);
          continue;
        }
        try {
          // 非同步處理使用者端提交上來的任務
          collectPoll.submit(new SocketService(socket));
        } catch (Exception e) {
          socket.close();
        }
      }
    } catch (Exception e) {
      log.error("SocketServiceStart - start error", e);
      throw new RuntimeException(e);
    } catch (Throwable e) {
      log.error("SocketServiceStart - start error", e);
      throw new RuntimeException(e);
    }
  }
	// 返回特定的錯誤碼給使用者端
  public static void rejectRequest(Socket socket) throws IOException {
    OutputStream outputStream = null;
    try{
      outputStream = socket.getOutputStream();
      byte[] bytes = "伺服器太忙了,請稍後重試~".getBytes(Charset.forName("UTF-8"));
      SocketClient.segmentWrite(bytes, outputStream);
      socket.shutdownOutput();
    }finally {
      //關閉流
      SocketClient.close(socket,outputStream,null,null,null);
    }
  }
}

我們使用 collectPoll.getQueue().size() >= 1 來判斷目前伺服器端是否已經到達處理的極限了,如果佇列中有一個任務正在排隊,說明當前伺服器端已經超負荷執行了,新的請求應該拒絕掉,如果佇列中沒有資料,說明伺服器端還可以接受新的請求。

以上程式碼註釋詳細,就不累贅說了。

3.2、伺服器端任務的處理邏輯

伺服器端的處理邏輯比較簡單,主要步驟是:從使用者端的 Socket 中讀取輸入,進行處理,把響應返回給使用者端。

我們使用執行緒沉睡 2 秒來模擬伺服器端的處理邏輯,程式碼如下:

public class SocketService implements Runnable {
  private Socket socket;
  public SocketService() {
  }
  public SocketService(Socket socket) {
    this.socket = socket;
  }
  @Override
  public void run() {
    log.info("SocketService 伺服器端任務開始執行");
    OutputStream outputStream = null;
    InputStream is = null;
    InputStreamReader isr = null;
    BufferedReader br = null;
    try {
      //接受訊息
      socket.setSoTimeout(10000);// 10秒還沒有得到資料,直接斷開連線
      is = socket.getInputStream();
      isr = new InputStreamReader(is,"UTF-8");
      br = new BufferedReader(isr);
      StringBuffer sb = SocketClient.segmentRead(br);
      socket.shutdownInput();
      log.info("SocketService accept info is {}", sb.toString());
      //伺服器端處理 模擬伺服器端處理耗時
      Thread.sleep(2000);
      String response  = sb.toString();
      //返回處理結果給使用者端
      outputStream = socket.getOutputStream();
      byte[] bytes = response.getBytes(Charset.forName("UTF-8"));
      SocketClient.segmentWrite(bytes, outputStream);
      socket.shutdownOutput();
      //關閉流
      SocketClient.close(socket,outputStream,isr,br,is);
      log.info("SocketService 伺服器端任務執行完成");
    } catch (IOException e) {
      log.error("SocketService IOException", e);
    } catch (Exception e) {
      log.error("SocketService Exception", e);
    } finally {
      try {
        SocketClient.close(socket,outputStream,isr,br,is);
      } catch (IOException e) {
        log.error("SocketService IOException", e);
      }
    }
  }
}

4、測試

測試的時候,我們必須先啟動伺服器端,然後再啟動使用者端,首先我們啟動伺服器端,列印紀錄檔如下:

接著我們啟動使用者端,列印紀錄檔如下:

我們最後看一下伺服器端的執行紀錄檔: 

 從以上執行結果中,我們可以看出得出的結果是符合我們預期的,伺服器端在請求高峰時,能夠並行處理5個請求,其餘請求可以用正確的提示進行拒絕。

5、總結

所以程式碼集中在 SocketClient、SocketServiceStart、SocketService 中,啟動的順序為先啟動 SocketServiceStart,後執行 SocketClient,感興趣的同學可以自己 debug 下,加深印象。

以上就是Socket結合執行緒池實現使用者端和伺服器端通訊實戰demo的詳細內容,更多關於Socket執行緒池使用者端與伺服器端通訊demo的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com