跳到主要内容

'`Atomics.wait`, `Atomics.notify`, `Atomics.waitAsync`

· 阅读需 8 分钟
[Marja Hölttä](https://twitter.com/marjakh), 一个非阻塞的博主

Atomics.waitAtomics.notify 是底层同步原语,适用于实现互斥锁和其他同步手段。然而,由于Atomics.wait是阻塞的,无法在主线程上调用(尝试这样做会抛出 TypeError)。

从版本8.7开始,V8支持非阻塞版本Atomics.waitAsync,也可用于主线程。

在本文中,我们解释如何使用这些底层API来实现一个可以在工作线程同步使用,也可以在主线程异步使用的互斥锁。

Atomics.waitAtomics.waitAsync 接受以下参数:

  • buffer: 一个由 SharedArrayBuffer 支持的 Int32ArrayBigInt64Array
  • index: 数组内的有效索引
  • expectedValue: 我们期望存在于由 (buffer, index) 描述的内存位置的值
  • timeout: 超时时间,单位为毫秒(可选,默认值为 Infinity

Atomics.wait 的返回值是一个字符串。如果内存位置不包含期望值,Atomics.wait 会立即返回值 not-equal。否则,线程会被阻塞,直到另一个线程在相同内存位置调用 Atomics.notify 或达到超时时间。在前一种情况下,Atomics.wait 返回值为 ok,而在后一种情况下返回值为 timed-out

Atomics.notify 接受以下参数:

  • 一个由 SharedArrayBuffer 支持的 Int32ArrayBigInt64Array
  • 一个数组内部有效的索引
  • 要通知的等待者数量(可选,默认值为 Infinity

它按照FIFO顺序通知在由 (buffer, index) 描述的内存位置等待的等待者。如果有多个与同一位置相关联的 Atomics.wait 调用或 Atomics.waitAsync 调用,它们都在同一个FIFO队列中。

Atomics.wait 相比,Atomics.waitAsync 总是立即返回。返回值如下之一:

  • { async: false, value: 'not-equal' }(如果内存位置未包含期望值)
  • { async: false, value: 'timed-out' }(仅在立即超时0时)
  • { async: true, value: promise }

该promise稍后可能会以字符串值ok(如果在相同内存位置调用了Atomics.notify)或timed-out(如果达到超时时间)解析。Promise绝不会被拒绝。

以下示例展示了Atomics.waitAsync的基本用法:

const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
// | | ^ 超时(可选)
// | ^ 期望值
// ^ 索引

if (result.value === 'not-equal') {
// SharedArrayBuffer中的值不是期望值。
} else {
result.value instanceof Promise; // true
result.value.then(
(value) => {
if (value == 'ok') { /* 已通知 */ }
else { /* 值为 'timed-out' */ }
});
}

// 在该线程或其他线程中:
Atomics.notify(i32a, 0);

接下来,我们将展示如何实现一个可以同步和异步使用的互斥锁。同步版本的互斥锁实现已在例如本文 中讨论过。

在此示例中,我们未在Atomics.waitAtomics.waitAsync中使用timeout参数。此参数可用于实现具有超时的条件变量。

我们的互斥锁类AsyncLock基于SharedArrayBuffer操作,并实现以下方法:

  • lock — 阻塞线程直到我们能够锁定互斥锁(仅适用于工作线程)
  • unlock — 解锁互斥锁(lock的对应方法)
  • executeLocked(callback) — 非阻塞锁定,可由主线程使用;计划在锁定后执行callback

我们来看看如何实现这些功能。类定义包括常量和构造函数,构造函数以SharedArrayBuffer作为参数。

class AsyncLock {
static INDEX = 0;
static UNLOCKED = 0;
static LOCKED = 1;

constructor(sab) {
this.sab = sab;
this.i32a = new Int32Array(sab);
}

lock() {
/* … */
}

unlock() {
/* … */
}

executeLocked(f) {
/* … */
}
}

这里,i32a[0]包含值LOCKEDUNLOCKED之一。它也是Atomics.waitAtomics.waitAsync的等待位置。AsyncLock类确保以下不变量:

  1. 如果i32a[0] == LOCKED,且线程开始在i32a[0]等待(通过Atomics.waitAtomics.waitAsync),它最终会被通知。
  2. 在收到通知后,线程尝试获取锁。如果获取了锁,它将在释放锁时再次通知。

同步锁与解锁

接下来我们展示只能从工作线程调用的阻塞式lock方法:

lock() {
while (true) {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* 旧值 >>> */ AsyncLock.UNLOCKED,
/* 新值 >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
return;
}
Atomics.wait(this.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED); // <<< 初始预期值
}
}

当线程调用lock()时,它首先尝试通过使用Atomics.compareExchange将锁状态从UNLOCKED更改为LOCKED来获取锁。Atomics.compareExchange尝试以原子方式进行状态更改,并返回内存位置的原始值。如果原始值为UNLOCKED,我们知道状态更改成功,线程获取了锁,不需要其他操作。

如果Atomics.compareExchange未能更改锁状态,则另一个线程必须持有锁。因此,该线程尝试调用Atomics.wait以等待其他线程释放锁。如果内存位置仍然包含预期值(在这种情况下为AsyncLock.LOCKED),调用Atomics.wait将阻塞线程,并且只有当另一个线程调用Atomics.notify时,Atomics.wait调用才会返回。

unlock方法将锁设置为UNLOCKED状态,并调用Atomics.notify,唤醒一个等待锁的线程。状态更改应始终成功,因为此线程持有锁,而其他线程在此期间不应该调用unlock()

unlock() {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* 旧值 >>> */ AsyncLock.LOCKED,
/* 新值 >>> */ AsyncLock.UNLOCKED);
if (oldValue != AsyncLock.LOCKED) {
throw new Error('尝试解锁时未持有互斥锁');
}
Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}

以下是简单情况:锁是空闲的,线程T1通过使用Atomics.compareExchange更改锁状态来获取锁。线程T2尝试通过调用Atomics.compareExchange获取锁,但未能成功更改锁状态。然后T2调用Atomics.wait,阻塞线程。某个时候,T1释放锁并调用Atomics.notify。这使得T2中的Atomics.wait调用返回'ok',唤醒T2。然后T2再次尝试获取锁,并成功。

还有2种可能的边界情况——这些说明了为什么Atomics.waitAtomics.waitAsync需要在索引处检查特定值:

  • T1持有锁,T2尝试获取锁。首先,T2尝试以Atomics.compareExchange更改锁状态,但未成功。但在T2调用Atomics.wait之前,T1释放了锁。当T2调用Atomics.wait时,它会立即返回值'not-equal'。在这种情况下,T2继续下一次循环迭代,尝试再次获取锁。
  • T1持有锁,T2正在使用Atomics.wait等待。T1释放锁——T2被唤醒(Atomics.wait调用返回),并尝试使用Atomics.compareExchange获取锁,但另一个线程T3抢先获取了锁。因此,调用Atomics.compareExchange未能获取锁,T2再次调用Atomics.wait,阻塞直到T3释放锁。

由于后一种边界情况,互斥锁并不“公平”。可能出现的情况是T2一直在等待锁释放,但T3立即获取了锁。更实际的锁实现可能会使用多个状态来区分“锁定”和“锁定但存在争用”。

异步锁

非阻塞式executeLocked方法可以从主线程调用,而阻塞式lock方法不能。它获取一个回调函数作为其唯一参数,并安排回调在成功获取锁后执行。

executeLocked(f) {
const self = this;

async function tryGetLock() {
while (true) {
const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
/* 旧值 >>> */ AsyncLock.UNLOCKED,
/* 新值 >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
f();
self.unlock();
return;
}
const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED);
// ^ 开始时的期望值
await result.value;
}
}

tryGetLock();
}

内部函数 tryGetLock 试图首先通过 Atomics.compareExchange 获取锁,如前所述。如果成功更改了锁状态,它可以执行回调、解锁并返回。

如果 Atomics.compareExchange 未能获取锁,我们需要在锁可能变为可用时重新尝试。我们不能阻塞并等待锁变为可用——相反,我们使用 Atomics.waitAsync 以及它返回的 Promise 来安排新的尝试。

如果我们成功启动了 Atomics.waitAsync,当占有锁的线程执行 Atomics.notify 时,返回的 Promise 会被解析。然后,等待锁的线程会像之前一样再次尝试获取锁。

异步版本中也可能存在同样的边界情况(在 Atomics.compareExchange 调用和 Atomics.waitAsync 调用之间,锁被释放,或者在 Promise 解析与 Atomics.compareExchange 调用之间,锁再次被占有),因此代码需要以健壮的方式处理它们。

结论

在本文中,我们展示了如何使用同步原语 Atomics.waitAtomics.waitAsyncAtomics.notify 来实现一个可同时用于主线程和工作线程的互斥锁。

功能支持

Atomics.waitAtomics.notify

Atomics.waitAsync