From ed78b23ff269898223c257e6389a3de094eb0a9a Mon Sep 17 00:00:00 2001 From: Mukul Mishra Date: Thu, 13 Jul 2017 21:24:10 +0530 Subject: [PATCH 1/3] Adds node.js logic for networking tasks for PDF.js --- src/display/network.js | 3 - src/display/node_stream.js | 213 +++++++++++++++++++++++++++++++++++++ src/pdf.js | 8 +- 3 files changed, 220 insertions(+), 4 deletions(-) create mode 100644 src/display/node_stream.js diff --git a/src/display/network.js b/src/display/network.js index 448abaf9b..5397d161a 100644 --- a/src/display/network.js +++ b/src/display/network.js @@ -18,7 +18,6 @@ import { UnexpectedResponseException } from '../shared/util'; import globalScope from '../shared/global_scope'; -import { setPDFNetworkStreamClass } from './api'; if (typeof PDFJSDev !== 'undefined' && PDFJSDev.test('FIREFOX || MOZCENTRAL')) { throw new Error('Module "./network" shall not ' + @@ -594,8 +593,6 @@ PDFNetworkStreamRangeRequestReader.prototype = { }, }; -setPDFNetworkStreamClass(PDFNetworkStream); - export { PDFNetworkStream, NetworkManager, diff --git a/src/display/node_stream.js b/src/display/node_stream.js new file mode 100644 index 000000000..09846ac23 --- /dev/null +++ b/src/display/node_stream.js @@ -0,0 +1,213 @@ +/* Copyright 2012 Mozilla Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* globals __non_webpack_require__ */ + +let fs = __non_webpack_require__('fs'); +let utils = require('../shared/util'); + +let assert = utils.assert; +let createPromiseCapability = utils.createPromiseCapability; + +class PDFNodeStream { + constructor(args) { + this._path = args.path; + this._fullRequest = null; + this._rangeRequestReaders = []; + } + + getFullReader() { + assert(!this._fullRequest); + this._fullRequest = new PDFNodeStreamFullReader(this._path); + return this._fullRequest; + } + + getRangeReader(begin, end) { + let rangeReader = new PDFNodeStreamRangeReader(this._path, begin, end); + this._rangeRequestReaders.push(rangeReader); + return rangeReader; + } + + cancelAllRequests(reason) { + if (this._fullRequest) { + this._fullRequest.cancel(reason); + } + + let readers = this._rangeRequestReaders.slice(0); + readers.forEach(function(reader) { + reader.cancel(reason); + }); + } +} + +class PDFNodeStreamFullReader { + constructor(path) { + this._path = path; + this._done = false; + this._errored = false; + this._reason = null; + this.onProgress = null; + this._length = null; + this._loaded = 0; + this._readCapability = createPromiseCapability(); + this._headersCapability = createPromiseCapability(); + this._fullRequest = fs.createReadStream(path); + + fs.lstat(this._path, (error, stat) => { + if (error) { + this._errored = true; + this._reason = error; + this._headersCapability.reject(error); + return; + } + this._length = stat.size; + this._headersCapability.resolve(); + }); + + this._fullRequest.on('readable', () => { + this._readCapability.resolve(); + }); + + this._fullRequest.on('end', () => { + this._done = true; + this._readCapability.resolve(); + }); + + this._fullRequest.on('error', (reason) => { + this._errored = true; + this._reason = reason; + this._readCapability.resolve(); + }); + } + + get headersReady() { + return this._headersCapability.promise; + } + + get contentLength() { + return this._length; + } + + get isRangeSupported() { + return true; + } + + get isStreamingSupported() { + return true; + } + + read() { + return this._readCapability.promise.then(() => { + if (this._done) { + return Promise.resolve({ value: undefined, done: true, }); + } + if (this._errored) { + return Promise.reject(this._reason); + } + + let chunk = this._fullRequest.read(); + if (chunk === null) { + this._readCapability = createPromiseCapability(); + return this.read(); + } + this._loaded += chunk.length; + if (this.onProgress) { + this.onProgress({ + loaded: this._loaded, + total: this._length, + }); + } + return Promise.resolve({ value: chunk, done: false, }); + }); + } + + cancel(reason) { + this._fullRequest.close(reason); + this._fullRequest.destroy(reason); + } +} + +class PDFNodeStreamRangeReader { + constructor(path, start, end) { + this._path = path; + this._done = false; + this._errored = false; + this._reason = null; + this.onProgress = null; + this._length = null; + this._loaded = 0; + this._readCapability = createPromiseCapability(); + this._rangeRequest = fs.createReadStream(path, { start, end, }); + + fs.lstat(this._path, (error, stat) => { + if (error) { + this._errored = true; + this._reason = error; + return; + } + this._length = stat.size; + }); + + this._rangeRequest.on('readable', () => { + this._readCapability.resolve(); + }); + + this._rangeRequest.on('end', () => { + this._done = true; + this._readCapability.resolve(); + }); + + this._rangeRequest.on('error', (reason) => { + this._errored = true; + this._reason = reason; + this._readCapability.resolve(); + }); + } + + get isStreamingSupported() { + return true; + } + + read() { + return this._readCapability.promise.then(() => { + if (this._done) { + return Promise.resolve({ value: undefined, done: true, }); + } + if (this._errored) { + return Promise.reject(this._reason); + } + + let chunk = this._rangeRequest.read(); + if (chunk === null) { + this._readCapability = createPromiseCapability(); + return this.read(); + } + this._loaded += chunk.length; + if (this.onProgress) { + this.onProgress({ + loaded: this._loaded, + total: this._length, + }); + } + return Promise.resolve({ value: chunk, done: false, }); + }); + } + + cancel(reason) { + this._fullRequest.cancel(reason); + this._fullRequest.destroy(reason); + } +} + +exports.PDFNodeStream = PDFNodeStream; diff --git a/src/pdf.js b/src/pdf.js index 2a451c272..9c0b1488a 100644 --- a/src/pdf.js +++ b/src/pdf.js @@ -31,7 +31,13 @@ var pdfjsDisplaySVG = require('./display/svg.js'); if (typeof PDFJSDev === 'undefined' || !PDFJSDev.test('FIREFOX || MOZCENTRAL')) { - require('./display/network.js'); + if (pdfjsSharedUtil.isNodeJS()) { + var PDFNodeStream = require('./display/node_stream.js').PDFNodeStream; + pdfjsDisplayAPI.setPDFNetworkStreamClass(PDFNodeStream); + } else { + var PDFNetworkStream = require('./display/network.js').PDFNetworkStream; + pdfjsDisplayAPI.setPDFNetworkStreamClass(PDFNetworkStream); + } } exports.PDFJS = pdfjsDisplayGlobal.PDFJS; From 18ede8c65dec28346980ec55eaf23fdc36211d1e Mon Sep 17 00:00:00 2001 From: Mukul Mishra Date: Sun, 30 Jul 2017 20:28:32 +0530 Subject: [PATCH 2/3] Adds http support to node_stream logic --- src/display/network.js | 55 ++---- src/display/network_utils.js | 56 +++++++ src/display/node_stream.js | 313 ++++++++++++++++++++++++++--------- 3 files changed, 309 insertions(+), 115 deletions(-) create mode 100644 src/display/network_utils.js diff --git a/src/display/network.js b/src/display/network.js index 5397d161a..cdb375398 100644 --- a/src/display/network.js +++ b/src/display/network.js @@ -18,6 +18,7 @@ import { UnexpectedResponseException } from '../shared/util'; import globalScope from '../shared/global_scope'; +import { validateRangeRequestCapabilities } from './network_utils'; if (typeof PDFJSDev !== 'undefined' && PDFJSDev.test('FIREFOX || MOZCENTRAL')) { throw new Error('Module "./network" shall not ' + @@ -351,51 +352,27 @@ function PDFNetworkStreamFullRequestReader(manager, options) { } PDFNetworkStreamFullRequestReader.prototype = { - _validateRangeRequestCapabilities: function - PDFNetworkStreamFullRequestReader_validateRangeRequestCapabilities() { + getResponseHeader(name) { + let fullRequestXhrId = this._fullRequestId; + let fullRequestXhr = this._manager.getRequestXhr(fullRequestXhrId); - if (this._disableRange) { - return false; - } - - var networkManager = this._manager; - if (!networkManager.isHttp) { - return false; - } - var fullRequestXhrId = this._fullRequestId; - var fullRequestXhr = networkManager.getRequestXhr(fullRequestXhrId); - if (fullRequestXhr.getResponseHeader('Accept-Ranges') !== 'bytes') { - return false; - } - - var contentEncoding = - fullRequestXhr.getResponseHeader('Content-Encoding') || 'identity'; - if (contentEncoding !== 'identity') { - return false; - } - - var length = fullRequestXhr.getResponseHeader('Content-Length'); - length = parseInt(length, 10); - if (!isInt(length)) { - return false; - } - - this._contentLength = length; // setting right content length - - if (length <= 2 * this._rangeChunkSize) { - // The file size is smaller than the size of two chunks, so it does - // not make any sense to abort the request and retry with a range - // request. - return false; - } - - return true; + return fullRequestXhr.getResponseHeader(name); }, _onHeadersReceived: function PDFNetworkStreamFullRequestReader_onHeadersReceived() { + let { allowRangeRequests, suggestedLength, } = + validateRangeRequestCapabilities({ + getResponseHeader: this.getResponseHeader.bind(this), + isHttp: this._manager.isHttp, + rangeChunkSize: this._rangeChunkSize, + disableRange: this._disableRange, + }); - if (this._validateRangeRequestCapabilities()) { + // Setting right content length. + this._contentLength = suggestedLength || this._contentLength; + + if (allowRangeRequests) { this._isRangeSupported = true; } diff --git a/src/display/network_utils.js b/src/display/network_utils.js new file mode 100644 index 000000000..9d8ebe1c0 --- /dev/null +++ b/src/display/network_utils.js @@ -0,0 +1,56 @@ +/* Copyright 2012 Mozilla Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { isInt } from '../shared/util'; + +function validateRangeRequestCapabilities({ getResponseHeader, isHttp, + rangeChunkSize, disableRange, }) { + let returnValues = { + allowRangeRequests: false, + suggestedLength: undefined, + }; + if (disableRange || !isHttp) { + return returnValues; + } + if (getResponseHeader('Accept-Ranges') !== 'bytes') { + return returnValues; + } + + let contentEncoding = getResponseHeader('Content-Encoding') || 'identity'; + if (contentEncoding !== 'identity') { + return returnValues; + } + + let length = getResponseHeader('Content-Length'); + length = parseInt(length, 10); + if (!isInt(length)) { + return returnValues; + } + + returnValues.suggestedLength = length; + if (length <= 2 * rangeChunkSize) { + // The file size is smaller than the size of two chunks, so it does + // not make any sense to abort the request and retry with a range + // request. + return returnValues; + } + + returnValues.allowRangeRequests = true; + return returnValues; +} + +export { + validateRangeRequestCapabilities, +}; diff --git a/src/display/node_stream.js b/src/display/node_stream.js index 09846ac23..009157338 100644 --- a/src/display/node_stream.js +++ b/src/display/node_stream.js @@ -15,26 +15,39 @@ /* globals __non_webpack_require__ */ let fs = __non_webpack_require__('fs'); -let utils = require('../shared/util'); +let http = __non_webpack_require__('http'); +let https = __non_webpack_require__('https'); +let url = __non_webpack_require__('url'); -let assert = utils.assert; -let createPromiseCapability = utils.createPromiseCapability; +import { assert, createPromiseCapability } from '../shared/util'; +import { validateRangeRequestCapabilities } from './network_utils'; class PDFNodeStream { - constructor(args) { - this._path = args.path; + constructor(options) { + this.options = options; + this.source = options.source; + this.url = url.parse(this.source.url); + this.isHttp = this.url.protocol === 'http:' || + this.url.protocol === 'https:'; + this.isFsUrl = !this.url.host; + this.httpHeaders = (this.isHttp && this.source.httpHeaders) || {}; + this._fullRequest = null; this._rangeRequestReaders = []; } getFullReader() { assert(!this._fullRequest); - this._fullRequest = new PDFNodeStreamFullReader(this._path); + this._fullRequest = this.isFsUrl ? + new PDFNodeStreamFsFullReader(this) : + new PDFNodeStreamFullReader(this); return this._fullRequest; } - getRangeReader(begin, end) { - let rangeReader = new PDFNodeStreamRangeReader(this._path, begin, end); + getRangeReader(start, end) { + let rangeReader = this.isFsUrl ? + new PDFNodeStreamFsRangeReader(this, start, end) : + new PDFNodeStreamRangeReader(this, start, end); this._rangeRequestReaders.push(rangeReader); return rangeReader; } @@ -51,44 +64,19 @@ class PDFNodeStream { } } -class PDFNodeStreamFullReader { - constructor(path) { - this._path = path; +class BaseFullReader { + constructor(stream) { + this._url = stream.url; this._done = false; this._errored = false; this._reason = null; this.onProgress = null; - this._length = null; + this._length = stream.source.length; this._loaded = 0; + + this._fullRequest = null; this._readCapability = createPromiseCapability(); this._headersCapability = createPromiseCapability(); - this._fullRequest = fs.createReadStream(path); - - fs.lstat(this._path, (error, stat) => { - if (error) { - this._errored = true; - this._reason = error; - this._headersCapability.reject(error); - return; - } - this._length = stat.size; - this._headersCapability.resolve(); - }); - - this._fullRequest.on('readable', () => { - this._readCapability.resolve(); - }); - - this._fullRequest.on('end', () => { - this._done = true; - this._readCapability.resolve(); - }); - - this._fullRequest.on('error', (reason) => { - this._errored = true; - this._reason = reason; - this._readCapability.resolve(); - }); } get headersReady() { @@ -100,11 +88,11 @@ class PDFNodeStreamFullReader { } get isRangeSupported() { - return true; + return this._isRangeSupported; } get isStreamingSupported() { - return true; + return this._isStreamingSupported; } read() { @@ -138,45 +126,20 @@ class PDFNodeStreamFullReader { } } -class PDFNodeStreamRangeReader { - constructor(path, start, end) { - this._path = path; +class BaseRangeReader { + constructor(stream) { + this._url = stream.url; this._done = false; this._errored = false; this._reason = null; this.onProgress = null; - this._length = null; + this._length = stream.source.length; this._loaded = 0; this._readCapability = createPromiseCapability(); - this._rangeRequest = fs.createReadStream(path, { start, end, }); - - fs.lstat(this._path, (error, stat) => { - if (error) { - this._errored = true; - this._reason = error; - return; - } - this._length = stat.size; - }); - - this._rangeRequest.on('readable', () => { - this._readCapability.resolve(); - }); - - this._rangeRequest.on('end', () => { - this._done = true; - this._readCapability.resolve(); - }); - - this._rangeRequest.on('error', (reason) => { - this._errored = true; - this._reason = reason; - this._readCapability.resolve(); - }); } get isStreamingSupported() { - return true; + return false; } read() { @@ -188,7 +151,7 @@ class PDFNodeStreamRangeReader { return Promise.reject(this._reason); } - let chunk = this._rangeRequest.read(); + let chunk = this._read(); if (chunk === null) { this._readCapability = createPromiseCapability(); return this.read(); @@ -203,11 +166,209 @@ class PDFNodeStreamRangeReader { return Promise.resolve({ value: chunk, done: false, }); }); } +} - cancel(reason) { - this._fullRequest.cancel(reason); - this._fullRequest.destroy(reason); +class PDFNodeStreamFullReader extends BaseFullReader { + constructor(stream) { + super(stream); + + this._disableRange = stream.options.disableRange || false; + this._rangeChunkSize = stream.source.rangeChunkSize; + if (!this._rangeChunkSize && !this._disableRange) { + this._disableRange = true; + } + + this._isStreamingSupported = !stream.source.disableStream; + this._isRangeSupported = false; + + let options = { + host: this._url.host, + path: this._url.path, + method: 'GET', + headers: stream.httpHeaders, + }; + + let handleResponse = (response) => { + this._headersCapability.resolve(); + this._fullRequest = response; + + response.on('readable', () => { + this._readCapability.resolve(); + }); + + response.on('end', () => { + // Destroy response to minimize resource usage. + response.destroy(); + this._done = true; + this._readCapability.resolve(); + }); + + response.on('error', (reason) => { + this._errored = true; + this._reason = reason; + this._readCapability.resolve(); + }); + }; + + this._request = this._url.protocol === 'http:' ? + http.request(options, handleResponse) : + https.request(options, handleResponse); + + this._request.on('error', (reason) => { + this._errored = true; + this._reason = reason; + this._headersCapability.reject(reason); + }); + this._request.end(); + + this._headersCapability.promise.then(() => { + let { allowRangeRequests, suggestedLength, } = + validateRangeRequestCapabilities({ + getResponseHeader: this.getResponseHeader.bind(this), + isHttp: stream.isHttp, + rangeChunkSize: this._rangeChunkSize, + disableRange: this._disableRange, + }); + + if (allowRangeRequests) { + this._isRangeSupported = true; + } + this._length = suggestedLength; + }); + } + + getReasponseHeader(name) { + return this._fullRequest.headers[name]; } } -exports.PDFNodeStream = PDFNodeStream; +class PDFNodeStreamRangeReader extends BaseRangeReader { + constructor(stream, start, end) { + super(stream); + + this._rangeRequest = null; + this._read = null; + let rangeStr = start + '-' + (end - 1); + stream.httpHeaders['Range'] = 'bytes=' + rangeStr; + + let options = { + host: this._url.host, + path: this._url.path, + method: 'GET', + headers: stream.httpHeaders, + }; + let handleResponse = (response) => { + this._rangeRequest = response; + this._read = this._rangeRequest.read; + + response.on('readable', () => { + this._readCapability.resolve(); + }); + + response.on('end', () => { + response.destroy(); + this._done = true; + this._readCapability.resolve(); + }); + + response.on('error', (reason) => { + this._errored = true; + this._reason = reason; + this._readCapability.resolve(); + }); + }; + this._request = this._url.protocol === 'http:' ? + http.request(options, handleResponse) : + https.request(options, handleResponse); + + this._request.on('error', (reason) => { + this._errored = true; + this._reason = reason; + }); + this._request.end(); + } + + cancel(reason) { + this._rangeRequest.close(reason); + this._rangeRequest.destroy(reason); + } +} + +class PDFNodeStreamFsFullReader extends BaseFullReader { + constructor(stream) { + super(stream); + + this._isRangeSupported = true; + this._isStreamingSupported = true; + this._fullRequest = fs.createReadStream(this._url.path); + + fs.lstat(this._url.path, (error, stat) => { + if (error) { + this._errored = true; + this._reason = error; + this._headersCapability.reject(error); + return; + } + this._length = stat.size; + this._headersCapability.resolve(); + }); + + this._fullRequest.on('readable', () => { + this._readCapability.resolve(); + }); + + this._fullRequest.on('end', () => { + this._fullRequest.destroy(); + this._done = true; + this._readCapability.resolve(); + }); + + this._fullRequest.on('error', (reason) => { + this._errored = true; + this._reason = reason; + this._readCapability.resolve(); + }); + } +} + +class PDFNodeStreamFsRangeReader extends BaseRangeReader { + constructor(stream, start, end) { + super(stream); + + this._rangeRequest = fs.createReadStream(this._url.path, { start, end, }); + fs.lstat(this._url.path, (error, stat) => { + if (error) { + this._errored = true; + this._reason = error; + return; + } + this._length = stat.size; + }); + this._read = this._rangeRequest.read; + + this._rangeRequest.on('readable', () => { + this._readCapability.resolve(); + }); + + this._rangeRequest.on('end', () => { + this._rangeRequest.destroy(); + this._done = true; + this._readCapability.resolve(); + }); + + this._rangeRequest.on('error', (reason) => { + this._errored = true; + this._reason = reason; + this._readCapability.resolve(); + }); + } + + cancel(reason) { + this._rangeRequest.close(reason); + this._rangeRequest.destroy(reason); + } +} + +export { + PDFNodeStream, +}; From d16709f5e4c97a93763e49a81dcfc3773d0a1b98 Mon Sep 17 00:00:00 2001 From: Mukul Mishra Date: Sat, 5 Aug 2017 01:00:37 +0530 Subject: [PATCH 3/3] Adds tests for node_stream --- examples/node/getinfo.js | 3 +- src/display/network.js | 17 +- src/display/network_utils.js | 3 +- src/display/node_stream.js | 327 +++++++++++++++++----------------- test/unit/clitests.json | 1 + test/unit/jasmine-boot.js | 6 + test/unit/node_stream_spec.js | 234 ++++++++++++++++++++++++ 7 files changed, 419 insertions(+), 172 deletions(-) create mode 100644 test/unit/node_stream_spec.js diff --git a/examples/node/getinfo.js b/examples/node/getinfo.js index 1e91f586a..3dce2e20a 100644 --- a/examples/node/getinfo.js +++ b/examples/node/getinfo.js @@ -17,11 +17,10 @@ var pdfjsLib = require('pdfjs-dist'); // Loading file from file system into typed array var pdfPath = process.argv[2] || '../../web/compressed.tracemonkey-pldi-09.pdf'; -var data = new Uint8Array(fs.readFileSync(pdfPath)); // Will be using promises to load document, pages and misc data instead of // callback. -pdfjsLib.getDocument(data).then(function (doc) { +pdfjsLib.getDocument(pdfPath).then(function (doc) { var numPages = doc.numPages; console.log('# Document Loaded'); console.log('Number of Pages: ' + numPages); diff --git a/src/display/network.js b/src/display/network.js index cdb375398..5d4f39314 100644 --- a/src/display/network.js +++ b/src/display/network.js @@ -14,7 +14,7 @@ */ import { - assert, createPromiseCapability, isInt, MissingPDFException, + assert, createPromiseCapability, MissingPDFException, UnexpectedResponseException } from '../shared/util'; import globalScope from '../shared/global_scope'; @@ -352,18 +352,16 @@ function PDFNetworkStreamFullRequestReader(manager, options) { } PDFNetworkStreamFullRequestReader.prototype = { - getResponseHeader(name) { - let fullRequestXhrId = this._fullRequestId; - let fullRequestXhr = this._manager.getRequestXhr(fullRequestXhrId); - - return fullRequestXhr.getResponseHeader(name); - }, - _onHeadersReceived: function PDFNetworkStreamFullRequestReader_onHeadersReceived() { + var fullRequestXhrId = this._fullRequestId; + var fullRequestXhr = this._manager.getRequestXhr(fullRequestXhrId); + let { allowRangeRequests, suggestedLength, } = validateRangeRequestCapabilities({ - getResponseHeader: this.getResponseHeader.bind(this), + getResponseHeader: (name) => { + return fullRequestXhr.getResponseHeader(name); + }, isHttp: this._manager.isHttp, rangeChunkSize: this._rangeChunkSize, disableRange: this._disableRange, @@ -377,7 +375,6 @@ PDFNetworkStreamFullRequestReader.prototype = { } var networkManager = this._manager; - var fullRequestXhrId = this._fullRequestId; if (networkManager.isStreamingRequest(fullRequestXhrId)) { // We can continue fetching when progressive loading is enabled, // and we don't need the autoFetch feature. diff --git a/src/display/network_utils.js b/src/display/network_utils.js index 9d8ebe1c0..721afa4cc 100644 --- a/src/display/network_utils.js +++ b/src/display/network_utils.js @@ -13,10 +13,11 @@ * limitations under the License. */ -import { isInt } from '../shared/util'; +import { assert, isInt } from '../shared/util'; function validateRangeRequestCapabilities({ getResponseHeader, isHttp, rangeChunkSize, disableRange, }) { + assert(rangeChunkSize > 0); let returnValues = { allowRangeRequests: false, suggestedLength: undefined, diff --git a/src/display/node_stream.js b/src/display/node_stream.js index 009157338..aca82a902 100644 --- a/src/display/node_stream.js +++ b/src/display/node_stream.js @@ -29,7 +29,8 @@ class PDFNodeStream { this.url = url.parse(this.source.url); this.isHttp = this.url.protocol === 'http:' || this.url.protocol === 'https:'; - this.isFsUrl = !this.url.host; + // Check if url refers to filesystem. + this.isFsUrl = this.url.protocol === 'file:' || !this.url.host; this.httpHeaders = (this.isHttp && this.source.httpHeaders) || {}; this._fullRequest = null; @@ -71,10 +72,19 @@ class BaseFullReader { this._errored = false; this._reason = null; this.onProgress = null; - this._length = stream.source.length; + this._contentLength = stream.source.length; // optional this._loaded = 0; - this._fullRequest = null; + this._disableRange = stream.options.disableRange || false; + this._rangeChunkSize = stream.source.rangeChunkSize; + if (!this._rangeChunkSize && !this._disableRange) { + this._disableRange = true; + } + + this._isStreamingSupported = !stream.source.disableStream; + this._isRangeSupported = !stream.options.disableRange; + + this._readableStream = null; this._readCapability = createPromiseCapability(); this._headersCapability = createPromiseCapability(); } @@ -84,7 +94,7 @@ class BaseFullReader { } get contentLength() { - return this._length; + return this._contentLength; } get isRangeSupported() { @@ -104,7 +114,7 @@ class BaseFullReader { return Promise.reject(this._reason); } - let chunk = this._fullRequest.read(); + let chunk = this._readableStream.read(); if (chunk === null) { this._readCapability = createPromiseCapability(); return this.read(); @@ -113,16 +123,52 @@ class BaseFullReader { if (this.onProgress) { this.onProgress({ loaded: this._loaded, - total: this._length, + total: this._contentLength, }); } - return Promise.resolve({ value: chunk, done: false, }); + // Ensure that `read()` method returns ArrayBuffer. + let buffer = new Uint8Array(chunk).buffer; + return Promise.resolve({ value: buffer, done: false, }); }); } cancel(reason) { - this._fullRequest.close(reason); - this._fullRequest.destroy(reason); + // Call `this._error()` method when cancel is called + // before _readableStream is set. + if (!this._readableStream) { + this._error(reason); + return; + } + this._readableStream.destroy(reason); + } + + _error(reason) { + this._errored = true; + this._reason = reason; + this._readCapability.resolve(); + } + + _setReadableStream(readableStream) { + this._readableStream = readableStream; + readableStream.on('readable', () => { + this._readCapability.resolve(); + }); + + readableStream.on('end', () => { + // Destroy readable to minimize resource usage. + readableStream.destroy(); + this._done = true; + this._readCapability.resolve(); + }); + + readableStream.on('error', (reason) => { + this._error(reason); + }); + + // Destroy ReadableStream if already in errored state. + if (this._errored) { + this._readableStream.destroy(this._reason); + } } } @@ -133,13 +179,15 @@ class BaseRangeReader { this._errored = false; this._reason = null; this.onProgress = null; - this._length = stream.source.length; this._loaded = 0; + this._readableStream = null; this._readCapability = createPromiseCapability(); + + this._isStreamingSupported = !stream.source.disableStream; } get isStreamingSupported() { - return false; + return this._isStreamingSupported; } read() { @@ -151,80 +199,88 @@ class BaseRangeReader { return Promise.reject(this._reason); } - let chunk = this._read(); + let chunk = this._readableStream.read(); if (chunk === null) { this._readCapability = createPromiseCapability(); return this.read(); } this._loaded += chunk.length; if (this.onProgress) { - this.onProgress({ - loaded: this._loaded, - total: this._length, - }); + this.onProgress({ loaded: this._loaded, }); } - return Promise.resolve({ value: chunk, done: false, }); + // Ensure that `read()` method returns ArrayBuffer. + let buffer = new Uint8Array(chunk).buffer; + return Promise.resolve({ value: buffer, done: false, }); }); } + + cancel(reason) { + // Call `this._error()` method when cancel is called + // before _readableStream is set. + if (!this._readableStream) { + this._error(reason); + return; + } + this._readableStream.destroy(reason); + } + + _error(reason) { + this._errored = true; + this._reason = reason; + this._readCapability.resolve(); + } + + _setReadableStream(readableStream) { + this._readableStream = readableStream; + readableStream.on('readable', () => { + this._readCapability.resolve(); + }); + + readableStream.on('end', () => { + // Destroy readableStream to minimize resource usage. + readableStream.destroy(); + this._done = true; + this._readCapability.resolve(); + }); + + readableStream.on('error', (reason) => { + this._error(reason); + }); + + // Destroy readableStream if already in errored state. + if (this._errored) { + this._readableStream.destroy(this._reason); + } + } +} + +function createRequestOptions(url, headers) { + return { + protocol: url.protocol, + auth: url.auth, + host: url.hostname, + port: url.port, + path: url.path, + method: 'GET', + headers, + }; } class PDFNodeStreamFullReader extends BaseFullReader { constructor(stream) { super(stream); - this._disableRange = stream.options.disableRange || false; - this._rangeChunkSize = stream.source.rangeChunkSize; - if (!this._rangeChunkSize && !this._disableRange) { - this._disableRange = true; - } - - this._isStreamingSupported = !stream.source.disableStream; - this._isRangeSupported = false; - - let options = { - host: this._url.host, - path: this._url.path, - method: 'GET', - headers: stream.httpHeaders, - }; - let handleResponse = (response) => { this._headersCapability.resolve(); - this._fullRequest = response; + this._setReadableStream(response); - response.on('readable', () => { - this._readCapability.resolve(); - }); - - response.on('end', () => { - // Destroy response to minimize resource usage. - response.destroy(); - this._done = true; - this._readCapability.resolve(); - }); - - response.on('error', (reason) => { - this._errored = true; - this._reason = reason; - this._readCapability.resolve(); - }); - }; - - this._request = this._url.protocol === 'http:' ? - http.request(options, handleResponse) : - https.request(options, handleResponse); - - this._request.on('error', (reason) => { - this._errored = true; - this._reason = reason; - this._headersCapability.reject(reason); - }); - this._request.end(); - - this._headersCapability.promise.then(() => { let { allowRangeRequests, suggestedLength, } = validateRangeRequestCapabilities({ - getResponseHeader: this.getResponseHeader.bind(this), + getResponseHeader: (name) => { + // Make sure that headers name are in lower case, as mentioned + // here: https://nodejs.org/api/http.html#http_message_headers. + return this._readableStream.headers[name.toLowerCase()]; + }, isHttp: stream.isHttp, rangeChunkSize: this._rangeChunkSize, disableRange: this._disableRange, @@ -233,12 +289,28 @@ class PDFNodeStreamFullReader extends BaseFullReader { if (allowRangeRequests) { this._isRangeSupported = true; } - this._length = suggestedLength; - }); - } + // Setting right content length. + this._contentLength = suggestedLength; + }; - getReasponseHeader(name) { - return this._fullRequest.headers[name]; + this._request = null; + if (this._url.protocol === 'http:') { + this._request = http.request(createRequestOptions( + this._url, stream.httpHeaders), handleResponse); + } else { + this._request = https.request(createRequestOptions( + this._url, stream.httpHeaders), handleResponse); + } + + this._request.on('error', (reason) => { + this._errored = true; + this._reason = reason; + this._headersCapability.reject(reason); + }); + // Note: `request.end(data)` is used to write `data` to request body + // and notify end of request. But one should always call `request.end()` + // even if there is no data to write -- (to notify the end of request). + this._request.end(); } } @@ -246,40 +318,28 @@ class PDFNodeStreamRangeReader extends BaseRangeReader { constructor(stream, start, end) { super(stream); - this._rangeRequest = null; - this._read = null; - let rangeStr = start + '-' + (end - 1); - stream.httpHeaders['Range'] = 'bytes=' + rangeStr; + this._httpHeaders = {}; + for (let property in stream.httpHeaders) { + let value = stream.httpHeaders[property]; + if (typeof value === 'undefined') { + continue; + } + this._httpHeaders[property] = value; + } + this._httpHeaders['Range'] = `bytes=${start}-${end - 1}`; - let options = { - host: this._url.host, - path: this._url.path, - method: 'GET', - headers: stream.httpHeaders, - }; - let handleResponse = (response) => { - this._rangeRequest = response; - this._read = this._rangeRequest.read; - - response.on('readable', () => { - this._readCapability.resolve(); - }); - - response.on('end', () => { - response.destroy(); - this._done = true; - this._readCapability.resolve(); - }); - - response.on('error', (reason) => { - this._errored = true; - this._reason = reason; - this._readCapability.resolve(); - }); - }; - this._request = this._url.protocol === 'http:' ? - http.request(options, handleResponse) : - https.request(options, handleResponse); + this._request = null; + if (this._url.protocol === 'http:') { + this._request = http.request(createRequestOptions( + this._url, this._httpHeaders), (response) => { + this._setReadableStream(response); + }); + } else { + this._request = https.request(createRequestOptions( + this._url, this._httpHeaders), (response) => { + this._setReadableStream(response); + }); + } this._request.on('error', (reason) => { this._errored = true; @@ -287,20 +347,13 @@ class PDFNodeStreamRangeReader extends BaseRangeReader { }); this._request.end(); } - - cancel(reason) { - this._rangeRequest.close(reason); - this._rangeRequest.destroy(reason); - } } class PDFNodeStreamFsFullReader extends BaseFullReader { constructor(stream) { super(stream); - this._isRangeSupported = true; - this._isStreamingSupported = true; - this._fullRequest = fs.createReadStream(this._url.path); + this._setReadableStream(fs.createReadStream(this._url.path)); fs.lstat(this._url.path, (error, stat) => { if (error) { @@ -309,25 +362,10 @@ class PDFNodeStreamFsFullReader extends BaseFullReader { this._headersCapability.reject(error); return; } - this._length = stat.size; + // Setting right content length. + this._contentLength = stat.size; this._headersCapability.resolve(); }); - - this._fullRequest.on('readable', () => { - this._readCapability.resolve(); - }); - - this._fullRequest.on('end', () => { - this._fullRequest.destroy(); - this._done = true; - this._readCapability.resolve(); - }); - - this._fullRequest.on('error', (reason) => { - this._errored = true; - this._reason = reason; - this._readCapability.resolve(); - }); } } @@ -335,37 +373,8 @@ class PDFNodeStreamFsRangeReader extends BaseRangeReader { constructor(stream, start, end) { super(stream); - this._rangeRequest = fs.createReadStream(this._url.path, { start, end, }); - fs.lstat(this._url.path, (error, stat) => { - if (error) { - this._errored = true; - this._reason = error; - return; - } - this._length = stat.size; - }); - this._read = this._rangeRequest.read; - - this._rangeRequest.on('readable', () => { - this._readCapability.resolve(); - }); - - this._rangeRequest.on('end', () => { - this._rangeRequest.destroy(); - this._done = true; - this._readCapability.resolve(); - }); - - this._rangeRequest.on('error', (reason) => { - this._errored = true; - this._reason = reason; - this._readCapability.resolve(); - }); - } - - cancel(reason) { - this._rangeRequest.close(reason); - this._rangeRequest.destroy(reason); + this._setReadableStream( + fs.createReadStream(this._url.path, { start, end: end - 1, })); } } diff --git a/test/unit/clitests.json b/test/unit/clitests.json index bf8d3ff84..3c98a6f0a 100644 --- a/test/unit/clitests.json +++ b/test/unit/clitests.json @@ -15,6 +15,7 @@ "fonts_spec.js", "function_spec.js", "murmurhash3_spec.js", + "node_stream_spec.js", "parser_spec.js", "primitives_spec.js", "stream_spec.js", diff --git a/test/unit/jasmine-boot.js b/test/unit/jasmine-boot.js index 2f5f3f06e..8820e6c7d 100644 --- a/test/unit/jasmine-boot.js +++ b/test/unit/jasmine-boot.js @@ -43,6 +43,8 @@ function initializePDFJS(callback) { Promise.all([ 'pdfjs/display/global', + 'pdfjs/display/api', + 'pdfjs/display/network', 'pdfjs-test/unit/annotation_spec', 'pdfjs-test/unit/api_spec', 'pdfjs-test/unit/bidi_spec', @@ -72,7 +74,11 @@ function initializePDFJS(callback) { return SystemJS.import(moduleName); })).then(function (modules) { var displayGlobal = modules[0]; + var displayApi = modules[1]; + var PDFNetworkStream = modules[2].PDFNetworkStream; + // Set network stream class for unit tests. + displayApi.setPDFNetworkStreamClass(PDFNetworkStream); // Configure the worker. displayGlobal.PDFJS.workerSrc = '../../build/generic/build/pdf.worker.js'; // Opt-in to using the latest API. diff --git a/test/unit/node_stream_spec.js b/test/unit/node_stream_spec.js new file mode 100644 index 000000000..3812ab905 --- /dev/null +++ b/test/unit/node_stream_spec.js @@ -0,0 +1,234 @@ +/* Copyright 2017 Mozilla Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* globals __non_webpack_require__ */ + +import { assert, isNodeJS } from '../../src/shared/util'; +import { PDFNodeStream } from '../../src/display/node_stream'; + +// Make sure that we only running this script is Node.js environments. +assert(isNodeJS()); + +let path = __non_webpack_require__('path'); +let url = __non_webpack_require__('url'); +let http = __non_webpack_require__('http'); +let fs = __non_webpack_require__('fs'); + +describe('node_stream', function() { + let server = null; + let port = null; + let pdf = url.parse(encodeURI('file://' + path.join(process.cwd(), + './test/pdfs/tracemonkey.pdf'))).href; + let pdfLength = 1016315; + + beforeAll((done) => { + // Create http server to serve pdf data for tests. + server = http.createServer((request, response) => { + let filePath = process.cwd() + '/test/pdfs' + request.url; + fs.lstat(filePath, (error, stat) => { + if (error) { + response.writeHead(404); + response.end(`File ${request.url} not found!`); + return; + } + if (!request.headers['range']) { + let contentLength = stat.size; + let stream = fs.createReadStream(filePath); + response.writeHead(200, { + 'Content-Type': 'application/pdf', + 'Content-Length': contentLength, + 'Accept-Ranges': 'bytes', + }); + stream.pipe(response); + } else { + let [start, end] = + request.headers['range'].split('=')[1].split('-').map((x) => { + return Number(x); + }); + let stream = fs.createReadStream(filePath, { start, end, }); + response.writeHead(206, { + 'Content-Type': 'application/pdf', + }); + stream.pipe(response); + } + }); + }).listen(0); /* Listen on a random free port */ + port = server.address().port; + done(); + }); + + afterAll((done) => { + // Close the server from accepting new connections after all test finishes. + server.close(); + done(); + }); + + it('read both http(s) and filesystem pdf files', function(done) { + let stream1 = new PDFNodeStream({ + source: { + url: `http://127.0.0.1:${port}/tracemonkey.pdf`, + rangeChunkSize: 65536, + disableStream: true, + }, + disableRange: true, + }); + + let stream2 = new PDFNodeStream({ + source: { + url: pdf, + rangeChunkSize: 65536, + disableStream: true, + }, + disableRange: true, + }); + + let fullReader1 = stream1.getFullReader(); + let fullReader2 = stream2.getFullReader(); + + let isStreamingSupported1, isRangeSupported1; + let promise1 = fullReader1.headersReady.then(() => { + isStreamingSupported1 = fullReader1.isStreamingSupported; + isRangeSupported1 = fullReader1.isRangeSupported; + }); + + let isStreamingSupported2, isRangeSupported2; + let promise2 = fullReader2.headersReady.then(() => { + isStreamingSupported2 = fullReader2.isStreamingSupported; + isRangeSupported2 = fullReader2.isRangeSupported; + }); + + let len1 = 0, len2 = 0; + let read1 = function () { + return fullReader1.read().then(function (result) { + if (result.done) { + return; + } + len1 += result.value.byteLength; + return read1(); + }); + }; + let read2 = function () { + return fullReader2.read().then(function (result) { + if (result.done) { + return; + } + len2 += result.value.byteLength; + return read2(); + }); + }; + + let readPromise = Promise.all([read1(), read2(), promise1, promise2]); + readPromise.then((result) => { + expect(isStreamingSupported1).toEqual(false); + expect(isRangeSupported1).toEqual(false); + expect(isStreamingSupported2).toEqual(false); + expect(isRangeSupported2).toEqual(false); + expect(len1).toEqual(pdfLength); + expect(len1).toEqual(len2); + done(); + }).catch((reason) => { + done.fail(reason); + }); + }); + + it('read custom ranges for both http(s) and filesystem urls', + function(done) { + let rangeSize = 32768; + let stream1 = new PDFNodeStream({ + source: { + url: `http://127.0.0.1:${port}/tracemonkey.pdf`, + length: pdfLength, + rangeChunkSize: rangeSize, + disableStream: true, + }, + disableRange: false, + }); + let stream2 = new PDFNodeStream({ + source: { + url: pdf, + length: pdfLength, + rangeChunkSize: rangeSize, + disableStream: true, + }, + disableRange: false, + }); + + let fullReader1 = stream1.getFullReader(); + let fullReader2 = stream2.getFullReader(); + + let isStreamingSupported1, isRangeSupported1, fullReaderCancelled1; + let isStreamingSupported2, isRangeSupported2, fullReaderCancelled2; + + let promise1 = fullReader1.headersReady.then(function () { + isStreamingSupported1 = fullReader1.isStreamingSupported; + isRangeSupported1 = fullReader1.isRangeSupported; + // we shall be able to close the full reader without issues + fullReader1.cancel('Don\'t need full reader'); + fullReaderCancelled1 = true; + }); + + let promise2 = fullReader2.headersReady.then(function () { + isStreamingSupported2 = fullReader2.isStreamingSupported; + isRangeSupported2 = fullReader2.isRangeSupported; + fullReader2.cancel('Don\'t need full reader'); + fullReaderCancelled2 = true; + }); + + // Skipping fullReader results, requesting something from the PDF end. + let tailSize = (pdfLength % rangeSize) || rangeSize; + + let range11Reader = stream1.getRangeReader(pdfLength - tailSize - rangeSize, + pdfLength - tailSize); + let range12Reader = stream1.getRangeReader(pdfLength - tailSize, pdfLength); + + let range21Reader = stream2.getRangeReader(pdfLength - tailSize - rangeSize, + pdfLength - tailSize); + let range22Reader = stream2.getRangeReader(pdfLength - tailSize, pdfLength); + + let result11 = { value: 0, }, result12 = { value: 0, }; + let result21 = { value: 0, }, result22 = { value: 0, }; + + let read = function (reader, lenResult) { + return reader.read().then(function (result) { + if (result.done) { + return; + } + lenResult.value += result.value.byteLength; + return read(reader, lenResult); + }); + }; + + let readPromises = Promise.all([read(range11Reader, result11), + read(range12Reader, result12), + read(range21Reader, result21), + read(range22Reader, result22), + promise1, promise2]); + + readPromises.then(function () { + expect(result11.value).toEqual(rangeSize); + expect(result12.value).toEqual(tailSize); + expect(result21.value).toEqual(rangeSize); + expect(result22.value).toEqual(tailSize); + expect(isStreamingSupported1).toEqual(false); + expect(isRangeSupported1).toEqual(true); + expect(fullReaderCancelled1).toEqual(true); + expect(isStreamingSupported2).toEqual(false); + expect(isRangeSupported2).toEqual(true); + expect(fullReaderCancelled2).toEqual(true); + done(); + }).catch(function (reason) { + done.fail(reason); + }); + }); +});