Remove unnecessary data.streamId
accesses in MessageHandler._processStreamMessage
, and use a constant object shape in MessageHandler.sendWithStream
The `streamId` short-hand in `MessageHandler._processStreamMessage` was only used partially througout the method, which seemed kind of strange, hence that's fixed in this patch. Furthermore, always giving the `streamController` object a constant shape in `MessageHandler.sendWithStream` cannot hurt either.
This commit is contained in:
parent
d7c7f15551
commit
0617984b59
@ -193,6 +193,8 @@ MessageHandler.prototype = {
|
||||
this.streamControllers[streamId] = {
|
||||
controller,
|
||||
startCall: startCapability,
|
||||
pullCall: null,
|
||||
cancelCall: null,
|
||||
isClosed: false,
|
||||
};
|
||||
this.postMessage({
|
||||
@ -337,33 +339,33 @@ MessageHandler.prototype = {
|
||||
_processStreamMessage(data) {
|
||||
let sourceName = this.sourceName;
|
||||
let targetName = data.sourceName;
|
||||
let streamId = data.streamId;
|
||||
const streamId = data.streamId;
|
||||
const comObj = this.comObj;
|
||||
|
||||
let deleteStreamController = () => {
|
||||
// Delete the `streamController` only when the start, pull, and cancel
|
||||
// capabilities have settled, to prevent `TypeError`s.
|
||||
Promise.all([
|
||||
this.streamControllers[data.streamId].startCall,
|
||||
this.streamControllers[data.streamId].pullCall,
|
||||
this.streamControllers[data.streamId].cancelCall
|
||||
this.streamControllers[streamId].startCall,
|
||||
this.streamControllers[streamId].pullCall,
|
||||
this.streamControllers[streamId].cancelCall
|
||||
].map(function(capability) {
|
||||
return capability && capability.promise.catch(function() { });
|
||||
})).then(() => {
|
||||
delete this.streamControllers[data.streamId];
|
||||
delete this.streamControllers[streamId];
|
||||
});
|
||||
};
|
||||
|
||||
switch (data.stream) {
|
||||
case StreamKind.START_COMPLETE:
|
||||
resolveOrReject(this.streamControllers[data.streamId].startCall, data);
|
||||
resolveOrReject(this.streamControllers[streamId].startCall, data);
|
||||
break;
|
||||
case StreamKind.PULL_COMPLETE:
|
||||
resolveOrReject(this.streamControllers[data.streamId].pullCall, data);
|
||||
resolveOrReject(this.streamControllers[streamId].pullCall, data);
|
||||
break;
|
||||
case StreamKind.PULL:
|
||||
// Ignore any pull after close is called.
|
||||
if (!this.streamSinks[data.streamId]) {
|
||||
if (!this.streamSinks[streamId]) {
|
||||
comObj.postMessage({
|
||||
sourceName,
|
||||
targetName,
|
||||
@ -376,12 +378,12 @@ MessageHandler.prototype = {
|
||||
// Pull increases the desiredSize property of sink,
|
||||
// so when it changes from negative to positive,
|
||||
// set ready property as resolved promise.
|
||||
if (this.streamSinks[data.streamId].desiredSize <= 0 &&
|
||||
if (this.streamSinks[streamId].desiredSize <= 0 &&
|
||||
data.desiredSize > 0) {
|
||||
this.streamSinks[data.streamId].sinkCapability.resolve();
|
||||
this.streamSinks[streamId].sinkCapability.resolve();
|
||||
}
|
||||
// Reset desiredSize property of sink on every pull.
|
||||
this.streamSinks[data.streamId].desiredSize = data.desiredSize;
|
||||
this.streamSinks[streamId].desiredSize = data.desiredSize;
|
||||
const { onPull, } = this.streamSinks[data.streamId];
|
||||
new Promise(function(resolve) {
|
||||
resolve(onPull && onPull());
|
||||
@ -404,35 +406,36 @@ MessageHandler.prototype = {
|
||||
});
|
||||
break;
|
||||
case StreamKind.ENQUEUE:
|
||||
assert(this.streamControllers[data.streamId],
|
||||
assert(this.streamControllers[streamId],
|
||||
'enqueue should have stream controller');
|
||||
if (!this.streamControllers[data.streamId].isClosed) {
|
||||
this.streamControllers[data.streamId].controller.enqueue(data.chunk);
|
||||
}
|
||||
break;
|
||||
case StreamKind.CLOSE:
|
||||
assert(this.streamControllers[data.streamId],
|
||||
'close should have stream controller');
|
||||
if (this.streamControllers[data.streamId].isClosed) {
|
||||
if (this.streamControllers[streamId].isClosed) {
|
||||
break;
|
||||
}
|
||||
this.streamControllers[data.streamId].isClosed = true;
|
||||
this.streamControllers[data.streamId].controller.close();
|
||||
this.streamControllers[streamId].controller.enqueue(data.chunk);
|
||||
break;
|
||||
case StreamKind.CLOSE:
|
||||
assert(this.streamControllers[streamId],
|
||||
'close should have stream controller');
|
||||
if (this.streamControllers[streamId].isClosed) {
|
||||
break;
|
||||
}
|
||||
this.streamControllers[streamId].isClosed = true;
|
||||
this.streamControllers[streamId].controller.close();
|
||||
deleteStreamController();
|
||||
break;
|
||||
case StreamKind.ERROR:
|
||||
assert(this.streamControllers[data.streamId],
|
||||
assert(this.streamControllers[streamId],
|
||||
'error should have stream controller');
|
||||
this.streamControllers[data.streamId].controller.
|
||||
error(wrapReason(data.reason));
|
||||
this.streamControllers[streamId].controller.error(
|
||||
wrapReason(data.reason));
|
||||
deleteStreamController();
|
||||
break;
|
||||
case StreamKind.CANCEL_COMPLETE:
|
||||
resolveOrReject(this.streamControllers[data.streamId].cancelCall, data);
|
||||
resolveOrReject(this.streamControllers[streamId].cancelCall, data);
|
||||
deleteStreamController();
|
||||
break;
|
||||
case StreamKind.CANCEL:
|
||||
if (!this.streamSinks[data.streamId]) {
|
||||
if (!this.streamSinks[streamId]) {
|
||||
break;
|
||||
}
|
||||
const { onCancel, } = this.streamSinks[data.streamId];
|
||||
@ -455,10 +458,10 @@ MessageHandler.prototype = {
|
||||
reason: wrapReason(reason),
|
||||
});
|
||||
});
|
||||
this.streamSinks[data.streamId].sinkCapability.
|
||||
reject(wrapReason(data.reason));
|
||||
this.streamSinks[data.streamId].isCancelled = true;
|
||||
delete this.streamSinks[data.streamId];
|
||||
this.streamSinks[streamId].sinkCapability.reject(
|
||||
wrapReason(data.reason));
|
||||
this.streamSinks[streamId].isCancelled = true;
|
||||
delete this.streamSinks[streamId];
|
||||
break;
|
||||
default:
|
||||
throw new Error('Unexpected stream case');
|
||||
|
Loading…
Reference in New Issue
Block a user