diff --git a/src/__tests__/caching.test.mjs b/src/__tests__/caching.test.mjs new file mode 100644 index 00000000..5dc306e8 --- /dev/null +++ b/src/__tests__/caching.test.mjs @@ -0,0 +1,110 @@ +import { deepStrictEqual, strictEqual } from 'node:assert'; +import { describe, it } from 'node:test'; + +import { collectAsyncGenerator, createCache } from '../caching.mjs'; + +describe('caching', () => { + describe('collectAsyncGenerator', () => { + it('should collect all chunks into a flat array', async () => { + async function* gen() { + yield [1, 2]; + yield [3, 4]; + yield [5]; + } + + const result = await collectAsyncGenerator(gen()); + + deepStrictEqual(result, [1, 2, 3, 4, 5]); + }); + + it('should return empty array for empty generator', async () => { + async function* gen() { + // empty generator + } + + const result = await collectAsyncGenerator(gen()); + + deepStrictEqual(result, []); + }); + + it('should handle empty chunks', async () => { + async function* gen() { + yield []; + yield [1]; + yield []; + yield [2, 3]; + } + + const result = await collectAsyncGenerator(gen()); + + deepStrictEqual(result, [1, 2, 3]); + }); + }); + + describe('createCache', () => { + it('should report whether a generator is stored', () => { + const cache = createCache(); + + strictEqual(cache.has('a'), false); + cache.store('a', Promise.resolve(1)); + strictEqual(cache.has('a'), true); + }); + + it('should pass through non-streaming results', async () => { + const cache = createCache(); + + cache.store('a', Promise.resolve({ value: 42 })); + + deepStrictEqual(await cache.consume('a'), { value: 42 }); + }); + + it('should collect a streaming result only once for all consumers', async () => { + const cache = createCache(); + + let iterations = 0; + + async function* gen() { + iterations++; + yield [1, 2]; + yield [3]; + } + + cache.store('a', gen()); + + const first = await cache.consume('a'); + const second = await cache.consume('a'); + + deepStrictEqual(first, [1, 2, 3]); + // The same collected array is shared, collection happened a single time + strictEqual(first, second); + strictEqual(iterations, 1); + }); + + it('should count consumers across a dependency closure', async () => { + const cache = createCache(); + + // graph: a -> b -> c and d -> b; requested targets are a and d + const graph = { a: 'b', b: 'c', d: 'b' }; + + cache.populateConsumerCounts(['a', 'd'], name => graph[name]); + + // `b` is depended on by both `a` and `d`, so it needs two reads before + // it is evicted. + cache.store('b', Promise.resolve('B')); + await cache.consume('b'); + strictEqual(cache.has('b'), true); + await cache.consume('b'); + strictEqual(cache.has('b'), false); + + // `c` is depended on only by `b`, so a single read evicts it. + cache.store('c', Promise.resolve('C')); + await cache.consume('c'); + strictEqual(cache.has('c'), false); + + // `a` is a requested target, consumed exactly once by the final read. + cache.store('a', Promise.resolve('A')); + await cache.consume('a'); + strictEqual(cache.has('a'), false); + }); + }); +}); diff --git a/src/__tests__/generators.test.mjs b/src/__tests__/generators.test.mjs new file mode 100644 index 00000000..bf503b4f --- /dev/null +++ b/src/__tests__/generators.test.mjs @@ -0,0 +1,140 @@ +import assert from 'node:assert/strict'; +import { describe, it, mock, beforeEach } from 'node:test'; + +// Tracks how many times each synthetic generator actually executed so we can +// assert that a shared dependency runs exactly once even after its result is +// evicted from the cache once its last consumer has read it. +const runs = {}; + +const record = name => { + runs[name] = (runs[name] ?? 0) + 1; +}; + +// Yields a single chunk as an async generator (mirrors streaming generators) +const streamOf = chunk => + (async function* () { + yield chunk; + })(); + +mock.module('../generators/index.mjs', { + namedExports: { + allGenerators: { + // Root streaming generator (no dependency) + ast: { + name: 'ast', + hasParallelProcessor: true, + generate: async () => { + record('ast'); + return streamOf([{ ast: true }]); + }, + }, + // Streaming generator shared by multiple consumers + metadata: { + name: 'metadata', + dependsOn: 'ast', + hasParallelProcessor: true, + generate: async input => { + record('metadata'); + return streamOf([{ meta: input.length }]); + }, + }, + // Two non-streaming consumers of the shared `metadata` result + 'gen-a': { + name: 'gen-a', + dependsOn: 'metadata', + generate: async input => { + record('gen-a'); + return { a: input }; + }, + }, + 'gen-b': { + name: 'gen-b', + dependsOn: 'metadata', + generate: async input => { + record('gen-b'); + return { b: input }; + }, + }, + // A target that is itself depended upon by another target + 'gen-c': { + name: 'gen-c', + dependsOn: 'metadata', + hasParallelProcessor: true, + generate: async () => { + record('gen-c'); + return streamOf([{ c: true }]); + }, + }, + 'gen-c-all': { + name: 'gen-c-all', + dependsOn: 'gen-c', + generate: async input => { + record('gen-c-all'); + return { all: input }; + }, + }, + }, + }, +}); + +mock.module('../threading/index.mjs', { + defaultExport: () => ({ + run: async () => undefined, + destroy: async () => undefined, + }), +}); + +mock.module('../threading/parallel.mjs', { + defaultExport: () => ({ + async *stream() { + // Unused: the mocked generators return their own async generators + }, + }), +}); + +const createGenerator = (await import('../generators.mjs')).default; + +describe('createGenerator orchestration', () => { + beforeEach(() => { + for (const key of Object.keys(runs)) { + delete runs[key]; + } + }); + + it('runs a shared dependency exactly once and returns correct results', async () => { + const { runGenerators } = createGenerator(); + + const results = await runGenerators({ + target: ['gen-a', 'gen-b'], + threads: 1, + }); + + // ast -> metadata are shared and must each execute a single time + assert.equal(runs.ast, 1); + assert.equal(runs.metadata, 1); + assert.equal(runs['gen-a'], 1); + assert.equal(runs['gen-b'], 1); + + // metadata collected one entry whose `meta` is the ast array length (1) + assert.deepStrictEqual(results, [ + { a: [{ meta: 1 }] }, + { b: [{ meta: 1 }] }, + ]); + }); + + it('does not re-run a target that is also another target dependency', async () => { + const { runGenerators } = createGenerator(); + + const results = await runGenerators({ + target: ['gen-c', 'gen-c-all'], + threads: 1, + }); + + // gen-c is both a requested target and a dependency of gen-c-all; eviction + // must not cause it to be scheduled (and run) twice. + assert.equal(runs['gen-c'], 1); + assert.equal(runs['gen-c-all'], 1); + + assert.deepStrictEqual(results, [[{ c: true }], { all: [{ c: true }] }]); + }); +}); diff --git a/src/__tests__/streaming.test.mjs b/src/__tests__/streaming.test.mjs deleted file mode 100644 index 6b952f54..00000000 --- a/src/__tests__/streaming.test.mjs +++ /dev/null @@ -1,170 +0,0 @@ -import { deepStrictEqual, ok, strictEqual } from 'node:assert'; -import { describe, it } from 'node:test'; - -import { - isAsyncGenerator, - collectAsyncGenerator, - createStreamingCache, -} from '../streaming.mjs'; - -describe('streaming utilities', () => { - describe('isAsyncGenerator', () => { - it('should return true for async generators', () => { - async function* asyncGen() { - yield 1; - } - - const gen = asyncGen(); - - strictEqual(isAsyncGenerator(gen), true); - }); - - it('should return false for regular generators', () => { - function* syncGen() { - yield 1; - } - - const gen = syncGen(); - - strictEqual(isAsyncGenerator(gen), false); - }); - - it('should return false for plain objects', () => { - strictEqual(isAsyncGenerator({}), false); - strictEqual(isAsyncGenerator([]), false); - strictEqual(isAsyncGenerator({ async: true }), false); - }); - - it('should return false for null and undefined', () => { - strictEqual(isAsyncGenerator(null), false); - strictEqual(isAsyncGenerator(undefined), false); - }); - - it('should return false for primitives', () => { - strictEqual(isAsyncGenerator(42), false); - strictEqual(isAsyncGenerator('string'), false); - strictEqual(isAsyncGenerator(true), false); - }); - - it('should return true for objects with Symbol.asyncIterator', () => { - const asyncIterable = { - [Symbol.asyncIterator]() { - return { - next: async () => ({ done: true, value: undefined }), - }; - }, - }; - - strictEqual(isAsyncGenerator(asyncIterable), true); - }); - }); - - describe('collectAsyncGenerator', () => { - it('should collect all chunks into a flat array', async () => { - async function* gen() { - yield [1, 2]; - yield [3, 4]; - yield [5]; - } - - const result = await collectAsyncGenerator(gen()); - - deepStrictEqual(result, [1, 2, 3, 4, 5]); - }); - - it('should return empty array for empty generator', async () => { - async function* gen() { - // empty generator - } - - const result = await collectAsyncGenerator(gen()); - - deepStrictEqual(result, []); - }); - - it('should handle single chunk', async () => { - async function* gen() { - yield [1, 2, 3]; - } - - const result = await collectAsyncGenerator(gen()); - - deepStrictEqual(result, [1, 2, 3]); - }); - - it('should handle empty chunks', async () => { - async function* gen() { - yield []; - yield [1]; - yield []; - yield [2, 3]; - } - - const result = await collectAsyncGenerator(gen()); - - deepStrictEqual(result, [1, 2, 3]); - }); - - it('should handle objects in chunks', async () => { - async function* gen() { - yield [{ a: 1 }, { b: 2 }]; - yield [{ c: 3 }]; - } - - const result = await collectAsyncGenerator(gen()); - - deepStrictEqual(result, [{ a: 1 }, { b: 2 }, { c: 3 }]); - }); - }); - - describe('createStreamingCache', () => { - it('should create a cache with required methods', () => { - const cache = createStreamingCache(); - - ok(cache); - strictEqual(typeof cache.getOrCollect, 'function'); - }); - - it('should return same promise for same key', async () => { - const cache = createStreamingCache(); - - async function* gen() { - yield [1, 2, 3]; - } - - const promise1 = cache.getOrCollect('test', gen()); - - // Create a new generator (which shouldn't be used due to caching) - async function* gen2() { - yield [4, 5, 6]; - } - - const promise2 = cache.getOrCollect('test', gen2()); - - // Both should resolve to the same result (from first generator) - const result1 = await promise1; - const result2 = await promise2; - - deepStrictEqual(result1, [1, 2, 3]); - strictEqual(result1, result2); - }); - - it('should return different results for different keys', async () => { - const cache = createStreamingCache(); - - async function* gen1() { - yield [1, 2]; - } - - async function* gen2() { - yield [3, 4]; - } - - const result1 = await cache.getOrCollect('key1', gen1()); - const result2 = await cache.getOrCollect('key2', gen2()); - - deepStrictEqual(result1, [1, 2]); - deepStrictEqual(result2, [3, 4]); - }); - }); -}); diff --git a/src/caching.mjs b/src/caching.mjs new file mode 100644 index 00000000..47dc47da --- /dev/null +++ b/src/caching.mjs @@ -0,0 +1,183 @@ +'use strict'; + +import logger from './logger/index.mjs'; +import { isAsyncIterable } from './utils/misc.mjs'; + +const cachingLogger = logger.child('caching'); + +/** + * Collects all values from an async generator into a flat array. + * Each yielded chunk is spread into the results array. + * + * @template T + * @param {AsyncGenerator} generator - Async generator yielding arrays + * @returns {Promise} Flattened array of all yielded items + */ +export const collectAsyncGenerator = async generator => { + const results = []; + + let chunkCount = 0; + + for await (const chunk of generator) { + chunkCount++; + + results.push(...chunk); + + cachingLogger.debug(`Collected chunk ${chunkCount}`, { + itemsInChunk: chunk.length, + }); + } + + cachingLogger.debug(`Collection complete`, { + totalItems: results.length, + chunks: chunkCount, + }); + + return results; +}; + +/** + * Creates the cache that backs the generator pipeline. It is the single owner + * of every piece of caching the orchestrator needs. + * + * Generator caching: each generator's pending result, so a generator and its + * dependencies are only ever scheduled once. + * + * Stream caching: collecting an async generator's chunks into a single array + * exactly once, shared by every consumer. + * + * Dependency caching & eviction: tracking how many consumers still need each + * result and dropping it once the last one has read it, so intermediate results + * can be garbage collected while later generators keep running. + */ +export const createCache = () => { + /** @type {{ [key: string]: Promise | AsyncGenerator }} */ + const results = {}; + + /** @type {Map>} */ + const collections = new Map(); + + /** + * Number of consumers that still need to read each result. Once it reaches + * zero the result is evicted. + * + * @type {{ [key: string]: number }} + */ + const remaining = {}; + + /** + * Collects an async generator's chunks, ensuring a single collection is + * shared across all consumers of the same key. + * + * @param {string} key - Cache key (the generator name) + * @param {AsyncGenerator} generator - Generator to collect + * @returns {Promise} Promise resolving to the collected items + */ + const collect = (key, generator) => { + const hasKey = collections.has(key); + + if (!hasKey) { + collections.set(key, collectAsyncGenerator(generator)); + } + + cachingLogger.debug( + hasKey + ? `Using cached collection for "${key}"` + : `Starting collection for "${key}"` + ); + + return collections.get(key); + }; + + return { + /** + * Whether a generator has already been scheduled. + * + * @param {string} name - Generator name + * @returns {boolean} + */ + has: name => name in results, + + /** + * Registers a generator's pending result. + * + * @param {string} name - Generator name + * @param {Promise | AsyncGenerator} result - Pending result + */ + store: (name, result) => { + results[name] = result; + }, + + /** + * Computes and records consumer counts across the dependency closure of the + * requested generators. A generator is consumed once per dependent + * generator, plus once more when it is a requested target (read by the + * final collection). This drives eviction without coupling the cache to the + * generator registry: callers supply how to resolve a dependency. + * + * @param {string[]} targets - Requested generator names + * @param {(name: string) => string | undefined} dependsOn - Resolves a + * generator's dependency name (if any) + */ + populateConsumerCounts: (targets, dependsOn) => { + const scheduled = new Set(); + const stack = [...targets]; + + // Walk the dependency closure of the requested targets + while (stack.length > 0) { + const name = stack.pop(); + + if (scheduled.has(name)) { + continue; + } + + scheduled.add(name); + + const dependency = dependsOn(name); + + if (dependency) { + stack.push(dependency); + } + } + + // Each scheduled generator consumes its dependency exactly once + for (const name of scheduled) { + const dependency = dependsOn(name); + + if (dependency) { + remaining[dependency] = (remaining[dependency] ?? 0) + 1; + } + } + + // Each requested target is consumed once more by the final collection + for (const name of targets) { + remaining[name] = (remaining[name] ?? 0) + 1; + } + }, + + /** + * Reads a generator's result (collecting it first if it streams) and + * evicts it once every expected consumer has read it. + * + * @param {string} name - Generator name to consume + * @returns {Promise} + */ + consume: async name => { + const result = await results[name]; + + const value = isAsyncIterable(result) + ? await collect(name, result) + : result; + + // Evict once the last consumer has retrieved the result + if (name in remaining && --remaining[name] <= 0) { + delete results[name]; + collections.delete(name); + + cachingLogger.debug(`Evicted "${name}" after final consumer`); + } + + return value; + }, + }; +}; diff --git a/src/generators.mjs b/src/generators.mjs index 451c39b5..4f87ec29 100644 --- a/src/generators.mjs +++ b/src/generators.mjs @@ -1,10 +1,11 @@ 'use strict'; +import { createCache } from './caching.mjs'; import { allGenerators } from './generators/index.mjs'; import logger from './logger/index.mjs'; -import { isAsyncGenerator, createStreamingCache } from './streaming.mjs'; import createWorkerPool from './threading/index.mjs'; import createParallelWorker from './threading/parallel.mjs'; +import { isAsyncIterable } from './utils/misc.mjs'; const generatorsLogger = logger.child('generators'); @@ -14,10 +15,9 @@ const generatorsLogger = logger.child('generators'); * processing and streaming results. */ const createGenerator = () => { - /** @type {{ [key: string]: Promise | AsyncGenerator }} */ - const cachedGenerators = {}; - - const streamingCache = createStreamingCache(); + // Owns all caching: generator results, stream collection, and the + // consumer-count bookkeeping that drives eviction. + const cache = createCache(); /** @type {import('piscina').Piscina} */ let pool; @@ -33,13 +33,7 @@ const createGenerator = () => { return undefined; } - const result = await cachedGenerators[dependsOn]; - - if (isAsyncGenerator(result)) { - return streamingCache.getOrCollect(dependsOn, result); - } - - return result; + return cache.consume(dependsOn); }; /** @@ -49,7 +43,7 @@ const createGenerator = () => { * @param {import('./utils/configuration/types').Configuration} configuration - Runtime options */ const scheduleGenerator = async (generatorName, configuration) => { - if (generatorName in cachedGenerators) { + if (cache.has(generatorName)) { return; } @@ -57,7 +51,7 @@ const createGenerator = () => { allGenerators[generatorName]; // Schedule dependency first - if (dependsOn && !(dependsOn in cachedGenerators)) { + if (dependsOn && !cache.has(dependsOn)) { await scheduleGenerator(dependsOn, configuration); } @@ -67,26 +61,29 @@ const createGenerator = () => { }); // Schedule the generator - cachedGenerators[generatorName] = (async () => { - const dependencyInput = await getDependencyInput(dependsOn); + cache.store( + generatorName, + (async () => { + const dependencyInput = await getDependencyInput(dependsOn); - generatorsLogger.debug(`Starting "${generatorName}"`); + generatorsLogger.debug(`Starting "${generatorName}"`); - // Create parallel worker for streaming generators - const worker = hasParallelProcessor - ? createParallelWorker(generatorName, pool, configuration) - : Promise.resolve(null); + // Create parallel worker for streaming generators + const worker = hasParallelProcessor + ? createParallelWorker(generatorName, pool, configuration) + : Promise.resolve(null); - const result = await generate(dependencyInput, await worker); + const result = await generate(dependencyInput, await worker); - // For streaming generators, "Completed" is logged when collection finishes - // (in streamingCache.getOrCollect), not here when the generator returns - if (!isAsyncGenerator(result)) { - generatorsLogger.debug(`Completed "${generatorName}"`); - } + // For streaming generators, "Completed" is logged when the cache + // finishes collecting, not here when the generator returns + if (!isAsyncIterable(result)) { + generatorsLogger.debug(`Completed "${generatorName}"`); + } - return result; - })(); + return result; + })() + ); }; /** @@ -103,6 +100,13 @@ const createGenerator = () => { threads, }); + // Compute consumer counts up front so dependencies can be evicted as soon + // as their last consumer runs (must be ready before any generator starts). + cache.populateConsumerCounts( + generators, + name => allGenerators[name].dependsOn + ); + // Create worker pool pool = createWorkerPool(threads); @@ -111,18 +115,11 @@ const createGenerator = () => { await scheduleGenerator(name, configuration); } - // Start all collections in parallel (don't await sequentially) - const resultPromises = generators.map(async name => { - let result = await cachedGenerators[name]; - - if (isAsyncGenerator(result)) { - result = await streamingCache.getOrCollect(name, result); - } - - return result; - }); - - const results = await Promise.all(resultPromises); + // Start all collections in parallel (don't await sequentially). Consuming + // through the shared path lets the final read also trigger eviction. + const results = await Promise.all( + generators.map(name => cache.consume(name)) + ); await pool.destroy(); diff --git a/src/generators/json-simple/generate.mjs b/src/generators/json-simple/generate.mjs index 82d06a99..116b9362 100644 --- a/src/generators/json-simple/generate.mjs +++ b/src/generators/json-simple/generate.mjs @@ -16,28 +16,18 @@ import { UNIST } from '../../utils/queries/index.mjs'; export async function generate(input) { const config = getConfig('json-simple'); - // Iterates the input (MetadataEntry) and performs a few changes - const mappedInput = input.map(node => { - // Deep clones the content nodes to avoid affecting upstream nodes - const content = JSON.parse(JSON.stringify(node.content)); - - // Removes numerous nodes from the content that should not be on the "body" - // of the JSON version of the API docs as they are already represented in the metadata - remove(content, [UNIST.isStabilityNode, UNIST.isHeading]); - - return { ...node, content }; - }); + input.forEach(node => + remove(node.content, [UNIST.isStabilityNode, UNIST.isHeading]) + ); if (config.output) { // Writes all the API docs stringified content into one file // Note: The full JSON generator in the future will create one JSON file per top-level API doc file await writeFile( join(config.output, 'api-docs.json'), - config.minify - ? JSON.stringify(mappedInput) - : JSON.stringify(mappedInput, null, 2) + config.minify ? JSON.stringify(input) : JSON.stringify(input, null, 2) ); } - return mappedInput; + return input; } diff --git a/src/generators/jsx-ast/utils/buildContent.mjs b/src/generators/jsx-ast/utils/buildContent.mjs index c4ed4832..3502b637 100644 --- a/src/generators/jsx-ast/utils/buildContent.mjs +++ b/src/generators/jsx-ast/utils/buildContent.mjs @@ -252,25 +252,22 @@ export const transformHeadingNode = async (entry, node, index, parent) => { * @param {import('../../metadata/types').MetadataEntry} entry - The API metadata entry to process */ export const processEntry = entry => { - // Deep copy content to avoid mutations on original - const content = structuredClone(entry.content); - // Visit and transform stability nodes - visit(content, UNIST.isStabilityNode, transformStabilityNode); + visit(entry.content, UNIST.isStabilityNode, transformStabilityNode); // Visit and transform headings with metadata and links - visit(content, UNIST.isHeading, (...args) => + visit(entry.content, UNIST.isHeading, (...args) => transformHeadingNode(entry, ...args) ); // Transform typed lists into property tables visit( - content, + entry.content, UNIST.isStronglyTypedList, (node, idx, parent) => (parent.children[idx] = createSignatureTable(node)) ); - return content; + return entry.content; }; /** diff --git a/src/generators/legacy-html/utils/buildContent.mjs b/src/generators/legacy-html/utils/buildContent.mjs index 22e4a0e2..73545d7f 100644 --- a/src/generators/legacy-html/utils/buildContent.mjs +++ b/src/generators/legacy-html/utils/buildContent.mjs @@ -223,11 +223,8 @@ export default (headNodes, metadataEntries) => { 'root', // Parses the metadata pieces of each node and the content metadataEntries.map(entry => { - // Deep clones the content nodes to avoid affecting upstream nodes - const content = structuredClone(entry.content); - // Parses the Heading nodes into Heading elements - visit(content, UNIST.isHeading, (node, index, parent) => + visit(entry.content, UNIST.isHeading, (node, index, parent) => buildHeading( node, index, @@ -239,14 +236,14 @@ export default (headNodes, metadataEntries) => { // Parses the Blockquotes into Stability elements // This is treated differently as we want to preserve the position of a Stability Index // within the content, so we can't just remove it and append it to the metadata - visit(content, UNIST.isStabilityNode, buildStability); + visit(entry.content, UNIST.isStabilityNode, buildStability); // Parses the type references that got replaced into Markdown links (raw) // into actual HTML links, these then get parsed into HAST nodes on `runSync` - visit(content, UNIST.isHtmlWithType, buildHtmlTypeLink); + visit(entry.content, UNIST.isHtmlWithType, buildHtmlTypeLink); // Splits the content into the Heading node and the rest of the content - const [headingNode, ...restNodes] = content.children; + const [headingNode, ...restNodes] = entry.content.children; // Concatenates all the strings and parses with remark into an AST tree return createElement('section', [ diff --git a/src/generators/legacy-json/utils/buildSection.mjs b/src/generators/legacy-json/utils/buildSection.mjs index dd5dea05..cf4992b2 100644 --- a/src/generators/legacy-json/utils/buildSection.mjs +++ b/src/generators/legacy-json/utils/buildSection.mjs @@ -174,7 +174,7 @@ export const createSectionBuilder = () => { * @param {import('../types.d.ts').Section} parent - The parent section. */ const handleEntry = (entry, parent) => { - const [headingNode, ...nodes] = structuredClone(entry.content.children); + const [headingNode, ...nodes] = entry.content.children; const section = createSection(entry, headingNode); parseStability(section, nodes, entry); diff --git a/src/streaming.mjs b/src/streaming.mjs deleted file mode 100644 index a275a037..00000000 --- a/src/streaming.mjs +++ /dev/null @@ -1,82 +0,0 @@ -'use strict'; - -import logger from './logger/index.mjs'; - -const streamingLogger = logger.child('streaming'); - -/** - * Checks if a value is an async generator/iterable. - * - * @param {unknown} obj - Value to check - * @returns {obj is AsyncGenerator} True if the value is an async iterable - */ -export const isAsyncGenerator = obj => - obj !== null && - typeof obj === 'object' && - typeof obj[Symbol.asyncIterator] === 'function'; - -/** - * Collects all values from an async generator into a flat array. - * Each yielded chunk is spread into the results array. - * - * @template T - * @param {AsyncGenerator} generator - Async generator yielding arrays - * @returns {Promise} Flattened array of all yielded items - */ -export const collectAsyncGenerator = async generator => { - const results = []; - - let chunkCount = 0; - - for await (const chunk of generator) { - chunkCount++; - - results.push(...chunk); - - streamingLogger.debug(`Collected chunk ${chunkCount}`, { - itemsInChunk: chunk.length, - }); - } - - streamingLogger.debug(`Collection complete`, { - totalItems: results.length, - chunks: chunkCount, - }); - - return results; -}; - -/** - * Creates a cache for async generator collection results. - * Ensures that when multiple consumers request the same async generator, - * only one collection happens and all consumers share the result. - */ -export const createStreamingCache = () => { - /** @type {Map>} */ - const cache = new Map(); - - return { - /** - * Gets the collected result for a generator, starting collection if needed. - * - * @param {string} key - Cache key (usually generator name) - * @param {AsyncGenerator} generator - The async generator to collect - * @returns {Promise} Promise resolving to collected results - */ - getOrCollect(key, generator) { - const hasKey = cache.has(key); - - if (!hasKey) { - cache.set(key, collectAsyncGenerator(generator)); - } - - streamingLogger.debug( - hasKey - ? `Using cached result for "${key}"` - : `Starting collection for "${key}"` - ); - - return cache.get(key); - }, - }; -}; diff --git a/src/utils/__tests__/misc.test.mjs b/src/utils/__tests__/misc.test.mjs index d6a1006c..e673ec78 100644 --- a/src/utils/__tests__/misc.test.mjs +++ b/src/utils/__tests__/misc.test.mjs @@ -3,7 +3,13 @@ import assert from 'node:assert/strict'; import { describe, it } from 'node:test'; -import { lazy, isPlainObject, omitKeys, deepMerge } from '../misc.mjs'; +import { + lazy, + isPlainObject, + isAsyncIterable, + omitKeys, + deepMerge, +} from '../misc.mjs'; describe('lazy', () => { it('should call the function only once and cache the result', () => { @@ -38,6 +44,47 @@ describe('isPlainObject', () => { }); }); +describe('isAsyncIterable', () => { + it('should return true for async generators', () => { + async function* asyncGen() { + yield 1; + } + + assert.strictEqual(isAsyncIterable(asyncGen()), true); + }); + + it('should return true for objects with Symbol.asyncIterator', () => { + const asyncIterable = { + [Symbol.asyncIterator]() { + return { next: async () => ({ done: true, value: undefined }) }; + }, + }; + + assert.strictEqual(isAsyncIterable(asyncIterable), true); + }); + + it('should return false for regular generators', () => { + function* syncGen() { + yield 1; + } + + assert.strictEqual(isAsyncIterable(syncGen()), false); + }); + + it('should return false for plain objects, arrays and primitives', () => { + assert.strictEqual(isAsyncIterable({}), false); + assert.strictEqual(isAsyncIterable([]), false); + assert.strictEqual(isAsyncIterable({ async: true }), false); + assert.strictEqual(isAsyncIterable(42), false); + assert.strictEqual(isAsyncIterable('string'), false); + }); + + it('should return false for null and undefined', () => { + assert.strictEqual(isAsyncIterable(null), false); + assert.strictEqual(isAsyncIterable(undefined), false); + }); +}); + describe('omitKeys', () => { it('should return all properties when no keys are excluded', () => { const obj = { a: 'hello', b: 42, c: true }; diff --git a/src/utils/misc.mjs b/src/utils/misc.mjs index 353e2971..7b257b64 100644 --- a/src/utils/misc.mjs +++ b/src/utils/misc.mjs @@ -19,6 +19,16 @@ export const lazy = fn => export const isPlainObject = value => value !== null && typeof value === 'object' && !Array.isArray(value); +/** + * Checks if a value is an async generator/iterable. + * @param {unknown} obj - Value to check + * @returns {obj is AsyncGenerator} True if the value is an async iterable + */ +export const isAsyncIterable = obj => + obj !== null && + typeof obj === 'object' && + typeof obj[Symbol.asyncIterator] === 'function'; + /** * Returns a shallow copy of `obj` without the specified keys. * @param {Record} obj