首頁 > 軟體

Java實現多執行緒大批次同步資料(分頁)

2022-08-10 14:03:53

背景

最近遇到個功能,兩個月有300w+的資料,之後還在累加,因一開始該資料就全部儲存在mysql表,現需要展示在頁面,還需要關聯另一張表的資料,而且產品要求頁面的查詢條件多達20個條件,最終,這個功能卡的要死,基本查不出來資料。

最後是打算把這兩張表的資料同時儲存到MongoDB中去,以提高查詢效率。

一開始同步的時候,採用單執行緒,迴圈以分頁的模式去同步這兩張表資料,結果是…一晚上,只同步了30w資料,特慢!!!

最後,改造了一番,2小時,就成功同步了300w+資料。

以下是主要邏輯。

執行緒的個數請根據你自己的伺服器效能酌情設定。

思路

先通過count查出結果集的總條數,設定每個執行緒分頁查詢的條數,通過總條數和單次條數得到執行緒數量,通過改變limit的下標實現分批查詢。

程式碼實現

package com.github.admin.controller.loans;

import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.github.admin.model.entity.CaseCheckCallRecord;
import com.github.admin.model.entity.duyan.DuyanCallRecordDetail;
import com.github.admin.model.entity.loans.CaseCallRemarkRecord;
import com.github.admin.service.duyan.DuyanCallRecordDetailService;
import com.github.admin.service.loans.CaseCallRemarkRecordService;
import com.github.common.constant.MongodbConstant;
import com.github.common.util.DingDingMsgSendUtils;
import com.github.common.util.ListUtils;
import com.github.common.util.Response;
import com.github.common.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

/**
 * 多執行緒同步歷史資料
 * @author songfayuan
 * @date 2019-09-26 15:38
 */
@Slf4j
@RestController
@RequestMapping("/demo")
public class SynchronizeHistoricalDataController implements DisposableBean {

    private ExecutorService executor = Executors.newFixedThreadPool(10, "SynchronizeHistoricalDataController");  //newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大並行數,超出的執行緒會在佇列中等待。

    @Value("${spring.profiles.active}")
    private String profile;
    @Autowired
    private DuyanCallRecordDetailService duyanCallRecordDetailService;
    @Autowired
    private MongoTemplate mongoTemplate;
    @Autowired
    private CaseCallRemarkRecordService caseCallRemarkRecordService;

    /**
     * 多執行緒同步通話記錄歷史資料
     * @param params
     * @return
     * @throws Exception
     */
    @GetMapping("/syncHistoryData")
    public Response syncHistoryData(Map<String, Object> params) throws Exception {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    logicHandler(params);
                } catch (Exception e) {
                    log.warn("多執行緒同步稽查通話記錄歷史資料才處理異常,errMsg = {}", e);
                    DingDingMsgSendUtils.sendDingDingGroupMsg("【系統訊息】" + profile + "環境,多執行緒同步稽查通話記錄歷史資料才處理異常,errMsg = "+e);
                }
            }
        });
        return Response.success("請求成功");
    }

    /**
     * 處理資料邏輯
     * @param params
     * @throws Exception
     */
    private void logicHandler(Map<String, Object> params) throws Exception {
        /******返回結果:多執行緒處理完的最終資料******/
        List<DuyanCallRecordDetail> result = new ArrayList<>();

        /******查詢資料庫總的資料條數******/
        int count = this.duyanCallRecordDetailService.selectCount(new EntityWrapper<DuyanCallRecordDetail>()
                .eq("is_delete", 0)
                .eq("platform_type", 1));
        DingDingMsgSendUtils.sendDingDingGroupMsg("【系統訊息】" + profile + "環境,本次需要同步" + count + "條歷史稽查通話記錄資料。");

//        int count = 2620266;
        /******限制每次查詢的條數******/
        int num = 1000;

        /******計算需要查詢的次數******/
        int times = count / num;
        if (count % num != 0) {
            times = times + 1;
        }

        /******每個執行緒開始查詢的行數******/
        int offset = 0;

        /******新增任務******/
        List<Callable<List<DuyanCallRecordDetail>>> tasks = new ArrayList<>();
        for (int i = 0; i < times; i++) {
            Callable<List<DuyanCallRecordDetail>> qfe = new ThredQuery(duyanCallRecordDetailService, params, offset, num);
            tasks.add(qfe);
            offset = offset + num;
        }

        /******為避免太多工的最終資料全部存在list導致記憶體溢位,故將任務再次拆分單獨處理******/
        List<List<Callable<List<DuyanCallRecordDetail>>>> smallList = ListUtils.partition(tasks, 10);
        for (List<Callable<List<DuyanCallRecordDetail>>> callableList : smallList) {
            if (CollectionUtils.isNotEmpty(callableList)) {
//                executor.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        log.info("任務拆分執行開始:執行緒{}拆分處理開始...", Thread.currentThread().getName());
//
//                        log.info("任務拆分執行結束:執行緒{}拆分處理開始...", Thread.currentThread().getName());
//                    }
//                });

                try {
                    List<Future<List<DuyanCallRecordDetail>>> futures = executor.invokeAll(callableList);
                    /******處理執行緒返回結果******/
                    if (futures != null && futures.size() > 0) {
                        for (Future<List<DuyanCallRecordDetail>> future : futures) {
                            List<DuyanCallRecordDetail> duyanCallRecordDetailList = future.get();
                            if (CollectionUtils.isNotEmpty(duyanCallRecordDetailList)){
                                executor.execute(new Runnable() {
                                    @Override
                                    public void run() {
                                        /******非同步儲存******/
                                        log.info("非同步儲存MongoDB開始:執行緒{}拆分處理開始...", Thread.currentThread().getName());
                                        saveMongoDB(duyanCallRecordDetailList);
                                        log.info("非同步儲存MongoDB結束:執行緒{}拆分處理開始...", Thread.currentThread().getName());
                                    }
                                });
                            }
                            //result.addAll(future.get());
                        }
                    }
                } catch (Exception e) {
                    log.warn("任務拆分執行異常,errMsg = {}", e);
                    DingDingMsgSendUtils.sendDingDingGroupMsg("【系統訊息】" + profile + "環境,任務拆分執行異常,errMsg = "+e);
                }
            }
        }
    }

    /**
     * 資料儲存MongoDB
     * @param duyanCallRecordDetailList
     */
    private void saveMongoDB(List<DuyanCallRecordDetail> duyanCallRecordDetailList) {
        for (DuyanCallRecordDetail duyanCallRecordDetail : duyanCallRecordDetailList) {
            /******重複資料不同步MongoDB******/
            org.springframework.data.mongodb.core.query.Query query = new org.springframework.data.mongodb.core.query.Query();
            query.addCriteria(Criteria.where("callUuid").is(duyanCallRecordDetail.getCallUuid()));
            List<CaseCheckCallRecord> caseCheckCallRecordList = mongoTemplate.find(query, CaseCheckCallRecord.class, MongodbConstant.CASE_CHECK_CALL_RECORD);
            if (CollectionUtils.isNotEmpty(caseCheckCallRecordList)) {
                log.warn("call_uuid = {}在MongoDB已經存在資料,後面資料將被捨棄...", duyanCallRecordDetail.getCallUuid());
                continue;
            }

            /******關聯填寫的記錄******/
            CaseCallRemarkRecord caseCallRemarkRecord = this.caseCallRemarkRecordService.selectOne(new EntityWrapper<CaseCallRemarkRecord>()
                    .eq("is_delete", 0)
                    .eq("call_uuid", duyanCallRecordDetail.getCallUuid()));

            CaseCheckCallRecord caseCheckCallRecord = new CaseCheckCallRecord();
            BeanUtils.copyProperties(duyanCallRecordDetail, caseCheckCallRecord);
            //補充
            caseCheckCallRecord.setCollectorUserId(duyanCallRecordDetail.getUserId());
            
            if (caseCallRemarkRecord != null) {
                //補充
                caseCheckCallRecord.setCalleeName(caseCallRemarkRecord.getContactName());            
            }
            log.info("正在儲存資料到MongoDB:{}", caseCheckCallRecord.toString());
            this.mongoTemplate.save(caseCheckCallRecord, MongodbConstant.CASE_CHECK_CALL_RECORD);
        }
    }

    @Override
    public void destroy() throws Exception {
        executor.shutdown();
    }
}


class ThredQuery implements Callable<List<DuyanCallRecordDetail>> {
    /******需要通過構造方法把對應的業務service傳進來 實際用的時候把型別變為對應的型別******/
    private DuyanCallRecordDetailService myService;
    /******查詢條件 根據條件來定義該類的屬性******/
    private Map<String, Object> params;

    /******分頁index******/
    private int offset;
    /******數量******/
    private int num;

    public ThredQuery(DuyanCallRecordDetailService myService, Map<String, Object> params, int offset, int num) {
        this.myService = myService;
        this.params = params;
        this.offset = offset;
        this.num = num;
    }

    @Override
    public List<DuyanCallRecordDetail> call() throws Exception {
        /******通過service查詢得到對應結果******/
        List<DuyanCallRecordDetail> duyanCallRecordDetailList = myService.selectList(new EntityWrapper<DuyanCallRecordDetail>()
                .eq("is_delete", 0)
                .eq("platform_type", 1)
                .last("limit "+offset+", "+num));
        return duyanCallRecordDetailList;
    }
}

ListUtils工具

package com.github.common.util;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.util.ArrayList;
import java.util.List;

/**
 * 描述:List工具類
 * @author songfayuan
 * 2018年7月22日下午2:23:22
 */
@Slf4j
public class ListUtils {
    
    /**
     * 描述:list集合深拷貝
     * @param src
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @author songfayuan
     * 2018年7月22日下午2:35:23
     */
    public static <T> List<T> deepCopy(List<T> src) {
        try {
            ByteArrayOutputStream byteout = new ByteArrayOutputStream();
            ObjectOutputStream out = new ObjectOutputStream(byteout);
            out.writeObject(src);
            ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray());
            ObjectInputStream in = new ObjectInputStream(bytein);
            @SuppressWarnings("unchecked")
            List<T> dest = (List<T>) in.readObject();
            return dest;
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            return null;
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }
    /**
     * 描述:物件深拷貝
     * @param src
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @author songfayuan
     * 2018年12月14日
     */
    public static <T> T objDeepCopy(T src) {
        try {
            ByteArrayOutputStream byteout = new ByteArrayOutputStream();
            ObjectOutputStream out = new ObjectOutputStream(byteout);
            out.writeObject(src);
            ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray());
            ObjectInputStream in = new ObjectInputStream(bytein);
            @SuppressWarnings("unchecked")
            T dest = (T) in.readObject();
            return dest;
        } catch (ClassNotFoundException e) {
            log.error("errMsg = {}", e);
            return null;
        } catch (IOException e) {
            log.error("errMsg = {}", e);
            return null;
        }
    }

    /**
     * 將一個list均分成n個list,主要通過偏移量來實現的
     * @author songfayuan
     * 2018年12月14日
     */
    public static <T> List<List<T>> averageAssign(List<T> source, int n) {
        List<List<T>> result = new ArrayList<List<T>>();
        int remaider = source.size() % n;  //(先計算出餘數)
        int number = source.size() / n;  //然後是商
        int offset = 0;//偏移量
        for (int i = 0; i < n; i++) {
            List<T> value = null;
            if (remaider > 0) {
                value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
                remaider--;
                offset++;
            } else {
                value = source.subList(i * number + offset, (i + 1) * number + offset);
            }
            result.add(value);
        }
        return result;
    }

    /**
     * List按指定長度分割
     * @param list the list to return consecutive sublists of (需要分隔的list)
     * @param size the desired size of each sublist (the last may be smaller) (分隔的長度)
     * @author songfayuan
     * @date 2019-07-07 21:37
     */
    public static <T> List<List<T>> partition(List<T> list, int size){
        return  Lists.partition(list, size); // 使用guava
    }

    /**
     * 測試
     * @param args
     */
    public static void main(String[] args) {
        List<Integer> bigList = new ArrayList<>();
        for (int i = 0; i < 101; i++){
            bigList.add(i);
        }
        log.info("bigList長度為:{}", bigList.size());
        log.info("bigList為:{}", bigList);
        List<List<Integer>> smallists = partition(bigList, 20);
        log.info("smallists長度為:{}", smallists.size());
        for (List<Integer> smallist : smallists) {
            log.info("拆分結果:{},長度為:{}", smallist, smallist.size());
        }
    }

}

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援it145.com。


IT145.com E-mail:sddin#qq.com