首頁 > 軟體

Java多執行緒實現第三方資料同步

2022-08-10 18:02:26

本文範例為大家分享了Java多執行緒實現第三方資料同步的具體程式碼,供大家參考,具體內容如下

一、場景

最近的一項開發任務是同步第三方資料,而第三方資料一般有存量資料和增量資料,存量資料有100w+。在得知此需求時,進行了一定的資訊檢索和工具學習,提前獲取存量資料到目標庫,再使用kettle進行存量資料轉換;增量資料則根據業務方規定的請求時間,通過定時任務去獲取增量資料並進行資料轉換。在資料獲取和轉換時,我們應該要記錄每一次的請求資訊,便於溯源和資料對賬!!!

二、獲取資料的方式

2.1 遞迴方式

使用遞迴方式時,要求資料量少,否則會出現棧溢位或堆溢位!!!並且遞迴方式是單執行緒,所以會導致同步速度很慢!!!

/**
     * 資料同步 - 遞迴方式
     * 此處存量資料只需要請求到資料並儲存資料庫即可,後期通過kettle進行轉換。
     * Data為自定義實體類,這裡僅做範例!!!
*/
    private void fetchAndSaveDB(int pageIndex, int pageSize) throws Exception {
        log.info("【資料同步 - 存量】,第{}次同步,", pageIndex);
        List<Data> datas= getDataByPage(pageIndex,pageSize);
        if (CollectionUtils.isNotEmpty(datas)) {
            dataService.saveOrUpdateBatch(datas);
            log.info("【資料同步 - 存量】,第{}次同步,同步成功", pageIndex);
            if (datas.size() < pageSize) {
                log.info("【資料同步 - 存量】,第{}次同步,獲取資料小於每頁獲取條數,證明已全部同步完畢!!!", pageIndex);
                return;
            }
            // 遞迴操作-直到資料同步完畢
            fetchAndSaveDB(pageIndex + 1, pageSize);
        } else {
            log.info("【資料同步 - 存量】,第{}次同步,獲取資料為空,證明已全部同步完畢!!!", pageIndex);
            return;
        }
    }
    /** 
     * 獲取分頁資料,Data為自定義實體類,這裡僅做範例!!!
     */
    private List<Data> getDataByPage(int pageIndex, int pageSize) throws Exception {
        //通過feign呼叫第三方介面獲取資料
        String data = dataFeignService.fetchAllData(pageSize, pageIndex);
        JSONObject jsonObject = JSONObject.parseObject(data);
        JSONArray datalist = jsonObject.getJSONArray("datalist");
        List<Data> datas = datalist.toJavaList(Data.class);
        return datas;
    }

2.2 多執行緒方式

由於遞迴方式是單執行緒,考慮到資料的龐大,且易造成記憶體溢位,因此將遞迴更換成多執行緒方式,不僅避免了記憶體溢位的情況,且速度大大的提升!!!

public void synAllData() {
         // 定義原子變數 - 頁數
        AtomicInteger pageIndex = new AtomicInteger(0);
         // 建立執行緒池
         ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

        // 100萬資料
        int total = 1000000;//資料總量
        int times = total / 1000;
        if (total % 1000!= 0) {
            times = times + 1;
        }
        LocalDateTime beginLocalDateTime = LocalDateTime.now();
        log.info("【資料同步 - 存量】開始同步時間:{}", beginLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        for (int index = 1; index <= times; index++) {
            fixedThreadPool.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        multiFetchAndSaveDB(pageIndex.incrementAndGet(), 1000);
                    } catch (Exception e) {
                        log.error("並行獲取並儲存資料異常:{}", e);
                    }
                }
            });
        }
        LocalDateTime endLocalDateTime = LocalDateTime.now();
        log.info("【資料同步 - 存量】同步結束時間:{},總共耗時:{}分鐘",
                endLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),
                Duration.between(beginLocalDateTime, endLocalDateTime).toMinutes());
    }
    /**
     * 資料同步 - 【多執行緒方式】
     *
     * @throws Exception
     */
    private void multiFetchAndSaveDB(int pageIndex, int pageSize) throws Exception {
        log.info("【資料同步 - 存量】,第{}次同步,", pageIndex);
        List<Data> datas= getDataByPage(pageIndex, pageSize);//getDataByPage()同上2.1
        if (CollectionUtils.isNotEmpty(datas)) {
            log.info("【資料同步 - 存量】,第{}次同步,同步成功", pageIndex);
            if (datas.size() < pageSize) {
                log.info("【資料同步 - 存量】,第{}次同步,獲取資料小於每頁獲取條數,證明已全部同步完畢!!!", pageIndex);
                return;
            }
        } else {
            log.info("【資料同步 - 存量】,第{}次同步,獲取資料為空,證明已全部同步完畢!!!", pageIndex);
            return;
        }

    }

三、增量資料如何對接

增量資料需要寫定時任務,可使用Scheduled註解,並需要將增量資料存放到目標庫中且進行資料轉換!!!此處就不再提供程式碼,可以參考上面的存量資料的方式編寫!!!

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援it145.com。


IT145.com E-mail:sddin#qq.com