JavaScript 中的异步生成器与异步迭代在数据处理管道中的应用
字数 1143 2025-12-14 18:35:41
JavaScript 中的异步生成器与异步迭代在数据处理管道中的应用
一、异步生成器与异步迭代的基本概念
1.1 什么是异步生成器?
异步生成器是结合了async函数和生成器(Generator)特性的特殊函数,它使用async function*语法声明,内部可以使用yield来产生异步值。
// 异步生成器示例
async function* asyncGenerator() {
yield await Promise.resolve(1);
yield await Promise.resolve(2);
yield await Promise.resolve(3);
}
1.2 异步迭代协议
异步迭代协议包含两个关键部分:
- 异步可迭代对象:拥有
[Symbol.asyncIterator]()方法的对象 - 异步迭代器:拥有
next()方法,返回Promise对象,Promise解析为{value, done}格式
二、异步生成器的核心特性
2.1 语法与基本使用
async function* fetchInBatches(urls, batchSize) {
for (let i = 0; i < urls.length; i += batchSize) {
const batch = urls.slice(i, i + batchSize);
// 并行获取当前批次的数据
const promises = batch.map(url => fetch(url).then(r => r.json()));
const results = await Promise.all(promises);
// 逐个产出结果
for (const result of results) {
yield result;
}
// 可以在这里添加延迟,避免请求过快
await new Promise(resolve => setTimeout(resolve, 100));
}
}
2.2 异步生成器与普通生成器的区别
| 特性 | 普通生成器 | 异步生成器 |
|---|---|---|
| 函数声明 | function* |
async function* |
| yield返回值 | 任意值 | Promise(可自动包装) |
| next()返回 | {value, done} |
Promise<{value, done}> |
| 迭代方式 | for...of |
for await...of |
三、异步迭代器的手动实现
3.1 实现异步可迭代对象
class AsyncRange {
constructor(start, end, delay = 100) {
this.start = start;
this.end = end;
this.delay = delay;
}
// 必须实现Symbol.asyncIterator方法
[Symbol.asyncIterator]() {
let current = this.start;
const end = this.end;
const delay = this.delay;
return {
async next() {
if (current <= end) {
// 模拟异步操作
await new Promise(resolve => setTimeout(resolve, delay));
return {
value: current++,
done: false
};
}
return {
value: undefined,
done: true
};
}
};
}
}
3.2 使用for await...of遍历
async function processAsyncRange() {
const asyncRange = new AsyncRange(1, 5, 200);
for await (const value of asyncRange) {
console.log(`Processing value: ${value}`);
// 可以在这里进行异步处理
await processValue(value);
}
}
四、数据处理管道的构建
4.1 创建数据处理阶段的异步生成器
// 阶段1:数据源生成器
async function* dataSource(urls) {
for (const url of urls) {
console.log(`Fetching from: ${url}`);
const response = await fetch(url);
const data = await response.json();
yield data;
}
}
// 阶段2:数据转换器
async function* transformData(source, transformFn) {
for await (const item of source) {
const transformed = await transformFn(item);
yield transformed;
}
}
// 阶段3:数据过滤器
async function* filterData(source, filterFn) {
for await (const item of source) {
if (await filterFn(item)) {
yield item;
}
}
}
// 阶段4:数据批处理器
async function* batchProcessor(source, batchSize) {
let batch = [];
for await (const item of source) {
batch.push(item);
if (batch.length >= batchSize) {
yield batch;
batch = [];
}
}
// 处理剩余数据
if (batch.length > 0) {
yield batch;
}
}
4.2 组合数据处理管道
async function processDataPipeline(urls) {
// 构建管道
const source = dataSource(urls);
const transformer = transformData(source, async (data) => ({
...data,
processed: true,
timestamp: Date.now()
}));
const filter = filterData(transformer, async (item) => item.id !== null);
const batcher = batchProcessor(filter, 10);
// 执行管道
for await (const batch of batcher) {
console.log(`Processing batch of ${batch.length} items`);
// 批量处理逻辑
await processBatch(batch);
}
}
五、高级应用:并发控制与错误处理
5.1 带并发控制的异步生成器
async function* concurrentMap(
source,
mapper,
concurrency = 3
) {
const pending = new Set();
const results = [];
let sourceDone = false;
let sourceIterator = source[Symbol.asyncIterator]();
try {
while (!sourceDone || pending.size > 0) {
// 填充并发槽位
while (!sourceDone && pending.size < concurrency) {
const { value, done } = await sourceIterator.next();
if (done) {
sourceDone = true;
break;
}
const promise = Promise.resolve()
.then(() => mapper(value))
.then(result => ({ value: result, promise }));
pending.add(promise);
promise
.finally(() => pending.delete(promise))
.catch(() => {}); // 避免未处理的Promise拒绝
}
if (pending.size === 0) break;
// 等待任意一个任务完成
const { value: result, promise } = await Promise.race(
Array.from(pending).map(async p => ({
value: await p,
promise: p
}))
);
yield result;
}
} finally {
// 清理资源
if (sourceIterator.return) {
sourceIterator.return();
}
}
}
5.2 错误处理与重试机制
async function* withRetry(
source,
maxRetries = 3,
shouldRetry = (error) => true
) {
for await (const item of source) {
let lastError;
let retryCount = 0;
while (retryCount <= maxRetries) {
try {
yield item;
break; // 成功则退出重试循环
} catch (error) {
lastError = error;
if (!shouldRetry(error) || retryCount >= maxRetries) {
throw new Error(`Failed after ${retryCount} retries: ${error.message}`);
}
retryCount++;
console.log(`Retry ${retryCount}/${maxRetries} for error: ${error.message}`);
// 指数退避延迟
const delay = Math.min(1000 * Math.pow(2, retryCount - 1), 10000);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
}
六、实际应用场景
6.1 实时数据流处理
// WebSocket数据流处理
async function* websocketDataStream(url) {
const ws = new WebSocket(url);
const messageQueue = [];
let resolver;
ws.onmessage = (event) => {
messageQueue.push(JSON.parse(event.data));
if (resolver) {
resolver();
resolver = null;
}
};
try {
while (true) {
// 等待队列中有数据
while (messageQueue.length === 0) {
await new Promise(resolve => {
resolver = resolve;
});
}
// 产出所有累积的数据
while (messageQueue.length > 0) {
yield messageQueue.shift();
}
}
} finally {
ws.close();
}
}
6.2 大文件分块读取
async function* readFileInChunks(file, chunkSize = 1024 * 1024) { // 1MB chunks
const fileSize = file.size;
let offset = 0;
while (offset < fileSize) {
const chunk = file.slice(offset, offset + chunkSize);
const reader = new FileReader();
const chunkData = await new Promise((resolve, reject) => {
reader.onload = () => resolve(reader.result);
reader.onerror = reject;
reader.readAsArrayBuffer(chunk);
});
yield {
data: chunkData,
offset,
size: chunk.size
};
offset += chunkSize;
}
}
七、性能优化与最佳实践
7.1 内存优化技巧
// 流式处理避免内存累积
async function* processLargeDataset(source, processor) {
for await (const item of source) {
const processed = await processor(item);
yield processed;
// 及时释放引用
processed = null;
}
// 强制垃圾回收(非标准API,仅做演示)
if (global.gc) {
global.gc();
}
}
7.2 管道性能监控
async function* withMetrics(source, name) {
let processed = 0;
let startTime = Date.now();
for await (const item of source) {
const itemStart = Date.now();
try {
yield item;
processed++;
} finally {
const duration = Date.now() - itemStart;
// 定期输出性能指标
if (processed % 100 === 0) {
const totalDuration = Date.now() - startTime;
const itemsPerSecond = (processed / totalDuration) * 1000;
console.log(`${name}: Processed ${processed} items, ` +
`${itemsPerSecond.toFixed(2)} items/sec, ` +
`last item took ${duration}ms`);
}
}
}
}
八、总结与注意事项
异步生成器与异步迭代在数据处理管道中的主要优势:
- 内存高效:支持流式处理,避免一次性加载所有数据
- 代码简洁:用同步的编码风格处理异步数据流
- 组合灵活:可以像乐高积木一样组合不同的处理阶段
- 错误隔离:每个处理阶段可以独立处理错误
注意事项:
- 使用
for await...of时,确保外层函数是async函数 - 注意处理异步迭代器的资源清理,使用
try...finally确保资源释放 - 考虑并发控制,避免同时发起太多异步操作
- 监控内存使用,特别是处理大量数据时
- 合理使用
Promise.race和Promise.all来优化性能
异步生成器和异步迭代为JavaScript中的异步数据处理提供了强大的抽象能力,特别是在处理流式数据、大文件、实时数据流等场景中,能够显著提高代码的可读性和可维护性。