首頁 > 軟體

Java多執行緒實現FTP批次上傳檔案

2022-06-23 14:00:19

本文範例為大家分享了Java多執行緒實現FTP批次上傳檔案的具體程式碼,供大家參考,具體內容如下

1、構建FTP使用者端

package cn.com.pingtech.common.ftp;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;

import java.io.*;
import java.net.UnknownHostException;

@Slf4j
public class  FtpConnection {

    private FTPClient ftp = new FTPClient();

    private boolean is_connected = false;

    /**
     * 建構函式
     */
    public FtpConnection() {
        is_connected = false;
        ftp.setDefaultTimeout(FtpConfig.defaultTimeoutSecond * 1000);
        ftp.setConnectTimeout(FtpConfig.connectTimeoutSecond * 1000);
        ftp.setDataTimeout(FtpConfig.dataTimeoutSecond * 1000);
        try {
            initConnect(FtpConfig.host, FtpConfig.port, FtpConfig.user, FtpConfig.password);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 初始化連線
     *
     * @param host
     * @param port
     * @param user
     * @param password
     * @throws IOException
     */
    private void initConnect(String host, int port, String user, String password) throws IOException {
        try {
            ftp.connect(host, port);
        } catch (UnknownHostException ex) {
            throw new IOException("Can't find FTP server '" + host + "'");
        }
        int reply = ftp.getReplyCode();//220 連線成功
        if (!FTPReply.isPositiveCompletion(reply)) {
            disconnect();
            throw new IOException("Can't connect to server '" + host + "'");

        }
        if (!ftp.login(user, password)) {
            is_connected = false;
            disconnect();
            throw new IOException("Can't login to server '" + host + "'");
        } else {
            is_connected = true;
        }
    }

    /**
     * 上傳檔案
     *
     * @param path
     * @param ftpFileName
     * @param localFile
     * @throws IOException
     */
    public boolean upload(String path, String ftpFileName, File localFile) throws IOException {
        boolean is  = false;
        //檢查本地檔案是否存在
        if (!localFile.exists()) {
            throw new IOException("Can't upload '" + localFile.getAbsolutePath() + "'. This file doesn't exist.");
        }
        //設定工作路徑
        setWorkingDirectory(path);
        //上傳
        InputStream in = null;
        try {
            //被動模式
            ftp.enterLocalPassiveMode();
            in = new BufferedInputStream(new FileInputStream(localFile));
            //儲存檔案
            is = ftp.storeFile(ftpFileName, in);
        }catch (Exception e){
            e.printStackTrace();
        }
        finally {
            try {
                in.close();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
        return is;
    }

    /**
     * 關閉連線
     *
     * @throws IOException
     */
    public void disconnect() throws IOException {
        if (ftp.isConnected()) {
            try {
                ftp.logout();
                ftp.disconnect();
                is_connected = false;
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }

    /**
     * 設定工作路徑
     *
     * @param dir
     * @return
     */
    private boolean setWorkingDirectory(String dir) {
        if (!is_connected) {
            return false;
        }
        //如果目錄不存在建立目錄
        try {
            if (createDirecroty(dir)) {
                return ftp.changeWorkingDirectory(dir);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return false;

    }

    /**
     * 是否連線
     *
     * @return
     */
    public boolean isConnected() {
        return is_connected;
    }

    /**
     * 建立目錄
     *
     * @param remote
     * @return
     * @throws IOException
     */
    private boolean createDirecroty(String remote) throws IOException {
        boolean success = true;
        String directory = remote.substring(0, remote.lastIndexOf("/") + 1);
        // 如果遠端目錄不存在,則遞迴建立遠端伺服器目錄
        if (!directory.equalsIgnoreCase("/") && !ftp.changeWorkingDirectory(new String(directory))) {
            int start = 0;
            int end = 0;
            if (directory.startsWith("/")) {
                start = 1;
            } else {
                start = 0;
            }
            end = directory.indexOf("/", start);
            while (true) {
                String subDirectory = new String(remote.substring(start, end));
                if (!ftp.changeWorkingDirectory(subDirectory)) {
                    if (ftp.makeDirectory(subDirectory)) {
                        ftp.changeWorkingDirectory(subDirectory);
                    } else {
                        log.error("mack directory error :/" + subDirectory);
                        return false;
                    }
                }
                start = end + 1;
                end = directory.indexOf("/", start);
                // 檢查所有目錄是否建立完畢
                if (end <= start) {
                    break;
                }
            }
        }
        return success;
    }

}

2、FTP連線工廠

package cn.com.pingtech.common.ftp;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;


/**
 * 連線工廠

 */
@Slf4j
public class FtpFactory {

    //有界佇列
    private static final ArrayBlockingQueue<FtpConnection> arrayBlockingQueue = new ArrayBlockingQueue<>(FtpConfig.ftpConnectionSize);


    protected FtpFactory(){
        log.info("init ftpConnectionSize "+FtpConfig.ftpConnectionSize);
        for(int i = 0; i< FtpConfig.ftpConnectionSize; i++){
            //表示如果可能的話,將 e 加到 BlockingQueue 裡,即如果 BlockingQueue 可以容納,則返回 true,否則返回 false
            arrayBlockingQueue.offer(new FtpConnection());
        }
    }

    /**
     * 獲取連線
     *
     * @return
     */

    public FtpConnection getFtp() {
        FtpConnection poll = null;
        try {
            //取走 BlockingQueue 裡排在首位的物件,若 BlockingQueue 為空,阻斷進入等待狀態直到 Blocking 有新的物件被加入為止
            poll = arrayBlockingQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return poll;
    }

    /**
     * 釋放連線
     * @param ftp
     * @return
     */
    public boolean relase(FtpConnection ftp){
        return arrayBlockingQueue.offer(ftp);
    }

    /**
     * 刪除連線
     *
     * @param ftp
     */

    public void remove(FtpConnection ftp) {
        arrayBlockingQueue.remove(ftp);
    }

    /**
     * 關閉連線
     */
    public void close() {
        for (FtpConnection connection : arrayBlockingQueue) {
            try {
                connection.disconnect();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }


}

3、FTP設定

package cn.com.pingtech.common.ftp;

/**
 * ftp 設定類
 */

public class FtpConfig {

    public static int defaultTimeoutSecond = 10;
    public static int connectTimeoutSecond = 10;
    public static int dataTimeoutSecond = 10;
    public static String host = "127.0.0.1";
    public static int port =9999;
    public static String user = "Administrator";
    public static String password ="Yp886611";
    public static int threadPoolSize = 1;
    public static int ftpConnectionSize = 1;
    
}

4、構建多執行緒FTP上傳任務

package cn.com.pingtech.common.ftp;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.Callable;


/**
 * 上傳任務
 */
public class UploadTask implements Callable{
    private File file;

    private FtpConnection ftp;

    private String path;

    private String fileName;

    private FtpFactory factory;

    public UploadTask(FtpFactory factory,FtpConnection ftp, File file, String path, String fileName){

        this.factory = factory;

        this.ftp = ftp;

        this.file = file;

        this.path = path;

        this.fileName = fileName;

    }

    @Override
    public UploadResult call() throws Exception {
        UploadResult result = null;
        try {
            if (ftp == null) {
                result = new UploadResult(file.getAbsolutePath(), false);
                return result;
            }
            //如果連線未開啟 重新獲取連線
            if (!ftp.isConnected()) {
                factory.remove(ftp);
                ftp = new FtpConnection();
            }

            //開始上傳
            result = new UploadResult(file.getName(), ftp.upload(path, fileName, file));
        } catch (IOException ex) {
            result = new UploadResult(file.getName(), false);
            ex.printStackTrace();
        } finally {
            factory.relase(ftp);//釋放連線
        }
        return result;

    }
}
package cn.com.pingtech.common.ftp;
/**
 * 上傳結果
 */
public class UploadResult {
    private String fileName; //檔名稱
    private boolean result; //是否上傳成功

    public UploadResult(String fileName, boolean result) {
        this.fileName = fileName;
        this.result = result;
    }

    public String getFileName() {
        return fileName;

    }

    public void setFileName(String fileName) {
        this.fileName = fileName;
    }

    public boolean isResult() {
        return result;
    }

    public void setResult(boolean result) {
        this.result = result;
    }

    public String toString() {
        return "[fileName=" + fileName + " , result=" + result + "]";
    }
}

注意:實現Callable介面的任務執行緒能返回執行結果
Callable介面支援返回執行結果,此時需要呼叫FutureTask.get()方法實現,此方法會阻塞執行緒直到獲取“將來”的結果,當不呼叫此方法時,主執行緒不會阻塞

5、FTP上傳工具類

package cn.com.pingtech.common.ftp;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;


/**
 * ftp上傳工具包
 */

public class FtpUtil {

    /**
     * 上傳檔案
     *
     * @param ftpPath
     * @param listFiles
     * @return
     */

    public static synchronized List upload(String ftpPath, File[] listFiles) {
        //構建執行緒池
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(FtpConfig.threadPoolSize);
        List<Future> results = new ArrayList<>();
        //建立n個ftp連結
        FtpFactory factory = new FtpFactory();
        for (File file : listFiles) {
            FtpConnection ftp = factory.getFtp();//獲取ftp con
            UploadTask upload = new UploadTask(factory,ftp, file, ftpPath, file.getName());
            Future submit = newFixedThreadPool.submit(upload);
            results.add(submit);
        }

        List listResults = new ArrayList<>();
        for (Future result : results) {
            try {
                //獲取執行緒結果
                UploadResult uploadResult = (UploadResult)result.get(30, TimeUnit.MINUTES);
                listResults.add(uploadResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        factory.close();
        newFixedThreadPool.shutdown();
        return listResults;
    }

}

6、測試上傳

package cn.com.pingtech.common.ftp


class Client {
    public static void main(String[] args) throws IOException {
        String loalPath = "C:\Users\Administrator\Desktop\test\0";
        String ftpPath = "/data/jcz/";
        File parentFile = new File(loalPath);
        List <UploadResult> list = FtpUtil.upload(ftpPath,parentFile.listFiles());
        for(UploadResult vo:list){
            System.out.println(vo);
        }
        
    }
}

注意:FTP協定裡面,規定檔名編碼為iso-8859-1,所以目錄名或檔名需要轉碼

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援it145.com。


IT145.com E-mail:sddin#qq.com