メインコンテンツまでスキップ

`Atomics.wait`, `Atomics.notify`, `Atomics.waitAsync`

· 約10分
[マリヤ・ホルタ](https://twitter.com/marjakh)、ノンブロッキングブロガー

Atomics.wait および Atomics.notify は、ミューテックスやその他の同期手段を実装するために便利な低レベルの同期プリミティブです。しかし、Atomics.wait はブロッキングであるため、メインスレッドで呼び出すことはできません(試みると TypeError が投げられます)。

バージョン 8.7 から、V8 は非同期版である Atomics.waitAsync をサポートしており、メインスレッドでも使用できます。

この記事では、これらの低レベル API を使用して、同期的(ワーカースレッド用)および非同期的(ワーカースレッドまたはメインスレッド用)の両方で動作するミューテックスを実装する方法を説明します。

Atomics.waitAtomics.waitAsync は以下のパラメータを取ります:

  • buffer: SharedArrayBuffer に基づく Int32Array または BigInt64Array
  • index: 配列内の有効なインデックス
  • expectedValue: メモリ位置 (buffer, index) にあると期待される値
  • timeout: ミリ秒単位のタイムアウト (オプション、デフォルトは Infinity)

Atomics.wait の戻り値は文字列です。メモリ位置が期待値を含んでいない場合、Atomics.wait はすぐに not-equal を返します。それ以外の場合、タイムアウトに達するか、別のスレッドが同じメモリ位置で Atomics.notify を呼び出すまでスレッドはブロックされます。前者の場合、Atomics.waitok を返し、後者の場合、timed-out を返します。

Atomics.notify は以下のパラメータを取ります:

  • SharedArrayBuffer に基づいた Int32Array または BigInt64Array
  • 配列内の有効なインデックス
  • 通知する待機者の数 (オプション、デフォルトは Infinity)

指定された数の待機者を FIFO 順で通知し、メモリ位置 (buffer, index) に対応します。同じ場所に関連する複数の Atomics.wait 呼び出しまたは Atomics.waitAsync 呼び出しがある場合、それらはすべて同じ FIFO キューに存在します。

Atomics.wait と対照的に、Atomics.waitAsync は常にすぐに戻ります。戻り値は以下のいずれかです:

  • { async: false, value: 'not-equal' }(メモリ位置が期待値を含んでいない場合)
  • { async: false, value: 'timed-out' }(即時タイムアウト 0 の場合のみ)
  • { async: true, value: promise }

Promise は後で文字列値 okAtomics.notify が同じメモリ位置で呼び出された場合)または timed-out(タイムアウトが到達した場合)で解決されることがあります。Promise が拒否されることはありません。

以下の例では、Atomics.waitAsync の基本的な使用方法を示しています:

const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
// | | ^ タイムアウト (オプション)
// | ^ 期待値
// ^ インデックス

if (result.value === 'not-equal') {
// SharedArrayBuffer の値が期待値ではありません。
} else {
result.value instanceof Promise; // true
result.value.then(
(value) => {
if (value == 'ok') { /* 通知済み */ }
else { /* 値は 'timed-out' */ }
});
}

// このスレッド内または他のスレッド内で:
Atomics.notify(i32a, 0);

次に、同期的にも非同期的にも使用できるミューテックスを実装する方法を示します。同期版のミューテックス実装については、この記事などで以前に議論されています。

例では、Atomics.waitAtomics.waitAsync のタイムアウトパラメータを使用していません。このパラメータはタイムアウト付きの条件変数を実装するために使用できます。

ミューテックスクラス AsyncLockSharedArrayBuffer に基づき、以下のメソッドを実装します:

  • lock — ミューテックスをロックできるまでスレッドをブロックします(ワーカースレッドでのみ使用可能)
  • unlock — ミューテックスをアンロックします(lock の対応方法)
  • executeLocked(callback) — ノンブロッキングロック、メインスレッドで使用可能。ロックを取得できた時点で callback を実行するようにスケジュールされます。

それぞれの実装方法を見ていきましょう。クラス定義には定数と、SharedArrayBufferをパラメータとして受け取るコンストラクタが含まれています。

class AsyncLock {
static INDEX = 0;
static UNLOCKED = 0;
static LOCKED = 1;

constructor(sab) {
this.sab = sab;
this.i32a = new Int32Array(sab);
}

lock() {
/* … */
}

unlock() {
/* … */
}

executeLocked(f) {
/* … */
}
}

ここで、i32a[0]LOCKEDまたはUNLOCKEDのいずれかの値を含んでいます。また、それはAtomics.waitAtomics.waitAsyncの待機場所でもあります。このAsyncLockクラスは以下の不変条件を保証します:

  1. i32a[0] == LOCKEDの場合、スレッドがi32a[0]Atomics.waitまたはAtomics.waitAsyncを使用して待機を開始すると、最終的には通知を受け取ることになります。
  2. 通知を受けた後、スレッドはロックを取得しようとします。ロックを取得できた場合、ロック解除時に再度通知を行います。

同期ロックとアンロック

次に、ワーカースレッドからのみ呼び出せるブロッキングlockメソッドを示します:

lock() {
while (true) {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* old value >>> */ AsyncLock.UNLOCKED,
/* new value >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
return;
}
Atomics.wait(this.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED); // <<< 初期値として期待する値
}
}

スレッドがlock()を呼び出すと、まずAtomics.compareExchangeを使用してロック状態をUNLOCKEDからLOCKEDに変更することでロックを取得しようとします。Atomics.compareExchangeは状態変更を原子的に試み、メモリ位置の元の値を返します。元の値がUNLOCKEDであれば、状態変更が成功し、スレッドがロックを取得したことを示します。それ以上の操作は必要ありません。

もしAtomics.compareExchangeがロック状態の変更に失敗した場合、別のスレッドがロックを保持しているはずです。この場合、このスレッドは他のスレッドがロックを解放するのを待つためにAtomics.waitを試みます。そのメモリ位置が依然として期待する値(この場合、AsyncLock.LOCKED)を保持している場合、Atomics.waitを呼び出すとスレッドがブロックされ、別のスレッドがAtomics.notifyを呼び出すまでAtomics.wait呼び出しは戻りません。

unlockメソッドはロックをUNLOCKED状態に設定し、ロックを待機していた1つのスレッドを起こすためにAtomics.notifyを呼び出します。この状態変更は常に成功することが想定されています。なぜなら、このスレッドはロックを保持しており、その間に他の誰もunlock()を呼び出さないはずだからです。

unlock() {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* old value >>> */ AsyncLock.LOCKED,
/* new value >>> */ AsyncLock.UNLOCKED);
if (oldValue != AsyncLock.LOCKED) {
throw new Error('ミューテックスを保持していない状態でアンロックしようとしました');
}
Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}

シンプルなケースの流れは次の通りです: ロックが空いており、スレッドT1がAtomics.compareExchangeを使用してロック状態を変更することでロックを取得します。スレッドT2がロックを取得しようとしてAtomics.compareExchangeを呼び出しますが、ロック状態の変更に成功しません。T2は次にAtomics.waitを呼び出し、スレッドをブロックします。ある時点でT1がロックを解放し、Atomics.notifyを呼び出します。それにより、T2でのAtomics.wait呼び出しが'ok'を返し、T2を起こします。T2は再びロックを取得しようと試み、今回は成功します。

また、2つの角ケースがあります — これらはAtomics.waitおよびAtomics.waitAsyncが特定のインデックス値を確認する理由を示しています。

  • T1がロックを保持しており、T2がそれを取得しようとしています。まず、T2はAtomics.compareExchangeを使用してロック状態を変更しようとしますが、成功しません。しかし、T2がAtomics.waitを呼び出す前にT1がロックを解放します。T2がAtomics.waitを呼び出すと、それはすぐに'not-equal'の値を返します。その場合、T2は次のループ反復を続行し、再びロックを取得しようとします。
  • T1がロックを保持しており、T2はAtomics.waitを使用してそれを待っています。T1がロックを解放すると、T2が起きます(Atomics.wait呼び出しが戻ります)そして、Atomics.compareExchangeを試みてロックを取得します。ただし、別のスレッドT3がより早くロックを取得してしまいます。その結果、Atomics.compareExchange呼び出しがロックの取得に失敗し、T2は再びAtomics.waitを呼び出してT3がロックを解放するのを待ちます。

この後者の角ケースのため、ミューテックスは“公平”ではありません。T2がロックが解放されるのを待っていたにもかかわらず、T3が来てすぐにそれを取得する可能性があります。より現実的なロック実装では、“ロック済み”と“競合によるロック済み”を区別するためにいくつかの状態を使用することがあります。

非同期ロック

ブロッキングしないexecuteLockedメソッドは、ブロッキングするlockメソッドとは異なり、メインスレッドから呼び出すことができます。このメソッドはコールバック関数を唯一のパラメータとして受け取り、ロックの取得に成功した後にコールバックを実行するようスケジュールします。

executeLocked(f) {
const self = this;

async function tryGetLock() {
while (true) {
const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
/* 古い値 >>> */ AsyncLock.UNLOCKED,
/* 新しい値 >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
f();
self.unlock();
return;
}
const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED);
// ^ 開始時の期待値
await result.value;
}
}

tryGetLock();
}

内部関数tryGetLockは、以前と同様にまずAtomics.compareExchangeを使用してロックを取得しようとします。そのロック状態が正常に変更された場合、コールバックを実行し、ロックを解除してリターンします。

Atomics.compareExchangeがロックを取得するのに失敗した場合、ロックが解放されたと思われるときに再試行する必要があります。ただし、ロックが解放されるのを待つためにブロックすることはできません。その代わりに、Atomics.waitAsyncとそれが返すPromiseを使用して新しい試行をスケジュールします。

Atomics.waitAsyncを正常に開始できた場合、ロックを保持しているスレッドがAtomics.notifyを実行すると、返されたPromiseが解決されます。その後、ロックを待っていたスレッドは以前のように再びロックを取得しようとします。

非同期バージョンでも、Atomics.compareExchange呼び出しとAtomics.waitAsync呼び出しの間でロックが解放される、またはPromiseが解決される間にロックが再び取得されるなど、同じコーナーケースが発生する可能性があります。そのため、コードはそれらを堅牢に処理する必要があります。

結論

この投稿では、Atomics.waitAtomics.waitAsync、およびAtomics.notifyの同期プリミティブを使用して、メインスレッドおよびワーカースレッドの両方で使用可能なミューテックスを実装する方法を説明しました。

機能サポート

Atomics.waitAtomics.notify

Atomics.waitAsync