Merge pull request #11107 from Snuffleupagus/MessageHandler-postMessage

Various `MessageHandler` improvements when using Streams
This commit is contained in:
Tim van der Meij 2019-08-31 00:06:17 +02:00 committed by GitHub
commit d1e6d427cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 121 additions and 71 deletions

View File

@ -1248,17 +1248,17 @@ class PDFPageProxy {
*/
_tryCleanup(resetStats = false) {
if (!this.pendingCleanup ||
Object.keys(this.intentStates).some(function(intent) {
Object.keys(this.intentStates).some((intent) => {
const intentState = this.intentStates[intent];
return (intentState.renderTasks.length !== 0 ||
!intentState.operatorList.lastChunk);
}, this)) {
})) {
return;
}
Object.keys(this.intentStates).forEach(function(intent) {
Object.keys(this.intentStates).forEach((intent) => {
delete this.intentStates[intent];
}, this);
});
this.objs.clear();
this.annotationsPromise = null;
if (resetStats && this._stats instanceof StatTimer) {
@ -1443,18 +1443,18 @@ class LoopbackPort {
}
if (!this._defer) {
this._listeners.forEach(function(listener) {
this._listeners.forEach((listener) => {
listener.call(this, { data: obj, });
}, this);
});
return;
}
const cloned = new WeakMap();
const e = { data: cloneValue(obj), };
this._deferred.then(() => {
this._listeners.forEach(function(listener) {
this._listeners.forEach((listener) => {
listener.call(this, e);
}, this);
});
});
}

View File

@ -18,6 +18,18 @@ import {
ReadableStream, UnexpectedResponseException, UnknownErrorException
} from './util';
const StreamKind = {
UNKNOWN: 0,
CANCEL: 1,
CANCEL_COMPLETE: 2,
CLOSE: 3,
ENQUEUE: 4,
ERROR: 5,
PULL: 6,
PULL_COMPLETE: 7,
START_COMPLETE: 8,
};
async function resolveCall(fn, args, thisArg = null) {
if (!fn) {
return undefined;
@ -36,20 +48,11 @@ function wrapReason(reason) {
return new MissingPDFException(reason.message);
case 'UnexpectedResponseException':
return new UnexpectedResponseException(reason.message, reason.status);
default:
case 'UnknownErrorException':
return new UnknownErrorException(reason.message, reason.details);
}
}
function makeReasonSerializable(reason) {
if (!(reason instanceof Error) ||
reason instanceof AbortException ||
reason instanceof MissingPDFException ||
reason instanceof UnexpectedResponseException ||
reason instanceof UnknownErrorException) {
return reason;
}
default:
return new UnknownErrorException(reason.message, reason.toString());
}
}
function resolveOrReject(capability, data) {
@ -113,7 +116,7 @@ function MessageHandler(sourceName, targetName, comObj) {
targetName,
isReply: true,
callbackId: data.callbackId,
error: makeReasonSerializable(reason),
error: wrapReason(reason),
});
});
} else if (data.streamId) {
@ -143,13 +146,12 @@ MessageHandler.prototype = {
* @param {Array} [transfers] - Optional list of transfers/ArrayBuffers
*/
send(actionName, data, transfers) {
var message = {
this.postMessage({
sourceName: this.sourceName,
targetName: this.targetName,
action: actionName,
data,
};
this.postMessage(message, transfers);
}, transfers);
},
/**
* Sends a message to the comObj to invoke the action with the supplied data.
@ -161,19 +163,18 @@ MessageHandler.prototype = {
*/
sendWithPromise(actionName, data, transfers) {
var callbackId = this.callbackId++;
var message = {
sourceName: this.sourceName,
targetName: this.targetName,
action: actionName,
data,
callbackId,
};
var capability = createPromiseCapability();
this.callbacksCapabilities[callbackId] = capability;
try {
this.postMessage(message, transfers);
} catch (e) {
capability.reject(e);
this.postMessage({
sourceName: this.sourceName,
targetName: this.targetName,
action: actionName,
callbackId,
data,
}, transfers);
} catch (ex) {
capability.reject(ex);
}
return capability.promise;
},
@ -191,6 +192,7 @@ MessageHandler.prototype = {
let streamId = this.streamId++;
let sourceName = this.sourceName;
let targetName = this.targetName;
const comObj = this.comObj;
return new ReadableStream({
start: (controller) => {
@ -207,7 +209,7 @@ MessageHandler.prototype = {
streamId,
data,
desiredSize: controller.desiredSize,
});
}, transfers);
// Return Promise for Async process, to signal success/failure.
return startCapability.promise;
},
@ -215,10 +217,10 @@ MessageHandler.prototype = {
pull: (controller) => {
let pullCapability = createPromiseCapability();
this.streamControllers[streamId].pullCall = pullCapability;
this.postMessage({
comObj.postMessage({
sourceName,
targetName,
stream: 'pull',
stream: StreamKind.PULL,
streamId,
desiredSize: controller.desiredSize,
});
@ -231,12 +233,12 @@ MessageHandler.prototype = {
let cancelCapability = createPromiseCapability();
this.streamControllers[streamId].cancelCall = cancelCapability;
this.streamControllers[streamId].isClosed = true;
this.postMessage({
comObj.postMessage({
sourceName,
targetName,
stream: 'cancel',
reason,
stream: StreamKind.CANCEL,
streamId,
reason,
});
// Return Promise to signal success or failure.
return cancelCapability.promise;
@ -252,12 +254,7 @@ MessageHandler.prototype = {
let sourceName = this.sourceName;
let targetName = data.sourceName;
let capability = createPromiseCapability();
let sendStreamRequest = ({ stream, chunk, transfers,
success, reason, }) => {
this.postMessage({ sourceName, targetName, stream, streamId,
chunk, success, reason, }, transfers);
};
const comObj = this.comObj;
let streamSink = {
enqueue(chunk, size = 1, transfers) {
@ -273,7 +270,13 @@ MessageHandler.prototype = {
this.sinkCapability = createPromiseCapability();
this.ready = this.sinkCapability.promise;
}
sendStreamRequest({ stream: 'enqueue', chunk, transfers, });
self.postMessage({
sourceName,
targetName,
stream: StreamKind.ENQUEUE,
streamId,
chunk,
}, transfers);
},
close() {
@ -281,7 +284,12 @@ MessageHandler.prototype = {
return;
}
this.isCancelled = true;
sendStreamRequest({ stream: 'close', });
comObj.postMessage({
sourceName,
targetName,
stream: StreamKind.CLOSE,
streamId,
});
delete self.streamSinks[streamId];
},
@ -290,7 +298,13 @@ MessageHandler.prototype = {
return;
}
this.isCancelled = true;
sendStreamRequest({ stream: 'error', reason, });
comObj.postMessage({
sourceName,
targetName,
stream: StreamKind.ERROR,
streamId,
reason,
});
},
sinkCapability: capability,
@ -305,9 +319,21 @@ MessageHandler.prototype = {
streamSink.ready = streamSink.sinkCapability.promise;
this.streamSinks[streamId] = streamSink;
resolveCall(action[0], [data.data, streamSink], action[1]).then(() => {
sendStreamRequest({ stream: 'start_complete', success: true, });
comObj.postMessage({
sourceName,
targetName,
stream: StreamKind.START_COMPLETE,
streamId,
success: true,
});
}, (reason) => {
sendStreamRequest({ stream: 'start_complete', success: false, reason, });
comObj.postMessage({
sourceName,
targetName,
stream: StreamKind.START_COMPLETE,
streamId,
reason,
});
});
},
@ -315,11 +341,7 @@ MessageHandler.prototype = {
let sourceName = this.sourceName;
let targetName = data.sourceName;
let streamId = data.streamId;
let sendStreamResponse = ({ stream, success, reason, }) => {
this.comObj.postMessage({ sourceName, targetName, stream,
success, streamId, reason, });
};
const comObj = this.comObj;
let deleteStreamController = () => {
// Delete the `streamController` only when the start, pull, and cancel
@ -336,16 +358,22 @@ MessageHandler.prototype = {
};
switch (data.stream) {
case 'start_complete':
case StreamKind.START_COMPLETE:
resolveOrReject(this.streamControllers[data.streamId].startCall, data);
break;
case 'pull_complete':
case StreamKind.PULL_COMPLETE:
resolveOrReject(this.streamControllers[data.streamId].pullCall, data);
break;
case 'pull':
case StreamKind.PULL:
// Ignore any pull after close is called.
if (!this.streamSinks[data.streamId]) {
sendStreamResponse({ stream: 'pull_complete', success: true, });
comObj.postMessage({
sourceName,
targetName,
stream: StreamKind.PULL_COMPLETE,
streamId,
success: true,
});
break;
}
// Pull increases the desiredSize property of sink,
@ -358,20 +386,31 @@ MessageHandler.prototype = {
// Reset desiredSize property of sink on every pull.
this.streamSinks[data.streamId].desiredSize = data.desiredSize;
resolveCall(this.streamSinks[data.streamId].onPull).then(() => {
sendStreamResponse({ stream: 'pull_complete', success: true, });
comObj.postMessage({
sourceName,
targetName,
stream: StreamKind.PULL_COMPLETE,
streamId,
success: true,
});
}, (reason) => {
sendStreamResponse({ stream: 'pull_complete',
success: false, reason, });
comObj.postMessage({
sourceName,
targetName,
stream: StreamKind.PULL_COMPLETE,
streamId,
reason,
});
});
break;
case 'enqueue':
case StreamKind.ENQUEUE:
assert(this.streamControllers[data.streamId],
'enqueue should have stream controller');
if (!this.streamControllers[data.streamId].isClosed) {
this.streamControllers[data.streamId].controller.enqueue(data.chunk);
}
break;
case 'close':
case StreamKind.CLOSE:
assert(this.streamControllers[data.streamId],
'close should have stream controller');
if (this.streamControllers[data.streamId].isClosed) {
@ -381,27 +420,38 @@ MessageHandler.prototype = {
this.streamControllers[data.streamId].controller.close();
deleteStreamController();
break;
case 'error':
case StreamKind.ERROR:
assert(this.streamControllers[data.streamId],
'error should have stream controller');
this.streamControllers[data.streamId].controller.
error(wrapReason(data.reason));
deleteStreamController();
break;
case 'cancel_complete':
case StreamKind.CANCEL_COMPLETE:
resolveOrReject(this.streamControllers[data.streamId].cancelCall, data);
deleteStreamController();
break;
case 'cancel':
case StreamKind.CANCEL:
if (!this.streamSinks[data.streamId]) {
break;
}
resolveCall(this.streamSinks[data.streamId].onCancel,
[wrapReason(data.reason)]).then(() => {
sendStreamResponse({ stream: 'cancel_complete', success: true, });
comObj.postMessage({
sourceName,
targetName,
stream: StreamKind.CANCEL_COMPLETE,
streamId,
success: true,
});
}, (reason) => {
sendStreamResponse({ stream: 'cancel_complete',
success: false, reason, });
comObj.postMessage({
sourceName,
targetName,
stream: StreamKind.CANCEL_COMPLETE,
streamId,
reason,
});
});
this.streamSinks[data.streamId].sinkCapability.
reject(wrapReason(data.reason));