Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,40 @@ const isDuplexNodeStream = obj =>
typeof obj.on === 'function' &&
typeof obj.write === 'function';

const isNodeStream = obj => {
return (
obj &&
(obj._readableState ||
obj._writableState ||
(typeof obj.write === 'function' && typeof obj.on === 'function') ||
(typeof obj.pipe === 'function' && typeof obj.on === 'function'))
);
};

const isReadableWebStream = obj =>
obj && globalThis.ReadableStream && obj instanceof globalThis.ReadableStream;
!!(
obj &&
!isNodeStream(obj) &&
typeof obj.pipeThrough === 'function' &&
typeof obj.getReader === 'function' &&
typeof obj.cancel === 'function'
);

const isWritableWebStream = obj =>
obj && globalThis.WritableStream && obj instanceof globalThis.WritableStream;
!!(
obj &&
!isNodeStream(obj) &&
typeof obj.getWriter === 'function' &&
typeof obj.abort === 'function'
);

const isDuplexWebStream = obj =>
obj &&
globalThis.ReadableStream &&
obj.readable instanceof globalThis.ReadableStream &&
globalThis.WritableStream &&
obj.writable instanceof globalThis.WritableStream;
!!(
obj &&
!isNodeStream(obj) &&
typeof obj.readable === 'object' &&
typeof obj.writable === 'object'
);

const groupFunctions = (output, fn, index, fns) => {
if (
Expand Down Expand Up @@ -239,7 +261,7 @@ module.exports.normalizeMany = defs.normalizeMany;
module.exports.combineMany = defs.combineMany;
module.exports.combineManyMut = defs.combineManyMut;

module.exports.chain = chain; // for compatibility with 2.x
module.exports.chain = chain; // for compatibility with 2.x
module.exports.chainUnchecked = chain; // for TypeScript to bypass type checks
module.exports.gen = gen;
module.exports.asStream = asStream;
Expand Down