Merge pull request #8430 from mukulmishra18/sendWithStream
Adds sendWithStream method in MessageHandler.
This commit is contained in:
commit
9342054502
3
external/streams/streams-lib.js
vendored
3
external/streams/streams-lib.js
vendored
@ -1960,7 +1960,8 @@ function ReadableStreamClose(stream) {
|
|||||||
|
|
||||||
if (IsReadableStreamDefaultReader(reader) === true) {
|
if (IsReadableStreamDefaultReader(reader) === true) {
|
||||||
for (var i = 0; i < reader._readRequests.length; i++) {
|
for (var i = 0; i < reader._readRequests.length; i++) {
|
||||||
var _resolve = reader._readRequests[i];
|
var _resolve = reader._readRequests[i]._resolve;
|
||||||
|
|
||||||
_resolve(CreateIterResultObject(undefined, true));
|
_resolve(CreateIterResultObject(undefined, true));
|
||||||
}
|
}
|
||||||
reader._readRequests = [];
|
reader._readRequests = [];
|
||||||
|
@ -1214,24 +1214,50 @@ var createObjectURL = (function createObjectURLClosure() {
|
|||||||
};
|
};
|
||||||
})();
|
})();
|
||||||
|
|
||||||
|
function resolveCall(fn, args, thisArg = null) {
|
||||||
|
if (!fn) {
|
||||||
|
return Promise.resolve(undefined);
|
||||||
|
}
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
resolve(fn.apply(thisArg, args));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function resolveOrReject(capability, success, reason) {
|
||||||
|
if (success) {
|
||||||
|
capability.resolve();
|
||||||
|
} else {
|
||||||
|
capability.reject(reason);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function finalize(promise) {
|
||||||
|
return Promise.resolve(promise).catch(() => {});
|
||||||
|
}
|
||||||
|
|
||||||
function MessageHandler(sourceName, targetName, comObj) {
|
function MessageHandler(sourceName, targetName, comObj) {
|
||||||
this.sourceName = sourceName;
|
this.sourceName = sourceName;
|
||||||
this.targetName = targetName;
|
this.targetName = targetName;
|
||||||
this.comObj = comObj;
|
this.comObj = comObj;
|
||||||
this.callbackIndex = 1;
|
this.callbackId = 1;
|
||||||
|
this.streamId = 1;
|
||||||
this.postMessageTransfers = true;
|
this.postMessageTransfers = true;
|
||||||
var callbacksCapabilities = this.callbacksCapabilities = Object.create(null);
|
this.streamSinks = Object.create(null);
|
||||||
var ah = this.actionHandler = Object.create(null);
|
this.streamControllers = Object.create(null);
|
||||||
|
let callbacksCapabilities = this.callbacksCapabilities = Object.create(null);
|
||||||
|
let ah = this.actionHandler = Object.create(null);
|
||||||
|
|
||||||
this._onComObjOnMessage = (event) => {
|
this._onComObjOnMessage = (event) => {
|
||||||
var data = event.data;
|
let data = event.data;
|
||||||
if (data.targetName !== this.sourceName) {
|
if (data.targetName !== this.sourceName) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (data.isReply) {
|
if (data.stream) {
|
||||||
var callbackId = data.callbackId;
|
this._processStreamMessage(data);
|
||||||
|
} else if (data.isReply) {
|
||||||
|
let callbackId = data.callbackId;
|
||||||
if (data.callbackId in callbacksCapabilities) {
|
if (data.callbackId in callbacksCapabilities) {
|
||||||
var callback = callbacksCapabilities[callbackId];
|
let callback = callbacksCapabilities[callbackId];
|
||||||
delete callbacksCapabilities[callbackId];
|
delete callbacksCapabilities[callbackId];
|
||||||
if ('error' in data) {
|
if ('error' in data) {
|
||||||
callback.reject(data.error);
|
callback.reject(data.error);
|
||||||
@ -1242,13 +1268,13 @@ function MessageHandler(sourceName, targetName, comObj) {
|
|||||||
error('Cannot resolve callback ' + callbackId);
|
error('Cannot resolve callback ' + callbackId);
|
||||||
}
|
}
|
||||||
} else if (data.action in ah) {
|
} else if (data.action in ah) {
|
||||||
var action = ah[data.action];
|
let action = ah[data.action];
|
||||||
if (data.callbackId) {
|
if (data.callbackId) {
|
||||||
var sourceName = this.sourceName;
|
let sourceName = this.sourceName;
|
||||||
var targetName = data.sourceName;
|
let targetName = data.sourceName;
|
||||||
Promise.resolve().then(function () {
|
Promise.resolve().then(function () {
|
||||||
return action[0].call(action[1], data.data);
|
return action[0].call(action[1], data.data);
|
||||||
}).then(function (result) {
|
}).then((result) => {
|
||||||
comObj.postMessage({
|
comObj.postMessage({
|
||||||
sourceName,
|
sourceName,
|
||||||
targetName,
|
targetName,
|
||||||
@ -1256,7 +1282,7 @@ function MessageHandler(sourceName, targetName, comObj) {
|
|||||||
callbackId: data.callbackId,
|
callbackId: data.callbackId,
|
||||||
data: result,
|
data: result,
|
||||||
});
|
});
|
||||||
}, function (reason) {
|
}, (reason) => {
|
||||||
if (reason instanceof Error) {
|
if (reason instanceof Error) {
|
||||||
// Serialize error to avoid "DataCloneError"
|
// Serialize error to avoid "DataCloneError"
|
||||||
reason = reason + '';
|
reason = reason + '';
|
||||||
@ -1269,6 +1295,8 @@ function MessageHandler(sourceName, targetName, comObj) {
|
|||||||
error: reason,
|
error: reason,
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
} else if (data.streamId) {
|
||||||
|
this._createStreamSink(data);
|
||||||
} else {
|
} else {
|
||||||
action[0].call(action[1], data.data);
|
action[0].call(action[1], data.data);
|
||||||
}
|
}
|
||||||
@ -1289,9 +1317,9 @@ MessageHandler.prototype = {
|
|||||||
},
|
},
|
||||||
/**
|
/**
|
||||||
* Sends a message to the comObj to invoke the action with the supplied data.
|
* Sends a message to the comObj to invoke the action with the supplied data.
|
||||||
* @param {String} actionName Action to call.
|
* @param {String} actionName - Action to call.
|
||||||
* @param {JSON} data JSON data to send.
|
* @param {JSON} data - JSON data to send.
|
||||||
* @param {Array} [transfers] Optional list of transfers/ArrayBuffers
|
* @param {Array} [transfers] - Optional list of transfers/ArrayBuffers
|
||||||
*/
|
*/
|
||||||
send(actionName, data, transfers) {
|
send(actionName, data, transfers) {
|
||||||
var message = {
|
var message = {
|
||||||
@ -1304,14 +1332,14 @@ MessageHandler.prototype = {
|
|||||||
},
|
},
|
||||||
/**
|
/**
|
||||||
* Sends a message to the comObj to invoke the action with the supplied data.
|
* Sends a message to the comObj to invoke the action with the supplied data.
|
||||||
* Expects that other side will callback with the response.
|
* Expects that the other side will callback with the response.
|
||||||
* @param {String} actionName Action to call.
|
* @param {String} actionName - Action to call.
|
||||||
* @param {JSON} data JSON data to send.
|
* @param {JSON} data - JSON data to send.
|
||||||
* @param {Array} [transfers] Optional list of transfers/ArrayBuffers.
|
* @param {Array} [transfers] - Optional list of transfers/ArrayBuffers.
|
||||||
* @returns {Promise} Promise to be resolved with response data.
|
* @returns {Promise} Promise to be resolved with response data.
|
||||||
*/
|
*/
|
||||||
sendWithPromise(actionName, data, transfers) {
|
sendWithPromise(actionName, data, transfers) {
|
||||||
var callbackId = this.callbackIndex++;
|
var callbackId = this.callbackId++;
|
||||||
var message = {
|
var message = {
|
||||||
sourceName: this.sourceName,
|
sourceName: this.sourceName,
|
||||||
targetName: this.targetName,
|
targetName: this.targetName,
|
||||||
@ -1328,10 +1356,215 @@ MessageHandler.prototype = {
|
|||||||
}
|
}
|
||||||
return capability.promise;
|
return capability.promise;
|
||||||
},
|
},
|
||||||
|
/**
|
||||||
|
* Sends a message to the comObj to invoke the action with the supplied data.
|
||||||
|
* Expect that the other side will callback to signal 'start_complete'.
|
||||||
|
* @param {String} actionName - Action to call.
|
||||||
|
* @param {JSON} data - JSON data to send.
|
||||||
|
* @param {Object} queueingStrategy - strategy to signal backpressure based on
|
||||||
|
* internal queue.
|
||||||
|
* @param {Array} [transfers] - Optional list of transfers/ArrayBuffers.
|
||||||
|
* @return {ReadableStream} ReadableStream to read data in chunks.
|
||||||
|
*/
|
||||||
|
sendWithStream(actionName, data, queueingStrategy, transfers) {
|
||||||
|
let streamId = this.streamId++;
|
||||||
|
let sourceName = this.sourceName;
|
||||||
|
let targetName = this.targetName;
|
||||||
|
|
||||||
|
return new ReadableStream({
|
||||||
|
start: (controller) => {
|
||||||
|
let startCapability = createPromiseCapability();
|
||||||
|
this.streamControllers[streamId] = {
|
||||||
|
controller,
|
||||||
|
startCall: startCapability,
|
||||||
|
};
|
||||||
|
this.postMessage({
|
||||||
|
sourceName,
|
||||||
|
targetName,
|
||||||
|
action: actionName,
|
||||||
|
streamId,
|
||||||
|
data,
|
||||||
|
desiredSize: controller.desiredSize,
|
||||||
|
});
|
||||||
|
// Return Promise for Async process, to signal success/failure.
|
||||||
|
return startCapability.promise;
|
||||||
|
},
|
||||||
|
|
||||||
|
pull: (controller) => {
|
||||||
|
let pullCapability = createPromiseCapability();
|
||||||
|
this.streamControllers[streamId].pullCall = pullCapability;
|
||||||
|
this.postMessage({
|
||||||
|
sourceName,
|
||||||
|
targetName,
|
||||||
|
stream: 'pull',
|
||||||
|
streamId,
|
||||||
|
desiredSize: controller.desiredSize,
|
||||||
|
});
|
||||||
|
// Returning Promise will not call "pull"
|
||||||
|
// again until current pull is resolved.
|
||||||
|
return pullCapability.promise;
|
||||||
|
},
|
||||||
|
|
||||||
|
cancel: (reason) => {
|
||||||
|
let cancelCapability = createPromiseCapability();
|
||||||
|
this.streamControllers[streamId].cancelCall = cancelCapability;
|
||||||
|
this.postMessage({
|
||||||
|
sourceName,
|
||||||
|
targetName,
|
||||||
|
stream: 'cancel',
|
||||||
|
reason,
|
||||||
|
streamId,
|
||||||
|
});
|
||||||
|
// Return Promise to signal success or failure.
|
||||||
|
return cancelCapability.promise;
|
||||||
|
},
|
||||||
|
}, queueingStrategy);
|
||||||
|
},
|
||||||
|
|
||||||
|
_createStreamSink(data) {
|
||||||
|
let self = this;
|
||||||
|
let action = this.actionHandler[data.action];
|
||||||
|
let streamId = data.streamId;
|
||||||
|
let desiredSize = data.desiredSize;
|
||||||
|
let sourceName = this.sourceName;
|
||||||
|
let targetName = data.sourceName;
|
||||||
|
let capability = createPromiseCapability();
|
||||||
|
|
||||||
|
let sendStreamRequest = ({ stream, chunk, success, reason, }) => {
|
||||||
|
this.comObj.postMessage({ sourceName, targetName, stream, streamId,
|
||||||
|
chunk, success, reason, });
|
||||||
|
};
|
||||||
|
|
||||||
|
let streamSink = {
|
||||||
|
enqueue(chunk, size = 1) {
|
||||||
|
let lastDesiredSize = this.desiredSize;
|
||||||
|
this.desiredSize -= size;
|
||||||
|
// Enqueue decreases the desiredSize property of sink,
|
||||||
|
// so when it changes from positive to negative,
|
||||||
|
// set ready as unresolved promise.
|
||||||
|
if (lastDesiredSize > 0 && this.desiredSize <= 0) {
|
||||||
|
this.sinkCapability = createPromiseCapability();
|
||||||
|
this.ready = this.sinkCapability.promise;
|
||||||
|
}
|
||||||
|
sendStreamRequest({ stream: 'enqueue', chunk, });
|
||||||
|
},
|
||||||
|
|
||||||
|
close() {
|
||||||
|
sendStreamRequest({ stream: 'close', });
|
||||||
|
delete self.streamSinks[streamId];
|
||||||
|
},
|
||||||
|
|
||||||
|
error(reason) {
|
||||||
|
sendStreamRequest({ stream: 'error', reason, });
|
||||||
|
},
|
||||||
|
|
||||||
|
sinkCapability: capability,
|
||||||
|
onPull: null,
|
||||||
|
onCancel: null,
|
||||||
|
desiredSize,
|
||||||
|
ready: null,
|
||||||
|
};
|
||||||
|
|
||||||
|
streamSink.sinkCapability.resolve();
|
||||||
|
streamSink.ready = streamSink.sinkCapability.promise;
|
||||||
|
this.streamSinks[streamId] = streamSink;
|
||||||
|
resolveCall(action[0], [data.data, streamSink], action[1]).then(() => {
|
||||||
|
sendStreamRequest({ stream: 'start_complete', success: true, });
|
||||||
|
}, (reason) => {
|
||||||
|
sendStreamRequest({ stream: 'start_complete', success: false, reason, });
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
_processStreamMessage(data) {
|
||||||
|
let sourceName = this.sourceName;
|
||||||
|
let targetName = data.sourceName;
|
||||||
|
let streamId = data.streamId;
|
||||||
|
|
||||||
|
let sendStreamResponse = ({ stream, success, reason, }) => {
|
||||||
|
this.comObj.postMessage({ sourceName, targetName, stream,
|
||||||
|
success, streamId, reason, });
|
||||||
|
};
|
||||||
|
|
||||||
|
let deleteStreamController = () => {
|
||||||
|
// Delete streamController only when start, pull and
|
||||||
|
// cancel callbacks are resolved, to avoid "TypeError".
|
||||||
|
Promise.all([
|
||||||
|
this.streamControllers[data.streamId].startCall,
|
||||||
|
this.streamControllers[data.streamId].pullCall,
|
||||||
|
this.streamControllers[data.streamId].cancelCall
|
||||||
|
].map(function(capability) {
|
||||||
|
return capability && finalize(capability.promise);
|
||||||
|
})).then(() => {
|
||||||
|
delete this.streamControllers[data.streamId];
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
switch (data.stream) {
|
||||||
|
case 'start_complete':
|
||||||
|
resolveOrReject(this.streamControllers[data.streamId].startCall,
|
||||||
|
data.success, data.reason);
|
||||||
|
break;
|
||||||
|
case 'pull_complete':
|
||||||
|
resolveOrReject(this.streamControllers[data.streamId].pullCall,
|
||||||
|
data.success, data.reason);
|
||||||
|
break;
|
||||||
|
case 'pull':
|
||||||
|
// Ignore any pull after close is called.
|
||||||
|
if (!this.streamSinks[data.streamId]) {
|
||||||
|
sendStreamResponse({ stream: 'pull_complete', success: true, });
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// Pull increases the desiredSize property of sink,
|
||||||
|
// so when it changes from negative to positive,
|
||||||
|
// set ready property as resolved promise.
|
||||||
|
if (this.streamSinks[data.streamId].desiredSize <= 0 &&
|
||||||
|
data.desiredSize > 0) {
|
||||||
|
this.streamSinks[data.streamId].sinkCapability.resolve();
|
||||||
|
}
|
||||||
|
// Reset desiredSize property of sink on every pull.
|
||||||
|
this.streamSinks[data.streamId].desiredSize = data.desiredSize;
|
||||||
|
resolveCall(this.streamSinks[data.streamId].onPull).then(() => {
|
||||||
|
sendStreamResponse({ stream: 'pull_complete', success: true, });
|
||||||
|
}, (reason) => {
|
||||||
|
sendStreamResponse({ stream: 'pull_complete',
|
||||||
|
success: false, reason, });
|
||||||
|
});
|
||||||
|
break;
|
||||||
|
case 'enqueue':
|
||||||
|
this.streamControllers[data.streamId].controller.enqueue(data.chunk);
|
||||||
|
break;
|
||||||
|
case 'close':
|
||||||
|
this.streamControllers[data.streamId].controller.close();
|
||||||
|
deleteStreamController();
|
||||||
|
break;
|
||||||
|
case 'error':
|
||||||
|
this.streamControllers[data.streamId].controller.error(data.reason);
|
||||||
|
deleteStreamController();
|
||||||
|
break;
|
||||||
|
case 'cancel_complete':
|
||||||
|
resolveOrReject(this.streamControllers[data.streamId].cancelCall,
|
||||||
|
data.success, data.reason);
|
||||||
|
deleteStreamController();
|
||||||
|
break;
|
||||||
|
case 'cancel':
|
||||||
|
resolveCall(this.streamSinks[data.streamId].onCancel,
|
||||||
|
[data.reason]).then(() => {
|
||||||
|
sendStreamResponse({ stream: 'cancel_complete', success: true, });
|
||||||
|
}, (reason) => {
|
||||||
|
sendStreamResponse({ stream: 'cancel_complete',
|
||||||
|
success: false, reason, });
|
||||||
|
});
|
||||||
|
delete this.streamSinks[data.streamId];
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new Error('Unexpected stream case');
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends raw message to the comObj.
|
* Sends raw message to the comObj.
|
||||||
* @private
|
* @private
|
||||||
* @param message {Object} Raw message.
|
* @param {Object} message - Raw message.
|
||||||
* @param transfers List of transfers/ArrayBuffers, or undefined.
|
* @param transfers List of transfers/ArrayBuffers, or undefined.
|
||||||
*/
|
*/
|
||||||
postMessage(message, transfers) {
|
postMessage(message, transfers) {
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
"type1_parser_spec.js",
|
"type1_parser_spec.js",
|
||||||
"ui_utils_spec.js",
|
"ui_utils_spec.js",
|
||||||
"unicode_spec.js",
|
"unicode_spec.js",
|
||||||
"util_spec.js"
|
"util_spec.js",
|
||||||
|
"util_stream_spec.js"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
@ -45,15 +45,15 @@ function initializePDFJS(callback) {
|
|||||||
'pdfjs/display/global', 'pdfjs-test/unit/annotation_spec',
|
'pdfjs/display/global', 'pdfjs-test/unit/annotation_spec',
|
||||||
'pdfjs-test/unit/api_spec', 'pdfjs-test/unit/bidi_spec',
|
'pdfjs-test/unit/api_spec', 'pdfjs-test/unit/bidi_spec',
|
||||||
'pdfjs-test/unit/cff_parser_spec', 'pdfjs-test/unit/cmap_spec',
|
'pdfjs-test/unit/cff_parser_spec', 'pdfjs-test/unit/cmap_spec',
|
||||||
'pdfjs-test/unit/crypto_spec', 'pdfjs-test/unit/document_spec',
|
'pdfjs-test/unit/crypto_spec', 'pdfjs-test/unit/custom_spec',
|
||||||
'pdfjs-test/unit/dom_utils_spec', 'pdfjs-test/unit/evaluator_spec',
|
'pdfjs-test/unit/document_spec', 'pdfjs-test/unit/dom_utils_spec',
|
||||||
'pdfjs-test/unit/fonts_spec', 'pdfjs-test/unit/function_spec',
|
'pdfjs-test/unit/evaluator_spec', 'pdfjs-test/unit/fonts_spec',
|
||||||
'pdfjs-test/unit/metadata_spec', 'pdfjs-test/unit/murmurhash3_spec',
|
'pdfjs-test/unit/function_spec', 'pdfjs-test/unit/metadata_spec',
|
||||||
'pdfjs-test/unit/network_spec', 'pdfjs-test/unit/parser_spec',
|
'pdfjs-test/unit/murmurhash3_spec', 'pdfjs-test/unit/network_spec',
|
||||||
'pdfjs-test/unit/primitives_spec', 'pdfjs-test/unit/stream_spec',
|
'pdfjs-test/unit/parser_spec', 'pdfjs-test/unit/primitives_spec',
|
||||||
'pdfjs-test/unit/type1_parser_spec', 'pdfjs-test/unit/ui_utils_spec',
|
'pdfjs-test/unit/stream_spec', 'pdfjs-test/unit/type1_parser_spec',
|
||||||
'pdfjs-test/unit/unicode_spec', 'pdfjs-test/unit/util_spec',
|
'pdfjs-test/unit/ui_utils_spec', 'pdfjs-test/unit/unicode_spec',
|
||||||
'pdfjs-test/unit/custom_spec'
|
'pdfjs-test/unit/util_spec', 'pdfjs-test/unit/util_stream_spec'
|
||||||
].map(function (moduleName) {
|
].map(function (moduleName) {
|
||||||
return SystemJS.import(moduleName);
|
return SystemJS.import(moduleName);
|
||||||
})).then(function (modules) {
|
})).then(function (modules) {
|
||||||
|
@ -20,46 +20,46 @@ import {
|
|||||||
describe('util', function() {
|
describe('util', function() {
|
||||||
describe('stringToPDFString', function() {
|
describe('stringToPDFString', function() {
|
||||||
it('handles ISO Latin 1 strings', function() {
|
it('handles ISO Latin 1 strings', function() {
|
||||||
var str = '\x8Dstring\x8E';
|
let str = '\x8Dstring\x8E';
|
||||||
expect(stringToPDFString(str)).toEqual('\u201Cstring\u201D');
|
expect(stringToPDFString(str)).toEqual('\u201Cstring\u201D');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('handles UTF-16BE strings', function() {
|
it('handles UTF-16BE strings', function() {
|
||||||
var str = '\xFE\xFF\x00\x73\x00\x74\x00\x72\x00\x69\x00\x6E\x00\x67';
|
let str = '\xFE\xFF\x00\x73\x00\x74\x00\x72\x00\x69\x00\x6E\x00\x67';
|
||||||
expect(stringToPDFString(str)).toEqual('string');
|
expect(stringToPDFString(str)).toEqual('string');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('handles empty strings', function() {
|
it('handles empty strings', function() {
|
||||||
// ISO Latin 1
|
// ISO Latin 1
|
||||||
var str1 = '';
|
let str1 = '';
|
||||||
expect(stringToPDFString(str1)).toEqual('');
|
expect(stringToPDFString(str1)).toEqual('');
|
||||||
|
|
||||||
// UTF-16BE
|
// UTF-16BE
|
||||||
var str2 = '\xFE\xFF';
|
let str2 = '\xFE\xFF';
|
||||||
expect(stringToPDFString(str2)).toEqual('');
|
expect(stringToPDFString(str2)).toEqual('');
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('removeNullCharacters', function() {
|
describe('removeNullCharacters', function() {
|
||||||
it('should not modify string without null characters', function() {
|
it('should not modify string without null characters', function() {
|
||||||
var str = 'string without null chars';
|
let str = 'string without null chars';
|
||||||
expect(removeNullCharacters(str)).toEqual('string without null chars');
|
expect(removeNullCharacters(str)).toEqual('string without null chars');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should modify string with null characters', function() {
|
it('should modify string with null characters', function() {
|
||||||
var str = 'string\x00With\x00Null\x00Chars';
|
let str = 'string\x00With\x00Null\x00Chars';
|
||||||
expect(removeNullCharacters(str)).toEqual('stringWithNullChars');
|
expect(removeNullCharacters(str)).toEqual('stringWithNullChars');
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('ReadableStream', function() {
|
describe('ReadableStream', function() {
|
||||||
it('should return an Object', function () {
|
it('should return an Object', function () {
|
||||||
var readable = new ReadableStream();
|
let readable = new ReadableStream();
|
||||||
expect(typeof readable).toEqual('object');
|
expect(typeof readable).toEqual('object');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should have property getReader', function () {
|
it('should have property getReader', function () {
|
||||||
var readable = new ReadableStream();
|
let readable = new ReadableStream();
|
||||||
expect(typeof readable.getReader).toEqual('function');
|
expect(typeof readable.getReader).toEqual('function');
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
378
test/unit/util_stream_spec.js
Normal file
378
test/unit/util_stream_spec.js
Normal file
@ -0,0 +1,378 @@
|
|||||||
|
/* Copyright 2017 Mozilla Foundation
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { createPromiseCapability, MessageHandler } from '../../src/shared/util';
|
||||||
|
|
||||||
|
describe('util_stream', function () {
|
||||||
|
// Temporary fake port for sending messages between main and worker.
|
||||||
|
class FakePort {
|
||||||
|
constructor() {
|
||||||
|
this._listeners = [];
|
||||||
|
this._deferred = Promise.resolve(undefined);
|
||||||
|
}
|
||||||
|
|
||||||
|
postMessage(obj) {
|
||||||
|
let event = { data: obj, };
|
||||||
|
this._deferred.then(() => {
|
||||||
|
this._listeners.forEach(function (listener) {
|
||||||
|
listener.call(this, event);
|
||||||
|
}, this);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
addEventListener(name, listener) {
|
||||||
|
this._listeners.push(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
removeEventListener(name, listener) {
|
||||||
|
let i = this._listeners.indexOf(listener);
|
||||||
|
this._listeners.splice(i, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
terminate() {
|
||||||
|
this._listeners = [];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sleep function to wait for sometime, similar to setTimeout but faster.
|
||||||
|
function sleep(ticks) {
|
||||||
|
return Promise.resolve().then(() => {
|
||||||
|
return (ticks && sleep(ticks - 1));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('sendWithStream', function () {
|
||||||
|
it('should return a ReadableStream', function () {
|
||||||
|
let port = new FakePort();
|
||||||
|
let messageHandler1 = new MessageHandler('main', 'worker', port);
|
||||||
|
let readable = messageHandler1.sendWithStream('fakeHandler');
|
||||||
|
// Check if readable is an instance of ReadableStream.
|
||||||
|
expect(typeof readable).toEqual('object');
|
||||||
|
expect(typeof readable.getReader).toEqual('function');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should read using a reader', function (done) {
|
||||||
|
let log = '';
|
||||||
|
let port = new FakePort();
|
||||||
|
let messageHandler1 = new MessageHandler('main', 'worker', port);
|
||||||
|
let messageHandler2 = new MessageHandler('worker', 'main', port);
|
||||||
|
messageHandler2.on('fakeHandler', (data, sink) => {
|
||||||
|
sink.onPull = function () {
|
||||||
|
log += 'p';
|
||||||
|
};
|
||||||
|
sink.onCancel = function (reason) {
|
||||||
|
log += 'c';
|
||||||
|
};
|
||||||
|
sink.ready.then(() => {
|
||||||
|
sink.enqueue('hi');
|
||||||
|
return sink.ready;
|
||||||
|
}).then(() => {
|
||||||
|
sink.close();
|
||||||
|
});
|
||||||
|
return sleep(5);
|
||||||
|
});
|
||||||
|
let readable = messageHandler1.sendWithStream('fakeHandler', {}, {
|
||||||
|
highWaterMark: 1,
|
||||||
|
size() {
|
||||||
|
return 1;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
let reader = readable.getReader();
|
||||||
|
sleep(10).then(() => {
|
||||||
|
expect(log).toEqual('');
|
||||||
|
return reader.read();
|
||||||
|
}).then((result) => {
|
||||||
|
expect(log).toEqual('p');
|
||||||
|
expect(result.value).toEqual('hi');
|
||||||
|
expect(result.done).toEqual(false);
|
||||||
|
return sleep(10);
|
||||||
|
}).then(() => {
|
||||||
|
return reader.read();
|
||||||
|
}).then((result) => {
|
||||||
|
expect(result.value).toEqual(undefined);
|
||||||
|
expect(result.done).toEqual(true);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should not read any data when cancelled', function (done) {
|
||||||
|
let log = '';
|
||||||
|
let port = new FakePort();
|
||||||
|
let messageHandler2 = new MessageHandler('worker', 'main', port);
|
||||||
|
messageHandler2.on('fakeHandler', (data, sink) => {
|
||||||
|
sink.onPull = function () {
|
||||||
|
log += 'p';
|
||||||
|
};
|
||||||
|
sink.onCancel = function (reason) {
|
||||||
|
log += 'c';
|
||||||
|
};
|
||||||
|
log += '0';
|
||||||
|
sink.ready.then(() => {
|
||||||
|
log += '1';
|
||||||
|
sink.enqueue([1, 2, 3, 4], 4);
|
||||||
|
return sink.ready;
|
||||||
|
}).then(() => {
|
||||||
|
log += '2';
|
||||||
|
sink.enqueue([5, 6, 7, 8], 4);
|
||||||
|
return sink.ready;
|
||||||
|
}).then(() => {
|
||||||
|
log += '3';
|
||||||
|
sink.close();
|
||||||
|
}, () => {
|
||||||
|
log += '4';
|
||||||
|
});
|
||||||
|
});
|
||||||
|
let messageHandler1 = new MessageHandler('main', 'worker', port);
|
||||||
|
let readable = messageHandler1.sendWithStream('fakeHandler', {}, {
|
||||||
|
highWaterMark: 4,
|
||||||
|
size(arr) {
|
||||||
|
return arr.length;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
let reader = readable.getReader();
|
||||||
|
sleep(10).then(() => {
|
||||||
|
expect(log).toEqual('01');
|
||||||
|
return reader.read();
|
||||||
|
}).then((result) => {
|
||||||
|
expect(result.value).toEqual([1, 2, 3, 4]);
|
||||||
|
expect(result.done).toEqual(false);
|
||||||
|
return sleep(10);
|
||||||
|
}).then(() => {
|
||||||
|
expect(log).toEqual('01p2');
|
||||||
|
return reader.cancel();
|
||||||
|
}).then(() => {
|
||||||
|
expect(log).toEqual('01p2c');
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should not read when errored', function(done) {
|
||||||
|
let log = '';
|
||||||
|
let port = new FakePort();
|
||||||
|
let messageHandler2 = new MessageHandler('worker', 'main', port);
|
||||||
|
messageHandler2.on('fakeHandler', (data, sink) => {
|
||||||
|
sink.onPull = function () {
|
||||||
|
log += 'p';
|
||||||
|
};
|
||||||
|
sink.onCancel = function (reason) {
|
||||||
|
log += 'c';
|
||||||
|
};
|
||||||
|
sink.ready.then(() => {
|
||||||
|
sink.enqueue([1, 2, 3, 4], 4);
|
||||||
|
return sink.ready;
|
||||||
|
}).then(() => {
|
||||||
|
log += 'error';
|
||||||
|
sink.error('error');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
let messageHandler1 = new MessageHandler('main', 'worker', port);
|
||||||
|
let readable = messageHandler1.sendWithStream('fakeHandler', {}, {
|
||||||
|
highWaterMark: 4,
|
||||||
|
size(arr) {
|
||||||
|
return arr.length;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
let reader = readable.getReader();
|
||||||
|
|
||||||
|
sleep(10).then(() => {
|
||||||
|
expect(log).toEqual('');
|
||||||
|
return reader.read();
|
||||||
|
}).then((result) => {
|
||||||
|
expect(result.value).toEqual([1, 2, 3, 4]);
|
||||||
|
expect(result.done).toEqual(false);
|
||||||
|
return reader.read();
|
||||||
|
}).then(() => {
|
||||||
|
}, (reason) => {
|
||||||
|
expect(reason).toEqual('error');
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should read data with blocking promise', function (done) {
|
||||||
|
let log = '';
|
||||||
|
let port = new FakePort();
|
||||||
|
let messageHandler2 = new MessageHandler('worker', 'main', port);
|
||||||
|
messageHandler2.on('fakeHandler', (data, sink) => {
|
||||||
|
sink.onPull = function () {
|
||||||
|
log += 'p';
|
||||||
|
};
|
||||||
|
sink.onCancel = function (reason) {
|
||||||
|
log += 'c';
|
||||||
|
};
|
||||||
|
log += '0';
|
||||||
|
sink.ready.then(() => {
|
||||||
|
log += '1';
|
||||||
|
sink.enqueue([1, 2, 3, 4], 4);
|
||||||
|
return sink.ready;
|
||||||
|
}).then(() => {
|
||||||
|
log += '2';
|
||||||
|
sink.enqueue([5, 6, 7, 8], 4);
|
||||||
|
return sink.ready;
|
||||||
|
}).then(() => {
|
||||||
|
sink.close();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
let messageHandler1 = new MessageHandler('main', 'worker', port);
|
||||||
|
let readable = messageHandler1.sendWithStream('fakeHandler', {}, {
|
||||||
|
highWaterMark: 4,
|
||||||
|
size(arr) {
|
||||||
|
return arr.length;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
let reader = readable.getReader();
|
||||||
|
// Sleep for 10ms, so that read() is not unblocking the ready promise.
|
||||||
|
// Chain all read() to stream in sequence.
|
||||||
|
sleep(10).then(() => {
|
||||||
|
expect(log).toEqual('01');
|
||||||
|
return reader.read();
|
||||||
|
}).then((result) => {
|
||||||
|
expect(result.value).toEqual([1, 2, 3, 4]);
|
||||||
|
expect(result.done).toEqual(false);
|
||||||
|
return sleep(10);
|
||||||
|
}).then(() => {
|
||||||
|
expect(log).toEqual('01p2');
|
||||||
|
return reader.read();
|
||||||
|
}).then((result) => {
|
||||||
|
expect(result.value).toEqual([5, 6, 7, 8]);
|
||||||
|
expect(result.done).toEqual(false);
|
||||||
|
return sleep(10);
|
||||||
|
}).then(() => {
|
||||||
|
expect(log).toEqual('01p2p');
|
||||||
|
return reader.read();
|
||||||
|
}).then((result) => {
|
||||||
|
expect(result.value).toEqual(undefined);
|
||||||
|
expect(result.done).toEqual(true);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should read data with blocking promise and buffer whole data' +
|
||||||
|
' into stream', function (done) {
|
||||||
|
let log = '';
|
||||||
|
let port = new FakePort();
|
||||||
|
let messageHandler2 = new MessageHandler('worker', 'main', port);
|
||||||
|
messageHandler2.on('fakeHandler', (data, sink) => {
|
||||||
|
sink.onPull = function () {
|
||||||
|
log += 'p';
|
||||||
|
};
|
||||||
|
sink.onCancel = function (reason) {
|
||||||
|
log += 'c';
|
||||||
|
};
|
||||||
|
log += '0';
|
||||||
|
sink.ready.then(() => {
|
||||||
|
log += '1';
|
||||||
|
sink.enqueue([1, 2, 3, 4], 4);
|
||||||
|
return sink.ready;
|
||||||
|
}).then(() => {
|
||||||
|
log += '2';
|
||||||
|
sink.enqueue([5, 6, 7, 8], 4);
|
||||||
|
return sink.ready;
|
||||||
|
}).then(() => {
|
||||||
|
sink.close();
|
||||||
|
});
|
||||||
|
return sleep(10);
|
||||||
|
});
|
||||||
|
|
||||||
|
let messageHandler1 = new MessageHandler('main', 'worker', port);
|
||||||
|
let readable = messageHandler1.sendWithStream('fakeHandler', {}, {
|
||||||
|
highWaterMark: 8,
|
||||||
|
size(arr) {
|
||||||
|
return arr.length;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
let reader = readable.getReader();
|
||||||
|
|
||||||
|
sleep(10).then(() => {
|
||||||
|
expect(log).toEqual('012');
|
||||||
|
return reader.read();
|
||||||
|
}).then((result) => {
|
||||||
|
expect(result.value).toEqual([1, 2, 3, 4]);
|
||||||
|
expect(result.done).toEqual(false);
|
||||||
|
return sleep(10);
|
||||||
|
}).then(() => {
|
||||||
|
expect(log).toEqual('012p');
|
||||||
|
return reader.read();
|
||||||
|
}).then((result) => {
|
||||||
|
expect(result.value).toEqual([5, 6, 7, 8]);
|
||||||
|
expect(result.done).toEqual(false);
|
||||||
|
return sleep(10);
|
||||||
|
}).then(() => {
|
||||||
|
expect(log).toEqual('012p');
|
||||||
|
return reader.read();
|
||||||
|
}).then((result) => {
|
||||||
|
expect(result.value).toEqual(undefined);
|
||||||
|
expect(result.done).toEqual(true);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should ignore any pull after close is called', function (done) {
|
||||||
|
let log = '';
|
||||||
|
let port = new FakePort();
|
||||||
|
let capability = createPromiseCapability();
|
||||||
|
let messageHandler2 = new MessageHandler('worker', 'main', port);
|
||||||
|
messageHandler2.on('fakeHandler', (data, sink) => {
|
||||||
|
sink.onPull = function () {
|
||||||
|
log += 'p';
|
||||||
|
};
|
||||||
|
sink.onCancel = function (reason) {
|
||||||
|
log += 'c';
|
||||||
|
};
|
||||||
|
log += '0';
|
||||||
|
sink.ready.then(() => {
|
||||||
|
log += '1';
|
||||||
|
sink.enqueue([1, 2, 3, 4], 4);
|
||||||
|
});
|
||||||
|
return capability.promise.then(() => {
|
||||||
|
sink.close();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
let messageHandler1 = new MessageHandler('main', 'worker', port);
|
||||||
|
let readable = messageHandler1.sendWithStream('fakeHandler', {}, {
|
||||||
|
highWaterMark: 10,
|
||||||
|
size(arr) {
|
||||||
|
return arr.length;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
let reader = readable.getReader();
|
||||||
|
|
||||||
|
sleep(10).then(() => {
|
||||||
|
expect(log).toEqual('01');
|
||||||
|
capability.resolve();
|
||||||
|
return capability.promise.then(() => {
|
||||||
|
return reader.read();
|
||||||
|
});
|
||||||
|
}).then((result) => {
|
||||||
|
expect(result.value).toEqual([1, 2, 3, 4]);
|
||||||
|
expect(result.done).toEqual(false);
|
||||||
|
return sleep(10);
|
||||||
|
}).then(() => {
|
||||||
|
expect(log).toEqual('01');
|
||||||
|
return reader.read();
|
||||||
|
}).then((result) => {
|
||||||
|
expect(result.value).toEqual(undefined);
|
||||||
|
expect(result.done).toEqual(true);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
Loading…
x
Reference in New Issue
Block a user