<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
本文介紹幾種從攝像頭拉流的方法。
直接使用opencv的cv2.VideoCapture直接讀取rtsp視訊流,但是這樣做的缺點是延遲嚴重、出現掉幀、花屏現象等,原因在於opencv自己有一個快取,每次會順序從自己的快取中讀取,而不是直接讀取最新幀。
程式碼如下:
import cv2 import datetime def time_str(fmt=None): if fmt is None: fmt = '%Y_%m_%d_%H_%M_%S' return datetime.datetime.today().strftime(fmt) user_name, user_pwd = "admin", "1234" ca_ip="192.168.1.100" channel=2 cap = cv2.VideoCapture("rtsp://%s:%s@%s//Streaming/Channels/%d" % (user_name, user_pwd, ca_ip, channel)) if cap.isOpened(): print("Opened") while cap.isOpened(): ret, frame = cap.read() cv2.imwrite("opencv_"+time_str() + ".jpg", frame)
FFmpeg是一套強大的視訊、音訊處理程式,也是很多視訊處理軟體的基礎 。但是FFmpeg的命令列使用起來有一定的學習成本。而ffmpeg-python就是解決FFmpeg學習成本的問題,讓開發者使用python就可以呼叫FFmpeg的功能,既減少了學習成本,也增加了程式碼的可讀性。
github地址:https://github.com/kkroening/ffmpeg-python
2.1.1、安裝ffmpeg-python
ffmpeg-python可以通過典型的 pip 安裝獲取最新版本(注意:是ffmpeg-python,不要寫成了python-ffmpeg):
pip install ffmpeg-python
或者可以從本地克隆和安裝源:
git clone git@github.com:kkroening/ffmpeg-python.git pip install -e ./ffmpeg-python
2.1.2、安裝FFmpeg
使用該庫,需要自行安裝FFmpeg,如果電腦已經安裝了,可以忽略本步驟。這裡推薦直接使用conda進行安裝,可以省下很多麻煩,其他的安裝方式自行百度。
conda install ffmpeg
使用ffmpeg讀取rtsp流並轉換成numpy array,並使用cv2.imwrite儲存。
import ffmpeg import numpy as np import cv2 import datetime def main(source): args = { "rtsp_transport": "tcp", "fflags": "nobuffer", "flags": "low_delay" } # 新增引數 probe = ffmpeg.probe(source) cap_info = next(x for x in probe['streams'] if x['codec_type'] == 'video') print("fps: {}".format(cap_info['r_frame_rate'])) width = cap_info['width'] # 獲取視訊流的寬度 height = cap_info['height'] # 獲取視訊流的高度 up, down = str(cap_info['r_frame_rate']).split('/') fps = eval(up) / eval(down) print("fps: {}".format(fps)) # 讀取可能會出錯錯誤 process1 = ( ffmpeg .input(source, **args) .output('pipe:', format='rawvideo', pix_fmt='rgb24') .overwrite_output() .run_async(pipe_stdout=True) ) while True: in_bytes = process1.stdout.read(width * height * 3) # 讀取圖片 if not in_bytes: break # 轉成ndarray in_frame = ( np .frombuffer(in_bytes, np.uint8) .reshape([height, width, 3]) ) frame = cv2.cvtColor(in_frame, cv2.COLOR_RGB2BGR) # 轉成BGR # cv2.imshow(time_str(), frame) cv2.imwrite(time_str()+".jpg", frame) # if cv2.waitKey(1) == ord('q'): # break process1.kill() # 關閉 def time_str(fmt=None): if fmt is None: fmt = '%Y_%m_%d_%H_%M_%S' return datetime.datetime.today().strftime(fmt) if __name__ == "__main__": # rtsp流需要換成自己的 user_name, user_pwd = "admin", "1234" ca_ip = "192.168.1.168" channel = 2 alhua_rtsp="rtsp://%s:%s@%s//Streaming/Channels/%d" % (user_name, user_pwd, ca_ip, channel) main(alhua_rtsp)
採用多執行緒的方式,新開一個執行緒,利用變數、佇列等方式儲存最新幀,使得每次都讀取最新幀,而不是opencv自己快取中的順序幀,不會延遲,不會花屏了,程式碼如下:
import cv2 import threading import sys import datetime def time_str(fmt=None): if fmt is None: fmt = '%Y_%m_%d_%H_%M_%S' return datetime.datetime.today().strftime(fmt) class RTSCapture(cv2.VideoCapture): _cur_frame = None _reading = False schemes = ["rtsp://","rtmp://"] @staticmethod def create(url, *schemes): rtscap = RTSCapture(url) rtscap.frame_receiver = threading.Thread(target=rtscap.recv_frame, daemon=True) rtscap.schemes.extend(schemes) if isinstance(url, str) and url.startswith(tuple(rtscap.schemes)): rtscap._reading = True elif isinstance(url, int): pass return rtscap def isStarted(self): ok = self.isOpened() if ok and self._reading: ok = self.frame_receiver.is_alive() return ok def recv_frame(self): while self._reading and self.isOpened(): ok, frame = self.read() if not ok: break self._cur_frame = frame self._reading = False def read2(self): frame = self._cur_frame self._cur_frame = None return frame is not None, frame def start_read(self): self.frame_receiver.start() self.read_latest_frame = self.read2 if self._reading else self.read def stop_read(self): self._reading = False if self.frame_receiver.is_alive(): self.frame_receiver.join() if __name__ == '__main__': user_name, user_pwd = "admin", "1234" ca_ip = "192.168.1.100" channel = 2 alhua_rtsp="rtsp://%s:%s@%s//Streaming/Channels/%d" % (user_name, user_pwd, ca_ip, channel) rtscap = RTSCapture.create(alhua_rtsp) rtscap.start_read() while rtscap.isStarted(): ok, frame = rtscap.read_latest_frame() # if cv2.waitKey(100) & 0xFF == ord('q'): # break if not ok: continue # inhere # cv2.imshow(time_str(), frame) cv2.imwrite(time_str() + ".jpg", frame) rtscap.stop_read() rtscap.release() cv2.destroyAllWindows()
執行結果:
使用Python3自帶的多程序模組,建立一個佇列,程序A從通過rtsp協定從視訊流中讀取出每一幀,並放入佇列中,程序B從佇列中將圖片取出,處理後進行顯示。程序A如果發現佇列裡有兩張圖片(證明程序B的讀取速度跟不上程序A),那麼程序A主動將佇列裡面的舊圖片刪掉,換上新圖片。通過多執行緒的方法:
程式碼如下:
import cv2 import multiprocessing as mp import time import datetime def time_str(fmt=None): if fmt is None: fmt = '%Y_%m_%d_%H_%M_%S' return datetime.datetime.today().strftime(fmt) def image_put(q, user, pwd, ip, channel=1): cap = cv2.VideoCapture("rtsp://%s:%s@%s//Streaming/Channels/%d" % (user, pwd, ip, channel)) if cap.isOpened(): print('HIKVISION') else: cap = cv2.VideoCapture("rtsp://%s:%s@%s/cam/realmonitor?channel=%d&subtype=0" % (user, pwd, ip, channel)) print('DaHua') while True: q.put(cap.read()[1]) q.get() if q.qsize() > 1 else time.sleep(0.01) def image_get(q, window_name): # cv2.namedWindow(window_name, flags=cv2.WINDOW_FREERATIO) while True: frame = q.get() # cv2.imshow(window_name, frame) # cv2.waitKey(1) cv2.imwrite("opencv_"+time_str() + ".jpg", frame) cv2.waitKey(1) def run_single_camera(): user_name, user_pwd, camera_ip = "admin", "admin123456", "192.168.35.121" mp.set_start_method(method='spawn') # init queue = mp.Queue(maxsize=2) processes = [mp.Process(target=image_put, args=(queue, user_name, user_pwd, camera_ip)), mp.Process(target=image_get, args=(queue, camera_ip))] [process.start() for process in processes] [process.join() for process in processes] def run_multi_camera(): # user_name, user_pwd = "admin", "password" user_name, user_pwd = "admin", "1234" camera_ip_l = [ "192.168.1.XX3", # ipv4 "192.168.1.XX2", "192.168.1.XX1", ] mp.set_start_method(method='spawn') # init queues = [mp.Queue(maxsize=90) for _ in camera_ip_l] processes = [] for queue, camera_ip in zip(queues, camera_ip_l): processes.append(mp.Process(target=image_put, args=(queue, user_name, user_pwd, camera_ip))) processes.append(mp.Process(target=image_get, args=(queue, camera_ip))) for process in processes: process.daemon = True process.start() for process in processes: process.join() if __name__ == '__main__': # run_single_camera() run_multi_camera() pass
到此這篇關於Python實現從網路攝像頭拉流的方法分享的文章就介紹到這了,更多相關Python網路攝像頭拉流內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45