首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >WorkIt结构化并发与任务取消机制

WorkIt结构化并发与任务取消机制

原创
作者头像
用户11764306
发布2026-05-18 11:18:12
发布2026-05-18 11:18:12
930
举报

别再为应该取消的异步工作付费

上次我们展示了 work(items).inParallel(8).withRetry(3).withTimeout("5s").do(fn) —— 用于处理列表的一行流畅接口。这覆盖了80%的场景。

如果你错过了,请查看《TypeScript中的托管异步工作》—— Promise.race 并不会取消你的工作,以及《用户关闭标签页后你的AI仍在计费》。

本文讨论另外20%:编排异构任务,包括竞速、降级、对冲和重试 —— 并带有所有权

打开足够多AI代码库中的 package.json,你会看到类似这样的组合 —— 或者完全不同的辅助库,取决于代码栈的作者:

代码语言:json
复制
"p-limit":      "^5.0.0",
"p-map":        "^7.0.0",
"p-retry":      "^6.2.0",
"p-timeout":    "^6.1.2",
"p-queue":      "^8.0.1",
"bottleneck":   "^2.19.5",
"async-retry":  "^1.3.3"

六个库只是一个例子。有些团队自己编写 Promise.race 循环。另一些则使用 lodash 或 bluebird 的异步辅助函数。模式是一样的:分散的并发原语,没有共享的所有权。

当兄弟任务抛出异常、超时触发、用户点击停止时,你必须自己拼接队列状态、重试延迟、超时包装器、底层 I/O、清理和错误形状。

这就是本文的比较点:不是“那些工具没用”,而是“它们是独立的原语”。WorkIt 的主张是所有权和组合性。每个章节末尾的可运行基准测试会在你的机器上验证 WorkIt 的不变性。

WorkIt 有五个核心可组合组件,都共享一个运行时契约:

  • run.all // 真正的 Promise.all,在首次失败时取消失败者
  • run.race // 真正的 Promise.race,取消失败者
  • run.any // 真正的 Promise.any,取消剩余任务
  • run.pool // p-limit + p-map,但子任务属于同一个作用域
  • run.series // 顺序执行,带共享取消能力

再加上四个可与它们组合的组件:

  • run.retry // 带信号感知睡眠的回退算法
  • run.timeout // 返回 TaskFn 的截止时间
  • run.fallback // 主任务 -> 备选任务,类型安全
  • run.hedge // 用于控制尾部延迟的有界推测执行

相同的熟悉名称。不同的运行时契约:调用之下的所有内容都属于一个作用域,该作用域拥有取消权。

所有九个组件的组合特性:每个 WorkIt 弹性辅助函数接收一个 TaskFn<T> 并返回一个 TaskFn<T>。这使得代数封闭 —— run.timeout(run.retry(callProvider, 3), "5s") 就是简单的函数组合。Promise 辅助函数通常返回 Promise 或独立的包装函数,因此从超时到重试再到竞速的交叉意味着你要自己负责粘合代码和信号传递。

为什么 run.allPromise.all 更安全

代码语言:typescript
复制
import { run } from "@workit/core";

const [profile, plan, sources] = await run.all([
  (ctx) => fetchProfile({ signal: ctx.signal }),
  (ctx) => planLLM(question, { signal: ctx.signal }),
  (ctx) => retrieveContext(question, { signal: ctx.signal }),
]);

Promise.all 在首次失败时拒绝,并让其他两个请求继续运行,除非每个分支都有自己的取消逻辑。它们的 .then 处理程序可能在你的错误处理程序已经返回 500 之后触发,产生不再依附于原始请求的完成事件。

run.all 在首次失败时拒绝并取消另外两个ctx.signal 会中止。defer 清理会运行。原因是有类型的:CancelReason { kind: "sibling_failed", siblingId, error }

你可以基于此来调整仪表盘。你无法基于 Error: AggregateError 来调整。

基准测试 01-run-all-vs-promise-all.mjs:A 在 50 毫秒成功。B 在 30 毫秒失败。C 在 100 毫秒成功。

实现

外部拒绝时间

A 在拒绝后仍运行

C 在拒绝后仍运行

对失败者执行 defer

Promise.all

t=35 毫秒

+16 毫秒

+79 毫秒

不适用

run.all

t=32 毫秒

0 毫秒 (在 +1 毫秒时取消)

0 毫秒 (在 +1 毫秒时取消)

是,在外部拒绝前

run.race —— 真正竞速的 race

代码语言:typescript
复制
const winner = await run.race([callOpenAI, callAnthropic, callGemini]);

你用 Promise.race 写了六个词。不同的运行时契约:

  • 每个函数体接收一个与 race 关联的 ctx.signal
  • 首次决议在 AbortSignal 边界处取消其余任务,在 TCP 完成之前
  • 每个失败者看到 CancelReason { kind: "race_lost", winnerId } —— 有类型的,可穷尽缩窄
  • await run.race(...) 仅在失败者完成清理后返回

基准测试 02-run-race-vs-promise-race.mjs:Anthropic 10 毫秒,OpenAI 50 毫秒,Gemini 80 毫秒。

实现

胜出者时间

OpenAI 失败者仍运行

Gemini 失败者仍运行

失败者原因

Promise.race

t=14 毫秒

+47 毫秒 (总计 61 毫秒)

+77 毫秒 (总计 91 毫秒)

run.race

t=17 毫秒

0 毫秒 (在 t=16 毫秒取消)

0 毫秒 (在 t=16 毫秒取消)

race_lost

失败者运行时间 × N 个并行代理 × P 每秒请求数,就是你账单上没有人写的那一行。

run.any —— 首次成功,其余取消

代码语言:typescript
复制
const cheapest = await run.any([callExpensive, callCheap, callCheaper]);

Promise.any 以第一个成功决议并忽略其余任务。较慢的兄弟任务继续运行。较快失败的任务被记录然后被遗忘。run.any 做同样的事 —— 只是较慢的兄弟任务实际会停止

基准测试 03-run-any-vs-promise-any.mjs:A 在 30 毫秒失败。B 在 50 毫秒成功。C 在 100 毫秒成功。

实现

决议时间

C 继续运行

对 C 执行 defer

Promise.any

t=61 毫秒

+47 毫秒 (总计 108 毫秒)

不适用

run.any

t=65 毫秒

0 毫秒 (在 t=65 毫秒取消)

run.pool —— 可取消的有界并发

代码语言:typescript
复制
const results = await run.pool(8, files.map((file) => async (ctx) => {
  return uploadOne(file, { signal: ctx.signal });
}));

p-limit(8) 是一个信号量。这很有用,当前版本可以在你要求时清除待处理的队列项。但它不是一个结构化作用域:它不会自动将兄弟失败转化为进行中的取消、类型化的取消原因、清理和部分结果契约。

run.pool(8, tasks) 是一个信号量 + 一个作用域。默认策略是 Promise.all 风格的快速失败:首次抛出会取消队列中和进行中的任务。通过一行代码切换策略,返回类型会改变,这样你就不能忽略失败:

代码语言:typescript
复制
const out = await work(files).inParallel(8).onError("collect").do(uploadOne);

if (out.mode === "collect") {
  for (const r of out.results) {
    if (r.status === "rejected") logFailure(r.reason);
  }
}

WorkOutput<R> 是一个可辨识联合类型 —— mode: "fail" | "continue" | "collect"。改为 .onError("continue"),返回类型会强制你处理 errors[]。编译器就是你的审计日志。

基准测试 04-pool-vs-semaphore.mjs:10 个项目,并发数 4。项目 3 在 20 毫秒时抛出;其余每个需要 100 毫秒。

实现

外部拒绝时间

已启动

拒绝后完成

已取消

从未启动

拒绝后最长运行时间

本地 pLimitLike(4) 信号量基线

t=31 毫秒

10

9

0

0

+295 毫秒

run.pool(4, ...)

t=33 毫秒

4

0

3

6

0 毫秒

295 毫秒的拒绝后工作,乘以整个集群,就变成了可避免的运行时间和供应商成本。

run.retry —— 可组合、可感知取消的回退

代码语言:typescript
复制
const callWithRetry = run.retry(callProvider, {
  times: 4,
  backoff: "exponential",
  initialDelay: "200ms",
  maxDelay: "5s",
  jitter: true,
  retryIf: (err) => isTransient(err),
});

const answer = await callWithRetry(ctx);

WorkIt 使三件事成为重试契约的一部分:

  1. 在作用域取消时停止重试:当父作用域在尝试中途取消时,run.retry 不会排队另一次尝试。任务以 cancelled 状态结束,而不是 failed
  2. 在边界验证输入run.retry({ times: 1e9 }) 会创建一个无界的重试策略。run.retry 会拒绝它:RangeError: retry attempts must be an integer between 1 and 1000。上限是 MAX_RETRY_ATTEMPTS
  3. 使用作用域信号进行睡眠:回退睡眠是可中断的 —— 中止信号,睡眠拒绝,循环退出。下面的基准测试比较了不了解信号的重试循环;当前的重试库可能会暴露它们自己的中止钩子,但它们仍然不拥有 WorkIt 的作用域树、清理和取消原因契约。

基准测试 05-retry-on-cancel.mjs:函数体在每次尝试时都抛出。外部取消在大约 50 毫秒时触发。最多 8 次重试,间隔 50 毫秒回退。

实现

观察到取消

外部已决议

取消延迟

取消后的额外尝试

最终状态

不了解信号的 retry 循环

t=63 毫秒

t=701 毫秒

638 毫秒

7

rejected

run.retry

t=61 毫秒

t=61 毫秒

0 毫秒

0

cancelled (kind: manual)

用户已经取消后,638 毫秒浪费的重试工作。每个请求如此。乘以代理扇出数量。

run.timeout —— 可与 retry、race 和 pool 组合

代码语言:typescript
复制
const fastest = await run.race([
  run.timeout(callPrimary,   "800ms"),
  run.timeout(callSecondary, "800ms"),
]);

run.timeout(task, "800ms") 返回一个 TaskFn。它是可组合的。你可以用 run.retry 包装它。你可以把它放在 run.race 内部。你可以把它传给 run.pool。签名在组合下是封闭的。

Promise 超时辅助函数返回 Promise 或装饰过的 Promise。有些暴露 AbortSignal 支持。但它们仍然不返回 WorkIt 的 TaskFn,因此跨越超时、重试、竞速、池和清理意味着你要自己负责组合边界。

run.fallback —— 主任务、备选任务、类型安全

代码语言:typescript
复制
const callWithFallback = run.fallback(
  run.retry(callProvider, 3),
  callBackupProvider,
);

主任务失败(重试之后)-> 运行备选任务。相同的 ctx.signal。相同的作用域。如果父任务停止,相同的取消原因。没有嵌套的 try/catch。没有凌晨 2 点的“我是否忘记 await 备选任务了”的 Slack 消息。

run.supervise —— 长期运行任务的重启策略

run.retry 适用于可能暂时失败然后成功的单次操作。run.supervise 适用于长期运行的任务 —— 心跳、队列消费者、连接监视器、代理保活 —— 可能需要带有界回退的重启语义。

代码语言:typescript
复制
import { run } from "@workit/core";

const result = await run.supervise(async () => {
  attempts++;
  if (attempts < 3) throw new Error("transient worker failure");
  return "stable";
}, {
  restartOn:    "error",
  maxRestarts:  3,
  backoff:      () => 1,
});

被监控的函数体失败两次,每次在策略下重启,并在第三次尝试时稳定下来。父作用域仍然可以一次性取消所有内容,取消原因会通过监控包装器传递下去。重启策略在 resetWindow 内以 maxRestarts 为上限,因此永久损坏的函数体不会无限循环。

决策规则:对可能出故障的单次调用使用 run.retry;对应该持续运行的进程使用 run.supervise

run.hedge —— 有界推测请求

代码语言:typescript
复制
const ranked = await run.hedge(
  (ctx) => reranker.rank(question, sources, { signal: ctx.signal }),
  { after: "2s", max: 2 },
);

如果第一次调用在 2 秒内未返回,则触发第二次调用。首次成功胜出;其余取消。以 max 为界,这是一种在不支付每次推测扇出成本的情况下减少尾部延迟的计量方式。

基准测试 06-hedge-tied-requests.mjs:两种场景,opts { after: "50ms", max: 3 }

场景

函数体延迟

触发的尝试(时间戳)

胜出者

已取消的失败者

取消原因

200 毫秒

3 (t=2 毫秒, 62 毫秒, 107 毫秒)

id=1 在 217 毫秒

2

race_lost

30 毫秒

1 (未触发对冲)

id=1 在 31 毫秒

0

不适用

快速路径根本不为对冲付出代价。慢速路径以 max 为界。每个失败者都标记有 race_lost

并排对比 —— 谁真正取消了

代码语言:typescript
复制
// 3 个任务。B 在 30 毫秒失败。A 在 50 毫秒成功。C 在 100 毫秒成功。

await Promise.all([A, B, C]);     // 在 30 毫秒拒绝。A 和 C 继续运行约 16/63 毫秒。
await Promise.race([A, B, C]);    // 在 30 毫秒拒绝。A 和 C 继续运行。
await Promise.any([A, B, C]);     // 在 50 毫秒决议。C 继续运行约 44 毫秒。

await run.all([A, B, C]);         // 在 30 毫秒拒绝。A 和 C 在 1 毫秒内取消,defer 运行。
await run.race([A, B, C]);        // 在 30 毫秒拒绝。A 和 C 在 0-1 毫秒内取消。
await run.any([A, B, C]);         // 在 50 毫秒决议。C 在 0 毫秒内取消,defer 运行。

相同的形状。不同的契约。原生原语返回一个值。WorkIt 原语拥有值之下的树。

与其他库的对比

工具

兄弟失败时取消

信号感知重试

可组合超时(返回 TaskFn)

对冲请求

打包大小

WorkIt

内置

14,175 B / 4,835 B gz(所有九个组件)

Promise.all / race / any

不适用

不适用

0

p-limit + p-retry + p-timeout

部分/手动接线

部分/手动接线

独立抽象

三个依赖

RxJS

是(unsubscribe)

部分(通过操作符)

是(通过操作符)

Effection

是(生成器操作)

中等

Effect-TS

是(纤程 + 类型化 Cause)

对于没有其他失败可能的简单数组处理,p-limit 就足够了。对于需要更广泛效果或操作模型的全栈应用,Effection 和 Effect-TS 很扎实。WorkIt 的区别更窄:在保持 async/await 的同时进行结构化并发组合,在一个所有权树中提供九个可组合组件,以及上面显示的打包大小。

验证依据

WorkIt 运行时的上述声明通过 npm run verify(生产门禁)或 npm run bench:articles(生成本文代表性时序表的并排套件)进行验证。

代码语言:bash
复制
npm run bench:articles
# 完整文章套件:19 通过,0 失败

本文引用了完整套件中的基准测试 01-06。每个基准脚本约 100 行,零外部依赖,并内联断言 WorkIt 的不变性。时序是代表性的捕获运行;断言保护的是语义不变性,而不是精确毫秒数。

下一步

九个可组合组件共享一个引擎。明天我们将打开引擎。我们将探讨协作取消能做什么、不能做什么,以及硬边界从哪里开始。然后,我们将一个忽略所有信号的 CPU 自旋循环放在 offload({ timeout: "200ms" }) 前面,并验证工作线程的终止能阻止一个迟到的标记文件出现。CI 门禁会对其执行 stat()

AbortController 无法抢占 CPU 循环。WorkIt 无法改变这一语言边界,但工作线程可以被其宿主终止。

源代码、基准测试和验证

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 别再为应该取消的异步工作付费
    • 为什么 run.all 比 Promise.all 更安全
    • run.race —— 真正竞速的 race
    • run.any —— 首次成功,其余取消
    • run.pool —— 可取消的有界并发
    • run.retry —— 可组合、可感知取消的回退
    • run.timeout —— 可与 retry、race 和 pool 组合
    • run.fallback —— 主任务、备选任务、类型安全
    • run.supervise —— 长期运行任务的重启策略
    • run.hedge —— 有界推测请求
    • 并排对比 —— 谁真正取消了
    • 与其他库的对比
    • 验证依据
    • 下一步
    • 源代码、基准测试和验证
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档