我有一大堆工作要做。执行率将是CPU限制的(序列化/反序列化等)如果运行单线程。我希望通过将作业划分到一组节点工作线程来提高吞吐量。问题是,比工人消耗的工作更容易创造就业机会。作业集很大,无法放入内存中。因此,如果生产者领先于消费者,那么进程就会因为内存不足而失败。
下面是一个有用的例子。当节点配置为最小堆--max-old-space-size=4时,它会在我的计算机上产生大约3000个作业后崩溃。
// node 16, 8 cpus
// node --max-old-space-size=4 oom.js
// crashes after producing 3000 jobs
const os = require("os");
const { Worker, parentPort, isMainThread, threadId } = require("worker_threads");
// Produce jobs quickly
async function main() {
const workers = os.cpus().map(() => new Worker(__filename));
let id = 0;
async function* generateJobs() {
while (id < 100_000) {
const message = {
id,
data: "0".repeat(100_000)
};
yield message;
if (id % 1000 === 0) console.log("produced", message.id);
id++;
await new Promise(r => setTimeout(r, 1));
}
}
for await (const history of generateJobs()) {
workers[0].postMessage(history);
// Rotate workers
workers.push(workers.shift());
}
workers.forEach(w => w.postMessage("exit"));
const workerExits = workers.map(w => new Promise(resolve => w.on("exit", resolve)));
await Promise.all(workerExits);
}
// Consume jobs slowly
function worker() {
parentPort.on("message", message => {
if (message === "exit") {
console.log("finish", threadId);
parentPort.close();
return;
}
if (message.id % 1000 === 0) console.log("worker", threadId, "started", message.id);
for (let i = 0; i < 1e8; i++) {} // Simulate a executing a CPU-bound job.
if (message.id % 1000 === 0) console.log("worker", threadId, "finished", message.id);
});
}
if (isMainThread) {
main();
} else {
worker();
}是否有处理节点工作线程的进程内队列?关键是要对生产者施加压力,将排队的作业数量限制在一个合理的数量。
发布于 2021-12-22 00:48:39
这里有一个解决问题的方法。当员工请求作业时,生产者生成作业并将其发送给请求的工人。每个员工在开始工作时都会缓冲固定数量的作业。完成每个作业后,员工将请求下一个作业。系统有最大数量的在建作业.因此所需的内存也有类似的限制。
// node 16, 8 cpus
// node --max-old-space-size=4 rr
// finishes: 5444.60s user 13.14s system 735% cpu 12:21.98 total
const os = require("os");
const { Worker, parentPort, isMainThread, threadId } = require("worker_threads");
// Produce jobs quickly
async function main() {
const workers = os.cpus().map(() => new Worker(__filename));
let id = 0;
async function* generateJobs() {
while (id < 100_000) {
const message = {
id,
data: "0".repeat(100_000)
};
yield message;
if (id % 1000 === 0) console.log("produced", message.id);
id++;
await new Promise(r => setTimeout(r, 1));
}
}
const generator = generateJobs();
workers.forEach(w =>
w.on("message", async () => {
const { value, done } = await generator.next();
if (done) {
w.postMessage("exit");
} else {
w.postMessage(value);
}
})
);
const workerExits = workers.map(w => new Promise(resolve => w.on("exit", resolve)));
await Promise.all(workerExits);
}
// Consume jobs slowly
function worker() {
parentPort.on("message", message => {
if (message === "exit") {
console.log("exit", threadId);
parentPort.close();
return;
}
if (message.id % 1000 === 0) console.log("worker", threadId, "started", message.id);
for (let i = 0; i < 1e8; i++) {} // Simulate a executing a CPU-bound job.
if (message.id % 1000 === 0) console.log("worker", threadId, "finished", message.id);
parentPort.postMessage("ready"); // after one job finishes, ask for the next job
});
// Buffer 10 unstarted jobs, to make sure the worker stays busy.
for (let i = 0; i < 10; i++) {
parentPort.postMessage("ready");
}
}
if (isMainThread) {
main();
} else {
worker();
}解决这个问题的另一种方法是使用SharedArrayBuffer实现队列。在这种情况下,Atomics等待并通知将取代MessagePort postMessage。SharedArrayBuffer队列的开销可能会更小。另一方面,它的实现将更加复杂,并且对消息对象可能会有更严格的限制。
https://stackoverflow.com/questions/70430494
复制相似问题