Some small readability improvements in the MessageHandler code

In particular the `_processStreamMessage`-method is a bit cumbersome to read, given the way that the current streamController/streamSink is accessed, which we can improve with a couple of local variables.
This commit is contained in:
Jonas Jenwald 2021-09-21 11:21:43 +02:00
parent c3ca78fdf8
commit 890a6c1108

View File

@ -122,6 +122,7 @@ class MessageHandler {
if (data.callbackId) { if (data.callbackId) {
const cbSourceName = this.sourceName; const cbSourceName = this.sourceName;
const cbTargetName = data.sourceName; const cbTargetName = data.sourceName;
new Promise(function (resolve) { new Promise(function (resolve) {
resolve(action(data.data)); resolve(action(data.data));
}).then( }).then(
@ -230,10 +231,10 @@ class MessageHandler {
* @returns {ReadableStream} ReadableStream to read data in chunks. * @returns {ReadableStream} ReadableStream to read data in chunks.
*/ */
sendWithStream(actionName, data, queueingStrategy, transfers) { sendWithStream(actionName, data, queueingStrategy, transfers) {
const streamId = this.streamId++; const streamId = this.streamId++,
const sourceName = this.sourceName; sourceName = this.sourceName,
const targetName = this.targetName; targetName = this.targetName,
const comObj = this.comObj; comObj = this.comObj;
return new ReadableStream( return new ReadableStream(
{ {
@ -300,12 +301,12 @@ class MessageHandler {
* @private * @private
*/ */
_createStreamSink(data) { _createStreamSink(data) {
const self = this; const streamId = data.streamId,
const action = this.actionHandler[data.action]; sourceName = this.sourceName,
const streamId = data.streamId; targetName = data.sourceName,
const sourceName = this.sourceName; comObj = this.comObj;
const targetName = data.sourceName; const self = this,
const comObj = this.comObj; action = this.actionHandler[data.action];
const streamSink = { const streamSink = {
enqueue(chunk, size = 1, transfers) { enqueue(chunk, size = 1, transfers) {
@ -373,6 +374,7 @@ class MessageHandler {
streamSink.sinkCapability.resolve(); streamSink.sinkCapability.resolve();
streamSink.ready = streamSink.sinkCapability.promise; streamSink.ready = streamSink.sinkCapability.promise;
this.streamSinks[streamId] = streamSink; this.streamSinks[streamId] = streamSink;
new Promise(function (resolve) { new Promise(function (resolve) {
resolve(action(data.data, streamSink)); resolve(action(data.data, streamSink));
}).then( }).then(
@ -401,33 +403,31 @@ class MessageHandler {
* @private * @private
*/ */
_processStreamMessage(data) { _processStreamMessage(data) {
const streamId = data.streamId; const streamId = data.streamId,
const sourceName = this.sourceName; sourceName = this.sourceName,
const targetName = data.sourceName; targetName = data.sourceName,
const comObj = this.comObj; comObj = this.comObj;
const streamController = this.streamControllers[streamId],
streamSink = this.streamSinks[streamId];
switch (data.stream) { switch (data.stream) {
case StreamKind.START_COMPLETE: case StreamKind.START_COMPLETE:
if (data.success) { if (data.success) {
this.streamControllers[streamId].startCall.resolve(); streamController.startCall.resolve();
} else { } else {
this.streamControllers[streamId].startCall.reject( streamController.startCall.reject(wrapReason(data.reason));
wrapReason(data.reason)
);
} }
break; break;
case StreamKind.PULL_COMPLETE: case StreamKind.PULL_COMPLETE:
if (data.success) { if (data.success) {
this.streamControllers[streamId].pullCall.resolve(); streamController.pullCall.resolve();
} else { } else {
this.streamControllers[streamId].pullCall.reject( streamController.pullCall.reject(wrapReason(data.reason));
wrapReason(data.reason)
);
} }
break; break;
case StreamKind.PULL: case StreamKind.PULL:
// Ignore any pull after close is called. // Ignore any pull after close is called.
if (!this.streamSinks[streamId]) { if (!streamSink) {
comObj.postMessage({ comObj.postMessage({
sourceName, sourceName,
targetName, targetName,
@ -437,20 +437,16 @@ class MessageHandler {
}); });
break; break;
} }
// Pull increases the desiredSize property of sink, // Pull increases the desiredSize property of sink, so when it changes
// so when it changes from negative to positive, // from negative to positive, set ready property as resolved promise.
// set ready property as resolved promise. if (streamSink.desiredSize <= 0 && data.desiredSize > 0) {
if ( streamSink.sinkCapability.resolve();
this.streamSinks[streamId].desiredSize <= 0 &&
data.desiredSize > 0
) {
this.streamSinks[streamId].sinkCapability.resolve();
} }
// Reset desiredSize property of sink on every pull. // Reset desiredSize property of sink on every pull.
this.streamSinks[streamId].desiredSize = data.desiredSize; streamSink.desiredSize = data.desiredSize;
const { onPull } = this.streamSinks[streamId];
new Promise(function (resolve) { new Promise(function (resolve) {
resolve(onPull && onPull()); resolve(streamSink.onPull && streamSink.onPull());
}).then( }).then(
function () { function () {
comObj.postMessage({ comObj.postMessage({
@ -473,54 +469,43 @@ class MessageHandler {
); );
break; break;
case StreamKind.ENQUEUE: case StreamKind.ENQUEUE:
assert( assert(streamController, "enqueue should have stream controller");
this.streamControllers[streamId], if (streamController.isClosed) {
"enqueue should have stream controller"
);
if (this.streamControllers[streamId].isClosed) {
break; break;
} }
this.streamControllers[streamId].controller.enqueue(data.chunk); streamController.controller.enqueue(data.chunk);
break; break;
case StreamKind.CLOSE: case StreamKind.CLOSE:
assert( assert(streamController, "close should have stream controller");
this.streamControllers[streamId], if (streamController.isClosed) {
"close should have stream controller"
);
if (this.streamControllers[streamId].isClosed) {
break; break;
} }
this.streamControllers[streamId].isClosed = true; streamController.isClosed = true;
this.streamControllers[streamId].controller.close(); streamController.controller.close();
this._deleteStreamController(streamId); this._deleteStreamController(streamController, streamId);
break; break;
case StreamKind.ERROR: case StreamKind.ERROR:
assert( assert(streamController, "error should have stream controller");
this.streamControllers[streamId], streamController.controller.error(wrapReason(data.reason));
"error should have stream controller" this._deleteStreamController(streamController, streamId);
);
this.streamControllers[streamId].controller.error(
wrapReason(data.reason)
);
this._deleteStreamController(streamId);
break; break;
case StreamKind.CANCEL_COMPLETE: case StreamKind.CANCEL_COMPLETE:
if (data.success) { if (data.success) {
this.streamControllers[streamId].cancelCall.resolve(); streamController.cancelCall.resolve();
} else { } else {
this.streamControllers[streamId].cancelCall.reject( streamController.cancelCall.reject(wrapReason(data.reason));
wrapReason(data.reason)
);
} }
this._deleteStreamController(streamId); this._deleteStreamController(streamController, streamId);
break; break;
case StreamKind.CANCEL: case StreamKind.CANCEL:
if (!this.streamSinks[streamId]) { if (!streamSink) {
break; break;
} }
const { onCancel } = this.streamSinks[streamId];
new Promise(function (resolve) { new Promise(function (resolve) {
resolve(onCancel && onCancel(wrapReason(data.reason))); resolve(
streamSink.onCancel && streamSink.onCancel(wrapReason(data.reason))
);
}).then( }).then(
function () { function () {
comObj.postMessage({ comObj.postMessage({
@ -541,10 +526,8 @@ class MessageHandler {
}); });
} }
); );
this.streamSinks[streamId].sinkCapability.reject( streamSink.sinkCapability.reject(wrapReason(data.reason));
wrapReason(data.reason) streamSink.isCancelled = true;
);
this.streamSinks[streamId].isCancelled = true;
delete this.streamSinks[streamId]; delete this.streamSinks[streamId];
break; break;
default: default:
@ -555,18 +538,14 @@ class MessageHandler {
/** /**
* @private * @private
*/ */
async _deleteStreamController(streamId) { async _deleteStreamController(streamController, streamId) {
// Delete the `streamController` only when the start, pull, and cancel // Delete the `streamController` only when the start, pull, and cancel
// capabilities have settled, to prevent `TypeError`s. // capabilities have settled, to prevent `TypeError`s.
await Promise.allSettled( await Promise.allSettled([
[ streamController.startCall && streamController.startCall.promise,
this.streamControllers[streamId].startCall, streamController.pullCall && streamController.pullCall.promise,
this.streamControllers[streamId].pullCall, streamController.cancelCall && streamController.cancelCall.promise,
this.streamControllers[streamId].cancelCall, ]);
].map(function (capability) {
return capability && capability.promise;
})
);
delete this.streamControllers[streamId]; delete this.streamControllers[streamId];
} }