<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
在本教學中,您將學習如何使用 dlib 庫在實時視訊中有效地跟蹤多個物件。
我們當然可以使用 dlib 跟蹤多個物件;但是,為了獲得可能的最佳效能,我們需要利用多處理並將物件跟蹤器分佈在處理器的多個核心上。
正確利用多處理使我們能夠將 dlib 多物件跟蹤每秒幀數 (FPS) 提高 45% 以上!
在本指南的第一部分,我將演示如何實現一個簡單、樸素的 dlib 多物件跟蹤指令碼。該程式將跟蹤視訊中的多個物件;但是,我們會注意到指令碼執行速度有點慢。 為了提高我們的 FPS,我將向您展示一個更快、更高效的 dlib 多物件跟蹤器實現。 最後,我將討論一些改進和建議,以增強我們的多物件跟蹤實現。
你可以使用tree命令檢視我們的專案結構:
mobilenet_ssd/ 目錄包含我們的 MobileNet + SSD Caffe 模型檔案,它允許我們檢測人(以及其他物件)。 今天我們將回顧兩個 Python 指令碼:
我們今天要介紹的第一個 dlib 多物件跟蹤實現是“樸素的”,因為它將:
1.使用一個簡單的跟蹤器物件列表。
2.僅使用我們處理器的單個核心按順序更新每個跟蹤器。
對於某些物件跟蹤任務,此實現將綽綽有餘;然而,為了優化我們的 FPS,我們應該將物件跟蹤器分佈在多個程序中。
我們將從本節中的簡單實現開始,然後在下一節中轉到更快的方法。 首先,開啟multi_object_tracking_slow.py 指令碼並插入以下程式碼:
# import the necessary packages from imutils.video import FPS import numpy as np import argparse import imutils import dlib import cv2
讓我們解析我們的命令列引數:
# construct the argument parser and parse the arguments ap = argparse.ArgumentParser() ap.add_argument("-p", "--prototxt", required=True, help="path to Caffe 'deploy' prototxt file") ap.add_argument("-m", "--model", required=True, help="path to Caffe pre-trained model") ap.add_argument("-v", "--video", required=True, help="path to input video file") ap.add_argument("-o", "--output", type=str, help="path to optional output video file") ap.add_argument("-c", "--confidence", type=float, default=0.2, help="minimum probability to filter weak detections") args = vars(ap.parse_args())
我們的指令碼在執行時處理以下命令列引數:
讓我們定義這個模型支援的類列表,並從磁碟載入我們的模型:
# initialize the list of class labels MobileNet SSD was trained to # detect CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"] # load our serialized model from disk print("[INFO] loading model...") net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])
我們只關心今天的賽跑範例中的“人”類,但您可以輕鬆修改以跟蹤其他類。 我們載入了預訓練的物件檢測器模型。我們將使用我們預訓練的 SSD 來檢測視訊中物體的存在。我們將建立一個 dlib 物件跟蹤器來跟蹤每個檢測到的物件。
我們還有一些初始化要執行:
# initialize the video stream and output video writer print("[INFO] starting video stream...") vs = cv2.VideoCapture(args["video"]) writer = None # initialize the list of object trackers and corresponding class # labels trackers = [] labels = [] # start the frames per second throughput estimator fps = FPS().start()
我們初始化我們的視訊流——我們將從輸入視訊中一次讀取一個幀。 隨後,我們的視訊writer被初始化為 None 。在即將到來的 while 迴圈中,我們將與視訊writer進行更多合作。 現在初始化我們的跟蹤器和標籤列表。 最後,開始我們的每秒幀數計數器。 我們都準備好開始處理視訊了:
# loop over frames from the video file stream while True: # grab the next frame from the video file (grabbed, frame) = vs.read() # check to see if we have reached the end of the video file if frame is None: break # resize the frame for faster processing and then convert the # frame from BGR to RGB ordering (dlib needs RGB ordering) frame = imutils.resize(frame, width=600) rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # if we are supposed to be writing a video to disk, initialize # the writer if args["output"] is not None and writer is None: fourcc = cv2.VideoWriter_fourcc(*"MJPG") writer = cv2.VideoWriter(args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True)
將幀調整為600畫素寬,保持高寬比。然後,為了dlib相容性,幀被轉換為RGB顏色通道排序(OpenCV的預設值是BGR,而dlib的預設值是RGB)。
讓我們開始物件檢測階段:
# if there are no object trackers we first need to detect objects # and then create a tracker for each object if len(trackers) == 0: # grab the frame dimensions and convert the frame to a blob (h, w) = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5) # pass the blob through the network and obtain the detections # and predictions net.setInput(blob) detections = net.forward()
為了執行物件跟蹤,我們必須首先執行物件檢測
如果沒有物件跟蹤器,那麼我們知道我們還沒有執行物件檢測。
我們建立並通過 SSD 網路傳遞一個 blob 以檢測物件。
接下來,我們繼續迴圈檢測以查詢屬於person類的物件,因為我們的輸入視訊是人類的賽跑:
# loop over the detections for i in np.arange(0, detections.shape[2]): # extract the confidence (i.e., probability) associated # with the prediction confidence = detections[0, 0, i, 2] # filter out weak detections by requiring a minimum # confidence if confidence > args["confidence"]: # extract the index of the class label from the # detections list idx = int(detections[0, 0, i, 1]) label = CLASSES[idx] # if the class label is not a person, ignore it if CLASSES[idx] != "person": continue
我們開始迴圈檢測,其中我們:
現在我們已經在框架中定位了每個person,讓我們範例化我們的跟蹤器並繪製我們的初始邊界框 + 類標籤:
# compute the (x, y)-coordinates of the bounding box # for the object box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") # construct a dlib rectangle object from the bounding # box coordinates and start the correlation tracker t = dlib.correlation_tracker() rect = dlib.rectangle(startX, startY, endX, endY) t.start_track(rgb, rect) # update our set of trackers and corresponding class # labels labels.append(label) trackers.append(t) # grab the corresponding class label for the detection # and draw the bounding box cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
要開始跟蹤物件,我們:
因此,在下一個程式碼塊中,我們將處理已經建立跟蹤器並且只需要更新位置的情況。 我們在初始檢測步驟中執行了兩個額外的任務:
如果我們的檢測列表的長度大於0,我們就知道我們處於目標跟蹤階段:
# otherwise, we've already performed detection so let's track # multiple objects else: # loop over each of the trackers for (t, l) in zip(trackers, labels): # update the tracker and grab the position of the tracked # object t.update(rgb) pos = t.get_position() # unpack the position object startX = int(pos.left()) startY = int(pos.top()) endX = int(pos.right()) endY = int(pos.bottom()) # draw the bounding box from the correlation object tracker cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, l, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
在目標跟蹤階段,我們遍歷所有trackers和相應的labels。然後我們繼續update每個物件的位置。為了更新位置,我們只需傳遞 rgb 影象。
提取邊界框座標後,我們可以為每個被跟蹤物件繪製一個邊界框rectangle和label。
幀處理迴圈中的其餘步驟涉及寫入輸出視訊(如有必要)並顯示結果:
# check to see if we should write the frame to disk if writer is not None: writer.write(frame) # show the output frame cv2.imshow("Frame", frame) key = cv2.waitKey(1) & 0xFF # if the `q` key was pressed, break from the loop if key == ord("q"): break # update the FPS counter fps.update()
在這裡,我們:
剩下的步驟是在終端列印FPS資訊並釋放指標:
# stop the timer and display FPS information fps.stop() print("[INFO] elapsed time: {:.2f}".format(fps.elapsed())) print("[INFO] approx. FPS: {:.2f}".format(fps.fps())) # check to see if we need to release the video writer pointer if writer is not None: writer.release() # do a bit of cleanup cv2.destroyAllWindows() vs.release()
讓我們評估準確性和效能。開啟終端並執行以下命令:
$ python multi_object_tracking_slow.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel --video race.mp4 --output race_output_slow.avi [INFO] loading model... [INFO] starting video stream... [INFO] elapsed time: 24.51 [INFO] approx. FPS: 13.87
看來我們的多目標跟蹤器起作用了!
但正如你所看到的,我們只獲得了約13幀/秒。
對於某些應用程式來說,這個FPS可能已經足夠了——然而,如果你需要更快的FPS,我建議你看看下面我們更高效的dlib多物件跟蹤器。其次,要明白跟蹤的準確性並不完美。
如果您執行上一節中的 dlib 多物件跟蹤指令碼並同時開啟系統的監視器,您會注意到只使用了處理器的一個核心。
如果您執行上一節中的 dlib 多物件跟蹤指令碼並同時開啟系統的活動監視器,您會注意到只使用了處理器的一個核心。
利用程序使我們的作業系統能夠執行更好的程序排程,將程序對映到我們機器上的特定處理器核心(大多數現代作業系統能夠以並行方式有效地排程使用大量 CPU 的程序)。
繼續開啟 mutli_object_tracking_fast.py 並插入以下程式碼:
# import the necessary packages from imutils.video import FPS import multiprocessing import numpy as np import argparse import imutils import dlib import cv2
我們將使用 Python Process 類來生成一個新程序——每個新程序都獨立於原始程序。
為了生成這個程序,我們需要提供一個 Python 可以呼叫的函數,然後 Python 將使用該函數並建立一個全新的程序並執行它:
def start_tracker(box, label, rgb, inputQueue, outputQueue): # construct a dlib rectangle object from the bounding box # coordinates and then start the correlation tracker t = dlib.correlation_tracker() rect = dlib.rectangle(box[0], box[1], box[2], box[3]) t.start_track(rgb, rect)
start_tracker 的前三個引數包括:
請記住Python多處理是如何工作的——Python將呼叫這個函數,然後建立一個全新的直譯器來執行其中的程式碼。因此,每個生成的start_tracker程序都將獨立於它的父程序。為了與Python驅動程式指令碼通訊,我們需要利用管道或佇列(Pipes and Queues)。這兩種型別的物件都是執行緒/程序安全的,使用鎖和號誌來完成。
本質上,我們正在建立一個簡單的生產者/消費者關係:
我決定在這篇文章中使用 Queue 物件;但是,請記住,如果您願意,也可以使用Pipe
現在讓我們開始一個無限迴圈,它將在程序中執行:
# loop indefinitely -- this function will be called as a daemon # process so we don't need to worry about joining it while True: # attempt to grab the next frame from the input queue rgb = inputQueue.get() # if there was an entry in our queue, process it if rgb is not None: # update the tracker and grab the position of the tracked # object t.update(rgb) pos = t.get_position() # unpack the position object startX = int(pos.left()) startY = int(pos.top()) endX = int(pos.right()) endY = int(pos.bottom()) # add the label + bounding box coordinates to the output # queue outputQueue.put((label, (startX, startY, endX, endY)))
我們在這裡無限迴圈——這個函數將作為守護行程呼叫,所以我們不需要擔心加入它。
首先,我們將嘗試從 inputQueue 中抓取一個新幀。如果幀不為空,我們將抓取幀,然後更新物件跟蹤器,讓我們獲得更新後的邊界框座標。
最後,我們將標籤和邊界框寫入 outputQueue,以便父程序可以在指令碼的主迴圈中使用它們。
回到父程序,我們將解析命令列引數:
# construct the argument parser and parse the arguments ap = argparse.ArgumentParser() ap.add_argument("-p", "--prototxt", required=True, help="path to Caffe 'deploy' prototxt file") ap.add_argument("-m", "--model", required=True, help="path to Caffe pre-trained model") ap.add_argument("-v", "--video", required=True, help="path to input video file") ap.add_argument("-o", "--output", type=str, help="path to optional output video file") ap.add_argument("-c", "--confidence", type=float, default=0.2, help="minimum probability to filter weak detections") args = vars(ap.parse_args())
此指令碼的命令列引數與我們較慢的非多處理指令碼完全相同。
讓我們初始化我們的輸入和輸出佇列:
# initialize our lists of queues -- both input queue and output queue # for *every* object that we will be tracking inputQueues = [] outputQueues = []
這些佇列將儲存我們正在跟蹤的物件。生成的每個程序都需要兩個 Queue 物件:
下一個程式碼塊與我們之前的指令碼相同:
# initialize the list of class labels MobileNet SSD was trained to # detect CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"] # load our serialized model from disk print("[INFO] loading model...") net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"]) # initialize the video stream and output video writer print("[INFO] starting video stream...") vs = cv2.VideoCapture(args["video"]) writer = None # start the frames per second throughput estimator fps = FPS().start()
我們定義模型的 CLASSES 並載入模型本身。
現在讓我們開始迴圈視訊流中的幀:
# loop over frames from the video file stream while True: # grab the next frame from the video file (grabbed, frame) = vs.read() # check to see if we have reached the end of the video file if frame is None: break # resize the frame for faster processing and then convert the # frame from BGR to RGB ordering (dlib needs RGB ordering) frame = imutils.resize(frame, width=600) rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # if we are supposed to be writing a video to disk, initialize # the writer if args["output"] is not None and writer is None: fourcc = cv2.VideoWriter_fourcc(*"MJPG") writer = cv2.VideoWriter(args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True)
現在讓我們處理沒有 inputQueues 的情況:
# if our list of queues is empty then we know we have yet to # create our first object tracker if len(inputQueues) == 0: # grab the frame dimensions and convert the frame to a blob (h, w) = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5) # pass the blob through the network and obtain the detections # and predictions net.setInput(blob) detections = net.forward() # loop over the detections for i in np.arange(0, detections.shape[2]): # extract the confidence (i.e., probability) associated # with the prediction confidence = detections[0, 0, i, 2] # filter out weak detections by requiring a minimum # confidence if confidence > args["confidence"]: # extract the index of the class label from the # detections list idx = int(detections[0, 0, i, 1]) label = CLASSES[idx] # if the class label is not a person, ignore it if CLASSES[idx] != "person": continue
如果沒有 inputQueues,那麼我們需要在物件跟蹤之前應用物件檢測。 我們應用物件檢測,然後繼續迴圈。我們獲取置信度值並過濾掉弱檢測。 如果我們的置信度滿足我們的命令列引數建立的閾值,我們會考慮檢測,但我們會通過類標籤進一步過濾掉它。在這種情況下,我們只尋找person物件。 假設我們找到了一個person,我們將建立佇列和生成跟蹤程序:
# compute the (x, y)-coordinates of the bounding box # for the object box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") bb = (startX, startY, endX, endY) # create two brand new input and output queues, # respectively iq = multiprocessing.Queue() oq = multiprocessing.Queue() inputQueues.append(iq) outputQueues.append(oq) # spawn a daemon process for a new object tracker p = multiprocessing.Process( target=start_tracker, args=(bb, label, rgb, iq, oq)) p.daemon = True p.start() # grab the corresponding class label for the detection # and draw the bounding box cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
我們首先計算邊界框座標。從那裡我們建立兩個新佇列 iq 和 oq,分別將它們附加到 inputQueues 和 outputQueues。我們生成一個新的 start_tracker 程序,傳遞邊界框、標籤、rgb 影象和 iq + oq。
我們還繪製了檢測到的物件的邊界框rectangle和類標籤label。
否則,我們已經執行了物件檢測,因此我們需要將每個 dlib 物件跟蹤器應用於幀:
# otherwise, we've already performed detection so let's track # multiple objects else: # loop over each of our input ques and add the input RGB # frame to it, enabling us to update each of the respective # object trackers running in separate processes for iq in inputQueues: iq.put(rgb) # loop over each of the output queues for oq in outputQueues: # grab the updated bounding box coordinates for the # object -- the .get method is a blocking operation so # this will pause our execution until the respective # process finishes the tracking update (label, (startX, startY, endX, endY)) = oq.get() # draw the bounding box from the correlation object # tracker cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
遍歷每個 inputQueues ,我們將 rgb 影象新增到它們。然後我們遍歷每個outputQueues,從每個獨立的物件跟蹤器獲取邊界框座標。最後,我們繪製邊界框+關聯的類標籤label。
# check to see if we should write the frame to disk if writer is not None: writer.write(frame) # show the output frame cv2.imshow("Frame", frame) key = cv2.waitKey(1) & 0xFF # if the `q` key was pressed, break from the loop if key == ord("q"): break # update the FPS counter fps.update() # stop the timer and display FPS information fps.stop() print("[INFO] elapsed time: {:.2f}".format(fps.elapsed())) print("[INFO] approx. FPS: {:.2f}".format(fps.fps())) # check to see if we need to release the video writer pointer if writer is not None: writer.release() # do a bit of cleanup cv2.destroyAllWindows() vs.release()
如有必要,我們將幀寫入輸出視訊,並將幀顯示到螢幕。 如果按下q鍵,我們退出,跳出迴圈。 如果我們繼續處理幀,我們的 FPS 計算器會更新,然後我們再次在 while 迴圈的開頭開始處理。 否則,我們處理完幀,我們顯示 FPS 資訊 + 釋放指標並關閉視窗。
開啟終端並執行以下命令:
$ python multi_object_tracking_fast.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel --video race.mp4 --output race_output_fast.avi [INFO] loading model... [INFO] starting video stream... [INFO] elapsed time: 14.01 [INFO] approx. FPS: 24.26
如您所見,我們更快、更高效的多物件跟蹤器以 24 FPS 執行,比我們之前的實現提高了 45% 以上?! 此外,如果您在此指令碼執行時開啟活動監視器,您將看到更多系統的CPU 正在被使用。 這種加速是通過允許每個 dlib 物件跟蹤器在單獨的程序中執行來獲得的,這反過來又使您的作業系統能夠執行更有效的 CPU 資源排程。
multi_object_tracking_slow.py
# USAGE # python multi_object_tracking_slow.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt # --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel --video race.mp4 # import the necessary packages from imutils.video import FPS import numpy as np import argparse import imutils import dlib import cv2 # construct the argument parser and parse the arguments ap = argparse.ArgumentParser() ap.add_argument("-p", "--prototxt", required=True, help="path to Caffe 'deploy' prototxt file") ap.add_argument("-m", "--model", required=True, help="path to Caffe pre-trained model") # ap.add_argument("-v", "--video", required=True, # help="path to input video file") ap.add_argument("-v", "--video", help="path to input video file") ap.add_argument("-o", "--output", type=str, help="path to optional output video file") ap.add_argument("-c", "--confidence", type=float, default=0.2, help="minimum probability to filter weak detections") args = vars(ap.parse_args()) # initialize the list of class labels MobileNet SSD was trained to # detect CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"] # load our serialized model from disk print("[INFO] loading model...") net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"]) # initialize the video stream and output video writer print("[INFO] starting video stream...") # vs = cv2.VideoCapture(args["video"]) vs = cv2.VideoCapture(0) writer = None # initialize the list of object trackers and corresponding class # labels trackers = [] labels = [] # start the frames per second throughput estimator fps = FPS().start() # loop over frames from the video file stream while True: # grab the next frame from the video file (grabbed, frame) = vs.read() # check to see if we have reached the end of the video file if frame is None: break # resize the frame for faster processing and then convert the # frame from BGR to RGB ordering (dlib needs RGB ordering) frame = imutils.resize(frame, width=600) rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # if we are supposed to be writing a video to disk, initialize # the writer if args["output"] is not None and writer is None: fourcc = cv2.VideoWriter_fourcc(*"MJPG") writer = cv2.VideoWriter(args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True) # if there are no object trackers we first need to detect objects # and then create a tracker for each object if len(trackers) == 0: # grab the frame dimensions and convert the frame to a blob (h, w) = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5) # pass the blob through the network and obtain the detections # and predictions net.setInput(blob) detections = net.forward() # loop over the detections for i in np.arange(0, detections.shape[2]): # extract the confidence (i.e., probability) associated # with the prediction confidence = detections[0, 0, i, 2] # filter out weak detections by requiring a minimum # confidence if confidence > args["confidence"]: # extract the index of the class label from the # detections list idx = int(detections[0, 0, i, 1]) label = CLASSES[idx] # if the class label is not a person, ignore it if CLASSES[idx] != "person": continue # compute the (x, y)-coordinates of the bounding box # for the object box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") # construct a dlib rectangle object from the bounding # box coordinates and start the correlation tracker t = dlib.correlation_tracker() rect = dlib.rectangle(startX, startY, endX, endY) t.start_track(rgb, rect) # update our set of trackers and corresponding class # labels labels.append(label) trackers.append(t) # grab the corresponding class label for the detection # and draw the bounding box cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2) # otherwise, we've already performed detection so let's track # multiple objects else: # loop over each of the trackers for (t, l) in zip(trackers, labels): # update the tracker and grab the position of the tracked # object t.update(rgb) pos = t.get_position() # unpack the position object startX = int(pos.left()) startY = int(pos.top()) endX = int(pos.right()) endY = int(pos.bottom()) # draw the bounding box from the correlation object tracker cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, l, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2) # check to see if we should write the frame to disk if writer is not None: writer.write(frame) # show the output frame cv2.imshow("Frame", frame) key = cv2.waitKey(1) & 0xFF # if the `q` key was pressed, break from the loop if key == ord("q"): break # update the FPS counter fps.update() # stop the timer and display FPS information fps.stop() print("[INFO] elapsed time: {:.2f}".format(fps.elapsed())) print("[INFO] approx. FPS: {:.2f}".format(fps.fps())) # check to see if we need to release the video writer pointer if writer is not None: writer.release() # do a bit of cleanup cv2.destroyAllWindows() vs.release()
multi_object_tracking_fast.py
# USAGE # python multi_object_tracking_fast.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt # --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel --video race.mp4 # import the necessary packages from imutils.video import FPS import multiprocessing import numpy as np import argparse import imutils import dlib import cv2 def start_tracker(box, label, rgb, inputQueue, outputQueue): # construct a dlib rectangle object from the bounding box # coordinates and then start the correlation tracker t = dlib.correlation_tracker() rect = dlib.rectangle(box[0], box[1], box[2], box[3]) t.start_track(rgb, rect) # loop indefinitely -- this function will be called as a daemon # process so we don't need to worry about joining it while True: # attempt to grab the next frame from the input queue rgb = inputQueue.get() # if there was an entry in our queue, process it if rgb is not None: # update the tracker and grab the position of the tracked # object t.update(rgb) pos = t.get_position() # unpack the position object startX = int(pos.left()) startY = int(pos.top()) endX = int(pos.right()) endY = int(pos.bottom()) # add the label + bounding box coordinates to the output # queue outputQueue.put((label, (startX, startY, endX, endY))) # construct the argument parser and parse the arguments ap = argparse.ArgumentParser() ap.add_argument("-p", "--prototxt", required=True, help="path to Caffe 'deploy' prototxt file") ap.add_argument("-m", "--model", required=True, help="path to Caffe pre-trained model") ap.add_argument("-v", "--video", required=True, help="path to input video file") ap.add_argument("-o", "--output", type=str, help="path to optional output video file") ap.add_argument("-c", "--confidence", type=float, default=0.2, help="minimum probability to filter weak detections") args = vars(ap.parse_args()) # initialize our list of queues -- both input queue and output queue # for *every* object that we will be tracking inputQueues = [] outputQueues = [] # initialize the list of class labels MobileNet SSD was trained to # detect CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"] # load our serialized model from disk print("[INFO] loading model...") net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"]) # initialize the video stream and output video writer print("[INFO] starting video stream...") vs = cv2.VideoCapture(args["video"]) writer = None # start the frames per second throughput estimator fps = FPS().start() # loop over frames from the video file stream while True: # grab the next frame from the video file (grabbed, frame) = vs.read() # check to see if we have reached the end of the video file if frame is None: break # resize the frame for faster processing and then convert the # frame from BGR to RGB ordering (dlib needs RGB ordering) frame = imutils.resize(frame, width=600) rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # if we are supposed to be writing a video to disk, initialize # the writer if args["output"] is not None and writer is None: fourcc = cv2.VideoWriter_fourcc(*"MJPG") writer = cv2.VideoWriter(args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True) # if our list of queues is empty then we know we have yet to # create our first object tracker if len(inputQueues) == 0: # grab the frame dimensions and convert the frame to a blob (h, w) = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5) # pass the blob through the network and obtain the detections # and predictions net.setInput(blob) detections = net.forward() # loop over the detections for i in np.arange(0, detections.shape[2]): # extract the confidence (i.e., probability) associated # with the prediction confidence = detections[0, 0, i, 2] # filter out weak detections by requiring a minimum # confidence if confidence > args["confidence"]: # extract the index of the class label from the # detections list idx = int(detections[0, 0, i, 1]) label = CLASSES[idx] # if the class label is not a person, ignore it if CLASSES[idx] != "person": continue # compute the (x, y)-coordinates of the bounding box # for the object box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") bb = (startX, startY, endX, endY) # create two brand new input and output queues, # respectively iq = multiprocessing.Queue() oq = multiprocessing.Queue() inputQueues.append(iq) outputQueues.append(oq) # spawn a daemon process for a new object tracker p = multiprocessing.Process( target=start_tracker, args=(bb, label, rgb, iq, oq)) p.daemon = True p.start() # grab the corresponding class label for the detection # and draw the bounding box cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2) # otherwise, we've already performed detection so let's track # multiple objects else: # loop over each of our input ques and add the input RGB # frame to it, enabling us to update each of the respective # object trackers running in separate processes for iq in inputQueues: iq.put(rgb) # loop over each of the output queues for oq in outputQueues: # grab the updated bounding box coordinates for the # object -- the .get method is a blocking operation so # this will pause our execution until the respective # process finishes the tracking update (label, (startX, startY, endX, endY)) = oq.get() # draw the bounding box from the correlation object # tracker cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2) # check to see if we should write the frame to disk if writer is not None: writer.write(frame) # show the output frame cv2.imshow("Frame", frame) key = cv2.waitKey(1) & 0xFF # if the `q` key was pressed, break from the loop if key == ord("q"): break # update the FPS counter fps.update() # stop the timer and display FPS information fps.stop() print("[INFO] elapsed time: {:.2f}".format(fps.elapsed())) print("[INFO] approx. FPS: {:.2f}".format(fps.fps())) # check to see if we need to release the video writer pointer if writer is not None: writer.release() # do a bit of cleanup cv2.destroyAllWindows() vs.release()
連結:https://pan.baidu.com/s/1WhJr-Qxh5Wu3TsXKRiTHRg 提取碼:1234
我今天與大家分享的 dlib 多物件跟蹤 Python 指令碼可以很好地處理較短的視訊流;但是,如果您打算將此實現用於長時間執行的生產環境(大約數小時到數天的視訊),我建議您進行兩項主要改進:
第一個改進是利用程序池,而不是為每個要跟蹤的物件生成一個全新的程序。今天在這裡介紹的實現為我們需要跟蹤的每個物件構建了一個全新的佇列Queue和程序Process。
對於今天的目的來說這很好,但考慮一下如果您想跟蹤視訊中的 50 個物件——這意味著您將生成 50 個程序,每個物件一個。那時,系統管理所有這些程序的開銷將破壞 FPS 的任何增加。相反,您可能希望利用程序池。
如果您的系統有 N 個處理器核心,那麼您需要建立一個包含 N – 1 個程序的池,將一個核心留給您的作業系統來執行系統操作。這些程序中的每一個都應該執行多個物件跟蹤,維護一個物件跟蹤器列表,類似於我們今天介紹的第一個多物件跟蹤。
這種改進將允許您利用處理器的所有核心,而無需產生許多獨立程序的開銷。
我要做的第二個改進是清理程序和佇列。如果 dlib 將物件報告為“丟失”或“消失”,我們不會從 start_tracker 函數返回,這意味著該程序將在父指令碼的生命週期記憶體活,並且僅在父指令碼退出時被終止。
同樣,這對於我們今天的目的來說很好,但是如果您打算在生產環境中使用此程式碼,您應該:
未能執行此清理將導致長時間執行作業的不必要的計算消耗和記憶體開銷。
第三個改進是通過每 N 幀執行一次物件檢測器(而不是在開始時只執行一次)來提高跟蹤精度。
實際上,我在使用 OpenCV 計數的文章中演示了這一點。它需要更多的邏輯和思考,但會產生更準確的跟蹤器。 我選擇放棄這個指令碼的實現,這樣我就可以簡明地教你多處理方法。 理想情況下,除了多處理之外,您還可以使用第三個改進。
以上就是Python OpenCV使用dlib進行多目標跟蹤詳解的詳細內容,更多關於OpenCV dlib多目標跟蹤的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45