Atomics.wait, Atomics.notify, Atomics.waitAsync
Atomics.wait и Atomics.notify являются низкоуровневыми примитивами синхронизации, полезными для реализации мьютексов и других способов синхронизации. Однако, поскольку Atomics.wait блокирует выполнение, его невозможно вызвать в основном потоке (попытка сделать это вызывает TypeError).
Начиная с версии 8.7, V8 поддерживает неблокирующую версию, Atomics.waitAsync, которая также доступна в основном потоке.
В этом посте мы объясняем, как использовать эти низкоуровневые API для реализации мьютекса, который работает как синхронно (для рабочих потоков), так и асинхронно (для рабочих потоков или основного потока).
Atomics.wait и Atomics.waitAsync принимают следующие параметры:
buffer
: массивInt32Array
илиBigInt64Array
, поддерживаемый SharedArrayBufferindex
: допустимый индекс внутри массиваexpectedValue
: значение, которого мы ожидаем в указанной области памяти(buffer, index)
timeout
: тайм-аут в миллисекундах (необязательный, по умолчаниюInfinity
)
Возвращаемое значение Atomics.wait — строка. Если в области памяти не содержится ожидаемого значения, Atomics.wait немедленно возвращает значение 'not-equal'. В противном случае поток блокируется до тех пор, пока другой поток не вызовет Atomics.notify с той же областью памяти или пока не истечет тайм-аут. В первом случае Atomics.wait возвращает значение 'ok', во втором — значение 'timed-out'.
Atomics.notify принимает следующие параметры:
- массив
Int32Array
илиBigInt64Array
, поддерживаемый SharedArrayBuffer - индекс (допустимый внутри массива)
- количество ожидающих потоков, которых нужно уведомить (необязательный, по умолчанию
Infinity
)
Уведомляет указанное количество ожиданий в порядке FIFO, ожидающих на области памяти, описанной (buffer, index)
. Если существует несколько ожидающих вызовов Atomics.wait или Atomics.waitAsync для одной и той же области, все они находятся в одной очереди FIFO.
В отличие от Atomics.wait, Atomics.waitAsync всегда возвращает значение немедленно. Возвращаемое значение — одно из следующих:
{ async: false, value: 'not-equal' }
(если в области памяти не содержится ожидаемого значения){ async: false, value: 'timed-out' }
(только для немедленного тайм-аута 0){ async: true, value: promise }
Promise позже может быть разрешен строковым значением 'ok' (если Atomics.notify был вызван для той же области памяти) или 'timed-out' (если тайм-аут истек). Promise никогда не отклоняется.
Следующий пример демонстрирует базовое использование Atomics.waitAsync:
const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
// | | ^ тайм-аут (необязательный)
// | ^ ожидаемое значение
// ^ индекс
if (result.value === 'not-equal') {
// Значение в SharedArrayBuffer не соответствует ожидаемому.
} else {
result.value instanceof Promise; // true
result.value.then(
(value) => {
if (value == 'ok') { /* уведомление получено */ }
else { /* значение равно 'timed-out' */ }
});
}
// В этом потоке или в другом потоке:
Atomics.notify(i32a, 0);
Далее мы покажем, как реализовать мьютекс, который можно использовать как синхронно, так и асинхронно. Реализация синхронной версии мьютекса ранее обсуждалась, например, в этом посте.
В примере мы не используем параметр тайм-аута в Atomics.wait и Atomics.waitAsync. Этот параметр можно использовать для реализации переменных условия с тайм-аутом.
Наш класс мьютекса, AsyncLock, работает с SharedArrayBuffer и реализует следующие методы:
lock
— блокирует поток до тех пор, пока не удастся захватить мьютекс (используется только в рабочем потоке)unlock
— освобождает мьютекс (аналогlock
)executeLocked(callback)
— неблокирующая блокировка, может использоваться основным потоком; планирует выполнениеcallback
, как только удастся захватить блокировку
Посмотрим, как можно реализовать каждый из них. Определение класса включает константы и конструктор, который принимает SharedArrayBuffer
в качестве параметра.
class AsyncLock {
static INDEX = 0;
static UNLOCKED = 0;
static LOCKED = 1;
constructor(sab) {
this.sab = sab;
this.i32a = new Int32Array(sab);
}
lock() {
/* … */
}
unlock() {
/* … */
}
executeLocked(f) {
/* … */
}
}
Здесь i32a[0]
содержит либо значение LOCKED
, либо UNLOCKED
. Это также место ожидания для Atomics.wait
и Atomics.waitAsync
. Класс AsyncLock
гарантирует следующие инварианты:
- Если
i32a[0] == LOCKED
, и поток начинает ожидание (либо черезAtomics.wait
, либо черезAtomics.waitAsync
) наi32a[0]
, он будет уведомлен в конечном итоге. - После получения уведомления поток пытается захватить блокировку. Если ему удается захватить блокировку, он снова уведомляет, когда освобождает ее.
Синхронная блокировка и разблокировка
Затем мы покажем блокирующий метод lock
, который можно вызывать только из рабочего потока:
lock() {
while (true) {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* старое значение >>> */ AsyncLock.UNLOCKED,
/* новое значение >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
return;
}
Atomics.wait(this.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED); // <<< ожидаемое значение в начале
}
}
Когда поток вызывает lock()
, сначала он пытается установить блокировку, используя Atomics.compareExchange
, чтобы изменить состояние блокировки с UNLOCKED
на LOCKED
. Atomics.compareExchange
пытается выполнить изменение состояния атомарно и возвращает исходное значение в ячейке памяти. Если исходное значение было UNLOCKED
, мы знаем, что изменение состояния прошло успешно, и поток получил блокировку. Ничего больше не требуется.
Если Atomics.compareExchange
не смог изменить состояние блокировки, другой поток должен держать блокировку. Таким образом, этот поток пытается выполнить Atomics.wait
, чтобы дождаться освобождения блокировки другим потоком. Если ячейка памяти все еще содержит ожидаемое значение (в данном случае AsyncLock.LOCKED
), вызов Atomics.wait
заблокирует поток, и вызов Atomics.wait
завершится только тогда, когда другой поток вызовет Atomics.notify
.
Метод unlock
переводит блокировку в состояние UNLOCKED
и вызывает Atomics.notify
, чтобы разбудить одного ожидающего, который ожидал блокировку. Изменение состояния всегда ожидается успешным, так как этот поток держит блокировку, и никто другой не должен вызывать unlock()
в это время.
unlock() {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* старое значение >>> */ AsyncLock.LOCKED,
/* новое значение >>> */ AsyncLock.UNLOCKED);
if (oldValue != AsyncLock.LOCKED) {
throw new Error('Попытка разблокировки без удержания мьютекса');
}
Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}
Простой случай выглядит следующим образом: блокировка свободна, и поток T1 захватывает её, изменяя состояние блокировки с помощью Atomics.compareExchange
. Поток T2 пытается захватить блокировку, вызывая Atomics.compareExchange
, но не удается изменить состояние блокировки. Затем T2 вызывает Atomics.wait
, что блокирует поток. В какой-то момент T1 освобождает блокировку и вызывает Atomics.notify
. Это заставляет вызов Atomics.wait
в T2 вернуть 'ok'
, пробуждая T2. T2 затем снова пытается захватить блокировку и на этот раз успешно.
Существуют также 2 возможных крайних случая — они демонстрируют причину проверки Atomics.wait
и Atomics.waitAsync
на конкретное значение по индексу:
- T1 держит блокировку, а T2 пытается её получить. Сначала T2 пытается изменить состояние блокировки с помощью
Atomics.compareExchange
, но не удается. Затем T1 освобождает блокировку, прежде чем T2 успевает вызватьAtomics.wait
. Когда T2 вызываетAtomics.wait
, он сразу возвращает значение'not-equal'
. В этом случае T2 продолжает следующий цикл итерации, пытаясь снова захватить блокировку. - T1 держит блокировку, а T2 ждет её с помощью
Atomics.wait
. T1 освобождает блокировку — T2 просыпается (вызовAtomics.wait
возвращается) и пытается выполнитьAtomics.compareExchange
, чтобы захватить блокировку, но другой поток T3 оказался быстрее и уже захватил блокировку. Таким образом, вызовAtomics.compareExchange
не удается захватить блокировку, и T2 снова вызываетAtomics.wait
, блокируясь до освобождения блокировки T3.
Из-за последнего крайнего случая мьютекс не является "справедливым". Возможно, T2 ожидал освобождения блокировки, но T3 пришел и сразу же ее получил. Более реалистичная реализация блокировки может использовать несколько состояний для различения между «заблокировано» и «заблокировано с конкуренцией».
Асинхронная блокировка
Неблокирующий метод executeLocked
может быть вызван из главного потока, в отличие от блокирующего метода lock
. Он принимает функцию обратного вызова в качестве единственного параметра и планирует выполнение этой функции после успешного захвата блокировки.
executeLocked(f) {
const self = this;
async function tryGetLock() {
while (true) {
const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
/* старое значение >>> */ AsyncLock.UNLOCKED,
/* новое значение >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
f();
self.unlock();
return;
}
const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED);
// ^ ожидаемое значение в начале
await result.value;
}
}
tryGetLock();
}
Внутренняя функция tryGetLock
сначала пытается получить блокировку с помощью Atomics.compareExchange
, как и раньше. Если это успешно изменяет состояние блокировки, она может выполнить обратный вызов, разблокировать блокировку и вернуться.
Если Atomics.compareExchange
не удается получить блокировку, мы должны попробовать снова, когда блокировка, возможно, станет свободной. Мы не можем блокировать и ждать, пока блокировка станет свободной — вместо этого мы назначаем новую попытку с помощью Atomics.waitAsync
и возвращенного Promise.
Если нам удалось запустить Atomics.waitAsync
, возвращенный Promise выполняется, когда поток, удерживающий блокировку, вызывает Atomics.notify
. Затем поток, ожидавший блокировки, пытается снова получить блокировку, как и раньше.
Те же крайние случаи (освобождение блокировки между вызовом Atomics.compareExchange
и вызовом Atomics.waitAsync
, а также повторное получение блокировки между выполнением Promise и вызовом Atomics.compareExchange
) возможны и в асинхронной версии, поэтому код должен справляться с ними надежным образом.
Заключение
В этом посте мы показали, как использовать примитивы синхронизации Atomics.wait
, Atomics.waitAsync
и Atomics.notify
, чтобы реализовать мьютекс, который может использоваться как в главном потоке, так и в рабочих потоках.