首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >MessagePort postMessage背压

MessagePort postMessage背压
EN

Stack Overflow用户
提问于 2021-12-21 03:53:27
回答 1查看 28关注 0票数 0

我有一大堆工作要做。执行率将是CPU限制的(序列化/反序列化等)如果运行单线程。我希望通过将作业划分到一组节点工作线程来提高吞吐量。问题是,比工人消耗的工作更容易创造就业机会。作业集很大,无法放入内存中。因此,如果生产者领先于消费者,那么进程就会因为内存不足而失败。

下面是一个有用的例子。当节点配置为最小堆--max-old-space-size=4时,它会在我的计算机上产生大约3000个作业后崩溃。

代码语言:javascript
复制
// node 16, 8 cpus
// node --max-old-space-size=4 oom.js
// crashes after producing 3000 jobs

const os = require("os");
const { Worker, parentPort, isMainThread, threadId } = require("worker_threads");

// Produce jobs quickly
async function main() {
  const workers = os.cpus().map(() => new Worker(__filename));

  let id = 0;
  async function* generateJobs() {
    while (id < 100_000) {
      const message = {
        id,
        data: "0".repeat(100_000)
      };
      yield message;
      if (id % 1000 === 0) console.log("produced", message.id);

      id++;
      await new Promise(r => setTimeout(r, 1));
    }
  }

  for await (const history of generateJobs()) {
    workers[0].postMessage(history);

    // Rotate workers
    workers.push(workers.shift());
  }

  workers.forEach(w => w.postMessage("exit"));
  const workerExits = workers.map(w => new Promise(resolve => w.on("exit", resolve)));
  await Promise.all(workerExits);
}

// Consume jobs slowly
function worker() {
  parentPort.on("message", message => {
    if (message === "exit") {
      console.log("finish", threadId);
      parentPort.close();
      return;
    }
    if (message.id % 1000 === 0) console.log("worker", threadId, "started", message.id);
    for (let i = 0; i < 1e8; i++) {} // Simulate a executing a CPU-bound job.
    if (message.id % 1000 === 0) console.log("worker", threadId, "finished", message.id);
  });
}

if (isMainThread) {
  main();
} else {
  worker();
}

是否有处理节点工作线程的进程内队列?关键是要对生产者施加压力,将排队的作业数量限制在一个合理的数量。

EN

回答 1

Stack Overflow用户

发布于 2021-12-22 00:48:39

这里有一个解决问题的方法。当员工请求作业时,生产者生成作业并将其发送给请求的工人。每个员工在开始工作时都会缓冲固定数量的作业。完成每个作业后,员工将请求下一个作业。系统有最大数量的在建作业.因此所需的内存也有类似的限制。

代码语言:javascript
复制
// node 16, 8 cpus
// node --max-old-space-size=4 rr
// finishes: 5444.60s user 13.14s system 735% cpu 12:21.98 total

const os = require("os");
const { Worker, parentPort, isMainThread, threadId } = require("worker_threads");

// Produce jobs quickly
async function main() {
  const workers = os.cpus().map(() => new Worker(__filename));

  let id = 0;
  async function* generateJobs() {
    while (id < 100_000) {
      const message = {
        id,
        data: "0".repeat(100_000)
      };
      yield message;
      if (id % 1000 === 0) console.log("produced", message.id);

      id++;
      await new Promise(r => setTimeout(r, 1));
    }
  }

  const generator = generateJobs();
  workers.forEach(w =>
    w.on("message", async () => {
      const { value, done } = await generator.next();
      if (done) {
        w.postMessage("exit");
      } else {
        w.postMessage(value);
      }
    })
  );

  const workerExits = workers.map(w => new Promise(resolve => w.on("exit", resolve)));
  await Promise.all(workerExits);
}

// Consume jobs slowly
function worker() {
  parentPort.on("message", message => {
    if (message === "exit") {
      console.log("exit", threadId);
      parentPort.close();
      return;
    }
    if (message.id % 1000 === 0) console.log("worker", threadId, "started", message.id);
    for (let i = 0; i < 1e8; i++) {} // Simulate a executing a CPU-bound job.
    if (message.id % 1000 === 0) console.log("worker", threadId, "finished", message.id);
    parentPort.postMessage("ready"); // after one job finishes, ask for the next job
  });

  // Buffer 10 unstarted jobs, to make sure the worker stays busy.
  for (let i = 0; i < 10; i++) {
    parentPort.postMessage("ready");
  }
}

if (isMainThread) {
  main();
} else {
  worker();
}

解决这个问题的另一种方法是使用SharedArrayBuffer实现队列。在这种情况下,Atomics等待并通知将取代MessagePort postMessage。SharedArrayBuffer队列的开销可能会更小。另一方面,它的实现将更加复杂,并且对消息对象可能会有更严格的限制。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70430494

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档