diff --git a/src/shared/message_handler.js b/src/shared/message_handler.js index 88c565775..c0adeb460 100644 --- a/src/shared/message_handler.js +++ b/src/shared/message_handler.js @@ -48,14 +48,6 @@ function wrapReason(reason) { } } -function resolveOrReject(capability, data) { - if (data.success) { - capability.resolve(); - } else { - capability.reject(wrapReason(data.reason)); - } -} - function MessageHandler(sourceName, targetName, comObj) { this.sourceName = sourceName; this.targetName = targetName; @@ -80,8 +72,8 @@ function MessageHandler(sourceName, targetName, comObj) { if (data.callbackId in callbacksCapabilities) { let callback = callbacksCapabilities[callbackId]; delete callbacksCapabilities[callbackId]; - if ('error' in data) { - callback.reject(wrapReason(data.error)); + if ('reason' in data) { + callback.reject(wrapReason(data.reason)); } else { callback.resolve(data.data); } @@ -109,7 +101,7 @@ function MessageHandler(sourceName, targetName, comObj) { targetName, isReply: true, callbackId: data.callbackId, - error: wrapReason(reason), + reason: wrapReason(reason), }); }); } else if (data.streamId) { @@ -193,6 +185,8 @@ MessageHandler.prototype = { this.streamControllers[streamId] = { controller, startCall: startCapability, + pullCall: null, + cancelCall: null, isClosed: false, }; this.postMessage({ @@ -337,33 +331,43 @@ MessageHandler.prototype = { _processStreamMessage(data) { let sourceName = this.sourceName; let targetName = data.sourceName; - let streamId = data.streamId; + const streamId = data.streamId; const comObj = this.comObj; let deleteStreamController = () => { // Delete the `streamController` only when the start, pull, and cancel // capabilities have settled, to prevent `TypeError`s. Promise.all([ - this.streamControllers[data.streamId].startCall, - this.streamControllers[data.streamId].pullCall, - this.streamControllers[data.streamId].cancelCall + this.streamControllers[streamId].startCall, + this.streamControllers[streamId].pullCall, + this.streamControllers[streamId].cancelCall ].map(function(capability) { return capability && capability.promise.catch(function() { }); })).then(() => { - delete this.streamControllers[data.streamId]; + delete this.streamControllers[streamId]; }); }; switch (data.stream) { case StreamKind.START_COMPLETE: - resolveOrReject(this.streamControllers[data.streamId].startCall, data); + if (data.success) { + this.streamControllers[streamId].startCall.resolve(); + } else { + this.streamControllers[streamId].startCall.reject( + wrapReason(data.reason)); + } break; case StreamKind.PULL_COMPLETE: - resolveOrReject(this.streamControllers[data.streamId].pullCall, data); + if (data.success) { + this.streamControllers[streamId].pullCall.resolve(); + } else { + this.streamControllers[streamId].pullCall.reject( + wrapReason(data.reason)); + } break; case StreamKind.PULL: // Ignore any pull after close is called. - if (!this.streamSinks[data.streamId]) { + if (!this.streamSinks[streamId]) { comObj.postMessage({ sourceName, targetName, @@ -376,12 +380,12 @@ MessageHandler.prototype = { // Pull increases the desiredSize property of sink, // so when it changes from negative to positive, // set ready property as resolved promise. - if (this.streamSinks[data.streamId].desiredSize <= 0 && + if (this.streamSinks[streamId].desiredSize <= 0 && data.desiredSize > 0) { - this.streamSinks[data.streamId].sinkCapability.resolve(); + this.streamSinks[streamId].sinkCapability.resolve(); } // Reset desiredSize property of sink on every pull. - this.streamSinks[data.streamId].desiredSize = data.desiredSize; + this.streamSinks[streamId].desiredSize = data.desiredSize; const { onPull, } = this.streamSinks[data.streamId]; new Promise(function(resolve) { resolve(onPull && onPull()); @@ -404,35 +408,41 @@ MessageHandler.prototype = { }); break; case StreamKind.ENQUEUE: - assert(this.streamControllers[data.streamId], + assert(this.streamControllers[streamId], 'enqueue should have stream controller'); - if (!this.streamControllers[data.streamId].isClosed) { - this.streamControllers[data.streamId].controller.enqueue(data.chunk); - } - break; - case StreamKind.CLOSE: - assert(this.streamControllers[data.streamId], - 'close should have stream controller'); - if (this.streamControllers[data.streamId].isClosed) { + if (this.streamControllers[streamId].isClosed) { break; } - this.streamControllers[data.streamId].isClosed = true; - this.streamControllers[data.streamId].controller.close(); + this.streamControllers[streamId].controller.enqueue(data.chunk); + break; + case StreamKind.CLOSE: + assert(this.streamControllers[streamId], + 'close should have stream controller'); + if (this.streamControllers[streamId].isClosed) { + break; + } + this.streamControllers[streamId].isClosed = true; + this.streamControllers[streamId].controller.close(); deleteStreamController(); break; case StreamKind.ERROR: - assert(this.streamControllers[data.streamId], + assert(this.streamControllers[streamId], 'error should have stream controller'); - this.streamControllers[data.streamId].controller. - error(wrapReason(data.reason)); + this.streamControllers[streamId].controller.error( + wrapReason(data.reason)); deleteStreamController(); break; case StreamKind.CANCEL_COMPLETE: - resolveOrReject(this.streamControllers[data.streamId].cancelCall, data); + if (data.success) { + this.streamControllers[streamId].cancelCall.resolve(); + } else { + this.streamControllers[streamId].cancelCall.reject( + wrapReason(data.reason)); + } deleteStreamController(); break; case StreamKind.CANCEL: - if (!this.streamSinks[data.streamId]) { + if (!this.streamSinks[streamId]) { break; } const { onCancel, } = this.streamSinks[data.streamId]; @@ -455,10 +465,10 @@ MessageHandler.prototype = { reason: wrapReason(reason), }); }); - this.streamSinks[data.streamId].sinkCapability. - reject(wrapReason(data.reason)); - this.streamSinks[data.streamId].isCancelled = true; - delete this.streamSinks[data.streamId]; + this.streamSinks[streamId].sinkCapability.reject( + wrapReason(data.reason)); + this.streamSinks[streamId].isCancelled = true; + delete this.streamSinks[streamId]; break; default: throw new Error('Unexpected stream case');