Перейти к основному содержимому

Atomics.wait, Atomics.notify, Atomics.waitAsync

· 7 мин. чтения
[Марья Хёльтта](https://twitter.com/marjakh), неблокирующий блогер

Atomics.wait и Atomics.notify являются низкоуровневыми примитивами синхронизации, полезными для реализации мьютексов и других способов синхронизации. Однако, поскольку Atomics.wait блокирует выполнение, его невозможно вызвать в основном потоке (попытка сделать это вызывает TypeError).

Начиная с версии 8.7, V8 поддерживает неблокирующую версию, Atomics.waitAsync, которая также доступна в основном потоке.

В этом посте мы объясняем, как использовать эти низкоуровневые API для реализации мьютекса, который работает как синхронно (для рабочих потоков), так и асинхронно (для рабочих потоков или основного потока).

Atomics.wait и Atomics.waitAsync принимают следующие параметры:

  • buffer: массив Int32Array или BigInt64Array, поддерживаемый SharedArrayBuffer
  • index: допустимый индекс внутри массива
  • expectedValue: значение, которого мы ожидаем в указанной области памяти (buffer, index)
  • timeout: тайм-аут в миллисекундах (необязательный, по умолчанию Infinity)

Возвращаемое значение Atomics.wait — строка. Если в области памяти не содержится ожидаемого значения, Atomics.wait немедленно возвращает значение 'not-equal'. В противном случае поток блокируется до тех пор, пока другой поток не вызовет Atomics.notify с той же областью памяти или пока не истечет тайм-аут. В первом случае Atomics.wait возвращает значение 'ok', во втором — значение 'timed-out'.

Atomics.notify принимает следующие параметры:

  • массив Int32Array или BigInt64Array, поддерживаемый SharedArrayBuffer
  • индекс (допустимый внутри массива)
  • количество ожидающих потоков, которых нужно уведомить (необязательный, по умолчанию Infinity)

Уведомляет указанное количество ожиданий в порядке FIFO, ожидающих на области памяти, описанной (buffer, index). Если существует несколько ожидающих вызовов Atomics.wait или Atomics.waitAsync для одной и той же области, все они находятся в одной очереди FIFO.

В отличие от Atomics.wait, Atomics.waitAsync всегда возвращает значение немедленно. Возвращаемое значение — одно из следующих:

  • { async: false, value: 'not-equal' } (если в области памяти не содержится ожидаемого значения)
  • { async: false, value: 'timed-out' } (только для немедленного тайм-аута 0)
  • { async: true, value: promise }

Promise позже может быть разрешен строковым значением 'ok' (если Atomics.notify был вызван для той же области памяти) или 'timed-out' (если тайм-аут истек). Promise никогда не отклоняется.

Следующий пример демонстрирует базовое использование Atomics.waitAsync:

const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
// | | ^ тайм-аут (необязательный)
// | ^ ожидаемое значение
// ^ индекс

if (result.value === 'not-equal') {
// Значение в SharedArrayBuffer не соответствует ожидаемому.
} else {
result.value instanceof Promise; // true
result.value.then(
(value) => {
if (value == 'ok') { /* уведомление получено */ }
else { /* значение равно 'timed-out' */ }
});
}

// В этом потоке или в другом потоке:
Atomics.notify(i32a, 0);

Далее мы покажем, как реализовать мьютекс, который можно использовать как синхронно, так и асинхронно. Реализация синхронной версии мьютекса ранее обсуждалась, например, в этом посте.

В примере мы не используем параметр тайм-аута в Atomics.wait и Atomics.waitAsync. Этот параметр можно использовать для реализации переменных условия с тайм-аутом.

Наш класс мьютекса, AsyncLock, работает с SharedArrayBuffer и реализует следующие методы:

  • lock — блокирует поток до тех пор, пока не удастся захватить мьютекс (используется только в рабочем потоке)
  • unlock — освобождает мьютекс (аналог lock)
  • executeLocked(callback) — неблокирующая блокировка, может использоваться основным потоком; планирует выполнение callback, как только удастся захватить блокировку

Посмотрим, как можно реализовать каждый из них. Определение класса включает константы и конструктор, который принимает SharedArrayBuffer в качестве параметра.

class AsyncLock {
static INDEX = 0;
static UNLOCKED = 0;
static LOCKED = 1;

constructor(sab) {
this.sab = sab;
this.i32a = new Int32Array(sab);
}

lock() {
/* … */
}

unlock() {
/* … */
}

executeLocked(f) {
/* … */
}
}

Здесь i32a[0] содержит либо значение LOCKED, либо UNLOCKED. Это также место ожидания для Atomics.wait и Atomics.waitAsync. Класс AsyncLock гарантирует следующие инварианты:

  1. Если i32a[0] == LOCKED, и поток начинает ожидание (либо через Atomics.wait, либо через Atomics.waitAsync) на i32a[0], он будет уведомлен в конечном итоге.
  2. После получения уведомления поток пытается захватить блокировку. Если ему удается захватить блокировку, он снова уведомляет, когда освобождает ее.

Синхронная блокировка и разблокировка

Затем мы покажем блокирующий метод lock, который можно вызывать только из рабочего потока:

lock() {
while (true) {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* старое значение >>> */ AsyncLock.UNLOCKED,
/* новое значение >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
return;
}
Atomics.wait(this.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED); // <<< ожидаемое значение в начале
}
}

Когда поток вызывает lock(), сначала он пытается установить блокировку, используя Atomics.compareExchange, чтобы изменить состояние блокировки с UNLOCKED на LOCKED. Atomics.compareExchange пытается выполнить изменение состояния атомарно и возвращает исходное значение в ячейке памяти. Если исходное значение было UNLOCKED, мы знаем, что изменение состояния прошло успешно, и поток получил блокировку. Ничего больше не требуется.

Если Atomics.compareExchange не смог изменить состояние блокировки, другой поток должен держать блокировку. Таким образом, этот поток пытается выполнить Atomics.wait, чтобы дождаться освобождения блокировки другим потоком. Если ячейка памяти все еще содержит ожидаемое значение (в данном случае AsyncLock.LOCKED), вызов Atomics.wait заблокирует поток, и вызов Atomics.wait завершится только тогда, когда другой поток вызовет Atomics.notify.

Метод unlock переводит блокировку в состояние UNLOCKED и вызывает Atomics.notify, чтобы разбудить одного ожидающего, который ожидал блокировку. Изменение состояния всегда ожидается успешным, так как этот поток держит блокировку, и никто другой не должен вызывать unlock() в это время.

unlock() {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* старое значение >>> */ AsyncLock.LOCKED,
/* новое значение >>> */ AsyncLock.UNLOCKED);
if (oldValue != AsyncLock.LOCKED) {
throw new Error('Попытка разблокировки без удержания мьютекса');
}
Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}

Простой случай выглядит следующим образом: блокировка свободна, и поток T1 захватывает её, изменяя состояние блокировки с помощью Atomics.compareExchange. Поток T2 пытается захватить блокировку, вызывая Atomics.compareExchange, но не удается изменить состояние блокировки. Затем T2 вызывает Atomics.wait, что блокирует поток. В какой-то момент T1 освобождает блокировку и вызывает Atomics.notify. Это заставляет вызов Atomics.wait в T2 вернуть 'ok', пробуждая T2. T2 затем снова пытается захватить блокировку и на этот раз успешно.

Существуют также 2 возможных крайних случая — они демонстрируют причину проверки Atomics.wait и Atomics.waitAsync на конкретное значение по индексу:

  • T1 держит блокировку, а T2 пытается её получить. Сначала T2 пытается изменить состояние блокировки с помощью Atomics.compareExchange, но не удается. Затем T1 освобождает блокировку, прежде чем T2 успевает вызвать Atomics.wait. Когда T2 вызывает Atomics.wait, он сразу возвращает значение 'not-equal'. В этом случае T2 продолжает следующий цикл итерации, пытаясь снова захватить блокировку.
  • T1 держит блокировку, а T2 ждет её с помощью Atomics.wait. T1 освобождает блокировку — T2 просыпается (вызов Atomics.wait возвращается) и пытается выполнить Atomics.compareExchange, чтобы захватить блокировку, но другой поток T3 оказался быстрее и уже захватил блокировку. Таким образом, вызов Atomics.compareExchange не удается захватить блокировку, и T2 снова вызывает Atomics.wait, блокируясь до освобождения блокировки T3.

Из-за последнего крайнего случая мьютекс не является "справедливым". Возможно, T2 ожидал освобождения блокировки, но T3 пришел и сразу же ее получил. Более реалистичная реализация блокировки может использовать несколько состояний для различения между «заблокировано» и «заблокировано с конкуренцией».

Асинхронная блокировка

Неблокирующий метод executeLocked может быть вызван из главного потока, в отличие от блокирующего метода lock. Он принимает функцию обратного вызова в качестве единственного параметра и планирует выполнение этой функции после успешного захвата блокировки.

executeLocked(f) {
const self = this;

async function tryGetLock() {
while (true) {
const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
/* старое значение >>> */ AsyncLock.UNLOCKED,
/* новое значение >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
f();
self.unlock();
return;
}
const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED);
// ^ ожидаемое значение в начале
await result.value;
}
}

tryGetLock();
}

Внутренняя функция tryGetLock сначала пытается получить блокировку с помощью Atomics.compareExchange, как и раньше. Если это успешно изменяет состояние блокировки, она может выполнить обратный вызов, разблокировать блокировку и вернуться.

Если Atomics.compareExchange не удается получить блокировку, мы должны попробовать снова, когда блокировка, возможно, станет свободной. Мы не можем блокировать и ждать, пока блокировка станет свободной — вместо этого мы назначаем новую попытку с помощью Atomics.waitAsync и возвращенного Promise.

Если нам удалось запустить Atomics.waitAsync, возвращенный Promise выполняется, когда поток, удерживающий блокировку, вызывает Atomics.notify. Затем поток, ожидавший блокировки, пытается снова получить блокировку, как и раньше.

Те же крайние случаи (освобождение блокировки между вызовом Atomics.compareExchange и вызовом Atomics.waitAsync, а также повторное получение блокировки между выполнением Promise и вызовом Atomics.compareExchange) возможны и в асинхронной версии, поэтому код должен справляться с ними надежным образом.

Заключение

В этом посте мы показали, как использовать примитивы синхронизации Atomics.wait, Atomics.waitAsync и Atomics.notify, чтобы реализовать мьютекс, который может использоваться как в главном потоке, так и в рабочих потоках.

Поддержка функций

Atomics.wait и Atomics.notify

Atomics.waitAsync