Управление параллелизмом задач в JavaScript: разумные подходы вместо `Promise.all`

javascript
// Неоптимизированный подход, который может вызвать проблемы:
const urls = [/* 1000 URL */];
const results = await Promise.all(urls.map(fetchData)); // Риск перегрузки системы

Проблематика слепого параллелизма

Promise.all - идеальный инструмент для параллельного выполнения независимых задач... пока их количество измеряется десятками. При работе с большими наборами операций (500+ запросов API, обработка тысяч файлов, массовые операции с БД) вы рискуете:

  1. Исчерпать лимиты соединений (в браузерах обычно 6-8 параллельных запросов на домен)
  2. Перегрузить сервер резкими скачками нагрузки
  3. Вызвать переполнение памяти из-за одномоментной обработки огромных данных
  4. Нарушить политики API (rate limits)
  5. Полностью блокировать продвижение потоков выполнения при одной ошибке

Стандартные решения начинающих разработчиков типа "разбить на пачки и выполнять последовательно" часто неоптимальны:

javascript
// Неудачное решение: последовательные пачки (-производительность)
for (const batch of chunk(urls, 10)) {
  await Promise.all(batch.map(fetchData));
}

Здесь мы теряем время на ожидание завершения всей пачки целиком, вместо грамотного параллелизма.

Разумный параллелизм: контролируемое исполнение

Классический паттерн с пулом промисов

Основная идея — предоставить разработчику контроль над максимальным количеством одновременно выполняемых операций:

javascript
async function promisePool(tasks, concurrency = 10) {
  const results = [];
  const executing = new Set();
  
  for (const [index, task] of tasks.entries()) {
    const promise = task()
      .then(result => ({ index, result }))
      .catch(error => ({ index, error }));
    
    executing.add(promise);
    const clean = () => executing.delete(promise);
    promise.then(clean).catch(clean);
    
    if (executing.size >= concurrency) {
      await Promise.race(executing);
    }
  }
  
  // Дождаться оставшихся
  const settled = await Promise.allSettled(executing);
  return settled
    .filter(({status}) => status === 'fulfilled')
    .map(({value}) => value)
    .sort((a, b) => a.index - b.index)
    .map(({result}) => result);
}

Разберём ключевые элементы реализации:

  1. executing Set отслеживает выполняющиеся промисы
  2. Promise.race в цикле ожидает завершения хотя бы одной задачи перед запуском новой
  3. Делегирование обработки ошибок через catch гарантирует стабильность работы цикла
  4. Индексирование результатов сохраняет порядок оригинального массива
  5. Заключительный Promise.allSettled обрабатывает остаток после завершения основного цикла

Бенчмарк на 1000 задач с задержкой 100мс:

  • Promise.all: полное выполнение за ~100мс, пиковая нагрузка 1000 одновременных операций
  • Пулинг с concurrency=10: ~10 секунд, стабильная нагрузка ~10 операций

Усовершенствованный пул для сложных сценариев

Оригинальное решение имеет недостатки в реальной работе:

  • Нет обработки отказа запуска при таймауте
  • Отсутствует контроль прогресса
  • Невозможность динамической регулировки скорости

Улучшенная версия с дополнительными возможностями:

javascript
class PromisePool {
  constructor(tasks, options = {}) {
    this.tasks = [...tasks];
    this.concurrency = options.concurrency || 10;
    this.timeout = options.timeout || 0;
    this.active = 0;
    this.results = new Array(tasks.length);
    this.pending = new Set();
    this.shouldStop = false;
  }

  async start(onProgress) {
    if (this.started) throw new Error('Pool already started');
    this.started = true;

    let index = 0;
    const timers = new Map();

    const next = async () => {
      if (this.shouldStop || index >= this.tasks.length) return null;

      if (this.active >= this.concurrency) {
        return;
      }

      const i = index++;
      this.active++;
      
      const task = this.tasks[i];
      let timerID;

      const timerPromise = new Promise((_, reject) => {
        if (this.timeout > 0) {
          timerID = setTimeout(() => {
            reject(new Error(`Task ${i} timed out after ${this.timeout}ms`));
          }, this.timeout);
        }
      });

      const taskPromise = task().then(result => ({ i, result }));
      
      try {
        const result = await Promise.race([taskPromise, timerPromise]);
        clearTimeout(timerID);
        this.results[result.i] = { value: result.result };
      } catch (error) {
        clearTimeout(timerID);
        this.results[i] = { reason: error };
      } finally {
        this.active--;
        if (onProgress) onProgress(i + 1, this.tasks.length);
        await next();
      }
    };

    const workers = Array(Math.min(this.concurrency, this.tasks.length))
      .fill()
      .map(() => next());

    await Promise.all(workers);
    return this.results;
  }

  stop() {
    this.shouldStop = true;
  }
}

Новые возможности:

  • Прогресс выполнения через колбэк onProgress
  • Контроль таймаутов для отдельных задач
  • Отслеживание активных операций через active
  • Возможность досрочной отмены выполнения (stop())
  • Сохранение оригинального порядка результатов

Когда выбирать нативные реализации

Создание собственной реализации имеет смысл при:

  • Нежелании добавлять зависимости
  • Специфичных требованиях к стратегии ожидания
  • Необходимости тонкой настройки таймаутов
  • Интеграции с экосистемой приложения (логирование, метрики)

Пример использования усовершенствованного пула:

javascript
const tasks = Array(100).fill(null).map((_, i) => () => fetchResource(++i));

const pool = new PromisePool(tasks, {
  concurrency: 5, 
  timeout: 2500
});

pool.start((completed, total) => {
  console.log(`Progress: ${Math.round(completed / total * 100)}%`);
})
.then(results => {
  console.log('Completed:', results.filter(r => r.value));
})
.catch(err => console.error('Pool fatal error:', err));

Альтернативные решения: проверенные библиотеки

p-limit: минимализм и элегантность

Установка: npm install p-limit

javascript
import pLimit from 'p-limit';

const limiter = pLimit(5); // максимум 5 одновременных промисов

const tasks = urls.map(url => 
  limiter(() => 
    fetch(url).then(res => res.json())
  )
);

const results = await Promise.all(tasks); // ВАЖНО: параллелизм контролируется!

Особенности:

  • Миниатюрная (600Б gzipped)
  • Без зависимостей
  • Поддержка async/await и традиционных промисов
  • Чистое Flow API

Async-Pool: функциональный подход

Установка: npm install async-pool

javascript
import asyncPool from 'async-pool';

const timeout = ms => new Promise(resolve => setTimeout(resolve, ms));

const promises = Array(100).fill(null).map((_, i) => 
  () => timeout(200).then(() => i * 2)
);

const results = await asyncPool(3, promises, fn => fn());

Особенности:

  • Отдельные параметры для промисов и генераторов
  • Поддержка итераторов вместо только массивов
  • Фиксированные размеры очереди для контроля памяти
  • Расширенные таймауты в enterprise-версии

Key Differences: Why Not Just Promise.allSettled?

Важное архитектурное отличие от стандартного Promise.allSettled:

javascript
// Разрушительный паттерн:
const promises = urls.map(url => fetchData(url));
const settled = await Promise.allSettled(promises); // Создаёт всё сразу!

// Здоровый подход с пулингом:
/*
- Контролируемое количество одновременно созданных промисов
- Оптимальное распределение ресурсов
- Динамическое создание промисов по мере освобождения слотов
*/

Рекомендации по использованию

Оптимальная конфигурация зависит от типа операций:

Тип задачConcurrencyВремя таймаута
CPU-bound операцииядра × 1.5редкий
File I/O10-2010–30s
Сетевая задача HTTP(S)5-102–5s
WebSockets40-10030–60s
Работа с БДпо лимиту БДнастройки БД

Продвинутые стратегии:

  1. Динамический параллелизм: использование обратных вызовов для увеличения конкуренции при высокой пропускной способности
  2. Инкрементальное увеличение: старт с консервативных значений и постепенный разгон системы
  3. Эластичный пул: автоматическое восстановление пула вторичных задач после ошибок первичных

Важные наблюдения:

  • В Node.js для файловых операций используйте worker_threads вместо пулинг-IO
  • Для запросов к несколькими доменам распределяйте их по пулам с разными настройками
  • Мониторьте HTTP-статусы 429 и 503 для динамического регулирования скорости
  • Добавляйте экспоненциальные задержки и стратегии повторов

Проверка на практике:

javascript
// Боевой пример для Express API
router.post('/batch-request', async (req, res) => {
  try {
    const { urls, maxConnections = 5 } = req.body;
    
    const limiter = pLimit(maxConnections);
    const sanitized = urls.slice(0, MAX_URLS_PER_REQUEST);
    
    const tasks = sanitized.map(url => 
      limiter(() => safeFetch(url, { timeout: 3000 }))
    );
    
    const data = await Promise.all(tasks);
    res.json({ success: true, data });
  } catch (error) {
    sentryCaptureException(error);
    res.status(500).json({ error: 'Batch processing failed' });
  }
});

Резюме мыслей

Управление параллелизмом требует баланса между двумя крайностями: "стрельба из всех орудий" с помощью Promise.all и искусственное замедление работы через последовательное выполнение.

Ключевые выводы:

  1. Пулинг промисов сохраняет сокеты, память и стабильность API
  2. Стандарт ES6 не предоставляет встроенных решений для управляемого параллелизма
  3. Для большинства проектов достаточно p-limit или async-pool
  4. При сложных workflow кастомные реализации окупаются гибкостью
  5. Настройка без мониторинга — это привилегия стабильных систем, а не новых продуктов

Финальный совет: внедряйте пулы до возникновения проблем — стоимость решения в 130 символов кода ничтожна по сравнению с инцидентами потери доступности в продакшне.