Merge pull request #11155 from Snuffleupagus/MessageHandler-misc-cleanup
Miscellaneous (small) clean-up of the `MessageHandler` code
This commit is contained in:
commit
7af66c8c12
@ -48,14 +48,6 @@ function wrapReason(reason) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function resolveOrReject(capability, data) {
|
|
||||||
if (data.success) {
|
|
||||||
capability.resolve();
|
|
||||||
} else {
|
|
||||||
capability.reject(wrapReason(data.reason));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function MessageHandler(sourceName, targetName, comObj) {
|
function MessageHandler(sourceName, targetName, comObj) {
|
||||||
this.sourceName = sourceName;
|
this.sourceName = sourceName;
|
||||||
this.targetName = targetName;
|
this.targetName = targetName;
|
||||||
@ -80,8 +72,8 @@ function MessageHandler(sourceName, targetName, comObj) {
|
|||||||
if (data.callbackId in callbacksCapabilities) {
|
if (data.callbackId in callbacksCapabilities) {
|
||||||
let callback = callbacksCapabilities[callbackId];
|
let callback = callbacksCapabilities[callbackId];
|
||||||
delete callbacksCapabilities[callbackId];
|
delete callbacksCapabilities[callbackId];
|
||||||
if ('error' in data) {
|
if ('reason' in data) {
|
||||||
callback.reject(wrapReason(data.error));
|
callback.reject(wrapReason(data.reason));
|
||||||
} else {
|
} else {
|
||||||
callback.resolve(data.data);
|
callback.resolve(data.data);
|
||||||
}
|
}
|
||||||
@ -109,7 +101,7 @@ function MessageHandler(sourceName, targetName, comObj) {
|
|||||||
targetName,
|
targetName,
|
||||||
isReply: true,
|
isReply: true,
|
||||||
callbackId: data.callbackId,
|
callbackId: data.callbackId,
|
||||||
error: wrapReason(reason),
|
reason: wrapReason(reason),
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
} else if (data.streamId) {
|
} else if (data.streamId) {
|
||||||
@ -193,6 +185,8 @@ MessageHandler.prototype = {
|
|||||||
this.streamControllers[streamId] = {
|
this.streamControllers[streamId] = {
|
||||||
controller,
|
controller,
|
||||||
startCall: startCapability,
|
startCall: startCapability,
|
||||||
|
pullCall: null,
|
||||||
|
cancelCall: null,
|
||||||
isClosed: false,
|
isClosed: false,
|
||||||
};
|
};
|
||||||
this.postMessage({
|
this.postMessage({
|
||||||
@ -337,33 +331,43 @@ MessageHandler.prototype = {
|
|||||||
_processStreamMessage(data) {
|
_processStreamMessage(data) {
|
||||||
let sourceName = this.sourceName;
|
let sourceName = this.sourceName;
|
||||||
let targetName = data.sourceName;
|
let targetName = data.sourceName;
|
||||||
let streamId = data.streamId;
|
const streamId = data.streamId;
|
||||||
const comObj = this.comObj;
|
const comObj = this.comObj;
|
||||||
|
|
||||||
let deleteStreamController = () => {
|
let deleteStreamController = () => {
|
||||||
// Delete the `streamController` only when the start, pull, and cancel
|
// Delete the `streamController` only when the start, pull, and cancel
|
||||||
// capabilities have settled, to prevent `TypeError`s.
|
// capabilities have settled, to prevent `TypeError`s.
|
||||||
Promise.all([
|
Promise.all([
|
||||||
this.streamControllers[data.streamId].startCall,
|
this.streamControllers[streamId].startCall,
|
||||||
this.streamControllers[data.streamId].pullCall,
|
this.streamControllers[streamId].pullCall,
|
||||||
this.streamControllers[data.streamId].cancelCall
|
this.streamControllers[streamId].cancelCall
|
||||||
].map(function(capability) {
|
].map(function(capability) {
|
||||||
return capability && capability.promise.catch(function() { });
|
return capability && capability.promise.catch(function() { });
|
||||||
})).then(() => {
|
})).then(() => {
|
||||||
delete this.streamControllers[data.streamId];
|
delete this.streamControllers[streamId];
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
switch (data.stream) {
|
switch (data.stream) {
|
||||||
case StreamKind.START_COMPLETE:
|
case StreamKind.START_COMPLETE:
|
||||||
resolveOrReject(this.streamControllers[data.streamId].startCall, data);
|
if (data.success) {
|
||||||
|
this.streamControllers[streamId].startCall.resolve();
|
||||||
|
} else {
|
||||||
|
this.streamControllers[streamId].startCall.reject(
|
||||||
|
wrapReason(data.reason));
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case StreamKind.PULL_COMPLETE:
|
case StreamKind.PULL_COMPLETE:
|
||||||
resolveOrReject(this.streamControllers[data.streamId].pullCall, data);
|
if (data.success) {
|
||||||
|
this.streamControllers[streamId].pullCall.resolve();
|
||||||
|
} else {
|
||||||
|
this.streamControllers[streamId].pullCall.reject(
|
||||||
|
wrapReason(data.reason));
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case StreamKind.PULL:
|
case StreamKind.PULL:
|
||||||
// Ignore any pull after close is called.
|
// Ignore any pull after close is called.
|
||||||
if (!this.streamSinks[data.streamId]) {
|
if (!this.streamSinks[streamId]) {
|
||||||
comObj.postMessage({
|
comObj.postMessage({
|
||||||
sourceName,
|
sourceName,
|
||||||
targetName,
|
targetName,
|
||||||
@ -376,12 +380,12 @@ MessageHandler.prototype = {
|
|||||||
// Pull increases the desiredSize property of sink,
|
// Pull increases the desiredSize property of sink,
|
||||||
// so when it changes from negative to positive,
|
// so when it changes from negative to positive,
|
||||||
// set ready property as resolved promise.
|
// set ready property as resolved promise.
|
||||||
if (this.streamSinks[data.streamId].desiredSize <= 0 &&
|
if (this.streamSinks[streamId].desiredSize <= 0 &&
|
||||||
data.desiredSize > 0) {
|
data.desiredSize > 0) {
|
||||||
this.streamSinks[data.streamId].sinkCapability.resolve();
|
this.streamSinks[streamId].sinkCapability.resolve();
|
||||||
}
|
}
|
||||||
// Reset desiredSize property of sink on every pull.
|
// Reset desiredSize property of sink on every pull.
|
||||||
this.streamSinks[data.streamId].desiredSize = data.desiredSize;
|
this.streamSinks[streamId].desiredSize = data.desiredSize;
|
||||||
const { onPull, } = this.streamSinks[data.streamId];
|
const { onPull, } = this.streamSinks[data.streamId];
|
||||||
new Promise(function(resolve) {
|
new Promise(function(resolve) {
|
||||||
resolve(onPull && onPull());
|
resolve(onPull && onPull());
|
||||||
@ -404,35 +408,41 @@ MessageHandler.prototype = {
|
|||||||
});
|
});
|
||||||
break;
|
break;
|
||||||
case StreamKind.ENQUEUE:
|
case StreamKind.ENQUEUE:
|
||||||
assert(this.streamControllers[data.streamId],
|
assert(this.streamControllers[streamId],
|
||||||
'enqueue should have stream controller');
|
'enqueue should have stream controller');
|
||||||
if (!this.streamControllers[data.streamId].isClosed) {
|
if (this.streamControllers[streamId].isClosed) {
|
||||||
this.streamControllers[data.streamId].controller.enqueue(data.chunk);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case StreamKind.CLOSE:
|
|
||||||
assert(this.streamControllers[data.streamId],
|
|
||||||
'close should have stream controller');
|
|
||||||
if (this.streamControllers[data.streamId].isClosed) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
this.streamControllers[data.streamId].isClosed = true;
|
this.streamControllers[streamId].controller.enqueue(data.chunk);
|
||||||
this.streamControllers[data.streamId].controller.close();
|
break;
|
||||||
|
case StreamKind.CLOSE:
|
||||||
|
assert(this.streamControllers[streamId],
|
||||||
|
'close should have stream controller');
|
||||||
|
if (this.streamControllers[streamId].isClosed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
this.streamControllers[streamId].isClosed = true;
|
||||||
|
this.streamControllers[streamId].controller.close();
|
||||||
deleteStreamController();
|
deleteStreamController();
|
||||||
break;
|
break;
|
||||||
case StreamKind.ERROR:
|
case StreamKind.ERROR:
|
||||||
assert(this.streamControllers[data.streamId],
|
assert(this.streamControllers[streamId],
|
||||||
'error should have stream controller');
|
'error should have stream controller');
|
||||||
this.streamControllers[data.streamId].controller.
|
this.streamControllers[streamId].controller.error(
|
||||||
error(wrapReason(data.reason));
|
wrapReason(data.reason));
|
||||||
deleteStreamController();
|
deleteStreamController();
|
||||||
break;
|
break;
|
||||||
case StreamKind.CANCEL_COMPLETE:
|
case StreamKind.CANCEL_COMPLETE:
|
||||||
resolveOrReject(this.streamControllers[data.streamId].cancelCall, data);
|
if (data.success) {
|
||||||
|
this.streamControllers[streamId].cancelCall.resolve();
|
||||||
|
} else {
|
||||||
|
this.streamControllers[streamId].cancelCall.reject(
|
||||||
|
wrapReason(data.reason));
|
||||||
|
}
|
||||||
deleteStreamController();
|
deleteStreamController();
|
||||||
break;
|
break;
|
||||||
case StreamKind.CANCEL:
|
case StreamKind.CANCEL:
|
||||||
if (!this.streamSinks[data.streamId]) {
|
if (!this.streamSinks[streamId]) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
const { onCancel, } = this.streamSinks[data.streamId];
|
const { onCancel, } = this.streamSinks[data.streamId];
|
||||||
@ -455,10 +465,10 @@ MessageHandler.prototype = {
|
|||||||
reason: wrapReason(reason),
|
reason: wrapReason(reason),
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
this.streamSinks[data.streamId].sinkCapability.
|
this.streamSinks[streamId].sinkCapability.reject(
|
||||||
reject(wrapReason(data.reason));
|
wrapReason(data.reason));
|
||||||
this.streamSinks[data.streamId].isCancelled = true;
|
this.streamSinks[streamId].isCancelled = true;
|
||||||
delete this.streamSinks[data.streamId];
|
delete this.streamSinks[streamId];
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
throw new Error('Unexpected stream case');
|
throw new Error('Unexpected stream case');
|
||||||
|
Loading…
Reference in New Issue
Block a user