首頁 > 軟體

使用p-limit 限制並行數原始碼解析

2022-12-27 14:01:34

前言

並行是指在同一時間內處理的任務數量。例如,在一臺伺服器上同時執行多個 Web 伺服器執行緒,可以同時處理多個使用者端的請求。有時為了程式的穩定執行,我們需要限制並行的數量,p-limit 就是一個用js實現的控制並行數的庫。

原始碼地址:sindresorhus/p-limit

使用

下面是官方提供的使用範例:

import pLimit from 'p-limit';
const limit = pLimit(1);
const input = [
	limit(() => fetchSomething('foo')),
	limit(() => fetchSomething('bar')),
	limit(() => doSomething())
];
// Only one promise is run at once
const result = await Promise.all(input);
console.log(result);

在程式碼的第一行,使用了 pLimit(1) 來建立一個 p-limit 範例,並將並行限制設為 1。這意味著,在任意時刻,只能有一個 Promise 在執行。

在第四行,使用了 limit(() => fetchSomething('foo')) 來包裝一個非同步函數,並返回一個 Promise。同樣的方式,在第五、六行也包裝了其他兩個非同步函數。

最後,使用 Promise.all 方法來等待所有 Promise 的完成,並在完成後將結果輸出到控制檯。由於 p-limit 的限制,這些 Promise 只會按順序一個一個地執行,保證了並行的數量不會超過 1。

原始碼分析

import Queue from 'yocto-queue';
export default function pLimit(concurrency) {
	if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
		throw new TypeError('Expected `concurrency` to be a number from 1 and up');
	}
	const queue = new Queue();
	let activeCount = 0;
}

yocto-queue 是一種允許高效儲存和檢索資料的資料結構。前邊的章節分析過它的原始碼,詳情參見: 原始碼共讀|yocto-queue 佇列 連結串列

pLimit 函數接受一個引數,並行數,首先函數判斷引數是否是陣列型別,或者是否能夠轉換成數位型別,如果不能,丟擲一個錯誤。

之後定義了一個佇列來儲存待執行的函數,並使用一個計數器來記錄當前正在執行的函數的數量。

const next = () => {
		activeCount--;
		if (queue.size > 0) {
			queue.dequeue()();
		}
	};
	const run = async (fn, resolve, args) => {
		activeCount++;
		const result = (async () => fn(...args))();
		resolve(result);
		try {
			await result;
		} catch {}
		next();
	};

在程式碼的 next 函數中,如果佇列不為空,則從佇列中取出一個函數並執行。這個函數的執行會導致計數器的值減 1。

在程式碼的 run 函數中,使用了 async/await 語法來執行傳入的函數 fn。它還使用了 resolve 函數將函數的返回值包裝成一個 Promise,並將這個 Promise 返回給呼叫者。在函數執行完成後,呼叫 next 函數來執行下一個函數。

	const enqueue = (fn, resolve, args) => {
		queue.enqueue(run.bind(undefined, fn, resolve, args));
		(async () => {
			// This function needs to wait until the next microtask before comparing
			// `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
			// when the run function is dequeued and called. The comparison in the if-statement
			// needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
			await Promise.resolve();
			if (activeCount < concurrency && queue.size > 0) {
				queue.dequeue()();
			}
		})();
	};

在程式碼的 enqueue 函數中,使用了 queue.enqueue 方法將傳入的函數 fn 加入佇列。然後,它使用了 async/await 語法來在下一個微任務中檢查當前的並行數量是否小於設定的並行限制。如果是,則從佇列中取出一個函數並執行。

	const generator = (fn, ...args) => new Promise(resolve => {
		enqueue(fn, resolve, args);
	});
	Object.defineProperties(generator, {
		activeCount: {
			get: () => activeCount,
		},
		pendingCount: {
			get: () => queue.size,
		},
		clearQueue: {
			value: () => {
				queue.clear();
			},
		},
	});
	return generator;

在程式碼的 generator 函數中,使用了 new Promise 語法來生成一個新的 Promise,並在其中呼叫了 enqueue 函數。這樣,每次呼叫生成的函數時,都會生成一個新的 Promise,並將函數加入佇列。

最後,使用了 Object.defineProperties 方法來為生成的函數新增屬性。這些屬性可以用來查詢當前的並行數量和等待佇列的大小,以及清空等待佇列。

總結

控制並行的實現方式有很多種。例如,可以使用號誌或佇列來控制並行請求的數量。也可以使用計數器或令牌桶演演算法來限制請求的頻率。還可以使用拒絕服務(DoS)防護系統來檢測異常請求流量並進行限制

以上就是使用p-limit 限制並行數原始碼解析的詳細內容,更多關於p-limit限制並行數的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com