Merge pull request #11290 from Snuffleupagus/MessageHandler-rm-in

[MessageHandler] Re-factor and convert the code to a proper `class`
This commit is contained in:
Tim van der Meij 2019-10-31 23:57:52 +01:00 committed by GitHub
commit 30ef05c161
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 116 additions and 86 deletions

View File

@ -198,6 +198,7 @@
"object-shorthand": ["error", "always", { "object-shorthand": ["error", "always", {
"avoidQuotes": true, "avoidQuotes": true,
}], }],
"prefer-const": "off",
"rest-spread-spacing": ["error", "never"], "rest-spread-spacing": ["error", "never"],
"sort-imports": ["error", { "sort-imports": ["error", {
"ignoreCase": true, "ignoreCase": true,

View File

@ -12,12 +12,19 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
/* eslint no-var: error, prefer-const: error */
import { import {
AbortException, assert, createPromiseCapability, MissingPDFException, AbortException, assert, createPromiseCapability, MissingPDFException,
ReadableStream, UnexpectedResponseException, UnknownErrorException ReadableStream, UnexpectedResponseException, UnknownErrorException
} from './util'; } from './util';
const CallbackKind = {
UNKNOWN: 0,
DATA: 1,
ERROR: 2,
};
const StreamKind = { const StreamKind = {
UNKNOWN: 0, UNKNOWN: 0,
CANCEL: 1, CANCEL: 1,
@ -54,50 +61,59 @@ function wrapReason(reason) {
} }
} }
function MessageHandler(sourceName, targetName, comObj) { class MessageHandler {
this.sourceName = sourceName; constructor(sourceName, targetName, comObj) {
this.targetName = targetName; this.sourceName = sourceName;
this.comObj = comObj; this.targetName = targetName;
this.callbackId = 1; this.comObj = comObj;
this.streamId = 1; this.callbackId = 1;
this.postMessageTransfers = true; this.streamId = 1;
this.streamSinks = Object.create(null); this.postMessageTransfers = true;
this.streamControllers = Object.create(null); this.streamSinks = Object.create(null);
let callbacksCapabilities = this.callbacksCapabilities = Object.create(null); this.streamControllers = Object.create(null);
let ah = this.actionHandler = Object.create(null); this.callbackCapabilities = Object.create(null);
this.actionHandler = Object.create(null);
this._onComObjOnMessage = (event) => { this._onComObjOnMessage = (event) => {
let data = event.data; const data = event.data;
if (data.targetName !== this.sourceName) { if (data.targetName !== this.sourceName) {
return; return;
} }
if (data.stream) { if (data.stream) {
this._processStreamMessage(data); this._processStreamMessage(data);
} else if (data.isReply) { return;
let callbackId = data.callbackId; }
if (data.callbackId in callbacksCapabilities) { if (data.callback) {
let callback = callbacksCapabilities[callbackId]; const callbackId = data.callbackId;
delete callbacksCapabilities[callbackId]; const capability = this.callbackCapabilities[callbackId];
if ('reason' in data) { if (!capability) {
callback.reject(wrapReason(data.reason)); throw new Error(`Cannot resolve callback ${callbackId}`);
} else { }
callback.resolve(data.data); delete this.callbackCapabilities[callbackId];
}
} else { if (data.callback === CallbackKind.DATA) {
throw new Error(`Cannot resolve callback ${callbackId}`); capability.resolve(data.data);
} else if (data.callback === CallbackKind.ERROR) {
capability.reject(wrapReason(data.reason));
} else {
throw new Error('Unexpected callback case');
}
return;
}
const action = this.actionHandler[data.action];
if (!action) {
throw new Error(`Unknown action from worker: ${data.action}`);
} }
} else if (data.action in ah) {
let action = ah[data.action];
if (data.callbackId) { if (data.callbackId) {
let sourceName = this.sourceName; const sourceName = this.sourceName;
let targetName = data.sourceName; const targetName = data.sourceName;
new Promise(function(resolve) { new Promise(function(resolve) {
resolve(action(data.data)); resolve(action(data.data));
}).then(function(result) { }).then(function(result) {
comObj.postMessage({ comObj.postMessage({
sourceName, sourceName,
targetName, targetName,
isReply: true, callback: CallbackKind.DATA,
callbackId: data.callbackId, callbackId: data.callbackId,
data: result, data: result,
}); });
@ -105,31 +121,35 @@ function MessageHandler(sourceName, targetName, comObj) {
comObj.postMessage({ comObj.postMessage({
sourceName, sourceName,
targetName, targetName,
isReply: true, callback: CallbackKind.ERROR,
callbackId: data.callbackId, callbackId: data.callbackId,
reason: wrapReason(reason), reason: wrapReason(reason),
}); });
}); });
} else if (data.streamId) { return;
this._createStreamSink(data);
} else {
action(data.data);
} }
} else { if (data.streamId) {
throw new Error(`Unknown action from worker: ${data.action}`); this._createStreamSink(data);
} return;
}; }
comObj.addEventListener('message', this._onComObjOnMessage); action(data.data);
} };
comObj.addEventListener('message', this._onComObjOnMessage);
}
MessageHandler.prototype = {
on(actionName, handler) { on(actionName, handler) {
var ah = this.actionHandler; if (typeof PDFJSDev === 'undefined' ||
PDFJSDev.test('!PRODUCTION || TESTING')) {
assert(typeof handler === 'function',
'MessageHandler.on: Expected "handler" to be a function.');
}
const ah = this.actionHandler;
if (ah[actionName]) { if (ah[actionName]) {
throw new Error(`There is already an actionName called "${actionName}"`); throw new Error(`There is already an actionName called "${actionName}"`);
} }
ah[actionName] = handler; ah[actionName] = handler;
}, }
/** /**
* Sends a message to the comObj to invoke the action with the supplied data. * Sends a message to the comObj to invoke the action with the supplied data.
* @param {string} actionName - Action to call. * @param {string} actionName - Action to call.
@ -137,13 +157,14 @@ MessageHandler.prototype = {
* @param {Array} [transfers] - List of transfers/ArrayBuffers. * @param {Array} [transfers] - List of transfers/ArrayBuffers.
*/ */
send(actionName, data, transfers) { send(actionName, data, transfers) {
this.postMessage({ this._postMessage({
sourceName: this.sourceName, sourceName: this.sourceName,
targetName: this.targetName, targetName: this.targetName,
action: actionName, action: actionName,
data, data,
}, transfers); }, transfers);
}, }
/** /**
* Sends a message to the comObj to invoke the action with the supplied data. * Sends a message to the comObj to invoke the action with the supplied data.
* Expects that the other side will callback with the response. * Expects that the other side will callback with the response.
@ -153,11 +174,11 @@ MessageHandler.prototype = {
* @returns {Promise} Promise to be resolved with response data. * @returns {Promise} Promise to be resolved with response data.
*/ */
sendWithPromise(actionName, data, transfers) { sendWithPromise(actionName, data, transfers) {
var callbackId = this.callbackId++; const callbackId = this.callbackId++;
var capability = createPromiseCapability(); const capability = createPromiseCapability();
this.callbacksCapabilities[callbackId] = capability; this.callbackCapabilities[callbackId] = capability;
try { try {
this.postMessage({ this._postMessage({
sourceName: this.sourceName, sourceName: this.sourceName,
targetName: this.targetName, targetName: this.targetName,
action: actionName, action: actionName,
@ -168,7 +189,8 @@ MessageHandler.prototype = {
capability.reject(ex); capability.reject(ex);
} }
return capability.promise; return capability.promise;
}, }
/** /**
* Sends a message to the comObj to invoke the action with the supplied data. * Sends a message to the comObj to invoke the action with the supplied data.
* Expect that the other side will callback to signal 'start_complete'. * Expect that the other side will callback to signal 'start_complete'.
@ -180,14 +202,14 @@ MessageHandler.prototype = {
* @returns {ReadableStream} ReadableStream to read data in chunks. * @returns {ReadableStream} ReadableStream to read data in chunks.
*/ */
sendWithStream(actionName, data, queueingStrategy, transfers) { sendWithStream(actionName, data, queueingStrategy, transfers) {
let streamId = this.streamId++; const streamId = this.streamId++;
let sourceName = this.sourceName; const sourceName = this.sourceName;
let targetName = this.targetName; const targetName = this.targetName;
const comObj = this.comObj; const comObj = this.comObj;
return new ReadableStream({ return new ReadableStream({
start: (controller) => { start: (controller) => {
let startCapability = createPromiseCapability(); const startCapability = createPromiseCapability();
this.streamControllers[streamId] = { this.streamControllers[streamId] = {
controller, controller,
startCall: startCapability, startCall: startCapability,
@ -195,7 +217,7 @@ MessageHandler.prototype = {
cancelCall: null, cancelCall: null,
isClosed: false, isClosed: false,
}; };
this.postMessage({ this._postMessage({
sourceName, sourceName,
targetName, targetName,
action: actionName, action: actionName,
@ -208,7 +230,7 @@ MessageHandler.prototype = {
}, },
pull: (controller) => { pull: (controller) => {
let pullCapability = createPromiseCapability(); const pullCapability = createPromiseCapability();
this.streamControllers[streamId].pullCall = pullCapability; this.streamControllers[streamId].pullCall = pullCapability;
comObj.postMessage({ comObj.postMessage({
sourceName, sourceName,
@ -224,7 +246,7 @@ MessageHandler.prototype = {
cancel: (reason) => { cancel: (reason) => {
assert(reason instanceof Error, 'cancel must have a valid reason'); assert(reason instanceof Error, 'cancel must have a valid reason');
let cancelCapability = createPromiseCapability(); const cancelCapability = createPromiseCapability();
this.streamControllers[streamId].cancelCall = cancelCapability; this.streamControllers[streamId].cancelCall = cancelCapability;
this.streamControllers[streamId].isClosed = true; this.streamControllers[streamId].isClosed = true;
comObj.postMessage({ comObj.postMessage({
@ -238,24 +260,25 @@ MessageHandler.prototype = {
return cancelCapability.promise; return cancelCapability.promise;
}, },
}, queueingStrategy); }, queueingStrategy);
}, }
/**
* @private
*/
_createStreamSink(data) { _createStreamSink(data) {
let self = this; const self = this;
let action = this.actionHandler[data.action]; const action = this.actionHandler[data.action];
let streamId = data.streamId; const streamId = data.streamId;
let desiredSize = data.desiredSize; const sourceName = this.sourceName;
let sourceName = this.sourceName; const targetName = data.sourceName;
let targetName = data.sourceName;
let capability = createPromiseCapability();
const comObj = this.comObj; const comObj = this.comObj;
let streamSink = { const streamSink = {
enqueue(chunk, size = 1, transfers) { enqueue(chunk, size = 1, transfers) {
if (this.isCancelled) { if (this.isCancelled) {
return; return;
} }
let lastDesiredSize = this.desiredSize; const lastDesiredSize = this.desiredSize;
this.desiredSize -= size; this.desiredSize -= size;
// Enqueue decreases the desiredSize property of sink, // Enqueue decreases the desiredSize property of sink,
// so when it changes from positive to negative, // so when it changes from positive to negative,
@ -264,7 +287,7 @@ MessageHandler.prototype = {
this.sinkCapability = createPromiseCapability(); this.sinkCapability = createPromiseCapability();
this.ready = this.sinkCapability.promise; this.ready = this.sinkCapability.promise;
} }
self.postMessage({ self._postMessage({
sourceName, sourceName,
targetName, targetName,
stream: StreamKind.ENQUEUE, stream: StreamKind.ENQUEUE,
@ -302,11 +325,11 @@ MessageHandler.prototype = {
}); });
}, },
sinkCapability: capability, sinkCapability: createPromiseCapability(),
onPull: null, onPull: null,
onCancel: null, onCancel: null,
isCancelled: false, isCancelled: false,
desiredSize, desiredSize: data.desiredSize,
ready: null, ready: null,
}; };
@ -332,12 +355,15 @@ MessageHandler.prototype = {
reason: wrapReason(reason), reason: wrapReason(reason),
}); });
}); });
}, }
/**
* @private
*/
_processStreamMessage(data) { _processStreamMessage(data) {
let sourceName = this.sourceName;
let targetName = data.sourceName;
const streamId = data.streamId; const streamId = data.streamId;
const sourceName = this.sourceName;
const targetName = data.sourceName;
const comObj = this.comObj; const comObj = this.comObj;
switch (data.stream) { switch (data.stream) {
@ -465,8 +491,11 @@ MessageHandler.prototype = {
default: default:
throw new Error('Unexpected stream case'); throw new Error('Unexpected stream case');
} }
}, }
/**
* @private
*/
async _deleteStreamController(streamId) { async _deleteStreamController(streamId) {
// Delete the `streamController` only when the start, pull, and cancel // Delete the `streamController` only when the start, pull, and cancel
// capabilities have settled, to prevent `TypeError`s. // capabilities have settled, to prevent `TypeError`s.
@ -478,26 +507,26 @@ MessageHandler.prototype = {
return capability && capability.promise.catch(function() { }); return capability && capability.promise.catch(function() { });
})); }));
delete this.streamControllers[streamId]; delete this.streamControllers[streamId];
}, }
/** /**
* Sends raw message to the comObj. * Sends raw message to the comObj.
* @private
* @param {Object} message - Raw message. * @param {Object} message - Raw message.
* @param transfers List of transfers/ArrayBuffers, or undefined. * @param transfers List of transfers/ArrayBuffers, or undefined.
* @private
*/ */
postMessage(message, transfers) { _postMessage(message, transfers) {
if (transfers && this.postMessageTransfers) { if (transfers && this.postMessageTransfers) {
this.comObj.postMessage(message, transfers); this.comObj.postMessage(message, transfers);
} else { } else {
this.comObj.postMessage(message); this.comObj.postMessage(message);
} }
}, }
destroy() { destroy() {
this.comObj.removeEventListener('message', this._onComObjOnMessage); this.comObj.removeEventListener('message', this._onComObjOnMessage);
}, }
}; }
export { export {
MessageHandler, MessageHandler,