Implement an async scheduler with max concurrency
Maintain a counter of running tasks and a FIFO queue of pending ones. `schedule(task)` returns a Promise. If running < max, fire it; else push to queue. On task completion, drain one from the queue. Variants: per-key concurrency, retry, priority, AbortSignal cancellation.
Minimal implementation
function makeScheduler(max) {
let running = 0;
const queue = [];
function next() {
if (running >= max || queue.length === 0) return;
const { task, resolve, reject } = queue.shift();
running++;
Promise.resolve()
.then(() => task())
.then(resolve, reject)
.finally(() => { running--; next(); });
}
return function schedule(task) {
return new Promise((resolve, reject) => {
queue.push({ task, resolve, reject });
next();
});
};
}
// Usage
const limit = makeScheduler(3);
const results = await Promise.all(items.map((i) => limit(() => fetch(`/api/${i}`))));Walk-through
runningtracks active tasks.queueholds pending tasks with their outer Promise's resolve/reject.schedulereturns a Promise; pushes the task; tries to fire.nextfires up tomaxconcurrent tasks. On completion, decrements and tries the next.
The Promise.resolve().then(() => task()) wrap ensures errors thrown synchronously become rejections.
Priority variant
Replace FIFO with a priority queue (binary heap or sorted-insert):
queue.push({ task, priority, resolve, reject });
queue.sort((a, b) => b.priority - a.priority); // simple; heap better at scalePer-key concurrency
function perKeyScheduler(maxPerKey) {
const counts = new Map();
const queues = new Map();
// identical kernel, keyed on a function arg
}AbortSignal cancellation
function schedule(task, { signal } = {}) {
return new Promise((resolve, reject) => {
if (signal?.aborted) return reject(new DOMException("Aborted", "AbortError"));
const entry = { task, resolve, reject };
queue.push(entry);
signal?.addEventListener("abort", () => {
const i = queue.indexOf(entry);
if (i >= 0) { queue.splice(i, 1); reject(new DOMException("Aborted", "AbortError")); }
}, { once: true });
next();
});
}In-flight tasks can't be canceled without cooperation — propagate the signal to the task itself if it accepts one.
Retry per task
async function retry(fn, attempts = 3, baseMs = 200) {
let last;
for (let i = 0; i < attempts; i++) {
try { return await fn(); }
catch (e) { last = e; await new Promise(r => setTimeout(r, baseMs * 2 ** i)); }
}
throw last;
}
limit(() => retry(() => fetch(url)));Compose retry inside the task; the scheduler doesn't need to know.
Edge cases
- Empty input — schedule never called; nothing happens.
- Synchronous throw inside task — caught by the Promise.resolve wrap.
- Recursive schedules (task that schedules more) — fine; queue grows.
- Backpressure — caller can wait on individual schedules; the queue itself is unbounded; bound it if input is from a stream.
Why interviewers ask
Tests Promises + queues + state machines + edge cases (cancellation, errors). Distillation of how a real connection pool / job runner works.
Interview framing
"Counter of running tasks + FIFO queue. schedule(task) returns a Promise; pushes; tries to fire. next() fires up to max and on completion drains one from the queue. Wrap the task in Promise.resolve().then(() => task()) so a synchronous throw becomes a rejection. Variants: priority queue, per-key concurrency, AbortSignal for queue-only cancellation (in-flight needs the task to honor signal). Compose retry inside the task — the scheduler stays simple. Bound the queue if input is unbounded to avoid memory growth."
Follow-up questions
- •How would you add priority?
- •How would AbortSignal cancel an in-flight task?
- •Compare with p-limit's behavior.
Common mistakes
- •Forgetting to decrement on failure (queue stalls).
- •Not catching sync throws.
- •Unbounded queue from a stream.
Performance considerations
- •Queue ops O(1) (push/shift). Priority queue O(log n) with heap.
Edge cases
- •Sync throw in task.
- •Recursive schedules.
- •Cancellation of in-flight.
Real-world examples
- •p-limit, async-pool, BullMQ workers, browser fetch concurrency caps.