async/concurrency.ts

/**
 * Asynchronous tools.
 *
 * @module async
 * @license Apache-2.0
 * @copyright Mat. 2018-present
 */

/* eslint-disable @typescript-eslint/array-type */
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/prefer-function-type */

import { race } from "../async/tools";
import { random } from "../string/gen";
import { timeUnit } from "../utils/misc";




/**
 * Mutual exclusion for asynchronous functions.
 *
 * Example:
 *
 * ```
 * const mutex = async.createMutex()
 *
 * let f = async m => {
 *     let val = await m.lock()
 *     return `Freed with val: ${val}`
 * }
 *
 * f(mutex).then(utils.to_("success")).catch(utils.to_("failure"))
 *
 * mutex.resolve(42)  //  mutex.reject("ERROR")
 * ```
 *
 * @function createMutex
 * @returns {Object} lock(), resolve(), reject()
 */
export const createMutex = <T>(): {
    lock: () => Promise<T>;
    resolve: (value: T | PromiseLike<T>) => void;
    reject: (reason?: any) => void;
} => {
    let
        resolve = (_v: T | PromiseLike<T>): void => { /* no-op */ },
        reject = (_r?: any): void => {/* no-op */ };
    const promise = new Promise<T>(
        (res, rej) => { resolve = res; reject = rej; },
    );

    return {
        lock: () => promise,
        resolve, reject,
    };
};




/**
 * Create mutex with watchdog (10 minutes by default).
 *
 * `barrier`, in contrast to `mutex`, is preventing node process from exiting.
 *
 * Example:
 * ```
 * const barrier = createBarrier<void>();
 *
 * setTimeout(() => { barrier.resolve("Released!"); }, 2000);
 *
 * await barrier.lock();
 * ```
 *
 * @function createTimedBarrier
 * @param [releaseTimeout=10*timeUnit.minute]
 * @returns {Object} lock(), resolve(), reject()
 */
export const createTimedBarrier = <T>(
    releaseTimeout = 10 * timeUnit.minute,
): ReturnType<typeof createMutex<T>> => {
    const mutex = createMutex<T>();
    let watchdog: ReturnType<typeof setTimeout> | undefined = undefined;
    return {
        ...mutex,
        lock: () => {
            watchdog = setTimeout(() => {
                mutex.reject(new Error("timeout"));
            }, releaseTimeout);
            return mutex.lock();
        },
        resolve: (v) => {
            if (watchdog) clearTimeout(watchdog);
            return mutex.resolve(v);
        },
        reject: (r) => {
            if (watchdog) clearTimeout(watchdog);
            return mutex.reject(r);
        },
    };
};




/**
 * PromisePool fulfilled result type.
 */
export interface PromisePoolFulfilledResult<T> {
    status: "fulfilled";
    value: T;
}




/**
 * PromisePool rejected result type.
 */
export interface PromisePoolRejectedResult<E> {
    status: "rejected";
    reason: E;
}




/**
 * PromisePool empty result type.
 */
export interface PromisePoolEmptyResult {
    status: "empty";
}




/**
 * PromisePool result type.
 */
export type PromisePoolResult<T, E = any> =
    | PromisePoolFulfilledResult<T>
    | PromisePoolRejectedResult<E>
    | PromisePoolEmptyResult;




/**
 * PromisePool properties type.
 */
export interface PromisePoolProps {
    id: string;
}




/**
 * PromisePool internal interface.
 */
interface PromisePoolSlotType<T> {
    id: string;
    value: T;
}




/**
 * Imperative promise pool.
 *
 * ```
 * const pool = promisePool(128);
 * for (element of dataSeries) {
 *     let r = await pool.exec(async () => {
 *         // ...
 *     });
 *     if (r.status === "fulfilled") {
 *         // ...
 *     } else if (r.status === "rejected") {
 *         // ...
 *     }
 * }
 * let rest = await pool.finish();
 * ```
 *
 * @function promisePool
 * @param [poolSize=64]
 * @returns {Object} exec(t), finish()
 */
export const promisePool = <T, E = any>(poolSize = 64): {
    exec: (
        task: { (props: PromisePoolProps): Promise<T> },
    ) => Promise<PromisePoolResult<T, E>>;
    finish: () => Promise<Array<PromisePoolResult<T, E>>>;
} => {
    const slots = {} as Record<string, Promise<PromisePoolSlotType<T>>>;

    return {
        // execute task in an "empty slot"
        // awaits (suspends caller) if there are no more empty slots available
        exec: async (task) => {
            const
                m = createMutex<PromisePoolSlotType<T>>(),
                id = random(8) + String(Date.now());

            // create new slot with locked mutex
            slots[id] = m.lock();

            // convert slots object to array
            const locks = Object.values(slots);

            // run the task (don't wait for it to finish in this context)
            Promise.resolve()
                .then(() => task({ id }))
                .then(
                    (value) => m.resolve({ id, value }),
                    (reason) => m.reject({ id, reason }),
                );

            // if poolSize is filled up then await for some resources
            if (locks.length === poolSize) {
                try {
                    const winner = await race(...locks);
                    delete slots[winner.id];
                    return { status: "fulfilled", value: winner.value };
                } catch (winningErr: any) {
                    delete slots[winningErr.id];
                    return { status: "rejected", reason: winningErr.reason };
                }
            }

            // resolve immediately otherwise
            return { status: "empty" };
        },

        // wait until all tasks are finished
        finish: async () =>
            Object.values(slots).length > 0 ?
                (await Promise.allSettled(
                    Object.values(slots),
                )).map((r) =>
                    r.status === "fulfilled" ? ({
                        status: r.status,
                        value: r.value.value,
                    }) : ({
                        status: r.status,
                        reason: r.reason.reason,
                    }),
                ) as PromisePoolResult<T, E>[] : [],
    };
};