From 44d8afd46b451101d79cad2e06810ddaa5a717e0 Mon Sep 17 00:00:00 2001 From: Jonas Jenwald Date: Mon, 4 Jun 2018 12:37:54 +0200 Subject: [PATCH] Move `MessageHandler` into a separate `src/shared/message_handler.js` file The `MessageHandler` itself, and its assorted helper functions, are currently the single largest[1] piece of code in the `src/shared/util.js` file. By moving this code into its own file, `src/shared/util.js` thus becomes smaller and more manageable. --- src/core/worker.js | 3 +- src/display/api.js | 8 +- src/shared/message_handler.js | 446 ++++++++++++++++++ src/shared/util.js | 424 ----------------- test/unit/clitests.json | 4 +- test/unit/jasmine-boot.js | 2 +- ...stream_spec.js => message_handler_spec.js} | 5 +- 7 files changed, 458 insertions(+), 434 deletions(-) create mode 100644 src/shared/message_handler.js rename test/unit/{util_stream_spec.js => message_handler_spec.js} (98%) diff --git a/src/core/worker.js b/src/core/worker.js index 72c381a60..45c8e33cc 100644 --- a/src/core/worker.js +++ b/src/core/worker.js @@ -15,12 +15,13 @@ import { arrayByteLength, arraysToBytes, assert, createPromiseCapability, info, - InvalidPDFException, MessageHandler, MissingPDFException, PasswordException, + InvalidPDFException, MissingPDFException, PasswordException, setVerbosityLevel, UnexpectedResponseException, UnknownErrorException, UNSUPPORTED_FEATURES, warn, XRefParseException } from '../shared/util'; import { LocalPdfManager, NetworkPdfManager } from './pdf_manager'; import isNodeJS from '../shared/is_node'; +import { MessageHandler } from '../shared/message_handler'; import { Ref } from './primitives'; var WorkerTask = (function WorkerTaskClosure() { diff --git a/src/display/api.js b/src/display/api.js index 24dee8c77..4c23c7741 100644 --- a/src/display/api.js +++ b/src/display/api.js @@ -16,10 +16,9 @@ import { assert, createPromiseCapability, getVerbosityLevel, info, InvalidPDFException, - isArrayBuffer, isNum, isSameOrigin, MessageHandler, MissingPDFException, - NativeImageDecoding, PasswordException, setVerbosityLevel, shadow, - stringToBytes, UnexpectedResponseException, UnknownErrorException, - unreachable, Util, warn + isArrayBuffer, isNum, isSameOrigin, MissingPDFException, NativeImageDecoding, + PasswordException, setVerbosityLevel, shadow, stringToBytes, + UnexpectedResponseException, UnknownErrorException, unreachable, Util, warn } from '../shared/util'; import { DOMCanvasFactory, DOMCMapReaderFactory, DummyStatTimer, PageViewport, @@ -30,6 +29,7 @@ import { apiCompatibilityParams } from './api_compatibility'; import { CanvasGraphics } from './canvas'; import globalScope from '../shared/global_scope'; import { GlobalWorkerOptions } from './worker_options'; +import { MessageHandler } from '../shared/message_handler'; import { Metadata } from './metadata'; import { PDFDataTransportStream } from './transport_stream'; import { WebGLContext } from './webgl'; diff --git a/src/shared/message_handler.js b/src/shared/message_handler.js new file mode 100644 index 000000000..e24cacae4 --- /dev/null +++ b/src/shared/message_handler.js @@ -0,0 +1,446 @@ +/* Copyright 2018 Mozilla Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { + AbortException, assert, createPromiseCapability, MissingPDFException, + ReadableStream, UnexpectedResponseException, UnknownErrorException +} from './util'; + +function resolveCall(fn, args, thisArg = null) { + if (!fn) { + return Promise.resolve(undefined); + } + return new Promise((resolve, reject) => { + resolve(fn.apply(thisArg, args)); + }); +} + +function wrapReason(reason) { + if (typeof reason !== 'object') { + return reason; + } + switch (reason.name) { + case 'AbortException': + return new AbortException(reason.message); + case 'MissingPDFException': + return new MissingPDFException(reason.message); + case 'UnexpectedResponseException': + return new UnexpectedResponseException(reason.message, reason.status); + default: + return new UnknownErrorException(reason.message, reason.details); + } +} + +function makeReasonSerializable(reason) { + if (!(reason instanceof Error) || + reason instanceof AbortException || + reason instanceof MissingPDFException || + reason instanceof UnexpectedResponseException || + reason instanceof UnknownErrorException) { + return reason; + } + return new UnknownErrorException(reason.message, reason.toString()); +} + +function resolveOrReject(capability, success, reason) { + if (success) { + capability.resolve(); + } else { + capability.reject(reason); + } +} + +function finalize(promise) { + return Promise.resolve(promise).catch(() => {}); +} + +function MessageHandler(sourceName, targetName, comObj) { + this.sourceName = sourceName; + this.targetName = targetName; + this.comObj = comObj; + this.callbackId = 1; + this.streamId = 1; + this.postMessageTransfers = true; + this.streamSinks = Object.create(null); + this.streamControllers = Object.create(null); + let callbacksCapabilities = this.callbacksCapabilities = Object.create(null); + let ah = this.actionHandler = Object.create(null); + + this._onComObjOnMessage = (event) => { + let data = event.data; + if (data.targetName !== this.sourceName) { + return; + } + if (data.stream) { + this._processStreamMessage(data); + } else if (data.isReply) { + let callbackId = data.callbackId; + if (data.callbackId in callbacksCapabilities) { + let callback = callbacksCapabilities[callbackId]; + delete callbacksCapabilities[callbackId]; + if ('error' in data) { + callback.reject(wrapReason(data.error)); + } else { + callback.resolve(data.data); + } + } else { + throw new Error(`Cannot resolve callback ${callbackId}`); + } + } else if (data.action in ah) { + let action = ah[data.action]; + if (data.callbackId) { + let sourceName = this.sourceName; + let targetName = data.sourceName; + Promise.resolve().then(function () { + return action[0].call(action[1], data.data); + }).then((result) => { + comObj.postMessage({ + sourceName, + targetName, + isReply: true, + callbackId: data.callbackId, + data: result, + }); + }, (reason) => { + comObj.postMessage({ + sourceName, + targetName, + isReply: true, + callbackId: data.callbackId, + error: makeReasonSerializable(reason), + }); + }); + } else if (data.streamId) { + this._createStreamSink(data); + } else { + action[0].call(action[1], data.data); + } + } else { + throw new Error(`Unknown action from worker: ${data.action}`); + } + }; + comObj.addEventListener('message', this._onComObjOnMessage); +} + +MessageHandler.prototype = { + on(actionName, handler, scope) { + var ah = this.actionHandler; + if (ah[actionName]) { + throw new Error(`There is already an actionName called "${actionName}"`); + } + ah[actionName] = [handler, scope]; + }, + /** + * Sends a message to the comObj to invoke the action with the supplied data. + * @param {String} actionName - Action to call. + * @param {JSON} data - JSON data to send. + * @param {Array} [transfers] - Optional list of transfers/ArrayBuffers + */ + send(actionName, data, transfers) { + var message = { + sourceName: this.sourceName, + targetName: this.targetName, + action: actionName, + data, + }; + this.postMessage(message, transfers); + }, + /** + * Sends a message to the comObj to invoke the action with the supplied data. + * Expects that the other side will callback with the response. + * @param {String} actionName - Action to call. + * @param {JSON} data - JSON data to send. + * @param {Array} [transfers] - Optional list of transfers/ArrayBuffers. + * @returns {Promise} Promise to be resolved with response data. + */ + sendWithPromise(actionName, data, transfers) { + var callbackId = this.callbackId++; + var message = { + sourceName: this.sourceName, + targetName: this.targetName, + action: actionName, + data, + callbackId, + }; + var capability = createPromiseCapability(); + this.callbacksCapabilities[callbackId] = capability; + try { + this.postMessage(message, transfers); + } catch (e) { + capability.reject(e); + } + return capability.promise; + }, + /** + * Sends a message to the comObj to invoke the action with the supplied data. + * Expect that the other side will callback to signal 'start_complete'. + * @param {String} actionName - Action to call. + * @param {JSON} data - JSON data to send. + * @param {Object} queueingStrategy - strategy to signal backpressure based on + * internal queue. + * @param {Array} [transfers] - Optional list of transfers/ArrayBuffers. + * @return {ReadableStream} ReadableStream to read data in chunks. + */ + sendWithStream(actionName, data, queueingStrategy, transfers) { + let streamId = this.streamId++; + let sourceName = this.sourceName; + let targetName = this.targetName; + + return new ReadableStream({ + start: (controller) => { + let startCapability = createPromiseCapability(); + this.streamControllers[streamId] = { + controller, + startCall: startCapability, + isClosed: false, + }; + this.postMessage({ + sourceName, + targetName, + action: actionName, + streamId, + data, + desiredSize: controller.desiredSize, + }); + // Return Promise for Async process, to signal success/failure. + return startCapability.promise; + }, + + pull: (controller) => { + let pullCapability = createPromiseCapability(); + this.streamControllers[streamId].pullCall = pullCapability; + this.postMessage({ + sourceName, + targetName, + stream: 'pull', + streamId, + desiredSize: controller.desiredSize, + }); + // Returning Promise will not call "pull" + // again until current pull is resolved. + return pullCapability.promise; + }, + + cancel: (reason) => { + let cancelCapability = createPromiseCapability(); + this.streamControllers[streamId].cancelCall = cancelCapability; + this.streamControllers[streamId].isClosed = true; + this.postMessage({ + sourceName, + targetName, + stream: 'cancel', + reason, + streamId, + }); + // Return Promise to signal success or failure. + return cancelCapability.promise; + }, + }, queueingStrategy); + }, + + _createStreamSink(data) { + let self = this; + let action = this.actionHandler[data.action]; + let streamId = data.streamId; + let desiredSize = data.desiredSize; + let sourceName = this.sourceName; + let targetName = data.sourceName; + let capability = createPromiseCapability(); + + let sendStreamRequest = ({ stream, chunk, transfers, + success, reason, }) => { + this.postMessage({ sourceName, targetName, stream, streamId, + chunk, success, reason, }, transfers); + }; + + let streamSink = { + enqueue(chunk, size = 1, transfers) { + if (this.isCancelled) { + return; + } + let lastDesiredSize = this.desiredSize; + this.desiredSize -= size; + // Enqueue decreases the desiredSize property of sink, + // so when it changes from positive to negative, + // set ready as unresolved promise. + if (lastDesiredSize > 0 && this.desiredSize <= 0) { + this.sinkCapability = createPromiseCapability(); + this.ready = this.sinkCapability.promise; + } + sendStreamRequest({ stream: 'enqueue', chunk, transfers, }); + }, + + close() { + if (this.isCancelled) { + return; + } + this.isCancelled = true; + sendStreamRequest({ stream: 'close', }); + delete self.streamSinks[streamId]; + }, + + error(reason) { + if (this.isCancelled) { + return; + } + this.isCancelled = true; + sendStreamRequest({ stream: 'error', reason, }); + }, + + sinkCapability: capability, + onPull: null, + onCancel: null, + isCancelled: false, + desiredSize, + ready: null, + }; + + streamSink.sinkCapability.resolve(); + streamSink.ready = streamSink.sinkCapability.promise; + this.streamSinks[streamId] = streamSink; + resolveCall(action[0], [data.data, streamSink], action[1]).then(() => { + sendStreamRequest({ stream: 'start_complete', success: true, }); + }, (reason) => { + sendStreamRequest({ stream: 'start_complete', success: false, reason, }); + }); + }, + + _processStreamMessage(data) { + let sourceName = this.sourceName; + let targetName = data.sourceName; + let streamId = data.streamId; + + let sendStreamResponse = ({ stream, success, reason, }) => { + this.comObj.postMessage({ sourceName, targetName, stream, + success, streamId, reason, }); + }; + + let deleteStreamController = () => { + // Delete streamController only when start, pull and + // cancel callbacks are resolved, to avoid "TypeError". + Promise.all([ + this.streamControllers[data.streamId].startCall, + this.streamControllers[data.streamId].pullCall, + this.streamControllers[data.streamId].cancelCall + ].map(function(capability) { + return capability && finalize(capability.promise); + })).then(() => { + delete this.streamControllers[data.streamId]; + }); + }; + + switch (data.stream) { + case 'start_complete': + resolveOrReject(this.streamControllers[data.streamId].startCall, + data.success, wrapReason(data.reason)); + break; + case 'pull_complete': + resolveOrReject(this.streamControllers[data.streamId].pullCall, + data.success, wrapReason(data.reason)); + break; + case 'pull': + // Ignore any pull after close is called. + if (!this.streamSinks[data.streamId]) { + sendStreamResponse({ stream: 'pull_complete', success: true, }); + break; + } + // Pull increases the desiredSize property of sink, + // so when it changes from negative to positive, + // set ready property as resolved promise. + if (this.streamSinks[data.streamId].desiredSize <= 0 && + data.desiredSize > 0) { + this.streamSinks[data.streamId].sinkCapability.resolve(); + } + // Reset desiredSize property of sink on every pull. + this.streamSinks[data.streamId].desiredSize = data.desiredSize; + resolveCall(this.streamSinks[data.streamId].onPull).then(() => { + sendStreamResponse({ stream: 'pull_complete', success: true, }); + }, (reason) => { + sendStreamResponse({ stream: 'pull_complete', + success: false, reason, }); + }); + break; + case 'enqueue': + assert(this.streamControllers[data.streamId], + 'enqueue should have stream controller'); + if (!this.streamControllers[data.streamId].isClosed) { + this.streamControllers[data.streamId].controller.enqueue(data.chunk); + } + break; + case 'close': + assert(this.streamControllers[data.streamId], + 'close should have stream controller'); + if (this.streamControllers[data.streamId].isClosed) { + break; + } + this.streamControllers[data.streamId].isClosed = true; + this.streamControllers[data.streamId].controller.close(); + deleteStreamController(); + break; + case 'error': + assert(this.streamControllers[data.streamId], + 'error should have stream controller'); + this.streamControllers[data.streamId].controller. + error(wrapReason(data.reason)); + deleteStreamController(); + break; + case 'cancel_complete': + resolveOrReject(this.streamControllers[data.streamId].cancelCall, + data.success, wrapReason(data.reason)); + deleteStreamController(); + break; + case 'cancel': + if (!this.streamSinks[data.streamId]) { + break; + } + resolveCall(this.streamSinks[data.streamId].onCancel, + [wrapReason(data.reason)]).then(() => { + sendStreamResponse({ stream: 'cancel_complete', success: true, }); + }, (reason) => { + sendStreamResponse({ stream: 'cancel_complete', + success: false, reason, }); + }); + this.streamSinks[data.streamId].sinkCapability. + reject(wrapReason(data.reason)); + this.streamSinks[data.streamId].isCancelled = true; + delete this.streamSinks[data.streamId]; + break; + default: + throw new Error('Unexpected stream case'); + } + }, + + /** + * Sends raw message to the comObj. + * @private + * @param {Object} message - Raw message. + * @param transfers List of transfers/ArrayBuffers, or undefined. + */ + postMessage(message, transfers) { + if (transfers && this.postMessageTransfers) { + this.comObj.postMessage(message, transfers); + } else { + this.comObj.postMessage(message); + } + }, + + destroy() { + this.comObj.removeEventListener('message', this._onComObjOnMessage); + }, +}; + +export { + MessageHandler, +}; diff --git a/src/shared/util.js b/src/shared/util.js index 5330cd61d..6ffbedf60 100644 --- a/src/shared/util.js +++ b/src/shared/util.js @@ -1048,429 +1048,6 @@ var createObjectURL = (function createObjectURLClosure() { }; })(); -function resolveCall(fn, args, thisArg = null) { - if (!fn) { - return Promise.resolve(undefined); - } - return new Promise((resolve, reject) => { - resolve(fn.apply(thisArg, args)); - }); -} - -function wrapReason(reason) { - if (typeof reason !== 'object') { - return reason; - } - switch (reason.name) { - case 'AbortException': - return new AbortException(reason.message); - case 'MissingPDFException': - return new MissingPDFException(reason.message); - case 'UnexpectedResponseException': - return new UnexpectedResponseException(reason.message, reason.status); - default: - return new UnknownErrorException(reason.message, reason.details); - } -} - -function makeReasonSerializable(reason) { - if (!(reason instanceof Error) || - reason instanceof AbortException || - reason instanceof MissingPDFException || - reason instanceof UnexpectedResponseException || - reason instanceof UnknownErrorException) { - return reason; - } - return new UnknownErrorException(reason.message, reason.toString()); -} - -function resolveOrReject(capability, success, reason) { - if (success) { - capability.resolve(); - } else { - capability.reject(reason); - } -} - -function finalize(promise) { - return Promise.resolve(promise).catch(() => {}); -} - -function MessageHandler(sourceName, targetName, comObj) { - this.sourceName = sourceName; - this.targetName = targetName; - this.comObj = comObj; - this.callbackId = 1; - this.streamId = 1; - this.postMessageTransfers = true; - this.streamSinks = Object.create(null); - this.streamControllers = Object.create(null); - let callbacksCapabilities = this.callbacksCapabilities = Object.create(null); - let ah = this.actionHandler = Object.create(null); - - this._onComObjOnMessage = (event) => { - let data = event.data; - if (data.targetName !== this.sourceName) { - return; - } - if (data.stream) { - this._processStreamMessage(data); - } else if (data.isReply) { - let callbackId = data.callbackId; - if (data.callbackId in callbacksCapabilities) { - let callback = callbacksCapabilities[callbackId]; - delete callbacksCapabilities[callbackId]; - if ('error' in data) { - callback.reject(wrapReason(data.error)); - } else { - callback.resolve(data.data); - } - } else { - throw new Error(`Cannot resolve callback ${callbackId}`); - } - } else if (data.action in ah) { - let action = ah[data.action]; - if (data.callbackId) { - let sourceName = this.sourceName; - let targetName = data.sourceName; - Promise.resolve().then(function () { - return action[0].call(action[1], data.data); - }).then((result) => { - comObj.postMessage({ - sourceName, - targetName, - isReply: true, - callbackId: data.callbackId, - data: result, - }); - }, (reason) => { - comObj.postMessage({ - sourceName, - targetName, - isReply: true, - callbackId: data.callbackId, - error: makeReasonSerializable(reason), - }); - }); - } else if (data.streamId) { - this._createStreamSink(data); - } else { - action[0].call(action[1], data.data); - } - } else { - throw new Error(`Unknown action from worker: ${data.action}`); - } - }; - comObj.addEventListener('message', this._onComObjOnMessage); -} - -MessageHandler.prototype = { - on(actionName, handler, scope) { - var ah = this.actionHandler; - if (ah[actionName]) { - throw new Error(`There is already an actionName called "${actionName}"`); - } - ah[actionName] = [handler, scope]; - }, - /** - * Sends a message to the comObj to invoke the action with the supplied data. - * @param {String} actionName - Action to call. - * @param {JSON} data - JSON data to send. - * @param {Array} [transfers] - Optional list of transfers/ArrayBuffers - */ - send(actionName, data, transfers) { - var message = { - sourceName: this.sourceName, - targetName: this.targetName, - action: actionName, - data, - }; - this.postMessage(message, transfers); - }, - /** - * Sends a message to the comObj to invoke the action with the supplied data. - * Expects that the other side will callback with the response. - * @param {String} actionName - Action to call. - * @param {JSON} data - JSON data to send. - * @param {Array} [transfers] - Optional list of transfers/ArrayBuffers. - * @returns {Promise} Promise to be resolved with response data. - */ - sendWithPromise(actionName, data, transfers) { - var callbackId = this.callbackId++; - var message = { - sourceName: this.sourceName, - targetName: this.targetName, - action: actionName, - data, - callbackId, - }; - var capability = createPromiseCapability(); - this.callbacksCapabilities[callbackId] = capability; - try { - this.postMessage(message, transfers); - } catch (e) { - capability.reject(e); - } - return capability.promise; - }, - /** - * Sends a message to the comObj to invoke the action with the supplied data. - * Expect that the other side will callback to signal 'start_complete'. - * @param {String} actionName - Action to call. - * @param {JSON} data - JSON data to send. - * @param {Object} queueingStrategy - strategy to signal backpressure based on - * internal queue. - * @param {Array} [transfers] - Optional list of transfers/ArrayBuffers. - * @return {ReadableStream} ReadableStream to read data in chunks. - */ - sendWithStream(actionName, data, queueingStrategy, transfers) { - let streamId = this.streamId++; - let sourceName = this.sourceName; - let targetName = this.targetName; - - return new ReadableStream({ - start: (controller) => { - let startCapability = createPromiseCapability(); - this.streamControllers[streamId] = { - controller, - startCall: startCapability, - isClosed: false, - }; - this.postMessage({ - sourceName, - targetName, - action: actionName, - streamId, - data, - desiredSize: controller.desiredSize, - }); - // Return Promise for Async process, to signal success/failure. - return startCapability.promise; - }, - - pull: (controller) => { - let pullCapability = createPromiseCapability(); - this.streamControllers[streamId].pullCall = pullCapability; - this.postMessage({ - sourceName, - targetName, - stream: 'pull', - streamId, - desiredSize: controller.desiredSize, - }); - // Returning Promise will not call "pull" - // again until current pull is resolved. - return pullCapability.promise; - }, - - cancel: (reason) => { - let cancelCapability = createPromiseCapability(); - this.streamControllers[streamId].cancelCall = cancelCapability; - this.streamControllers[streamId].isClosed = true; - this.postMessage({ - sourceName, - targetName, - stream: 'cancel', - reason, - streamId, - }); - // Return Promise to signal success or failure. - return cancelCapability.promise; - }, - }, queueingStrategy); - }, - - _createStreamSink(data) { - let self = this; - let action = this.actionHandler[data.action]; - let streamId = data.streamId; - let desiredSize = data.desiredSize; - let sourceName = this.sourceName; - let targetName = data.sourceName; - let capability = createPromiseCapability(); - - let sendStreamRequest = ({ stream, chunk, transfers, - success, reason, }) => { - this.postMessage({ sourceName, targetName, stream, streamId, - chunk, success, reason, }, transfers); - }; - - let streamSink = { - enqueue(chunk, size = 1, transfers) { - if (this.isCancelled) { - return; - } - let lastDesiredSize = this.desiredSize; - this.desiredSize -= size; - // Enqueue decreases the desiredSize property of sink, - // so when it changes from positive to negative, - // set ready as unresolved promise. - if (lastDesiredSize > 0 && this.desiredSize <= 0) { - this.sinkCapability = createPromiseCapability(); - this.ready = this.sinkCapability.promise; - } - sendStreamRequest({ stream: 'enqueue', chunk, transfers, }); - }, - - close() { - if (this.isCancelled) { - return; - } - this.isCancelled = true; - sendStreamRequest({ stream: 'close', }); - delete self.streamSinks[streamId]; - }, - - error(reason) { - if (this.isCancelled) { - return; - } - this.isCancelled = true; - sendStreamRequest({ stream: 'error', reason, }); - }, - - sinkCapability: capability, - onPull: null, - onCancel: null, - isCancelled: false, - desiredSize, - ready: null, - }; - - streamSink.sinkCapability.resolve(); - streamSink.ready = streamSink.sinkCapability.promise; - this.streamSinks[streamId] = streamSink; - resolveCall(action[0], [data.data, streamSink], action[1]).then(() => { - sendStreamRequest({ stream: 'start_complete', success: true, }); - }, (reason) => { - sendStreamRequest({ stream: 'start_complete', success: false, reason, }); - }); - }, - - _processStreamMessage(data) { - let sourceName = this.sourceName; - let targetName = data.sourceName; - let streamId = data.streamId; - - let sendStreamResponse = ({ stream, success, reason, }) => { - this.comObj.postMessage({ sourceName, targetName, stream, - success, streamId, reason, }); - }; - - let deleteStreamController = () => { - // Delete streamController only when start, pull and - // cancel callbacks are resolved, to avoid "TypeError". - Promise.all([ - this.streamControllers[data.streamId].startCall, - this.streamControllers[data.streamId].pullCall, - this.streamControllers[data.streamId].cancelCall - ].map(function(capability) { - return capability && finalize(capability.promise); - })).then(() => { - delete this.streamControllers[data.streamId]; - }); - }; - - switch (data.stream) { - case 'start_complete': - resolveOrReject(this.streamControllers[data.streamId].startCall, - data.success, wrapReason(data.reason)); - break; - case 'pull_complete': - resolveOrReject(this.streamControllers[data.streamId].pullCall, - data.success, wrapReason(data.reason)); - break; - case 'pull': - // Ignore any pull after close is called. - if (!this.streamSinks[data.streamId]) { - sendStreamResponse({ stream: 'pull_complete', success: true, }); - break; - } - // Pull increases the desiredSize property of sink, - // so when it changes from negative to positive, - // set ready property as resolved promise. - if (this.streamSinks[data.streamId].desiredSize <= 0 && - data.desiredSize > 0) { - this.streamSinks[data.streamId].sinkCapability.resolve(); - } - // Reset desiredSize property of sink on every pull. - this.streamSinks[data.streamId].desiredSize = data.desiredSize; - resolveCall(this.streamSinks[data.streamId].onPull).then(() => { - sendStreamResponse({ stream: 'pull_complete', success: true, }); - }, (reason) => { - sendStreamResponse({ stream: 'pull_complete', - success: false, reason, }); - }); - break; - case 'enqueue': - assert(this.streamControllers[data.streamId], - 'enqueue should have stream controller'); - if (!this.streamControllers[data.streamId].isClosed) { - this.streamControllers[data.streamId].controller.enqueue(data.chunk); - } - break; - case 'close': - assert(this.streamControllers[data.streamId], - 'close should have stream controller'); - if (this.streamControllers[data.streamId].isClosed) { - break; - } - this.streamControllers[data.streamId].isClosed = true; - this.streamControllers[data.streamId].controller.close(); - deleteStreamController(); - break; - case 'error': - assert(this.streamControllers[data.streamId], - 'error should have stream controller'); - this.streamControllers[data.streamId].controller. - error(wrapReason(data.reason)); - deleteStreamController(); - break; - case 'cancel_complete': - resolveOrReject(this.streamControllers[data.streamId].cancelCall, - data.success, wrapReason(data.reason)); - deleteStreamController(); - break; - case 'cancel': - if (!this.streamSinks[data.streamId]) { - break; - } - resolveCall(this.streamSinks[data.streamId].onCancel, - [wrapReason(data.reason)]).then(() => { - sendStreamResponse({ stream: 'cancel_complete', success: true, }); - }, (reason) => { - sendStreamResponse({ stream: 'cancel_complete', - success: false, reason, }); - }); - this.streamSinks[data.streamId].sinkCapability. - reject(wrapReason(data.reason)); - this.streamSinks[data.streamId].isCancelled = true; - delete this.streamSinks[data.streamId]; - break; - default: - throw new Error('Unexpected stream case'); - } - }, - - /** - * Sends raw message to the comObj. - * @private - * @param {Object} message - Raw message. - * @param transfers List of transfers/ArrayBuffers, or undefined. - */ - postMessage(message, transfers) { - if (transfers && this.postMessageTransfers) { - this.comObj.postMessage(message, transfers); - } else { - this.comObj.postMessage(message); - } - }, - - destroy() { - this.comObj.removeEventListener('message', this._onComObjOnMessage); - }, -}; - export { FONT_IDENTITY_MATRIX, IDENTITY_MATRIX, @@ -1486,7 +1063,6 @@ export { CMapCompressionType, AbortException, InvalidPDFException, - MessageHandler, MissingDataException, MissingPDFException, NativeImageDecoding, diff --git a/test/unit/clitests.json b/test/unit/clitests.json index 01510af67..a993d796d 100644 --- a/test/unit/clitests.json +++ b/test/unit/clitests.json @@ -20,6 +20,7 @@ "evaluator_spec.js", "fonts_spec.js", "function_spec.js", + "message_handler_spec.js", "metadata_spec.js", "murmurhash3_spec.js", "network_utils_spec.js", @@ -31,7 +32,6 @@ "type1_parser_spec.js", "ui_utils_spec.js", "unicode_spec.js", - "util_spec.js", - "util_stream_spec.js" + "util_spec.js" ] } diff --git a/test/unit/jasmine-boot.js b/test/unit/jasmine-boot.js index f239f0ec7..b95a92f6e 100644 --- a/test/unit/jasmine-boot.js +++ b/test/unit/jasmine-boot.js @@ -62,6 +62,7 @@ function initializePDFJS(callback) { 'pdfjs-test/unit/evaluator_spec', 'pdfjs-test/unit/fonts_spec', 'pdfjs-test/unit/function_spec', + 'pdfjs-test/unit/message_handler_spec', 'pdfjs-test/unit/metadata_spec', 'pdfjs-test/unit/murmurhash3_spec', 'pdfjs-test/unit/network_spec', @@ -74,7 +75,6 @@ function initializePDFJS(callback) { 'pdfjs-test/unit/ui_utils_spec', 'pdfjs-test/unit/unicode_spec', 'pdfjs-test/unit/util_spec', - 'pdfjs-test/unit/util_stream_spec', ].map(function (moduleName) { return SystemJS.import(moduleName); })).then(function(modules) { diff --git a/test/unit/util_stream_spec.js b/test/unit/message_handler_spec.js similarity index 98% rename from test/unit/util_stream_spec.js rename to test/unit/message_handler_spec.js index d0b2b92e3..9db31ac1f 100644 --- a/test/unit/util_stream_spec.js +++ b/test/unit/message_handler_spec.js @@ -13,9 +13,10 @@ * limitations under the License. */ -import { createPromiseCapability, MessageHandler } from '../../src/shared/util'; +import { createPromiseCapability } from '../../src/shared/util'; +import { MessageHandler } from '../../src/shared/message_handler'; -describe('util_stream', function () { +describe('message_handler', function () { // Temporary fake port for sending messages between main and worker. class FakePort { constructor() {