首頁 > 軟體

Python OpenCV使用dlib進行多目標跟蹤詳解

2022-03-14 13:01:37

在本教學中,您將學習如何使用 dlib 庫在實時視訊中有效地跟蹤多個物件。

我們當然可以使用 dlib 跟蹤多個物件;但是,為了獲得可能的最佳效能,我們需要利用多處理並將物件跟蹤器分佈在處理器的多個核心上。

正確利用多處理使我們能夠將 dlib 多物件跟蹤每秒幀數 (FPS) 提高 45% 以上!

1.使用 dlib 進行多目標跟蹤

在本指南的第一部分,我將演示如何實現一個簡單、樸素的 dlib 多物件跟蹤指令碼。該程式將跟蹤視訊中的多個物件;但是,我們會注意到指令碼執行速度有點慢。 為了提高我們的 FPS,我將向您展示一個更快、更高效的 dlib 多物件跟蹤器實現。 最後,我將討論一些改進和建議,以增強我們的多物件跟蹤實現。

2.專案結構

你可以使用tree命令檢視我們的專案結構:

mobilenet_ssd/ 目錄包含我們的 MobileNet + SSD Caffe 模型檔案,它允許我們檢測人(以及其他物件)。 今天我們將回顧兩個 Python 指令碼:

  • multi_object_tracking_slow.py:dlib 多物件跟蹤的簡單“樸素”方法。
  • multi_object_tracking_fast.py:利用多處理的先進、快速的方法。

3.dlib 多物件跟蹤的簡單“樸素”方法

我們今天要介紹的第一個 dlib 多物件跟蹤實現是“樸素的”,因為它將:

1.使用一個簡單的跟蹤器物件列表。

2.僅使用我們處理器的單個核心按順序更新每個跟蹤器。

對於某些物件跟蹤任務,此實現將綽綽有餘;然而,為了優化我們的 FPS,我們應該將物件跟蹤器分佈在多個程序中。

我們將從本節中的簡單實現開始,然後在下一節中轉到更快的方法。 首先,開啟multi_object_tracking_slow.py 指令碼並插入以下程式碼:

# import the necessary packages
from imutils.video import FPS
import numpy as np
import argparse
import imutils
import dlib
import cv2

讓我們解析我們的命令列引數:

# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-p", "--prototxt", required=True,
    help="path to Caffe 'deploy' prototxt file")
ap.add_argument("-m", "--model", required=True,
    help="path to Caffe pre-trained model")
ap.add_argument("-v", "--video", required=True,
    help="path to input video file")
ap.add_argument("-o", "--output", type=str,
    help="path to optional output video file")
ap.add_argument("-c", "--confidence", type=float, default=0.2,
    help="minimum probability to filter weak detections")
args = vars(ap.parse_args())

我們的指令碼在執行時處理以下命令列引數:

  • --prototxt :Caffe 部署 prototxt 檔案的路徑。
  • --model : prototxt 附帶的模型檔案的路徑。
  • --video : 輸入視訊檔的路徑。我們將在此視訊中使用 dlib 執行多物件跟蹤。
  • --output :輸出視訊檔的可選路徑。如果未指定路徑,則不會將視訊輸出到磁碟。我建議輸出到 .avi 或 .mp4 檔案。
  • --confidence :物件檢測置信度閾值 ,預設是0.2 ,該值表示從物件檢測器過濾弱檢測的最小概率。

讓我們定義這個模型支援的類列表,並從磁碟載入我們的模型:

# initialize the list of class labels MobileNet SSD was trained to
# detect
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
    "bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
    "dog", "horse", "motorbike", "person", "pottedplant", "sheep",
    "sofa", "train", "tvmonitor"]
# load our serialized model from disk
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])

我們只關心今天的賽跑範例中的“人”類,但您可以輕鬆修改以跟蹤其他類。 我們載入了預訓練的物件檢測器模型。我們將使用我們預訓練的 SSD 來檢測視訊中物體的存在。我們將建立一個 dlib 物件跟蹤器來跟蹤每個檢測到的物件。

我們還有一些初始化要執行:

# initialize the video stream and output video writer
print("[INFO] starting video stream...")
vs = cv2.VideoCapture(args["video"])
writer = None
# initialize the list of object trackers and corresponding class
# labels
trackers = []
labels = []
# start the frames per second throughput estimator
fps = FPS().start()

我們初始化我們的視訊流——我們將從輸入視訊中一次讀取一個幀。 隨後,我們的視訊writer被初始化為 None 。在即將到來的 while 迴圈中,我們將與視訊writer進行更多合作。 現在初始化我們的跟蹤器和標籤列表。 最後,開始我們的每秒幀數計數器。 我們都準備好開始處理視訊了:

# loop over frames from the video file stream
while True:
    # grab the next frame from the video file
    (grabbed, frame) = vs.read()
    # check to see if we have reached the end of the video file
    if frame is None:
        break
    # resize the frame for faster processing and then convert the
    # frame from BGR to RGB ordering (dlib needs RGB ordering)
    frame = imutils.resize(frame, width=600)
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    # if we are supposed to be writing a video to disk, initialize
    # the writer
    if args["output"] is not None and writer is None:
        fourcc = cv2.VideoWriter_fourcc(*"MJPG")
        writer = cv2.VideoWriter(args["output"], fourcc, 30,
            (frame.shape[1], frame.shape[0]), True)

將幀調整為600畫素寬,保持高寬比。然後,為了dlib相容性,幀被轉換為RGB顏色通道排序(OpenCV的預設值是BGR,而dlib的預設值是RGB)。

讓我們開始物件檢測階段:

    # if there are no object trackers we first need to detect objects
    # and then create a tracker for each object
    if len(trackers) == 0:
        # grab the frame dimensions and convert the frame to a blob
        (h, w) = frame.shape[:2]
        blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5)
        # pass the blob through the network and obtain the detections
        # and predictions
        net.setInput(blob)
        detections = net.forward()

為了執行物件跟蹤,我們必須首先執行物件檢測

  • 手動,通過停止視訊流並手動選擇每個物件的邊界框。
  • 以程式設計方式,使用經過訓練的物件檢測器來檢測物件的存在(這就是我們在這裡所做的)。

如果沒有物件跟蹤器,那麼我們知道我們還沒有執行物件檢測。

我們建立並通過 SSD 網路傳遞一個 blob 以檢測物件。

接下來,我們繼續迴圈檢測以查詢屬於person類的物件,因為我們的輸入視訊是人類的賽跑:

        # loop over the detections
        for i in np.arange(0, detections.shape[2]):
            # extract the confidence (i.e., probability) associated
            # with the prediction
            confidence = detections[0, 0, i, 2]
            # filter out weak detections by requiring a minimum
            # confidence
            if confidence > args["confidence"]:
                # extract the index of the class label from the
                # detections list
                idx = int(detections[0, 0, i, 1])
                label = CLASSES[idx]
                # if the class label is not a person, ignore it
                if CLASSES[idx] != "person":
                    continue

我們開始迴圈檢測,其中我們:

  • 過濾掉弱檢測。
  • 確保每個檢測都是一個person。當然,您可以刪除這行程式碼或根據您自己的過濾需求對其進行自定義。

現在我們已經在框架中定位了每個person,讓我們範例化我們的跟蹤器並繪製我們的初始邊界框 + 類標籤:

                # compute the (x, y)-coordinates of the bounding box
                # for the object
                box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
                (startX, startY, endX, endY) = box.astype("int")
                # construct a dlib rectangle object from the bounding
                # box coordinates and start the correlation tracker
                t = dlib.correlation_tracker()
                rect = dlib.rectangle(startX, startY, endX, endY)
                t.start_track(rgb, rect)
                # update our set of trackers and corresponding class
                # labels
                labels.append(label)
                trackers.append(t)
                # grab the corresponding class label for the detection
                # and draw the bounding box
                cv2.rectangle(frame, (startX, startY), (endX, endY),
                    (0, 255, 0), 2)
                cv2.putText(frame, label, (startX, startY - 15),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

要開始跟蹤物件,我們:

  • 計算每個檢測到的物件的邊界框。
  • 範例化邊界框座標並將其傳遞給跟蹤器。邊界框在這裡尤為重要。我們需要為邊界框建立一個 dlib.rectangle 並將其傳遞給 start_track 方法。然後,dlib 可以開始跟蹤物件。
  • 最後,我們用單個跟蹤器填充trackers列表。

因此,在下一個程式碼塊中,我們將處理已經建立跟蹤器並且只需要更新位置的情況。 我們在初始檢測步驟中執行了兩個額外的任務:

  • 將類標籤附加到標籤列表。如果您要跟蹤多種型別的物件(例如dog+person),您可能希望知道每個物件的型別。
  • 在物件周圍繪製每個邊界框矩形和類標籤。

如果我們的檢測列表的長度大於0,我們就知道我們處於目標跟蹤階段:

    # otherwise, we've already performed detection so let's track
    # multiple objects
    else:
        # loop over each of the trackers
        for (t, l) in zip(trackers, labels):
            # update the tracker and grab the position of the tracked
            # object
            t.update(rgb)
            pos = t.get_position()
            # unpack the position object
            startX = int(pos.left())
            startY = int(pos.top())
            endX = int(pos.right())
            endY = int(pos.bottom())
            # draw the bounding box from the correlation object tracker
            cv2.rectangle(frame, (startX, startY), (endX, endY),
                (0, 255, 0), 2)
            cv2.putText(frame, l, (startX, startY - 15),
                cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

在目標跟蹤階段,我們遍歷所有trackers和相應的labels。然後我們繼續update每個物件的位置。為了更新位置,我們只需傳遞 rgb 影象。

提取邊界框座標後,我們可以為每個被跟蹤物件繪製一個邊界框rectangle和label。

幀處理迴圈中的其餘步驟涉及寫入輸出視訊(如有必要)並顯示結果:

    # check to see if we should write the frame to disk
    if writer is not None:
        writer.write(frame)
    # show the output frame
    cv2.imshow("Frame", frame)
    key = cv2.waitKey(1) & 0xFF
    # if the `q` key was pressed, break from the loop
    if key == ord("q"):
        break
    # update the FPS counter
    fps.update()

在這裡,我們:

  • 如有必要,將frame寫入視訊。
  • 顯示輸出幀並捕獲按鍵。如果按下q鍵(退出),我們就會跳出迴圈。 最後,我們更新我們的每秒幀數資訊以進行基準測試。

剩下的步驟是在終端列印FPS資訊並釋放指標:

# stop the timer and display FPS information
fps.stop()
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))
# check to see if we need to release the video writer pointer
if writer is not None:
    writer.release()
# do a bit of cleanup
cv2.destroyAllWindows()
vs.release()

讓我們評估準確性和效能。開啟終端並執行以下命令:

$ python multi_object_tracking_slow.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt 
    --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel 
    --video race.mp4 --output race_output_slow.avi
[INFO] loading model...
[INFO] starting video stream...
[INFO] elapsed time: 24.51
[INFO] approx. FPS: 13.87

看來我們的多目標跟蹤器起作用了!

但正如你所看到的,我們只獲得了約13幀/秒。

對於某些應用程式來說,這個FPS可能已經足夠了——然而,如果你需要更快的FPS,我建議你看看下面我們更高效的dlib多物件跟蹤器。其次,要明白跟蹤的準確性並不完美。

4.快速、高效的 dlib 多物件跟蹤實現

如果您執行上一節中的 dlib 多物件跟蹤指令碼並同時開啟系統的監視器,您會注意到只使用了處理器的一個核心。

如果您執行上一節中的 dlib 多物件跟蹤指令碼並同時開啟系統的活動監視器,您會注意到只使用了處理器的一個核心。

利用程序使我們的作業系統能夠執行更好的程序排程,將程序對映到我們機器上的特定處理器核心(大多數現代作業系統能夠以並行方式有效地排程使用大量 CPU 的程序)。

繼續開啟 mutli_object_tracking_fast.py 並插入以下程式碼:

# import the necessary packages
from imutils.video import FPS
import multiprocessing
import numpy as np
import argparse
import imutils
import dlib
import cv2

我們將使用 Python Process 類來生成一個新程序——每個新程序都獨立於原始程序。

為了生成這個程序,我們需要提供一個 Python 可以呼叫的函數,然後 Python 將使用該函數並建立一個全新的程序並執行它:

def start_tracker(box, label, rgb, inputQueue, outputQueue):
    # construct a dlib rectangle object from the bounding box
    # coordinates and then start the correlation tracker
    t = dlib.correlation_tracker()
    rect = dlib.rectangle(box[0], box[1], box[2], box[3])
    t.start_track(rgb, rect)

start_tracker 的前三個引數包括:

  • box :我們要跟蹤的物件的邊界框座標,可能是由某種物件檢測器返回的,無論是手動的還是程式設計的。
  • label :物件的人類可讀標籤。
  • rgb :我們將用於啟動初始 dlib 物件跟蹤器的 RGB 影象。

請記住Python多處理是如何工作的——Python將呼叫這個函數,然後建立一個全新的直譯器來執行其中的程式碼。因此,每個生成的start_tracker程序都將獨立於它的父程序。為了與Python驅動程式指令碼通訊,我們需要利用管道或佇列(Pipes and Queues)。這兩種型別的物件都是執行緒/程序安全的,使用鎖和號誌來完成。

本質上,我們正在建立一個簡單的生產者/消費者關係:

  • 我們的父程序將生成新幀並將它們新增到特定物件跟蹤器的佇列中。
  • 然後子程序將消耗幀,應用物件跟蹤,然後返回更新的邊界框座標。

我決定在這篇文章中使用 Queue 物件;但是,請記住,如果您願意,也可以使用Pipe

現在讓我們開始一個無限迴圈,它將在程序中執行:

    # loop indefinitely -- this function will be called as a daemon
    # process so we don't need to worry about joining it
    while True:
        # attempt to grab the next frame from the input queue
        rgb = inputQueue.get()
        # if there was an entry in our queue, process it
        if rgb is not None:
            # update the tracker and grab the position of the tracked
            # object
            t.update(rgb)
            pos = t.get_position()
            # unpack the position object
            startX = int(pos.left())
            startY = int(pos.top())
            endX = int(pos.right())
            endY = int(pos.bottom())
            # add the label + bounding box coordinates to the output
            # queue
            outputQueue.put((label, (startX, startY, endX, endY)))

我們在這裡無限迴圈——這個函數將作為守護行程呼叫,所以我們不需要擔心加入它。

首先,我們將嘗試從 inputQueue 中抓取一個新幀。如果幀不為空,我們將抓取幀,然後更新物件跟蹤器,讓我們獲得更新後的邊界框座標。

最後,我們將標籤和邊界框寫入 outputQueue,以便父程序可以在指令碼的主迴圈中使用它們。

回到父程序,我們將解析命令列引數:

# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-p", "--prototxt", required=True,
    help="path to Caffe 'deploy' prototxt file")
ap.add_argument("-m", "--model", required=True,
    help="path to Caffe pre-trained model")
ap.add_argument("-v", "--video", required=True,
    help="path to input video file")
ap.add_argument("-o", "--output", type=str,
    help="path to optional output video file")
ap.add_argument("-c", "--confidence", type=float, default=0.2,
    help="minimum probability to filter weak detections")
args = vars(ap.parse_args())

此指令碼的命令列引數與我們較慢的非多處理指令碼完全相同。

讓我們初始化我們的輸入和輸出佇列:

# initialize our lists of queues -- both input queue and output queue
# for *every* object that we will be tracking
inputQueues = []
outputQueues = []

這些佇列將儲存我們正在跟蹤的物件。生成的每個程序都需要兩個 Queue 物件:

  • 一個從其中讀取輸入幀
  • 另一個將結果寫入

下一個程式碼塊與我們之前的指令碼相同:

# initialize the list of class labels MobileNet SSD was trained to
# detect
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
    "bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
    "dog", "horse", "motorbike", "person", "pottedplant", "sheep",
    "sofa", "train", "tvmonitor"]
# load our serialized model from disk
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])
# initialize the video stream and output video writer
print("[INFO] starting video stream...")
vs = cv2.VideoCapture(args["video"])
writer = None
# start the frames per second throughput estimator
fps = FPS().start()

我們定義模型的 CLASSES 並載入模型本身。

現在讓我們開始迴圈視訊流中的幀:

# loop over frames from the video file stream
while True:
    # grab the next frame from the video file
    (grabbed, frame) = vs.read()
    # check to see if we have reached the end of the video file
    if frame is None:
        break
    # resize the frame for faster processing and then convert the
    # frame from BGR to RGB ordering (dlib needs RGB ordering)
    frame = imutils.resize(frame, width=600)
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    # if we are supposed to be writing a video to disk, initialize
    # the writer
    if args["output"] is not None and writer is None:
        fourcc = cv2.VideoWriter_fourcc(*"MJPG")
        writer = cv2.VideoWriter(args["output"], fourcc, 30,
            (frame.shape[1], frame.shape[0]), True)

現在讓我們處理沒有 inputQueues 的情況:

    # if our list of queues is empty then we know we have yet to
    # create our first object tracker
    if len(inputQueues) == 0:
        # grab the frame dimensions and convert the frame to a blob
        (h, w) = frame.shape[:2]
        blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5)
        # pass the blob through the network and obtain the detections
        # and predictions
        net.setInput(blob)
        detections = net.forward()
        # loop over the detections
        for i in np.arange(0, detections.shape[2]):
            # extract the confidence (i.e., probability) associated
            # with the prediction
            confidence = detections[0, 0, i, 2]
            # filter out weak detections by requiring a minimum
            # confidence
            if confidence > args["confidence"]:
                # extract the index of the class label from the
                # detections list
                idx = int(detections[0, 0, i, 1])
                label = CLASSES[idx]
                # if the class label is not a person, ignore it
                if CLASSES[idx] != "person":
                    continue

如果沒有 inputQueues,那麼我們需要在物件跟蹤之前應用物件檢測。 我們應用物件檢測,然後繼續迴圈。我們獲取置信度值並過濾掉弱檢測。 如果我們的置信度滿足我們的命令列引數建立的閾值,我們會考慮檢測,但我們會通過類標籤進一步過濾掉它。在這種情況下,我們只尋找person物件。 假設我們找到了一個person,我們將建立佇列和生成跟蹤程序:

                # compute the (x, y)-coordinates of the bounding box
                # for the object
                box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
                (startX, startY, endX, endY) = box.astype("int")
                bb = (startX, startY, endX, endY)
                # create two brand new input and output queues,
                # respectively
                iq = multiprocessing.Queue()
                oq = multiprocessing.Queue()
                inputQueues.append(iq)
                outputQueues.append(oq)
                # spawn a daemon process for a new object tracker
                p = multiprocessing.Process(
                    target=start_tracker,
                    args=(bb, label, rgb, iq, oq))
                p.daemon = True
                p.start()
                # grab the corresponding class label for the detection
                # and draw the bounding box
                cv2.rectangle(frame, (startX, startY), (endX, endY),
                    (0, 255, 0), 2)
                cv2.putText(frame, label, (startX, startY - 15),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

我們首先計算邊界框座標。從那裡我們建立兩個新佇列 iq 和 oq,分別將它們附加到 inputQueues 和 outputQueues。我們生成一個新的 start_tracker 程序,傳遞邊界框、標籤、rgb 影象和 iq + oq。

我們還繪製了檢測到的物件的邊界框rectangle和類標籤label。

否則,我們已經執行了物件檢測,因此我們需要將每個 dlib 物件跟蹤器應用於幀:

    # otherwise, we've already performed detection so let's track
    # multiple objects
    else:
        # loop over each of our input ques and add the input RGB
        # frame to it, enabling us to update each of the respective
        # object trackers running in separate processes
        for iq in inputQueues:
            iq.put(rgb)
        # loop over each of the output queues
        for oq in outputQueues:
            # grab the updated bounding box coordinates for the
            # object -- the .get method is a blocking operation so
            # this will pause our execution until the respective
            # process finishes the tracking update
            (label, (startX, startY, endX, endY)) = oq.get()
            # draw the bounding box from the correlation object
            # tracker
            cv2.rectangle(frame, (startX, startY), (endX, endY),
                (0, 255, 0), 2)
            cv2.putText(frame, label, (startX, startY - 15),
                cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

遍歷每個 inputQueues ,我們將 rgb 影象新增到它們。然後我們遍歷每個outputQueues,從每個獨立的物件跟蹤器獲取邊界框座標。最後,我們繪製邊界框+關聯的類標籤label。

    # check to see if we should write the frame to disk
    if writer is not None:
        writer.write(frame)
    # show the output frame
    cv2.imshow("Frame", frame)
    key = cv2.waitKey(1) & 0xFF
    # if the `q` key was pressed, break from the loop
    if key == ord("q"):
        break
    # update the FPS counter
    fps.update()
# stop the timer and display FPS information
fps.stop()
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))
# check to see if we need to release the video writer pointer
if writer is not None:
    writer.release()
# do a bit of cleanup
cv2.destroyAllWindows()
vs.release()

如有必要,我們將幀寫入輸出視訊,並將幀顯示到螢幕。 如果按下q鍵,我們退出,跳出迴圈。 如果我們繼續處理幀,我們的 FPS 計算器會更新,然後我們再次在 while 迴圈的開頭開始處理。 否則,我們處理完幀,我們顯示 FPS 資訊 + 釋放指標並關閉視窗。

開啟終端並執行以下命令:

$ python multi_object_tracking_fast.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt 
    --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel 
    --video race.mp4 --output race_output_fast.avi
[INFO] loading model...
[INFO] starting video stream...
[INFO] elapsed time: 14.01
[INFO] approx. FPS: 24.26

如您所見,我們更快、更高效的多物件跟蹤器以 24 FPS 執行,比我們之前的實現提高了 45% 以上?! 此外,如果您在此指令碼執行時開啟活動監視器,您將看到更多系統的CPU 正在被使用。 這種加速是通過允許每個 dlib 物件跟蹤器在單獨的程序中執行來獲得的,這反過來又使您的作業系統能夠執行更有效的 CPU 資源排程。

5.完整程式碼

multi_object_tracking_slow.py

# USAGE
# python multi_object_tracking_slow.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt 
# 	--model mobilenet_ssd/MobileNetSSD_deploy.caffemodel --video race.mp4

# import the necessary packages
from imutils.video import FPS
import numpy as np
import argparse
import imutils
import dlib
import cv2

# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-p", "--prototxt", required=True,
	help="path to Caffe 'deploy' prototxt file")
ap.add_argument("-m", "--model", required=True,
	help="path to Caffe pre-trained model")
# ap.add_argument("-v", "--video", required=True,
# 	help="path to input video file")
ap.add_argument("-v", "--video",
	help="path to input video file")
ap.add_argument("-o", "--output", type=str,
	help="path to optional output video file")
ap.add_argument("-c", "--confidence", type=float, default=0.2,
	help="minimum probability to filter weak detections")
args = vars(ap.parse_args())

# initialize the list of class labels MobileNet SSD was trained to
# detect
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
	"bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
	"dog", "horse", "motorbike", "person", "pottedplant", "sheep",
	"sofa", "train", "tvmonitor"]

# load our serialized model from disk
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])

# initialize the video stream and output video writer
print("[INFO] starting video stream...")
# vs = cv2.VideoCapture(args["video"])
vs = cv2.VideoCapture(0)
writer = None

# initialize the list of object trackers and corresponding class
# labels
trackers = []
labels = []

# start the frames per second throughput estimator
fps = FPS().start()

# loop over frames from the video file stream
while True:
	# grab the next frame from the video file
	(grabbed, frame) = vs.read()

	# check to see if we have reached the end of the video file
	if frame is None:
		break

	# resize the frame for faster processing and then convert the
	# frame from BGR to RGB ordering (dlib needs RGB ordering)
	frame = imutils.resize(frame, width=600)
	rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

	# if we are supposed to be writing a video to disk, initialize
	# the writer
	if args["output"] is not None and writer is None:
		fourcc = cv2.VideoWriter_fourcc(*"MJPG")
		writer = cv2.VideoWriter(args["output"], fourcc, 30,
			(frame.shape[1], frame.shape[0]), True)

	# if there are no object trackers we first need to detect objects
	# and then create a tracker for each object
	if len(trackers) == 0:
		# grab the frame dimensions and convert the frame to a blob
		(h, w) = frame.shape[:2]
		blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5)

		# pass the blob through the network and obtain the detections
		# and predictions
		net.setInput(blob)
		detections = net.forward()

		# loop over the detections
		for i in np.arange(0, detections.shape[2]):
			# extract the confidence (i.e., probability) associated
			# with the prediction
			confidence = detections[0, 0, i, 2]

			# filter out weak detections by requiring a minimum
			# confidence
			if confidence > args["confidence"]:
				# extract the index of the class label from the
				# detections list
				idx = int(detections[0, 0, i, 1])
				label = CLASSES[idx]

				# if the class label is not a person, ignore it
				if CLASSES[idx] != "person":
					continue

				# compute the (x, y)-coordinates of the bounding box
				# for the object
				box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
				(startX, startY, endX, endY) = box.astype("int")

				# construct a dlib rectangle object from the bounding
				# box coordinates and start the correlation tracker
				t = dlib.correlation_tracker()
				rect = dlib.rectangle(startX, startY, endX, endY)
				t.start_track(rgb, rect)

				# update our set of trackers and corresponding class
				# labels
				labels.append(label)
				trackers.append(t)

				# grab the corresponding class label for the detection
				# and draw the bounding box
				cv2.rectangle(frame, (startX, startY), (endX, endY),
					(0, 255, 0), 2)
				cv2.putText(frame, label, (startX, startY - 15),
					cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

	# otherwise, we've already performed detection so let's track
	# multiple objects
	else:
		# loop over each of the trackers
		for (t, l) in zip(trackers, labels):
			# update the tracker and grab the position of the tracked
			# object
			t.update(rgb)
			pos = t.get_position()

			# unpack the position object
			startX = int(pos.left())
			startY = int(pos.top())
			endX = int(pos.right())
			endY = int(pos.bottom())

			# draw the bounding box from the correlation object tracker
			cv2.rectangle(frame, (startX, startY), (endX, endY),
				(0, 255, 0), 2)
			cv2.putText(frame, l, (startX, startY - 15),
				cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

	# check to see if we should write the frame to disk
	if writer is not None:
		writer.write(frame)

	# show the output frame
	cv2.imshow("Frame", frame)
	key = cv2.waitKey(1) & 0xFF

	# if the `q` key was pressed, break from the loop
	if key == ord("q"):
		break

	# update the FPS counter
	fps.update()

# stop the timer and display FPS information
fps.stop()
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))

# check to see if we need to release the video writer pointer
if writer is not None:
	writer.release()

# do a bit of cleanup
cv2.destroyAllWindows()
vs.release()

multi_object_tracking_fast.py

# USAGE
# python multi_object_tracking_fast.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt 
#	--model mobilenet_ssd/MobileNetSSD_deploy.caffemodel --video race.mp4

# import the necessary packages
from imutils.video import FPS
import multiprocessing
import numpy as np
import argparse
import imutils
import dlib
import cv2

def start_tracker(box, label, rgb, inputQueue, outputQueue):
	# construct a dlib rectangle object from the bounding box
	# coordinates and then start the correlation tracker
	t = dlib.correlation_tracker()
	rect = dlib.rectangle(box[0], box[1], box[2], box[3])
	t.start_track(rgb, rect)

	# loop indefinitely -- this function will be called as a daemon
	# process so we don't need to worry about joining it
	while True:
		# attempt to grab the next frame from the input queue
		rgb = inputQueue.get()

		# if there was an entry in our queue, process it
		if rgb is not None:
			# update the tracker and grab the position of the tracked
			# object
			t.update(rgb)
			pos = t.get_position()

			# unpack the position object
			startX = int(pos.left())
			startY = int(pos.top())
			endX = int(pos.right())
			endY = int(pos.bottom())

			# add the label + bounding box coordinates to the output
			# queue
			outputQueue.put((label, (startX, startY, endX, endY)))

# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-p", "--prototxt", required=True,
	help="path to Caffe 'deploy' prototxt file")
ap.add_argument("-m", "--model", required=True,
	help="path to Caffe pre-trained model")
ap.add_argument("-v", "--video", required=True,
	help="path to input video file")
ap.add_argument("-o", "--output", type=str,
	help="path to optional output video file")
ap.add_argument("-c", "--confidence", type=float, default=0.2,
	help="minimum probability to filter weak detections")
args = vars(ap.parse_args())

# initialize our list of queues -- both input queue and output queue
# for *every* object that we will be tracking
inputQueues = []
outputQueues = []

# initialize the list of class labels MobileNet SSD was trained to
# detect
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
	"bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
	"dog", "horse", "motorbike", "person", "pottedplant", "sheep",
	"sofa", "train", "tvmonitor"]

# load our serialized model from disk
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])

# initialize the video stream and output video writer
print("[INFO] starting video stream...")
vs = cv2.VideoCapture(args["video"])
writer = None

# start the frames per second throughput estimator
fps = FPS().start()

# loop over frames from the video file stream
while True:
	# grab the next frame from the video file
	(grabbed, frame) = vs.read()

	# check to see if we have reached the end of the video file
	if frame is None:
		break

	# resize the frame for faster processing and then convert the
	# frame from BGR to RGB ordering (dlib needs RGB ordering)
	frame = imutils.resize(frame, width=600)
	rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

	# if we are supposed to be writing a video to disk, initialize
	# the writer
	if args["output"] is not None and writer is None:
		fourcc = cv2.VideoWriter_fourcc(*"MJPG")
		writer = cv2.VideoWriter(args["output"], fourcc, 30,
			(frame.shape[1], frame.shape[0]), True)

	# if our list of queues is empty then we know we have yet to
	# create our first object tracker
	if len(inputQueues) == 0:
		# grab the frame dimensions and convert the frame to a blob
		(h, w) = frame.shape[:2]
		blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5)

		# pass the blob through the network and obtain the detections
		# and predictions
		net.setInput(blob)
		detections = net.forward()

		# loop over the detections
		for i in np.arange(0, detections.shape[2]):
			# extract the confidence (i.e., probability) associated
			# with the prediction
			confidence = detections[0, 0, i, 2]

			# filter out weak detections by requiring a minimum
			# confidence
			if confidence > args["confidence"]:
				# extract the index of the class label from the
				# detections list
				idx = int(detections[0, 0, i, 1])
				label = CLASSES[idx]

				# if the class label is not a person, ignore it
				if CLASSES[idx] != "person":
					continue

				# compute the (x, y)-coordinates of the bounding box
				# for the object
				box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
				(startX, startY, endX, endY) = box.astype("int")
				bb = (startX, startY, endX, endY)

				# create two brand new input and output queues,
				# respectively
				iq = multiprocessing.Queue()
				oq = multiprocessing.Queue()
				inputQueues.append(iq)
				outputQueues.append(oq)

				# spawn a daemon process for a new object tracker
				p = multiprocessing.Process(
					target=start_tracker,
					args=(bb, label, rgb, iq, oq))
				p.daemon = True
				p.start()

				# grab the corresponding class label for the detection
				# and draw the bounding box
				cv2.rectangle(frame, (startX, startY), (endX, endY),
					(0, 255, 0), 2)
				cv2.putText(frame, label, (startX, startY - 15),
					cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

	# otherwise, we've already performed detection so let's track
	# multiple objects
	else:
		# loop over each of our input ques and add the input RGB
		# frame to it, enabling us to update each of the respective
		# object trackers running in separate processes
		for iq in inputQueues:
			iq.put(rgb)

		# loop over each of the output queues
		for oq in outputQueues:
			# grab the updated bounding box coordinates for the
			# object -- the .get method is a blocking operation so
			# this will pause our execution until the respective
			# process finishes the tracking update
			(label, (startX, startY, endX, endY)) = oq.get()

			# draw the bounding box from the correlation object
			# tracker
			cv2.rectangle(frame, (startX, startY), (endX, endY),
				(0, 255, 0), 2)
			cv2.putText(frame, label, (startX, startY - 15),
				cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

	# check to see if we should write the frame to disk
	if writer is not None:
		writer.write(frame)

	# show the output frame
	cv2.imshow("Frame", frame)
	key = cv2.waitKey(1) & 0xFF

	# if the `q` key was pressed, break from the loop
	if key == ord("q"):
		break

	# update the FPS counter
	fps.update()

# stop the timer and display FPS information
fps.stop()
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))

# check to see if we need to release the video writer pointer
if writer is not None:
	writer.release()

# do a bit of cleanup
cv2.destroyAllWindows()
vs.release()

連結:https://pan.baidu.com/s/1WhJr-Qxh5Wu3TsXKRiTHRg 提取碼:1234

6.改進和建議

我今天與大家分享的 dlib 多物件跟蹤 Python 指令碼可以很好地處理較短的視訊流;但是,如果您打算將此實現用於長時間執行的生產環境(大約數小時到數天的視訊),我建議您進行兩項主要改進:

第一個改進是利用程序池,而不是為每個要跟蹤的物件生成一個全新的程序。今天在這裡介紹的實現為我們需要跟蹤的每個物件構建了一個全新的佇列Queue和程序Process。

對於今天的目的來說這很好,但考慮一下如果您想跟蹤視訊中的 50 個物件——這意味著您將生成 50 個程序,每個物件一個。那時,系統管理所有這些程序的開銷將破壞 FPS 的任何增加。相反,您可能希望利用程序池。

如果您的系統有 N 個處理器核心,那麼您需要建立一個包含 N – 1 個程序的池,將一個核心留給您的作業系統來執行系統操作。這些程序中的每一個都應該執行多個物件跟蹤,維護一個物件跟蹤器列表,類似於我們今天介紹的第一個多物件跟蹤。

這種改進將允許您利用處理器的所有核心,而無需產生許多獨立程序的開銷。

我要做的第二個改進是清理程序和佇列。如果 dlib 將物件報告為“丟失”或“消失”,我們不會從 start_tracker 函數返回,這意味著該程序將在父指令碼的生命週期記憶體活,並且僅在父指令碼退出時被終止。

同樣,這對於我們今天的目的來說很好,但是如果您打算在生產環境中使用此程式碼,您應該:

  • 更新 start_tracker 函數以在 dlib 報告物件丟失後返回。
  • 同時刪除對應程序的 inputQueue 和 outputQueue。

未能執行此清理將導致長時間執行作業的不必要的計算消耗和記憶體開銷。

第三個改進是通過每 N 幀執行一次物件檢測器(而不是在開始時只執行一次)來提高跟蹤精度。

實際上,我在使用 OpenCV 計數的文章中演示了這一點。它需要更多的邏輯和思考,但會產生更準確的跟蹤器。 我選擇放棄這個指令碼的實現,這樣我就可以簡明地教你多處理方法。 理想情況下,除了多處理之外,您還可以使用第三個改進。

以上就是Python OpenCV使用dlib進行多目標跟蹤詳解的詳細內容,更多關於OpenCV dlib多目標跟蹤的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com