Перед каждым разработчиком Node.js рано или поздно встаёт задача: обработать огромный CSV-файл, прочитать сотни тысяч записей из базы данных или обработать поток реального времени. Классический подход "загрузить всё в память" терпит неудачу при масштабировании - потребление памяти растёт линейно с размером данных. Как решить эту проблему?
Проблема памяти в обработке данных
Рассмотрим типичный неправильный подход:
// Плохой пример: загрузка всего файла в память
const fs = require('fs');
async function processFile() {
const data = await fs.promises.readFile('huge-file.csv', 'utf-8');
const lines = data.split('\n');
for (const line of lines) {
// Обработка каждой строки
transformData(line);
}
}
При размере файла в 5 ГБ скрипт потребляет минимум 5 ГБ оперативной памяти, плюс дополнительные ресурсы на разбивку строк и их хранение. В экосистеме Node.js есть более эффективные решения - потоки (Streams) и асинхронные итераторы.
Асинхронные итераторы: Элегантная абстракция
Асинхронные итераторы дают узнаваемый интерфейс для работы с асинхронными источниками данных:
import { createReadStream } from 'fs';
import { createInterface } from 'readline';
async function processStream() {
const fileStream = createReadStream('huge-file.csv');
const rl = createInterface({
input: fileStream,
crlfDelay: Infinity
});
// Асинхронная итерация по строкам
for await (const line of rl) {
transformData(line);
}
console.log('Файл полностью обработан');
}
Эта реализация имеет постоянное потребление памяти и эффективно обрабатывает файлы любого размера. Но что скрывается под капотом for await...of
? На помощь приходят генераторы для полного контроля.
Генераторы: За кулисами асинхронности
Генераторы - особый тип функций, способных приостанавливать и возобновлять своё выполнение. В контексте обработки данных они позволяют создавать кастомные итераторы:
async function* chunkedRead(filePath, chunkSize = 1024 * 1024) { // 1MB по умолчанию
const stats = await fs.promises.stat(filePath);
const fileSize = stats.size;
let position = 0;
const fd = await fs.promises.open(filePath, 'r');
try {
while (position < fileSize) {
const length = Math.min(chunkSize, fileSize - position);
const { buffer } = await fd.read(Buffer.alloc(length), 0, length, position);
position += length;
// Приостанавливаем выполнение и возвращаем данные
yield buffer.toString();
}
} finally {
await fd.close();
}
}
Применение этого генератора:
for await (const chunk of chunkedRead('massive-dataset.bin')) {
processChunk(chunk);
}
Преимущество этого подхода - полный контроль над процессом чтения. Мы можем реализовать сложную логику: пропускать заголовки, обрабатывать бинарные данные или реализовать повторные попытки при ошибках.
Реальный пример: Обработка JSON-потоков
Рассмотрим проблему обработки огромных JSON-файлов - распространённая задача в ETL-процессах.
// Пример генератора для потокового парсинга JSON
async function* streamJsonParse(iterable) {
let buffer = '';
for await (const chunk of iterable) {
buffer += chunk;
// Простой конечный автомат сбалансированных скобок
let start = 0;
let depth = 0;
let inString = false;
for (let i = 0; i < buffer.length; i++) {
const char = buffer[i];
// Игнорируем символы внутри строк
if (char === '"' && buffer[i-1] !== '\\') {
inString = !inString;
continue;
}
if (!inString) {
if (char === '{' || char === '[') depth++;
if ((char === '}' && buffer[i-1] !== '\\') || char === ']') depth--;
// Нашли полный объект
if (depth === 0 && char === '}') {
const objectStr = buffer.substring(start, i + 1);
start = i + 1;
try {
yield JSON.parse(objectStr);
} catch (e) {
console.error(`Ошибка парсинга JSON: ${e.message}`);
}
}
}
}
buffer = buffer.substring(start);
}
}
Применение:
// Читаем файл как поток
const fileStream = fs.createReadStream('big-data.json', 'utf8');
for await (const jsonObj of streamJsonParse(fileStream)) {
saveToDatabase(jsonObj); // Сохраняем объекты по мере поступления
}
Эта реализация обрабатывает JSON произвольного размера с постоянным потреблением памяти, что невозможно при использовании JSON.parse()
на целом файле.
Когда использовать генераторы?
- Обработка файлов больше доступной памяти - нет необходимости загружать весь файл
- Операции реального времени - постепенная обработка данных по мере поступления
- Работа с бесконечными потоками - обработка логов, данных с сенсоров
- Кастомные протоколы передачи - реализация специфичной логики обработки
- Управление памятью - предотвращение утечек при длительных операциях
Производительность и ограничения
Хотя генераторы предлагают преимущества в управлении памятью, важно учитывать:
- Рециркуляция памяти: Избегайте накопления ссылок в генераторах
- Конкурентное выполнение: Генераторы не параллельны по своей природе
- Обработка ошибок: Всегда используйте
try/catch
внутри и снаружи генератора - Совместимость: Асинхронные генераторы требуют Node.js 10+ или современный браузер
// Оптимизация: рециркуляция буфера
async function* optimizedBinaryRead(filePath) {
const fd = await fs.promises.open(filePath, 'r');
const buffer = Buffer.alloc(1024 * 1024); // Переиспользуемый буфер
try {
let bytesRead;
do {
bytesRead = (await fd.read(buffer)).bytesRead;
if (bytesRead > 0) {
yield buffer.slice(0, bytesRead);
}
} while (bytesRead > 0);
} finally {
await fd.close();
}
}
За счет переиспользования буфера эта реализация минимизирует аллокации памяти и снижает нагрузку на сборщик мусора.
Альтернативы и дополнения
Для многих задач подходят стандартные инструменты:
- Streams API: Готовая реализация с обработкой бэкпресса
- Node.js worker_threads: Для CPU-интенсивных задач
- Базы данных с курсорами: Потоковая выгрузка результатов
Однако кастомные генераторы незаменимы, когда требуется:
- Обработка специфичных форматов данных
- Сложная предобработка информации
- Точный контроль над потоком выполнения
Итог: Эволюция подхода
Генераторы в JavaScript прошли путь от экзотической возможности до фундамента строительных блоков асинхронной обработки данных. Комбинируя их с for-await-of
, мы получаем мощный инструмент для работы с потоками данных любой величины.
Ключевые преимущества:
- Детерминированное управление памятью - отсутствие внезапных OOM-ошибок
- Чистый конкурентный код без callback hell или сложных цепочек promise
- Возможность прерывания длительных операций
- Естественный интерфейс для потребителя данных
В следующий раз перед загрузкой огромного файла в память спросите себя: не пора ли перейти на потоковую обработку? Ваши бедные серверы скажут вам спасибо.