首頁 > 軟體

BlockingQueue佇列處理高並行下的紀錄檔

2022-03-08 13:04:14

前言

當系統流量負載比較高時,業務紀錄檔的寫入操作也要納入系統效能考量之內,如若處理不當,將影響系統的正常業務操作,之前寫過一篇《spring boot通過MQ消費log4j2的紀錄檔》的博文,採用了RabbitMQ訊息中介軟體來儲存抗高並行下的紀錄檔,因為引入了中介軟體,操作使用起來可能沒那麼簡便,今天分享使用多執行緒消費阻塞佇列的方式來處理我們的海量紀錄檔

what阻塞佇列?

阻塞佇列(BlockingQueue)是區別於普通佇列多了兩個附加操作的執行緒安全的佇列。這兩個附加的操作是:在佇列為空時,獲取元素的執行緒會等待佇列變為非空。當佇列滿時,儲存元素的執行緒會等待佇列可用。阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者存放元素的容器,而消費者也只從容器裡拿元素。

1.宣告儲存固定訊息的佇列

/**
 * Created by kl on 2017/3/20.
 * Content :銷售操作紀錄檔佇列
 */
public class SalesLogQueue{
    //佇列大小
    public static final int QUEUE_MAX_SIZE    = 1000;
    private static SalesLogQueue alarmMessageQueue = new SalesLogQueue();
    //阻塞佇列
    private BlockingQueueblockingQueue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
    private SalesLogQueue(){}
    public static SalesLogQueue getInstance() {
        return alarmMessageQueue;
    }
    /**
     * 訊息入隊
     * @param salesLog
     * @return
     */
    public boolean push(SalesLog salesLog) {
        return this.blockingQueue.add(salesLog);//佇列滿了就丟擲異常,不阻塞
    }
    /**
     * 訊息出隊
     * @return
     */
    public SalesLog poll() {
        SalesLog result = null;
        try {
            result = this.blockingQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }
    /**
     * 獲取佇列大小
     * @return
     */
    public int size() {
        return this.blockingQueue.size();
    }
}

ps:因為業務原因,採用add的方式入隊,佇列滿了就拋異常,不阻塞

2.訊息入隊

訊息入隊可以在任何需要儲存紀錄檔的地方操作,如aop統一攔截紀錄檔處理,filter過濾請求紀錄檔處理,或者耦合的業務紀錄檔,記住,不阻塞入隊操作,不然將影響正常的業務操作,如下為filter統一處理請求紀錄檔:

/**
 * Created by kl on 2017/3/20.
 * Content :存取請求攔截,儲存操作紀錄檔
 */
public class SalesLogFilter implements Filter {
    private RoleResourceService resourceService;
    @Override
    public void init(FilterConfig filterConfig) throws ServletException {
        ServletContext context = filterConfig.getServletContext();
        ApplicationContext ctx = WebApplicationContextUtils.getWebApplicationContext(context);
        resourceService = ctx.getBean(RoleResourceService.class);
    }
    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
        try {
            HttpServletRequest request = (HttpServletRequest) servletRequest;
            String requestUrl = request.getRequestURI();
            String requestType=request.getMethod();
            String ipAddress = HttpClientUtil.getIpAddr(request);
            Map resource=resourceService.getResource();
            String context=resource.get(requestUrl);
            //動態url正則匹配
            if(StringUtil.isNull(context)){
                for(Map.Entry entry:resource.entrySet()){
                    String resourceUrl= entry.getKey();
                    if(requestUrl.matches(resourceUrl)){
                        context=entry.getValue();
                        break;
                    }
                }
            }
            SalesLog log=new SalesLog();
            log.setCreateDate(new Timestamp(System.currentTimeMillis()));
            log.setContext(context);
            log.setOperateUser(UserTokenUtil.currentUser.get().get("realname"));
            log.setRequestIp(ipAddress);
            log.setRequestUrl(requestUrl);
            log.setRequestType(requestType);
            SalesLogQueue.getInstance().push(log);
        }catch (Exception e){
            e.printStackTrace();
        }
        filterChain.doFilter(servletRequest, servletResponse);
    }
    @Override
    public void destroy() {
    }
}

3.訊息出隊被消費

BlockingQueue是執行緒安全的,所以可以放心的在多個執行緒中去處理佇列中的訊息,如下程式碼宣告了一個兩個大小的固定執行緒池,並新增了兩個執行緒去處理佇列中的訊息

/**
 * Created by kl on 2017/3/20.
 * Content :啟動消費操作紀錄檔佇列的執行緒
 */
@Component
public class ConsumeSalesLogQueue {
    @Autowired
    SalesLogService salesLogService;
    @PostConstruct
    public void startrtThread() {
        ExecutorService e = Executors.newFixedThreadPool(2);//兩個大小的固定執行緒池
        e.submit(new PollSalesLog(salesLogService));
        e.submit(new PollSalesLog(salesLogService));
    }
    class PollSalesLog implements Runnable {
        SalesLogService salesLogService;
        public PollSalesLog(SalesLogService salesLogService) {
            this.salesLogService = salesLogService;
        }
        @Override
        public void run() {
            while (true) {
                try {
                    SalesLog salesLog = SalesLogQueue.getInstance().poll();
                    if(salesLog!=null){
                        salesLogService.saveSalesLog(salesLog);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

參考博文如下,對BlockingQueue佇列更多瞭解,可讀一讀如下的博文:

詳細分析Java並行集合ArrayBlockingQueue的用法

詳解Java阻塞佇列(BlockingQueue)的實現原理

Java並行之BlockingQueue的使用

以上就是BlockingQueue佇列處理高並行下的紀錄檔的詳細內容,更多關於BlockingQueue佇列處理高並行紀錄檔的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com