`Atomics.wait`, `Atomics.notify`, `Atomics.waitAsync`
Atomics.wait
et Atomics.notify
sont des primitives de synchronisation bas-niveau utiles pour implémenter des mutex et d’autres moyens de synchronisation. Cependant, étant donné que Atomics.wait
est bloquant, il est impossible de l’appeler sur le thread principal (une tentative entraîne une TypeError
).
À partir de la version 8.7, V8 prend en charge une version non bloquante, Atomics.waitAsync
, qui est également utilisable sur le thread principal.
Dans cet article, nous expliquons comment utiliser ces API bas-niveau pour implémenter un mutex qui fonctionne à la fois de manière synchrone (pour les threads de travail) et asynchrone (pour les threads de travail ou le thread principal).
Atomics.wait
et Atomics.waitAsync
prennent les paramètres suivants :
buffer
: unInt32Array
ouBigInt64Array
basé sur unSharedArrayBuffer
index
: un index valide dans le tableauexpectedValue
: une valeur que nous attendons de trouver à l'emplacement mémoire décrit par(buffer, index)
timeout
: un délai en millisecondes (optionnel, par défaut àInfinity
)
La valeur de retour de Atomics.wait
est une chaîne de caractères. Si l'emplacement mémoire ne contient pas la valeur attendue, Atomics.wait
retourne immédiatement avec la valeur 'not-equal'
. Sinon, le thread est bloqué jusqu'à ce qu'un autre thread appelle Atomics.notify
avec le même emplacement mémoire ou que le délai soit expiré. Dans le premier cas, Atomics.wait
retourne la valeur 'ok'
, dans le dernier cas, Atomics.wait
retourne la valeur 'timed-out'
.
Atomics.notify
prend les paramètres suivants :
- un
Int32Array
ouBigInt64Array
basé sur unSharedArrayBuffer
- un index (valide dans le tableau)
- combien d'attente doivent être notifiés (optionnel, par défaut à
Infinity
)
Il notifie le nombre spécifié d'attenteurs, dans l'ordre FIFO, qui attendent à l'emplacement mémoire décrit par (buffer, index)
. S'il existe plusieurs appels en attente à Atomics.wait
ou Atomics.waitAsync
liés au même emplacement, ils sont tous dans la même file FIFO.
Contrairement à Atomics.wait
, Atomics.waitAsync
retourne toujours immédiatement. La valeur de retour est l'une des suivantes :
{ async: false, value: 'not-equal' }
(si l'emplacement mémoire ne contenait pas la valeur attendue){ async: false, value: 'timed-out' }
(uniquement pour un délai immédiat de 0){ async: true, value: promise }
La promesse peut être résolue ultérieurement avec une chaîne de caractères 'ok'
(si Atomics.notify
a été appelé avec le même emplacement mémoire) ou 'timed-out'
(si le délai a été atteint). La promesse n'est jamais rejetée.
L'exemple suivant démontre l'utilisation basique de Atomics.waitAsync
:
const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
// | | ^ délai (opt)
// | ^ valeur attendue
// ^ index
if (result.value === 'not-equal') {
// La valeur dans le SharedArrayBuffer n'était pas celle attendue.
} else {
result.value instanceof Promise; // true
result.value.then(
(value) => {
if (value == 'ok') { /* notifié */ }
else { /* la valeur est 'timed-out' */ }
});
}
// Dans ce thread, ou dans un autre thread :
Atomics.notify(i32a, 0);
Ensuite, nous montrerons comment implémenter un mutex qui peut être utilisé à la fois de manière synchrone et asynchrone. L'implémentation de la version synchrone du mutex a été discutée précédemment, par exemple dans cet article de blog.
Dans l'exemple, nous n'utilisons pas le paramètre de délai dans Atomics.wait
et Atomics.waitAsync
. Le paramètre peut être utilisé pour implémenter des variables conditionnelles avec un délai.
Notre classe de mutex, AsyncLock
, fonctionne sur un SharedArrayBuffer
et implémente les méthodes suivantes :
lock
— bloque le thread jusqu'à ce que nous puissions verrouiller le mutex (utilisable uniquement sur un thread de travail)unlock
— déverrouille le mutex (contrepartie delock
)executeLocked(callback)
— verrou non bloquant, peut être utilisé par le thread principal ; programmecallback
pour qu'il soit exécuté une fois que nous parvenons à obtenir le verrou
Voyons comment chacun d'eux peut être mis en œuvre. La définition de la classe inclut des constantes et un constructeur qui prend le SharedArrayBuffer
comme paramètre.
class AsyncLock {
static INDEX = 0;
static UNLOCKED = 0;
static LOCKED = 1;
constructor(sab) {
this.sab = sab;
this.i32a = new Int32Array(sab);
}
lock() {
/* … */
}
unlock() {
/* … */
}
executeLocked(f) {
/* … */
}
}
Ici, i32a[0]
contient soit la valeur LOCKED
soit UNLOCKED
. C'est également l'emplacement d'attente pour Atomics.wait
et Atomics.waitAsync
. La classe AsyncLock
garantit les invariants suivants :
- Si
i32a[0] == LOCKED
, et qu'un thread commence à attendre (soit viaAtomics.wait
ouAtomics.waitAsync
) suri32a[0]
, il sera finalement notifié. - Après avoir été notifié, le thread essaie de prendre le verrou. S'il réussit, il notifie à nouveau lors de sa libération.
Verrouillage et déverrouillage synchrones
Ensuite, nous montrons la méthode de verrouillage bloquante lock
qui ne peut être appelée que depuis un thread travailleur :
lock() {
while (true) {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* ancienne valeur >>> */ AsyncLock.UNLOCKED,
/* nouvelle valeur >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
return;
}
Atomics.wait(this.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED); // <<< valeur attendue au départ
}
}
Lorsqu'un thread appelle lock()
, il tente d'abord d'obtenir le verrou en utilisant Atomics.compareExchange
pour changer l'état du verrou de UNLOCKED
à LOCKED
. Atomics.compareExchange
essaie de réaliser ce changement d'état de manière atomique et retourne la valeur originale de l'emplacement mémoire. Si la valeur originale était UNLOCKED
, on sait que le changement d'état a réussi et le thread a acquis le verrou. Rien de plus n'est nécessaire.
Si Atomics.compareExchange
n'arrive pas à changer l'état du verrou, un autre thread doit détenir le verrou. Ainsi, ce thread essaie Atomics.wait
afin d'attendre que l'autre thread libère le verrou. Si l'emplacement mémoire contient encore la valeur attendue (dans ce cas, AsyncLock.LOCKED
), l'appel à Atomics.wait
bloquera le thread et l'appel Atomics.wait
ne retournera que lorsqu'un autre thread appelle Atomics.notify
.
La méthode unlock
met le verrou dans l'état UNLOCKED
et appelle Atomics.notify
pour réveiller un autre thread en attente du verrou. Le changement d'état est toujours censé réussir, puisque ce thread détient le verrou, et personne d'autre ne devrait appeler unlock()
entre-temps.
unlock() {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* ancienne valeur >>> */ AsyncLock.LOCKED,
/* nouvelle valeur >>> */ AsyncLock.UNLOCKED);
if (oldValue != AsyncLock.LOCKED) {
throw new Error('Tentative de déverrouillage sans posséder le mutex');
}
Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}
Le cas simple se déroule comme suit : le verrou est libre et le thread T1 l'acquiert en changeant l'état du verrou avec Atomics.compareExchange
. Le thread T2 essaie d'acquérir le verrou en appelant Atomics.compareExchange
, mais il n'arrive pas à changer l'état du verrou. T2 appelle alors Atomics.wait
, ce qui bloque le thread. À un moment donné, T1 libère le verrou et appelle Atomics.notify
. Cela fait que l'appel Atomics.wait
dans T2 retourne 'ok'
, réveillant T2. T2 essaie à nouveau d'acquérir le verrou, et cette fois-ci réussit.
Il existe également 2 cas particuliers possibles — qui démontrent la raison pour laquelle Atomics.wait
et Atomics.waitAsync
vérifient une valeur spécifique à l'index :
- T1 détient le verrou et T2 essaie de l'obtenir. Tout d'abord, T2 essaie de changer l'état du verrou avec
Atomics.compareExchange
, mais échoue. Mais ensuite, T1 libère le verrou avant que T2 ne parvienne à appelerAtomics.wait
. Lorsque T2 appelleAtomics.wait
, il retourne immédiatement avec la valeur'not-equal'
. Dans ce cas, T2 continue avec l'itération suivante de la boucle, essayant à nouveau d'acquérir le verrou. - T1 détient le verrou et T2 attend avec
Atomics.wait
. T1 libère le verrou — T2 se réveille (l'appelAtomics.wait
retourne) et essaie d'appelerAtomics.compareExchange
pour acquérir le verrou, mais un autre thread T3 a été plus rapide et l'a déjà obtenu. Donc, l'appel àAtomics.compareExchange
échoue, et T2 appelle à nouveauAtomics.wait
, attendant que T3 libère le verrou.
En raison de ce dernier cas particulier, le mutex n'est pas « équitable ». Il est possible que T2 ait attendu que le verrou soit libéré, mais que T3 arrive et l'obtienne immédiatement. Une implémentation plus réaliste de verrou pourrait utiliser plusieurs états pour différencier entre « verrouillé » et « verrouillé avec contention ».
Verrouillage asynchrone
La méthode non bloquante executeLocked
est appelable depuis le thread principal, contrairement à la méthode bloquante lock
. Elle reçoit une fonction de rappel comme son seul paramètre et planifie l'exécution de cette fonction une fois qu'elle a acquis le verrou avec succès.
executeLocked(f) {
const self = this;
async function tryGetLock() {
while (true) {
const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
/* ancienne valeur >>> */ AsyncLock.UNLOCKED,
/* nouvelle valeur >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
f();
self.unlock();
return;
}
const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED);
// ^ valeur attendue au départ
await result.value;
}
}
tryGetLock();
}
La fonction interne tryGetLock
essaie d'abord d'obtenir le verrou avec Atomics.compareExchange
, comme auparavant. Si cela change avec succès l'état du verrou, il peut exécuter le callback, déverrouiller le verrou et retourner.
Si Atomics.compareExchange
ne parvient pas à obtenir le verrou, nous devons réessayer lorsque le verrou est probablement libre. Nous ne pouvons pas bloquer et attendre que le verrou devienne libre - à la place, nous programmons un nouvel essai en utilisant Atomics.waitAsync
et la Promesse qu'il renvoie.
Si nous avons réussi à démarrer Atomics.waitAsync
, la Promesse renvoyée se résout lorsque le thread détenant le verrou fait Atomics.notify
. Ensuite, le thread qui attendait le verrou essaie d'obtenir à nouveau le verrou, comme auparavant.
Les mêmes cas particuliers (le verrou se libérant entre l'appel de Atomics.compareExchange
et l'appel de Atomics.waitAsync
, ainsi que le verrou se réacquérant entre la résolution de la Promesse et l'appel de Atomics.compareExchange
) sont également possibles dans la version asynchrone, donc le code doit les gérer de manière robuste.
Conclusion
Dans cet article, nous avons montré comment utiliser les primitives de synchronisation Atomics.wait
, Atomics.waitAsync
, et Atomics.notify
, pour implémenter un mutex utilisable à la fois dans le thread principal et dans les threads de travail.