added unit testing, and started implementing unit tests...phew

This commit is contained in:
Josh Burman
2019-03-12 22:28:02 -04:00
parent 74aad4a957
commit e8c2539f1b
3489 changed files with 464813 additions and 88 deletions

View File

@ -0,0 +1,204 @@
'use strict';
var _Object$setPrototypeO;
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
var finished = require('./end-of-stream');
var kLastResolve = Symbol('lastResolve');
var kLastReject = Symbol('lastReject');
var kError = Symbol('error');
var kEnded = Symbol('ended');
var kLastPromise = Symbol('lastPromise');
var kHandlePromise = Symbol('handlePromise');
var kStream = Symbol('stream');
function createIterResult(value, done) {
return {
value: value,
done: done
};
}
function readAndResolve(iter) {
var resolve = iter[kLastResolve];
if (resolve !== null) {
var data = iter[kStream].read(); // we defer if data is null
// we can be expecting either 'end' or
// 'error'
if (data !== null) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
resolve(createIterResult(data, false));
}
}
}
function onReadable(iter) {
// we wait for the next tick, because it might
// emit an error with process.nextTick
process.nextTick(readAndResolve, iter);
}
function wrapForNext(lastPromise, iter) {
return function (resolve, reject) {
lastPromise.then(function () {
iter[kHandlePromise](resolve, reject);
}, reject);
};
}
var AsyncIteratorPrototype = Object.getPrototypeOf(function () {});
var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPrototypeO = {
get stream() {
return this[kStream];
},
next: function next() {
var _this = this;
// if we have detected an error in the meanwhile
// reject straight away
var error = this[kError];
if (error !== null) {
return Promise.reject(error);
}
if (this[kEnded]) {
return Promise.resolve(createIterResult(null, true));
}
if (this[kStream].destroyed) {
// We need to defer via nextTick because if .destroy(err) is
// called, the error will be emitted via nextTick, and
// we cannot guarantee that there is no error lingering around
// waiting to be emitted.
return new Promise(function (resolve, reject) {
process.nextTick(function () {
if (_this[kError]) {
reject(_this[kError]);
} else {
resolve(createIterResult(null, true));
}
});
});
} // if we have multiple next() calls
// we will wait for the previous Promise to finish
// this logic is optimized to support for await loops,
// where next() is only called once at a time
var lastPromise = this[kLastPromise];
var promise;
if (lastPromise) {
promise = new Promise(wrapForNext(lastPromise, this));
} else {
// fast path needed to support multiple this.push()
// without triggering the next() queue
var data = this[kStream].read();
if (data !== null) {
return Promise.resolve(createIterResult(data, false));
}
promise = new Promise(this[kHandlePromise]);
}
this[kLastPromise] = promise;
return promise;
}
}, _defineProperty(_Object$setPrototypeO, Symbol.asyncIterator, function () {
return this;
}), _defineProperty(_Object$setPrototypeO, "return", function _return() {
var _this2 = this;
// destroy(err, cb) is a private API
// we can guarantee we have that here, because we control the
// Readable class this is attached to
return new Promise(function (resolve, reject) {
_this2[kStream].destroy(null, function (err) {
if (err) {
reject(err);
return;
}
resolve(createIterResult(null, true));
});
});
}), _Object$setPrototypeO), AsyncIteratorPrototype);
var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterator(stream) {
var _Object$create;
var iterator = Object.create(ReadableStreamAsyncIteratorPrototype, (_Object$create = {}, _defineProperty(_Object$create, kStream, {
value: stream,
writable: true
}), _defineProperty(_Object$create, kLastResolve, {
value: null,
writable: true
}), _defineProperty(_Object$create, kLastReject, {
value: null,
writable: true
}), _defineProperty(_Object$create, kError, {
value: null,
writable: true
}), _defineProperty(_Object$create, kEnded, {
value: stream._readableState.endEmitted,
writable: true
}), _defineProperty(_Object$create, kLastPromise, {
value: null,
writable: true
}), _defineProperty(_Object$create, kHandlePromise, {
value: function value(resolve, reject) {
var data = iterator[kStream].read();
if (data) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
resolve(createIterResult(data, false));
} else {
iterator[kLastResolve] = resolve;
iterator[kLastReject] = reject;
}
},
writable: true
}), _Object$create));
finished(stream, function (err) {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
var reject = iterator[kLastReject]; // reject if we are waiting for data in the Promise
// returned by next() and store the error
if (reject !== null) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
reject(err);
}
iterator[kError] = err;
return;
}
var resolve = iterator[kLastResolve];
if (resolve !== null) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
resolve(createIterResult(null, true));
}
iterator[kEnded] = true;
});
stream.on('readable', onReadable.bind(null, iterator));
return iterator;
};
module.exports = createReadableStreamAsyncIterator;

View File

@ -0,0 +1,189 @@
'use strict';
function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i] != null ? arguments[i] : {}; var ownKeys = Object.keys(source); if (typeof Object.getOwnPropertySymbols === 'function') { ownKeys = ownKeys.concat(Object.getOwnPropertySymbols(source).filter(function (sym) { return Object.getOwnPropertyDescriptor(source, sym).enumerable; })); } ownKeys.forEach(function (key) { _defineProperty(target, key, source[key]); }); } return target; }
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
var _require = require('buffer'),
Buffer = _require.Buffer;
var _require2 = require('util'),
inspect = _require2.inspect;
var custom = inspect && inspect.custom || 'inspect';
function copyBuffer(src, target, offset) {
Buffer.prototype.copy.call(src, target, offset);
}
module.exports =
/*#__PURE__*/
function () {
function BufferList() {
this.head = null;
this.tail = null;
this.length = 0;
}
var _proto = BufferList.prototype;
_proto.push = function push(v) {
var entry = {
data: v,
next: null
};
if (this.length > 0) this.tail.next = entry;else this.head = entry;
this.tail = entry;
++this.length;
};
_proto.unshift = function unshift(v) {
var entry = {
data: v,
next: this.head
};
if (this.length === 0) this.tail = entry;
this.head = entry;
++this.length;
};
_proto.shift = function shift() {
if (this.length === 0) return;
var ret = this.head.data;
if (this.length === 1) this.head = this.tail = null;else this.head = this.head.next;
--this.length;
return ret;
};
_proto.clear = function clear() {
this.head = this.tail = null;
this.length = 0;
};
_proto.join = function join(s) {
if (this.length === 0) return '';
var p = this.head;
var ret = '' + p.data;
while (p = p.next) {
ret += s + p.data;
}
return ret;
};
_proto.concat = function concat(n) {
if (this.length === 0) return Buffer.alloc(0);
var ret = Buffer.allocUnsafe(n >>> 0);
var p = this.head;
var i = 0;
while (p) {
copyBuffer(p.data, ret, i);
i += p.data.length;
p = p.next;
}
return ret;
} // Consumes a specified amount of bytes or characters from the buffered data.
;
_proto.consume = function consume(n, hasStrings) {
var ret;
if (n < this.head.data.length) {
// `slice` is the same for buffers and strings.
ret = this.head.data.slice(0, n);
this.head.data = this.head.data.slice(n);
} else if (n === this.head.data.length) {
// First chunk is a perfect match.
ret = this.shift();
} else {
// Result spans more than one buffer.
ret = hasStrings ? this._getString(n) : this._getBuffer(n);
}
return ret;
};
_proto.first = function first() {
return this.head.data;
} // Consumes a specified amount of characters from the buffered data.
;
_proto._getString = function _getString(n) {
var p = this.head;
var c = 1;
var ret = p.data;
n -= ret.length;
while (p = p.next) {
var str = p.data;
var nb = n > str.length ? str.length : n;
if (nb === str.length) ret += str;else ret += str.slice(0, n);
n -= nb;
if (n === 0) {
if (nb === str.length) {
++c;
if (p.next) this.head = p.next;else this.head = this.tail = null;
} else {
this.head = p;
p.data = str.slice(nb);
}
break;
}
++c;
}
this.length -= c;
return ret;
} // Consumes a specified amount of bytes from the buffered data.
;
_proto._getBuffer = function _getBuffer(n) {
var ret = Buffer.allocUnsafe(n);
var p = this.head;
var c = 1;
p.data.copy(ret);
n -= p.data.length;
while (p = p.next) {
var buf = p.data;
var nb = n > buf.length ? buf.length : n;
buf.copy(ret, ret.length - n, 0, nb);
n -= nb;
if (n === 0) {
if (nb === buf.length) {
++c;
if (p.next) this.head = p.next;else this.head = this.tail = null;
} else {
this.head = p;
p.data = buf.slice(nb);
}
break;
}
++c;
}
this.length -= c;
return ret;
} // Make sure the linked list only shows the minimal necessary information.
;
_proto[custom] = function (_, options) {
return inspect(this, _objectSpread({}, options, {
// Only inspect one level.
depth: 0,
// It should not recurse.
customInspect: false
}));
};
return BufferList;
}();

View File

@ -0,0 +1,85 @@
'use strict'; // undocumented cb() API, needed for core, not for public API
function destroy(err, cb) {
var _this = this;
var readableDestroyed = this._readableState && this._readableState.destroyed;
var writableDestroyed = this._writableState && this._writableState.destroyed;
if (readableDestroyed || writableDestroyed) {
if (cb) {
cb(err);
} else if (err && (!this._writableState || !this._writableState.errorEmitted)) {
process.nextTick(emitErrorNT, this, err);
}
return this;
} // we set destroyed to true before firing error callbacks in order
// to make it re-entrance safe in case destroy() is called within callbacks
if (this._readableState) {
this._readableState.destroyed = true;
} // if this is a duplex stream mark the writable part as destroyed as well
if (this._writableState) {
this._writableState.destroyed = true;
}
this._destroy(err || null, function (err) {
if (!cb && err) {
process.nextTick(emitErrorAndCloseNT, _this, err);
if (_this._writableState) {
_this._writableState.errorEmitted = true;
}
} else if (cb) {
process.nextTick(emitCloseNT, _this);
cb(err);
} else {
process.nextTick(emitCloseNT, _this);
}
});
return this;
}
function emitErrorAndCloseNT(self, err) {
emitErrorNT(self, err);
emitCloseNT(self);
}
function emitCloseNT(self) {
if (self._writableState && !self._writableState.emitClose) return;
if (self._readableState && !self._readableState.emitClose) return;
self.emit('close');
}
function undestroy() {
if (this._readableState) {
this._readableState.destroyed = false;
this._readableState.reading = false;
this._readableState.ended = false;
this._readableState.endEmitted = false;
}
if (this._writableState) {
this._writableState.destroyed = false;
this._writableState.ended = false;
this._writableState.ending = false;
this._writableState.finalCalled = false;
this._writableState.prefinished = false;
this._writableState.finished = false;
this._writableState.errorEmitted = false;
}
}
function emitErrorNT(self, err) {
self.emit('error', err);
}
module.exports = {
destroy: destroy,
undestroy: undestroy
};

View File

@ -0,0 +1,91 @@
// Ported from https://github.com/mafintosh/end-of-stream with
// permission from the author, Mathias Buus (@mafintosh).
'use strict';
var ERR_STREAM_PREMATURE_CLOSE = require('../../../errors').codes.ERR_STREAM_PREMATURE_CLOSE;
function noop() {}
function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}
function once(callback) {
var called = false;
return function (err) {
if (called) return;
called = true;
callback.call(this, err);
};
}
function eos(stream, opts, callback) {
if (typeof opts === 'function') return eos(stream, null, opts);
if (!opts) opts = {};
callback = once(callback || noop);
var ws = stream._writableState;
var rs = stream._readableState;
var readable = opts.readable || opts.readable !== false && stream.readable;
var writable = opts.writable || opts.writable !== false && stream.writable;
var onlegacyfinish = function onlegacyfinish() {
if (!stream.writable) onfinish();
};
var onfinish = function onfinish() {
writable = false;
if (!readable) callback.call(stream);
};
var onend = function onend() {
readable = false;
if (!writable) callback.call(stream);
};
var onerror = function onerror(err) {
callback.call(stream, err);
};
var onclose = function onclose() {
if (readable && !(rs && rs.ended)) {
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
}
if (writable && !(ws && ws.ended)) {
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
}
};
var onrequest = function onrequest() {
stream.req.on('finish', onfinish);
};
if (isRequest(stream)) {
stream.on('complete', onfinish);
stream.on('abort', onclose);
if (stream.req) onrequest();else stream.on('request', onrequest);
} else if (writable && !ws) {
// legacy streams
stream.on('end', onlegacyfinish);
stream.on('close', onlegacyfinish);
}
stream.on('end', onend);
stream.on('finish', onfinish);
if (opts.error !== false) stream.on('error', onerror);
stream.on('close', onclose);
return function () {
stream.removeListener('complete', onfinish);
stream.removeListener('abort', onclose);
stream.removeListener('request', onrequest);
if (stream.req) stream.req.removeListener('finish', onfinish);
stream.removeListener('end', onlegacyfinish);
stream.removeListener('close', onlegacyfinish);
stream.removeListener('finish', onfinish);
stream.removeListener('end', onend);
stream.removeListener('error', onerror);
stream.removeListener('close', onclose);
};
}
module.exports = eos;

View File

@ -0,0 +1,97 @@
// Ported from https://github.com/mafintosh/pump with
// permission from the author, Mathias Buus (@mafintosh).
'use strict';
var eos;
function once(callback) {
var called = false;
return function () {
if (called) return;
called = true;
callback.apply(void 0, arguments);
};
}
var _require$codes = require('../../../errors').codes,
ERR_MISSING_ARGS = _require$codes.ERR_MISSING_ARGS,
ERR_STREAM_DESTROYED = _require$codes.ERR_STREAM_DESTROYED;
function noop(err) {
// Rethrow the error if it exists to avoid swallowing it
if (err) throw err;
}
function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}
function destroyer(stream, reading, writing, callback) {
callback = once(callback);
var closed = false;
stream.on('close', function () {
closed = true;
});
if (eos === undefined) eos = require('./end-of-stream');
eos(stream, {
readable: reading,
writable: writing
}, function (err) {
if (err) return callback(err);
closed = true;
callback();
});
var destroyed = false;
return function (err) {
if (closed) return;
if (destroyed) return;
destroyed = true; // request.destroy just do .end - .abort is what we want
if (isRequest(stream)) return stream.abort();
if (typeof stream.destroy === 'function') return stream.destroy();
callback(err || new ERR_STREAM_DESTROYED('pipe'));
};
}
function call(fn) {
fn();
}
function pipe(from, to) {
return from.pipe(to);
}
function popCallback(streams) {
if (!streams.length) return noop;
if (typeof streams[streams.length - 1] !== 'function') return noop;
return streams.pop();
}
function pipeline() {
for (var _len = arguments.length, streams = new Array(_len), _key = 0; _key < _len; _key++) {
streams[_key] = arguments[_key];
}
var callback = popCallback(streams);
if (Array.isArray(streams[0])) streams = streams[0];
if (streams.length < 2) {
throw new ERR_MISSING_ARGS('streams');
}
var error;
var destroys = streams.map(function (stream, i) {
var reading = i < streams.length - 1;
var writing = i > 0;
return destroyer(stream, reading, writing, function (err) {
if (!error) error = err;
if (err) destroys.forEach(call);
if (reading) return;
destroys.forEach(call);
callback(error);
});
});
return streams.reduce(pipe);
}
module.exports = pipeline;

View File

@ -0,0 +1,27 @@
'use strict';
var ERR_INVALID_OPT_VALUE = require('../../../errors').codes.ERR_INVALID_OPT_VALUE;
function highWaterMarkFrom(options, isDuplex, duplexKey) {
return options.highWaterMark != null ? options.highWaterMark : isDuplex ? options[duplexKey] : null;
}
function getHighWaterMark(state, options, duplexKey, isDuplex) {
var hwm = highWaterMarkFrom(options, isDuplex, duplexKey);
if (hwm != null) {
if (!(isFinite(hwm) && Math.floor(hwm) === hwm) || hwm < 0) {
var name = isDuplex ? duplexKey : 'highWaterMark';
throw new ERR_INVALID_OPT_VALUE(name, hwm);
}
return Math.floor(hwm);
} // Default value
return state.objectMode ? 16 : 16 * 1024;
}
module.exports = {
getHighWaterMark: getHighWaterMark
};

View File

@ -0,0 +1 @@
module.exports = require('events').EventEmitter;

View File

@ -0,0 +1 @@
module.exports = require('stream');