`Atomics.wait`, `Atomics.notify`, `Atomics.waitAsync`
Atomics.wait
und Atomics.notify
sind Low-Level-Synchronisationsprimitiven, die nützlich für die Implementierung von Mutexen und anderen Synchronisationsmethoden sind. Da Atomics.wait
blockierend ist, ist es jedoch nicht möglich, es im Haupt-Thread aufzurufen (ein solcher Versuch löst einen TypeError
aus).
Ab Version 8.7 unterstützt V8 eine nicht-blockierende Version, Atomics.waitAsync
, die auch im Haupt-Thread verwendet werden kann.
In diesem Beitrag erklären wir, wie man diese Low-Level-APIs verwendet, um einen Mutex zu implementieren, der sowohl synchron (für Worker-Threads) als auch asynchron (für Worker-Threads oder den Haupt-Thread) funktioniert.
Atomics.wait
und Atomics.waitAsync
nehmen die folgenden Parameter:
buffer
: eineInt32Array
oderBigInt64Array
, die von einemSharedArrayBuffer
unterstützt wirdindex
: ein gültiger Index innerhalb des ArraysexpectedValue
: ein Wert, den wir in der Speicheradresse(buffer, index)
erwartentimeout
: ein Timeout in Millisekunden (optional, Standardwert istInfinity
)
Der Rückgabewert von Atomics.wait
ist ein String. Wenn die Speicheradresse nicht den erwarteten Wert enthält, gibt Atomics.wait
sofort den Wert 'not-equal'
zurück. Andernfalls wird der Thread blockiert, bis ein anderer Thread Atomics.notify
mit derselben Speicheradresse aufruft oder das Timeout erreicht wird. Im ersten Fall gibt Atomics.wait
den Wert 'ok'
zurück, im zweiten Fall den Wert 'timed-out'
.
Atomics.notify
nimmt die folgenden Parameter:
- eine
Int32Array
oderBigInt64Array
, die von einemSharedArrayBuffer
unterstützt wird - einen Index (gültig innerhalb des Arrays)
- wie viele wartende Threads benachrichtigt werden sollen (optional, Standardwert ist
Infinity
)
Es benachrichtigt die angegebene Anzahl wartender Threads in FIFO-Reihenfolge, die an der Speicheradresse (buffer, index)
warten. Wenn es mehrere anstehende Atomics.wait
- oder Atomics.waitAsync
-Aufrufe für dieselbe Speicheradresse gibt, befinden sich alle in derselben FIFO-Warteschlange.
Im Gegensatz zu Atomics.wait
gibt Atomics.waitAsync
immer sofort zurück. Der Rückgabewert ist einer der folgenden:
{ async: false, value: 'not-equal' }
(wenn die Speicheradresse nicht den erwarteten Wert enthielt){ async: false, value: 'timed-out' }
(nur bei sofortigem Timeout0
){ async: true, value: promise }
Das Versprechen kann später mit dem String-Wert 'ok'
(wenn Atomics.notify
mit derselben Speicheradresse aufgerufen wurde) oder 'timed-out'
(wenn das Timeout erreicht wurde) aufgelöst werden. Das Versprechen wird niemals abgelehnt.
Das folgende Beispiel zeigt die Grundnutzung von Atomics.waitAsync
:
const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
// | | ^ Timeout (optional)
// | ^ erwarteter Wert
// ^ Index
if (result.value === 'not-equal') {
// Der Wert im SharedArrayBuffer war nicht der erwartete.
} else {
result.value instanceof Promise; // true
result.value.then(
(value) => {
if (value == 'ok') { /* benachrichtigt */ }
else { /* Wert ist 'timed-out' */ }
});
}
// In diesem Thread oder in einem anderen Thread:
Atomics.notify(i32a, 0);
Als Nächstes zeigen wir, wie man einen Mutex implementiert, der sowohl synchron als auch asynchron verwendet werden kann. Die Implementierung der synchronen Version des Mutexes wurde zuvor diskutiert, z. B. in diesem Blogbeitrag.
Im Beispiel verwenden wir den Timeout-Parameter in Atomics.wait
und Atomics.waitAsync
nicht. Der Parameter kann verwendet werden, um Bedingungsvariablen mit einem Timeout zu implementieren.
Unsere Mutex-Klasse, AsyncLock
, arbeitet auf einem SharedArrayBuffer
und implementiert die folgenden Methoden:
lock
— blockiert den Thread, bis wir den Mutex sperren können (nur auf einem Worker-Thread verwendbar)unlock
— entsperrt den Mutex (Gegenstück zulock
)executeLocked(callback)
— nicht-blockierende Sperre, kann von der Hauptthread verwendet werden; plant dencallback
, sobald wir es schaffen, die Sperre zu erhalten
Schauen wir uns an, wie jeder dieser Schritte implementiert werden kann. Die Klassendefinition enthält Konstanten und einen Konstruktor, der den SharedArrayBuffer
als Parameter übernimmt.
class AsyncLock {
static INDEX = 0;
static UNLOCKED = 0;
static LOCKED = 1;
constructor(sab) {
this.sab = sab;
this.i32a = new Int32Array(sab);
}
lock() {
/* … */
}
unlock() {
/* … */
}
executeLocked(f) {
/* … */
}
}
i32a[0]
enthält entweder den Wert LOCKED
oder UNLOCKED
. Es dient auch als Wartebereich für Atomics.wait
und Atomics.waitAsync
. Die Klasse AsyncLock
gewährleistet die folgenden Invarianten:
- Wenn
i32a[0] == LOCKED
ist und ein Thread beginnt zu warten (entweder mitAtomics.wait
oderAtomics.waitAsync
) aufi32a[0]
, wird er schließlich benachrichtigt. - Nach der Benachrichtigung versucht der Thread, die Sperre zu greifen. Wenn er die Sperre erhält, benachrichtigt er beim Freigeben der Sperre erneut.
Synchrones Sperren und Freigeben
Im Folgenden zeigen wir die blockierende lock
-Methode, die nur vom Worker-Thread aufgerufen werden kann:
lock() {
while (true) {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* alter Wert >>> */ AsyncLock.UNLOCKED,
/* neuer Wert >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
return;
}
Atomics.wait(this.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED); // <<< erwarteter Wert zu Beginn
}
}
Wenn ein Thread lock()
aufruft, versucht er zuerst, die Sperre durch Atomics.compareExchange
zu erhalten, wodurch der Sperrzustand von UNLOCKED
auf LOCKED
geändert wird. Atomics.compareExchange
versucht, den Zustand atomar zu ändern, und gibt den Originalwert des Speicherorts zurück. Wenn der Originalwert UNLOCKED
war, wissen wir, dass die Zustandsänderung erfolgreich war und der Thread die Sperre erhalten hat. Es ist keine weitere Aktion erforderlich.
Wenn Atomics.compareExchange
es nicht schafft, den Sperrzustand zu ändern, hält ein anderer Thread die Sperre. Daher versucht dieser Thread Atomics.wait
, um auf die Freigabe der Sperre durch den anderen Thread zu warten. Wenn der Speicherort weiterhin den erwarteten Wert enthält (in diesem Fall AsyncLock.LOCKED
), blockiert der Aufruf von Atomics.wait
den Thread, und der Aufruf von Atomics.wait
gibt nur zurück, wenn ein anderer Thread Atomics.notify
aufruft.
unlock
ist eine Methode, die die Sperre in den UNLOCKED
Zustand setzt und Atomics.notify
aufruft, um einen wartenden Thread zu wecken, der auf die Sperre wartet. Die Zustandsänderung soll immer erfolgreich sein, da dieser Thread die Sperre hält und niemand anderes inzwischen unlock()
aufrufen sollte.
unlock() {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* alter Wert >>> */ AsyncLock.LOCKED,
/* neuer Wert >>> */ AsyncLock.UNLOCKED);
if (oldValue != AsyncLock.LOCKED) {
throw new Error('Versucht, die Sperre freizugeben, während das Mutex nicht gehalten wird');
}
Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}
Der einfache Fall ist wie folgt: Die Sperre ist frei, und Thread T1 erhält sie, indem er den Sperrzustand mit Atomics.compareExchange
ändert. Thread T2 versucht, die Sperre durch Atomics.compareExchange
zu erhalten, schafft es jedoch nicht, den Sperrzustand zu ändern. T2 ruft dann Atomics.wait
auf, wodurch der Thread blockiert wird. Zu einem bestimmten Zeitpunkt gibt T1 die Sperre frei und ruft Atomics.notify
auf. Dadurch gibt der Atomics.wait
-Aufruf in T2 'ok'
zurück und weckt T2 auf. T2 versucht dann erneut, die Sperre zu erhalten, und schafft es diesmal.
Es gibt auch zwei mögliche Eckfälle – sie demonstrieren den Grund, warum Atomics.wait
und Atomics.waitAsync
einen bestimmten Wert am Index überprüfen:
- T1 hält die Sperre, und T2 versucht, sie zu erhalten. Zunächst versucht T2, den Sperrzustand mit
Atomics.compareExchange
zu ändern, schafft es jedoch nicht. Aber dann gibt T1 die Sperre frei, bevor T2Atomics.wait
aufrufen kann. Wenn T2Atomics.wait
aufruft, gibt es sofort mit dem Wert'nicht-gleich'
zurück. In diesem Fall macht T2 mit der nächsten Schleifeniteration weiter und versucht erneut, die Sperre zu erhalten. - T1 hält die Sperre, und T2 wartet darauf mit
Atomics.wait
. T1 gibt die Sperre frei – T2 wird aufgeweckt (derAtomics.wait
-Aufruf gibt zurück) und versucht,Atomics.compareExchange
auszuführen, um die Sperre zu erhalten. Aber ein anderer Thread T3 war schneller und hat die Sperre bereits erhalten. Der Aufruf vonAtomics.compareExchange
schlägt fehl, die Sperre zu erhalten, und T2 ruft erneutAtomics.wait
auf, wodurch der Thread blockiert wird, bis T3 die Sperre freigibt.
Aufgrund des letzteren Eckfalls ist das Mutex nicht „fair“. Es ist möglich, dass T2 darauf gewartet hat, dass die Sperre freigegeben wird, aber T3 kommt und erhält sie sofort. Eine realistischere Sperrimplementierung könnte mehrere Zustände verwenden, um zwischen „gesperrt“ und „gesperrt mit Konflikt“ zu unterscheiden.
Asynchrones Sperren
Die nicht blockierende Methode executeLocked
kann vom Hauptthread aufgerufen werden, im Gegensatz zur blockierenden lock
-Methode. Sie erhält eine Callback-Funktion als einzigen Parameter und plant die Ausführung des Callbacks, sobald sie die Sperre erfolgreich erhalten hat.
executeLocked(f) {
const self = this;
async function tryGetLock() {
while (true) {
const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
/* alter Wert >>> */ AsyncLock.UNLOCKED,
/* neuer Wert >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
f();
self.unlock();
return;
}
const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED);
// ^ erwarteter Wert zu Beginn
await result.value;
}
}
tryGetLock();
}
Die innere Funktion tryGetLock
versucht zunächst, das Lock mit Atomics.compareExchange
zu erhalten, wie zuvor. Wenn dies erfolgreich den Zustand des Locks ändert, kann sie den Callback ausführen, das Lock entriegeln und zurückkehren.
Falls Atomics.compareExchange
das Lock nicht bekommt, müssen wir es erneut versuchen, wenn das Lock wahrscheinlich frei ist. Wir können nicht blockieren und warten, bis das Lock frei wird – stattdessen planen wir den neuen Versuch mit Atomics.waitAsync
und dem zurückgegebenen Promise.
Falls es uns gelingt, Atomics.waitAsync
zu starten, wird das zurückgegebene Promise aufgelöst, wenn der Thread, der das Lock hält, Atomics.notify
ausführt. Dann versucht der Thread, der auf das Lock gewartet hat, erneut das Lock zu erhalten, wie zuvor.
Die gleichen Randfälle (das Lock wird zwischen dem Atomics.compareExchange
-Aufruf und dem Atomics.waitAsync
-Aufruf freigegeben sowie das Lock wird erneut zwischen dem Auflösen des Promise und dem Atomics.compareExchange
-Aufruf erworben) sind auch in der asynchronen Version möglich, daher muss der Code sie robust behandeln.
Fazit
In diesem Beitrag haben wir gezeigt, wie man die Synchronisationsprimitive Atomics.wait
, Atomics.waitAsync
und Atomics.notify
nutzen kann, um einen Mutex zu implementieren, der sowohl im Hauptthread als auch in Worker-Threads verwendet werden kann.