diff --git a/examples/node/getinfo.js b/examples/node/getinfo.js index 1e91f586a..3dce2e20a 100644 --- a/examples/node/getinfo.js +++ b/examples/node/getinfo.js @@ -17,11 +17,10 @@ var pdfjsLib = require('pdfjs-dist'); // Loading file from file system into typed array var pdfPath = process.argv[2] || '../../web/compressed.tracemonkey-pldi-09.pdf'; -var data = new Uint8Array(fs.readFileSync(pdfPath)); // Will be using promises to load document, pages and misc data instead of // callback. -pdfjsLib.getDocument(data).then(function (doc) { +pdfjsLib.getDocument(pdfPath).then(function (doc) { var numPages = doc.numPages; console.log('# Document Loaded'); console.log('Number of Pages: ' + numPages); diff --git a/src/display/network.js b/src/display/network.js index cdb375398..5d4f39314 100644 --- a/src/display/network.js +++ b/src/display/network.js @@ -14,7 +14,7 @@ */ import { - assert, createPromiseCapability, isInt, MissingPDFException, + assert, createPromiseCapability, MissingPDFException, UnexpectedResponseException } from '../shared/util'; import globalScope from '../shared/global_scope'; @@ -352,18 +352,16 @@ function PDFNetworkStreamFullRequestReader(manager, options) { } PDFNetworkStreamFullRequestReader.prototype = { - getResponseHeader(name) { - let fullRequestXhrId = this._fullRequestId; - let fullRequestXhr = this._manager.getRequestXhr(fullRequestXhrId); - - return fullRequestXhr.getResponseHeader(name); - }, - _onHeadersReceived: function PDFNetworkStreamFullRequestReader_onHeadersReceived() { + var fullRequestXhrId = this._fullRequestId; + var fullRequestXhr = this._manager.getRequestXhr(fullRequestXhrId); + let { allowRangeRequests, suggestedLength, } = validateRangeRequestCapabilities({ - getResponseHeader: this.getResponseHeader.bind(this), + getResponseHeader: (name) => { + return fullRequestXhr.getResponseHeader(name); + }, isHttp: this._manager.isHttp, rangeChunkSize: this._rangeChunkSize, disableRange: this._disableRange, @@ -377,7 +375,6 @@ PDFNetworkStreamFullRequestReader.prototype = { } var networkManager = this._manager; - var fullRequestXhrId = this._fullRequestId; if (networkManager.isStreamingRequest(fullRequestXhrId)) { // We can continue fetching when progressive loading is enabled, // and we don't need the autoFetch feature. diff --git a/src/display/network_utils.js b/src/display/network_utils.js index 9d8ebe1c0..721afa4cc 100644 --- a/src/display/network_utils.js +++ b/src/display/network_utils.js @@ -13,10 +13,11 @@ * limitations under the License. */ -import { isInt } from '../shared/util'; +import { assert, isInt } from '../shared/util'; function validateRangeRequestCapabilities({ getResponseHeader, isHttp, rangeChunkSize, disableRange, }) { + assert(rangeChunkSize > 0); let returnValues = { allowRangeRequests: false, suggestedLength: undefined, diff --git a/src/display/node_stream.js b/src/display/node_stream.js index 009157338..aca82a902 100644 --- a/src/display/node_stream.js +++ b/src/display/node_stream.js @@ -29,7 +29,8 @@ class PDFNodeStream { this.url = url.parse(this.source.url); this.isHttp = this.url.protocol === 'http:' || this.url.protocol === 'https:'; - this.isFsUrl = !this.url.host; + // Check if url refers to filesystem. + this.isFsUrl = this.url.protocol === 'file:' || !this.url.host; this.httpHeaders = (this.isHttp && this.source.httpHeaders) || {}; this._fullRequest = null; @@ -71,10 +72,19 @@ class BaseFullReader { this._errored = false; this._reason = null; this.onProgress = null; - this._length = stream.source.length; + this._contentLength = stream.source.length; // optional this._loaded = 0; - this._fullRequest = null; + this._disableRange = stream.options.disableRange || false; + this._rangeChunkSize = stream.source.rangeChunkSize; + if (!this._rangeChunkSize && !this._disableRange) { + this._disableRange = true; + } + + this._isStreamingSupported = !stream.source.disableStream; + this._isRangeSupported = !stream.options.disableRange; + + this._readableStream = null; this._readCapability = createPromiseCapability(); this._headersCapability = createPromiseCapability(); } @@ -84,7 +94,7 @@ class BaseFullReader { } get contentLength() { - return this._length; + return this._contentLength; } get isRangeSupported() { @@ -104,7 +114,7 @@ class BaseFullReader { return Promise.reject(this._reason); } - let chunk = this._fullRequest.read(); + let chunk = this._readableStream.read(); if (chunk === null) { this._readCapability = createPromiseCapability(); return this.read(); @@ -113,16 +123,52 @@ class BaseFullReader { if (this.onProgress) { this.onProgress({ loaded: this._loaded, - total: this._length, + total: this._contentLength, }); } - return Promise.resolve({ value: chunk, done: false, }); + // Ensure that `read()` method returns ArrayBuffer. + let buffer = new Uint8Array(chunk).buffer; + return Promise.resolve({ value: buffer, done: false, }); }); } cancel(reason) { - this._fullRequest.close(reason); - this._fullRequest.destroy(reason); + // Call `this._error()` method when cancel is called + // before _readableStream is set. + if (!this._readableStream) { + this._error(reason); + return; + } + this._readableStream.destroy(reason); + } + + _error(reason) { + this._errored = true; + this._reason = reason; + this._readCapability.resolve(); + } + + _setReadableStream(readableStream) { + this._readableStream = readableStream; + readableStream.on('readable', () => { + this._readCapability.resolve(); + }); + + readableStream.on('end', () => { + // Destroy readable to minimize resource usage. + readableStream.destroy(); + this._done = true; + this._readCapability.resolve(); + }); + + readableStream.on('error', (reason) => { + this._error(reason); + }); + + // Destroy ReadableStream if already in errored state. + if (this._errored) { + this._readableStream.destroy(this._reason); + } } } @@ -133,13 +179,15 @@ class BaseRangeReader { this._errored = false; this._reason = null; this.onProgress = null; - this._length = stream.source.length; this._loaded = 0; + this._readableStream = null; this._readCapability = createPromiseCapability(); + + this._isStreamingSupported = !stream.source.disableStream; } get isStreamingSupported() { - return false; + return this._isStreamingSupported; } read() { @@ -151,80 +199,88 @@ class BaseRangeReader { return Promise.reject(this._reason); } - let chunk = this._read(); + let chunk = this._readableStream.read(); if (chunk === null) { this._readCapability = createPromiseCapability(); return this.read(); } this._loaded += chunk.length; if (this.onProgress) { - this.onProgress({ - loaded: this._loaded, - total: this._length, - }); + this.onProgress({ loaded: this._loaded, }); } - return Promise.resolve({ value: chunk, done: false, }); + // Ensure that `read()` method returns ArrayBuffer. + let buffer = new Uint8Array(chunk).buffer; + return Promise.resolve({ value: buffer, done: false, }); }); } + + cancel(reason) { + // Call `this._error()` method when cancel is called + // before _readableStream is set. + if (!this._readableStream) { + this._error(reason); + return; + } + this._readableStream.destroy(reason); + } + + _error(reason) { + this._errored = true; + this._reason = reason; + this._readCapability.resolve(); + } + + _setReadableStream(readableStream) { + this._readableStream = readableStream; + readableStream.on('readable', () => { + this._readCapability.resolve(); + }); + + readableStream.on('end', () => { + // Destroy readableStream to minimize resource usage. + readableStream.destroy(); + this._done = true; + this._readCapability.resolve(); + }); + + readableStream.on('error', (reason) => { + this._error(reason); + }); + + // Destroy readableStream if already in errored state. + if (this._errored) { + this._readableStream.destroy(this._reason); + } + } +} + +function createRequestOptions(url, headers) { + return { + protocol: url.protocol, + auth: url.auth, + host: url.hostname, + port: url.port, + path: url.path, + method: 'GET', + headers, + }; } class PDFNodeStreamFullReader extends BaseFullReader { constructor(stream) { super(stream); - this._disableRange = stream.options.disableRange || false; - this._rangeChunkSize = stream.source.rangeChunkSize; - if (!this._rangeChunkSize && !this._disableRange) { - this._disableRange = true; - } - - this._isStreamingSupported = !stream.source.disableStream; - this._isRangeSupported = false; - - let options = { - host: this._url.host, - path: this._url.path, - method: 'GET', - headers: stream.httpHeaders, - }; - let handleResponse = (response) => { this._headersCapability.resolve(); - this._fullRequest = response; + this._setReadableStream(response); - response.on('readable', () => { - this._readCapability.resolve(); - }); - - response.on('end', () => { - // Destroy response to minimize resource usage. - response.destroy(); - this._done = true; - this._readCapability.resolve(); - }); - - response.on('error', (reason) => { - this._errored = true; - this._reason = reason; - this._readCapability.resolve(); - }); - }; - - this._request = this._url.protocol === 'http:' ? - http.request(options, handleResponse) : - https.request(options, handleResponse); - - this._request.on('error', (reason) => { - this._errored = true; - this._reason = reason; - this._headersCapability.reject(reason); - }); - this._request.end(); - - this._headersCapability.promise.then(() => { let { allowRangeRequests, suggestedLength, } = validateRangeRequestCapabilities({ - getResponseHeader: this.getResponseHeader.bind(this), + getResponseHeader: (name) => { + // Make sure that headers name are in lower case, as mentioned + // here: https://nodejs.org/api/http.html#http_message_headers. + return this._readableStream.headers[name.toLowerCase()]; + }, isHttp: stream.isHttp, rangeChunkSize: this._rangeChunkSize, disableRange: this._disableRange, @@ -233,12 +289,28 @@ class PDFNodeStreamFullReader extends BaseFullReader { if (allowRangeRequests) { this._isRangeSupported = true; } - this._length = suggestedLength; - }); - } + // Setting right content length. + this._contentLength = suggestedLength; + }; - getReasponseHeader(name) { - return this._fullRequest.headers[name]; + this._request = null; + if (this._url.protocol === 'http:') { + this._request = http.request(createRequestOptions( + this._url, stream.httpHeaders), handleResponse); + } else { + this._request = https.request(createRequestOptions( + this._url, stream.httpHeaders), handleResponse); + } + + this._request.on('error', (reason) => { + this._errored = true; + this._reason = reason; + this._headersCapability.reject(reason); + }); + // Note: `request.end(data)` is used to write `data` to request body + // and notify end of request. But one should always call `request.end()` + // even if there is no data to write -- (to notify the end of request). + this._request.end(); } } @@ -246,40 +318,28 @@ class PDFNodeStreamRangeReader extends BaseRangeReader { constructor(stream, start, end) { super(stream); - this._rangeRequest = null; - this._read = null; - let rangeStr = start + '-' + (end - 1); - stream.httpHeaders['Range'] = 'bytes=' + rangeStr; + this._httpHeaders = {}; + for (let property in stream.httpHeaders) { + let value = stream.httpHeaders[property]; + if (typeof value === 'undefined') { + continue; + } + this._httpHeaders[property] = value; + } + this._httpHeaders['Range'] = `bytes=${start}-${end - 1}`; - let options = { - host: this._url.host, - path: this._url.path, - method: 'GET', - headers: stream.httpHeaders, - }; - let handleResponse = (response) => { - this._rangeRequest = response; - this._read = this._rangeRequest.read; - - response.on('readable', () => { - this._readCapability.resolve(); - }); - - response.on('end', () => { - response.destroy(); - this._done = true; - this._readCapability.resolve(); - }); - - response.on('error', (reason) => { - this._errored = true; - this._reason = reason; - this._readCapability.resolve(); - }); - }; - this._request = this._url.protocol === 'http:' ? - http.request(options, handleResponse) : - https.request(options, handleResponse); + this._request = null; + if (this._url.protocol === 'http:') { + this._request = http.request(createRequestOptions( + this._url, this._httpHeaders), (response) => { + this._setReadableStream(response); + }); + } else { + this._request = https.request(createRequestOptions( + this._url, this._httpHeaders), (response) => { + this._setReadableStream(response); + }); + } this._request.on('error', (reason) => { this._errored = true; @@ -287,20 +347,13 @@ class PDFNodeStreamRangeReader extends BaseRangeReader { }); this._request.end(); } - - cancel(reason) { - this._rangeRequest.close(reason); - this._rangeRequest.destroy(reason); - } } class PDFNodeStreamFsFullReader extends BaseFullReader { constructor(stream) { super(stream); - this._isRangeSupported = true; - this._isStreamingSupported = true; - this._fullRequest = fs.createReadStream(this._url.path); + this._setReadableStream(fs.createReadStream(this._url.path)); fs.lstat(this._url.path, (error, stat) => { if (error) { @@ -309,25 +362,10 @@ class PDFNodeStreamFsFullReader extends BaseFullReader { this._headersCapability.reject(error); return; } - this._length = stat.size; + // Setting right content length. + this._contentLength = stat.size; this._headersCapability.resolve(); }); - - this._fullRequest.on('readable', () => { - this._readCapability.resolve(); - }); - - this._fullRequest.on('end', () => { - this._fullRequest.destroy(); - this._done = true; - this._readCapability.resolve(); - }); - - this._fullRequest.on('error', (reason) => { - this._errored = true; - this._reason = reason; - this._readCapability.resolve(); - }); } } @@ -335,37 +373,8 @@ class PDFNodeStreamFsRangeReader extends BaseRangeReader { constructor(stream, start, end) { super(stream); - this._rangeRequest = fs.createReadStream(this._url.path, { start, end, }); - fs.lstat(this._url.path, (error, stat) => { - if (error) { - this._errored = true; - this._reason = error; - return; - } - this._length = stat.size; - }); - this._read = this._rangeRequest.read; - - this._rangeRequest.on('readable', () => { - this._readCapability.resolve(); - }); - - this._rangeRequest.on('end', () => { - this._rangeRequest.destroy(); - this._done = true; - this._readCapability.resolve(); - }); - - this._rangeRequest.on('error', (reason) => { - this._errored = true; - this._reason = reason; - this._readCapability.resolve(); - }); - } - - cancel(reason) { - this._rangeRequest.close(reason); - this._rangeRequest.destroy(reason); + this._setReadableStream( + fs.createReadStream(this._url.path, { start, end: end - 1, })); } } diff --git a/test/unit/clitests.json b/test/unit/clitests.json index bf8d3ff84..3c98a6f0a 100644 --- a/test/unit/clitests.json +++ b/test/unit/clitests.json @@ -15,6 +15,7 @@ "fonts_spec.js", "function_spec.js", "murmurhash3_spec.js", + "node_stream_spec.js", "parser_spec.js", "primitives_spec.js", "stream_spec.js", diff --git a/test/unit/jasmine-boot.js b/test/unit/jasmine-boot.js index 2f5f3f06e..8820e6c7d 100644 --- a/test/unit/jasmine-boot.js +++ b/test/unit/jasmine-boot.js @@ -43,6 +43,8 @@ function initializePDFJS(callback) { Promise.all([ 'pdfjs/display/global', + 'pdfjs/display/api', + 'pdfjs/display/network', 'pdfjs-test/unit/annotation_spec', 'pdfjs-test/unit/api_spec', 'pdfjs-test/unit/bidi_spec', @@ -72,7 +74,11 @@ function initializePDFJS(callback) { return SystemJS.import(moduleName); })).then(function (modules) { var displayGlobal = modules[0]; + var displayApi = modules[1]; + var PDFNetworkStream = modules[2].PDFNetworkStream; + // Set network stream class for unit tests. + displayApi.setPDFNetworkStreamClass(PDFNetworkStream); // Configure the worker. displayGlobal.PDFJS.workerSrc = '../../build/generic/build/pdf.worker.js'; // Opt-in to using the latest API. diff --git a/test/unit/node_stream_spec.js b/test/unit/node_stream_spec.js new file mode 100644 index 000000000..3812ab905 --- /dev/null +++ b/test/unit/node_stream_spec.js @@ -0,0 +1,234 @@ +/* Copyright 2017 Mozilla Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* globals __non_webpack_require__ */ + +import { assert, isNodeJS } from '../../src/shared/util'; +import { PDFNodeStream } from '../../src/display/node_stream'; + +// Make sure that we only running this script is Node.js environments. +assert(isNodeJS()); + +let path = __non_webpack_require__('path'); +let url = __non_webpack_require__('url'); +let http = __non_webpack_require__('http'); +let fs = __non_webpack_require__('fs'); + +describe('node_stream', function() { + let server = null; + let port = null; + let pdf = url.parse(encodeURI('file://' + path.join(process.cwd(), + './test/pdfs/tracemonkey.pdf'))).href; + let pdfLength = 1016315; + + beforeAll((done) => { + // Create http server to serve pdf data for tests. + server = http.createServer((request, response) => { + let filePath = process.cwd() + '/test/pdfs' + request.url; + fs.lstat(filePath, (error, stat) => { + if (error) { + response.writeHead(404); + response.end(`File ${request.url} not found!`); + return; + } + if (!request.headers['range']) { + let contentLength = stat.size; + let stream = fs.createReadStream(filePath); + response.writeHead(200, { + 'Content-Type': 'application/pdf', + 'Content-Length': contentLength, + 'Accept-Ranges': 'bytes', + }); + stream.pipe(response); + } else { + let [start, end] = + request.headers['range'].split('=')[1].split('-').map((x) => { + return Number(x); + }); + let stream = fs.createReadStream(filePath, { start, end, }); + response.writeHead(206, { + 'Content-Type': 'application/pdf', + }); + stream.pipe(response); + } + }); + }).listen(0); /* Listen on a random free port */ + port = server.address().port; + done(); + }); + + afterAll((done) => { + // Close the server from accepting new connections after all test finishes. + server.close(); + done(); + }); + + it('read both http(s) and filesystem pdf files', function(done) { + let stream1 = new PDFNodeStream({ + source: { + url: `http://127.0.0.1:${port}/tracemonkey.pdf`, + rangeChunkSize: 65536, + disableStream: true, + }, + disableRange: true, + }); + + let stream2 = new PDFNodeStream({ + source: { + url: pdf, + rangeChunkSize: 65536, + disableStream: true, + }, + disableRange: true, + }); + + let fullReader1 = stream1.getFullReader(); + let fullReader2 = stream2.getFullReader(); + + let isStreamingSupported1, isRangeSupported1; + let promise1 = fullReader1.headersReady.then(() => { + isStreamingSupported1 = fullReader1.isStreamingSupported; + isRangeSupported1 = fullReader1.isRangeSupported; + }); + + let isStreamingSupported2, isRangeSupported2; + let promise2 = fullReader2.headersReady.then(() => { + isStreamingSupported2 = fullReader2.isStreamingSupported; + isRangeSupported2 = fullReader2.isRangeSupported; + }); + + let len1 = 0, len2 = 0; + let read1 = function () { + return fullReader1.read().then(function (result) { + if (result.done) { + return; + } + len1 += result.value.byteLength; + return read1(); + }); + }; + let read2 = function () { + return fullReader2.read().then(function (result) { + if (result.done) { + return; + } + len2 += result.value.byteLength; + return read2(); + }); + }; + + let readPromise = Promise.all([read1(), read2(), promise1, promise2]); + readPromise.then((result) => { + expect(isStreamingSupported1).toEqual(false); + expect(isRangeSupported1).toEqual(false); + expect(isStreamingSupported2).toEqual(false); + expect(isRangeSupported2).toEqual(false); + expect(len1).toEqual(pdfLength); + expect(len1).toEqual(len2); + done(); + }).catch((reason) => { + done.fail(reason); + }); + }); + + it('read custom ranges for both http(s) and filesystem urls', + function(done) { + let rangeSize = 32768; + let stream1 = new PDFNodeStream({ + source: { + url: `http://127.0.0.1:${port}/tracemonkey.pdf`, + length: pdfLength, + rangeChunkSize: rangeSize, + disableStream: true, + }, + disableRange: false, + }); + let stream2 = new PDFNodeStream({ + source: { + url: pdf, + length: pdfLength, + rangeChunkSize: rangeSize, + disableStream: true, + }, + disableRange: false, + }); + + let fullReader1 = stream1.getFullReader(); + let fullReader2 = stream2.getFullReader(); + + let isStreamingSupported1, isRangeSupported1, fullReaderCancelled1; + let isStreamingSupported2, isRangeSupported2, fullReaderCancelled2; + + let promise1 = fullReader1.headersReady.then(function () { + isStreamingSupported1 = fullReader1.isStreamingSupported; + isRangeSupported1 = fullReader1.isRangeSupported; + // we shall be able to close the full reader without issues + fullReader1.cancel('Don\'t need full reader'); + fullReaderCancelled1 = true; + }); + + let promise2 = fullReader2.headersReady.then(function () { + isStreamingSupported2 = fullReader2.isStreamingSupported; + isRangeSupported2 = fullReader2.isRangeSupported; + fullReader2.cancel('Don\'t need full reader'); + fullReaderCancelled2 = true; + }); + + // Skipping fullReader results, requesting something from the PDF end. + let tailSize = (pdfLength % rangeSize) || rangeSize; + + let range11Reader = stream1.getRangeReader(pdfLength - tailSize - rangeSize, + pdfLength - tailSize); + let range12Reader = stream1.getRangeReader(pdfLength - tailSize, pdfLength); + + let range21Reader = stream2.getRangeReader(pdfLength - tailSize - rangeSize, + pdfLength - tailSize); + let range22Reader = stream2.getRangeReader(pdfLength - tailSize, pdfLength); + + let result11 = { value: 0, }, result12 = { value: 0, }; + let result21 = { value: 0, }, result22 = { value: 0, }; + + let read = function (reader, lenResult) { + return reader.read().then(function (result) { + if (result.done) { + return; + } + lenResult.value += result.value.byteLength; + return read(reader, lenResult); + }); + }; + + let readPromises = Promise.all([read(range11Reader, result11), + read(range12Reader, result12), + read(range21Reader, result21), + read(range22Reader, result22), + promise1, promise2]); + + readPromises.then(function () { + expect(result11.value).toEqual(rangeSize); + expect(result12.value).toEqual(tailSize); + expect(result21.value).toEqual(rangeSize); + expect(result22.value).toEqual(tailSize); + expect(isStreamingSupported1).toEqual(false); + expect(isRangeSupported1).toEqual(true); + expect(fullReaderCancelled1).toEqual(true); + expect(isStreamingSupported2).toEqual(false); + expect(isRangeSupported2).toEqual(true); + expect(fullReaderCancelled2).toEqual(true); + done(); + }).catch(function (reason) { + done.fail(reason); + }); + }); +});