export type ForAwaitable = Iterable | AsyncIterable; export type { ForAwaitable as ForOfAwaitable } export type ForAwaitableIterator = AsyncIterator | Iterator const isIterable = (x: unknown): x is Iterable => x != null && typeof x === 'object' && Symbol.iterator in x const isAsyncIterable = (x: unknown): x is AsyncIterable => x != null && typeof x === 'object' && Symbol.asyncIterator in x const forAwaitableIterator = (x: unknown): ForAwaitableIterator => isAsyncIterable(x) ? x[Symbol.asyncIterator]() : isIterable(x) ? x[Symbol.iterator]() : (() => { throw Error('Not for-awaitable. Neither Symbol.asyncIterator nor Symbol.iterator found.') })() export async function* streamToAsyncIterable(stream: ReadableStream): AsyncIterableIterator { const reader = stream.getReader(); try { let { done, value } = await reader.read(); while (!done) { yield value as T; ({ done, value } = await reader.read()); } reader.releaseLock(); // FIXME: is this necessary? } catch (err) { throw err; } } // Older version of Cloudflare Workers do not support the `ReadableStream` constructor, // so we use a TransformStream instead: function asyncIterableToStreamTS(iterable: ForAwaitable): ReadableStream { const { readable, writable } = new TransformStream(); (async () => { const writer = writable.getWriter(); try { for await (const x of iterable) writer.write(x); writer.close(); } catch (err) { writer.abort(err); } })(); return readable; } function asyncIterableToStreamRS(iterable: ForAwaitable): ReadableStream { let iterator: ForAwaitableIterator; return new ReadableStream({ start() { iterator = forAwaitableIterator(iterable); }, async pull(controller) { const { value, done } = await iterator.next(); if (!done) controller.enqueue(value); else controller.close(); }, async cancel(reason) { try { await iterator.throw?.(reason) } catch { } }, }); } const tryReadableStream = () => { try { return !!new ReadableStream({}) } catch { return false } }; export const asyncIterableToStream: (iterable: ForAwaitable) => ReadableStream = tryReadableStream() ? asyncIterableToStreamRS : asyncIterableToStreamTS; export { streamToAsyncIterable as streamToAsyncIter, asyncIterableToStream as asyncIterToStream, }