首頁 > 軟體

Python實現從網路攝像頭拉流的方法分享

2023-01-31 06:02:03

摘要

本文介紹幾種從攝像頭拉流的方法。

1、直接使用OpenCV

直接使用opencv的cv2.VideoCapture直接讀取rtsp視訊流,但是這樣做的缺點是延遲嚴重、出現掉幀、花屏現象等,原因在於opencv自己有一個快取,每次會順序從自己的快取中讀取,而不是直接讀取最新幀。

程式碼如下:

import cv2
import datetime
def time_str(fmt=None):
    if fmt is None:
        fmt = '%Y_%m_%d_%H_%M_%S'
    return datetime.datetime.today().strftime(fmt)

user_name, user_pwd = "admin", "1234"
ca_ip="192.168.1.100"
channel=2
cap = cv2.VideoCapture("rtsp://%s:%s@%s//Streaming/Channels/%d" 
                           % (user_name, user_pwd, ca_ip, channel))
if cap.isOpened():
    print("Opened")
while cap.isOpened():
        ret, frame = cap.read()
        cv2.imwrite("opencv_"+time_str() + ".jpg", frame)

2、使用ffmpeg

FFmpeg是一套強大的視訊、音訊處理程式,也是很多視訊處理軟體的基礎 。但是FFmpeg的命令列使用起來有一定的學習成本。而ffmpeg-python就是解決FFmpeg學習成本的問題,讓開發者使用python就可以呼叫FFmpeg的功能,既減少了學習成本,也增加了程式碼的可讀性。

github地址:https://github.com/kkroening/ffmpeg-python

2.1、安裝方法 

2.1.1、安裝ffmpeg-python 

ffmpeg-python可以通過典型的 pip 安裝獲取最新版本(注意:是ffmpeg-python,不要寫成了python-ffmpeg):

pip install ffmpeg-python

或者可以從本地克隆和安裝源:

git clone git@github.com:kkroening/ffmpeg-python.git
pip install -e ./ffmpeg-python

2.1.2、安裝FFmpeg 

使用該庫,需要自行安裝FFmpeg,如果電腦已經安裝了,可以忽略本步驟。這裡推薦直接使用conda進行安裝,可以省下很多麻煩,其他的安裝方式自行百度。

conda install ffmpeg

2.2、程式碼實現

使用ffmpeg讀取rtsp流並轉換成numpy array,並使用cv2.imwrite儲存。

import ffmpeg
import numpy as np
import cv2
import datetime

def main(source):
    args = {
        "rtsp_transport": "tcp",
        "fflags": "nobuffer",
        "flags": "low_delay"
    }    # 新增引數
    probe = ffmpeg.probe(source)
    cap_info = next(x for x in probe['streams'] if x['codec_type'] == 'video')
    print("fps: {}".format(cap_info['r_frame_rate']))
    width = cap_info['width']           # 獲取視訊流的寬度
    height = cap_info['height']         # 獲取視訊流的高度
    up, down = str(cap_info['r_frame_rate']).split('/')
    fps = eval(up) / eval(down)
    print("fps: {}".format(fps))    # 讀取可能會出錯錯誤
    process1 = (
        ffmpeg
        .input(source, **args)
        .output('pipe:', format='rawvideo', pix_fmt='rgb24')
        .overwrite_output()
        .run_async(pipe_stdout=True)
    )
    while True:
        in_bytes = process1.stdout.read(width * height * 3)     # 讀取圖片
        if not in_bytes:
            break
        # 轉成ndarray
        in_frame = (
            np
            .frombuffer(in_bytes, np.uint8)
            .reshape([height, width, 3])
        )
        frame = cv2.cvtColor(in_frame, cv2.COLOR_RGB2BGR)  # 轉成BGR
        # cv2.imshow(time_str(), frame)
        cv2.imwrite(time_str()+".jpg", frame)
        # if cv2.waitKey(1) == ord('q'):
        #     break
    process1.kill()             # 關閉

def time_str(fmt=None):
    if fmt is None:
        fmt = '%Y_%m_%d_%H_%M_%S'
    return datetime.datetime.today().strftime(fmt)

if __name__ == "__main__":
    # rtsp流需要換成自己的
    user_name, user_pwd = "admin", "1234"
    ca_ip = "192.168.1.168"
    channel = 2
    alhua_rtsp="rtsp://%s:%s@%s//Streaming/Channels/%d" 
                           % (user_name, user_pwd, ca_ip, channel)

    main(alhua_rtsp)

3、多執行緒的方式讀取圖片

採用多執行緒的方式,新開一個執行緒,利用變數、佇列等方式儲存最新幀,使得每次都讀取最新幀,而不是opencv自己快取中的順序幀,不會延遲,不會花屏了,程式碼如下:

import cv2
import threading
import sys
import  datetime
def time_str(fmt=None):
    if fmt is None:
        fmt = '%Y_%m_%d_%H_%M_%S'
    return datetime.datetime.today().strftime(fmt)

class RTSCapture(cv2.VideoCapture):
    _cur_frame = None
    _reading = False
    schemes = ["rtsp://","rtmp://"]
    @staticmethod
    def create(url, *schemes):
        rtscap = RTSCapture(url)
        rtscap.frame_receiver = threading.Thread(target=rtscap.recv_frame, daemon=True)
        rtscap.schemes.extend(schemes)
        if isinstance(url, str) and url.startswith(tuple(rtscap.schemes)):
            rtscap._reading = True
        elif isinstance(url, int):
            pass
        return rtscap

    def isStarted(self):
        ok = self.isOpened()
        if ok and self._reading:
            ok = self.frame_receiver.is_alive()
        return ok

    def recv_frame(self):
        while self._reading and self.isOpened():
            ok, frame = self.read()
            if not ok: break
            self._cur_frame = frame
        self._reading = False

    def read2(self):
        frame = self._cur_frame
        self._cur_frame = None
        return frame is not None, frame

    def start_read(self):
        self.frame_receiver.start()
        self.read_latest_frame = self.read2 if self._reading else self.read

    def stop_read(self):
        self._reading = False
        if self.frame_receiver.is_alive(): self.frame_receiver.join()


if __name__ == '__main__':
    user_name, user_pwd = "admin", "1234"
    ca_ip = "192.168.1.100"
    channel = 2
    alhua_rtsp="rtsp://%s:%s@%s//Streaming/Channels/%d" 
                           % (user_name, user_pwd, ca_ip, channel)

    rtscap = RTSCapture.create(alhua_rtsp)
    rtscap.start_read()

    while rtscap.isStarted():
        ok, frame = rtscap.read_latest_frame()
        # if cv2.waitKey(100) & 0xFF == ord('q'):
        #     break
        if not ok:
            continue


        # inhere
        # cv2.imshow(time_str(), frame)
        cv2.imwrite(time_str() + ".jpg", frame)


    rtscap.stop_read()
    rtscap.release()
    cv2.destroyAllWindows()

執行結果:

4、多程序的方式拉流

使用Python3自帶的多程序模組,建立一個佇列,程序A從通過rtsp協定從視訊流中讀取出每一幀,並放入佇列中,程序B從佇列中將圖片取出,處理後進行顯示。程序A如果發現佇列裡有兩張圖片(證明程序B的讀取速度跟不上程序A),那麼程序A主動將佇列裡面的舊圖片刪掉,換上新圖片。通過多執行緒的方法:

程式碼如下:

import cv2
import multiprocessing as mp
import time
import datetime


def time_str(fmt=None):
    if fmt is None:
        fmt = '%Y_%m_%d_%H_%M_%S'
    return datetime.datetime.today().strftime(fmt)

def image_put(q, user, pwd, ip, channel=1):
    cap = cv2.VideoCapture("rtsp://%s:%s@%s//Streaming/Channels/%d" % (user, pwd, ip, channel))
    if cap.isOpened():
        print('HIKVISION')
    else:
        cap = cv2.VideoCapture("rtsp://%s:%s@%s/cam/realmonitor?channel=%d&subtype=0" % (user, pwd, ip, channel))
        print('DaHua')

    while True:
        q.put(cap.read()[1])
        q.get() if q.qsize() > 1 else time.sleep(0.01)


def image_get(q, window_name):
    # cv2.namedWindow(window_name, flags=cv2.WINDOW_FREERATIO)
    while True:
        frame = q.get()
        # cv2.imshow(window_name, frame)
        # cv2.waitKey(1)
        cv2.imwrite("opencv_"+time_str() + ".jpg", frame)
        cv2.waitKey(1)

def run_single_camera():
    user_name, user_pwd, camera_ip = "admin", "admin123456", "192.168.35.121"

    mp.set_start_method(method='spawn')  # init
    queue = mp.Queue(maxsize=2)
    processes = [mp.Process(target=image_put, args=(queue, user_name, user_pwd, camera_ip)),
                 mp.Process(target=image_get, args=(queue, camera_ip))]

    [process.start() for process in processes]
    [process.join() for process in processes]

def run_multi_camera():
    # user_name, user_pwd = "admin", "password"
    user_name, user_pwd = "admin", "1234"
    camera_ip_l = [
        "192.168.1.XX3",  # ipv4
        "192.168.1.XX2",
        "192.168.1.XX1",
    ]

    mp.set_start_method(method='spawn')  # init
    queues = [mp.Queue(maxsize=90) for _ in camera_ip_l]

    processes = []
    for queue, camera_ip in zip(queues, camera_ip_l):
        processes.append(mp.Process(target=image_put, args=(queue, user_name, user_pwd, camera_ip)))
        processes.append(mp.Process(target=image_get, args=(queue, camera_ip)))

    for process in processes:
        process.daemon = True
        process.start()
    for process in processes:
        process.join()


if __name__ == '__main__':
    # run_single_camera()
    run_multi_camera()
    pass

到此這篇關於Python實現從網路攝像頭拉流的方法分享的文章就介紹到這了,更多相關Python網路攝像頭拉流內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com