Merge pull request #8712 from mukulmishra18/node_stream

Adds node.js logic for networking tasks for PDF.js
This commit is contained in:
Yury Delendik 2017-08-24 11:35:29 -05:00 committed by GitHub
commit e82811adb4
8 changed files with 707 additions and 50 deletions

View File

@ -17,11 +17,10 @@ var pdfjsLib = require('pdfjs-dist');
// Loading file from file system into typed array
var pdfPath = process.argv[2] || '../../web/compressed.tracemonkey-pldi-09.pdf';
var data = new Uint8Array(fs.readFileSync(pdfPath));
// Will be using promises to load document, pages and misc data instead of
// callback.
pdfjsLib.getDocument(data).then(function (doc) {
pdfjsLib.getDocument(pdfPath).then(function (doc) {
var numPages = doc.numPages;
console.log('# Document Loaded');
console.log('Number of Pages: ' + numPages);

View File

@ -14,11 +14,11 @@
*/
import {
assert, createPromiseCapability, isInt, MissingPDFException,
assert, createPromiseCapability, MissingPDFException,
UnexpectedResponseException
} from '../shared/util';
import globalScope from '../shared/global_scope';
import { setPDFNetworkStreamClass } from './api';
import { validateRangeRequestCapabilities } from './network_utils';
if (typeof PDFJSDev !== 'undefined' && PDFJSDev.test('FIREFOX || MOZCENTRAL')) {
throw new Error('Module "./network" shall not ' +
@ -352,56 +352,29 @@ function PDFNetworkStreamFullRequestReader(manager, options) {
}
PDFNetworkStreamFullRequestReader.prototype = {
_validateRangeRequestCapabilities: function
PDFNetworkStreamFullRequestReader_validateRangeRequestCapabilities() {
if (this._disableRange) {
return false;
}
var networkManager = this._manager;
if (!networkManager.isHttp) {
return false;
}
var fullRequestXhrId = this._fullRequestId;
var fullRequestXhr = networkManager.getRequestXhr(fullRequestXhrId);
if (fullRequestXhr.getResponseHeader('Accept-Ranges') !== 'bytes') {
return false;
}
var contentEncoding =
fullRequestXhr.getResponseHeader('Content-Encoding') || 'identity';
if (contentEncoding !== 'identity') {
return false;
}
var length = fullRequestXhr.getResponseHeader('Content-Length');
length = parseInt(length, 10);
if (!isInt(length)) {
return false;
}
this._contentLength = length; // setting right content length
if (length <= 2 * this._rangeChunkSize) {
// The file size is smaller than the size of two chunks, so it does
// not make any sense to abort the request and retry with a range
// request.
return false;
}
return true;
},
_onHeadersReceived:
function PDFNetworkStreamFullRequestReader_onHeadersReceived() {
var fullRequestXhrId = this._fullRequestId;
var fullRequestXhr = this._manager.getRequestXhr(fullRequestXhrId);
if (this._validateRangeRequestCapabilities()) {
let { allowRangeRequests, suggestedLength, } =
validateRangeRequestCapabilities({
getResponseHeader: (name) => {
return fullRequestXhr.getResponseHeader(name);
},
isHttp: this._manager.isHttp,
rangeChunkSize: this._rangeChunkSize,
disableRange: this._disableRange,
});
// Setting right content length.
this._contentLength = suggestedLength || this._contentLength;
if (allowRangeRequests) {
this._isRangeSupported = true;
}
var networkManager = this._manager;
var fullRequestXhrId = this._fullRequestId;
if (networkManager.isStreamingRequest(fullRequestXhrId)) {
// We can continue fetching when progressive loading is enabled,
// and we don't need the autoFetch feature.
@ -594,8 +567,6 @@ PDFNetworkStreamRangeRequestReader.prototype = {
},
};
setPDFNetworkStreamClass(PDFNetworkStream);
export {
PDFNetworkStream,
NetworkManager,

View File

@ -0,0 +1,57 @@
/* Copyright 2012 Mozilla Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { assert, isInt } from '../shared/util';
function validateRangeRequestCapabilities({ getResponseHeader, isHttp,
rangeChunkSize, disableRange, }) {
assert(rangeChunkSize > 0);
let returnValues = {
allowRangeRequests: false,
suggestedLength: undefined,
};
if (disableRange || !isHttp) {
return returnValues;
}
if (getResponseHeader('Accept-Ranges') !== 'bytes') {
return returnValues;
}
let contentEncoding = getResponseHeader('Content-Encoding') || 'identity';
if (contentEncoding !== 'identity') {
return returnValues;
}
let length = getResponseHeader('Content-Length');
length = parseInt(length, 10);
if (!isInt(length)) {
return returnValues;
}
returnValues.suggestedLength = length;
if (length <= 2 * rangeChunkSize) {
// The file size is smaller than the size of two chunks, so it does
// not make any sense to abort the request and retry with a range
// request.
return returnValues;
}
returnValues.allowRangeRequests = true;
return returnValues;
}
export {
validateRangeRequestCapabilities,
};

383
src/display/node_stream.js Normal file
View File

@ -0,0 +1,383 @@
/* Copyright 2012 Mozilla Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* globals __non_webpack_require__ */
let fs = __non_webpack_require__('fs');
let http = __non_webpack_require__('http');
let https = __non_webpack_require__('https');
let url = __non_webpack_require__('url');
import { assert, createPromiseCapability } from '../shared/util';
import { validateRangeRequestCapabilities } from './network_utils';
class PDFNodeStream {
constructor(options) {
this.options = options;
this.source = options.source;
this.url = url.parse(this.source.url);
this.isHttp = this.url.protocol === 'http:' ||
this.url.protocol === 'https:';
// Check if url refers to filesystem.
this.isFsUrl = this.url.protocol === 'file:' || !this.url.host;
this.httpHeaders = (this.isHttp && this.source.httpHeaders) || {};
this._fullRequest = null;
this._rangeRequestReaders = [];
}
getFullReader() {
assert(!this._fullRequest);
this._fullRequest = this.isFsUrl ?
new PDFNodeStreamFsFullReader(this) :
new PDFNodeStreamFullReader(this);
return this._fullRequest;
}
getRangeReader(start, end) {
let rangeReader = this.isFsUrl ?
new PDFNodeStreamFsRangeReader(this, start, end) :
new PDFNodeStreamRangeReader(this, start, end);
this._rangeRequestReaders.push(rangeReader);
return rangeReader;
}
cancelAllRequests(reason) {
if (this._fullRequest) {
this._fullRequest.cancel(reason);
}
let readers = this._rangeRequestReaders.slice(0);
readers.forEach(function(reader) {
reader.cancel(reason);
});
}
}
class BaseFullReader {
constructor(stream) {
this._url = stream.url;
this._done = false;
this._errored = false;
this._reason = null;
this.onProgress = null;
this._contentLength = stream.source.length; // optional
this._loaded = 0;
this._disableRange = stream.options.disableRange || false;
this._rangeChunkSize = stream.source.rangeChunkSize;
if (!this._rangeChunkSize && !this._disableRange) {
this._disableRange = true;
}
this._isStreamingSupported = !stream.source.disableStream;
this._isRangeSupported = !stream.options.disableRange;
this._readableStream = null;
this._readCapability = createPromiseCapability();
this._headersCapability = createPromiseCapability();
}
get headersReady() {
return this._headersCapability.promise;
}
get contentLength() {
return this._contentLength;
}
get isRangeSupported() {
return this._isRangeSupported;
}
get isStreamingSupported() {
return this._isStreamingSupported;
}
read() {
return this._readCapability.promise.then(() => {
if (this._done) {
return Promise.resolve({ value: undefined, done: true, });
}
if (this._errored) {
return Promise.reject(this._reason);
}
let chunk = this._readableStream.read();
if (chunk === null) {
this._readCapability = createPromiseCapability();
return this.read();
}
this._loaded += chunk.length;
if (this.onProgress) {
this.onProgress({
loaded: this._loaded,
total: this._contentLength,
});
}
// Ensure that `read()` method returns ArrayBuffer.
let buffer = new Uint8Array(chunk).buffer;
return Promise.resolve({ value: buffer, done: false, });
});
}
cancel(reason) {
// Call `this._error()` method when cancel is called
// before _readableStream is set.
if (!this._readableStream) {
this._error(reason);
return;
}
this._readableStream.destroy(reason);
}
_error(reason) {
this._errored = true;
this._reason = reason;
this._readCapability.resolve();
}
_setReadableStream(readableStream) {
this._readableStream = readableStream;
readableStream.on('readable', () => {
this._readCapability.resolve();
});
readableStream.on('end', () => {
// Destroy readable to minimize resource usage.
readableStream.destroy();
this._done = true;
this._readCapability.resolve();
});
readableStream.on('error', (reason) => {
this._error(reason);
});
// Destroy ReadableStream if already in errored state.
if (this._errored) {
this._readableStream.destroy(this._reason);
}
}
}
class BaseRangeReader {
constructor(stream) {
this._url = stream.url;
this._done = false;
this._errored = false;
this._reason = null;
this.onProgress = null;
this._loaded = 0;
this._readableStream = null;
this._readCapability = createPromiseCapability();
this._isStreamingSupported = !stream.source.disableStream;
}
get isStreamingSupported() {
return this._isStreamingSupported;
}
read() {
return this._readCapability.promise.then(() => {
if (this._done) {
return Promise.resolve({ value: undefined, done: true, });
}
if (this._errored) {
return Promise.reject(this._reason);
}
let chunk = this._readableStream.read();
if (chunk === null) {
this._readCapability = createPromiseCapability();
return this.read();
}
this._loaded += chunk.length;
if (this.onProgress) {
this.onProgress({ loaded: this._loaded, });
}
// Ensure that `read()` method returns ArrayBuffer.
let buffer = new Uint8Array(chunk).buffer;
return Promise.resolve({ value: buffer, done: false, });
});
}
cancel(reason) {
// Call `this._error()` method when cancel is called
// before _readableStream is set.
if (!this._readableStream) {
this._error(reason);
return;
}
this._readableStream.destroy(reason);
}
_error(reason) {
this._errored = true;
this._reason = reason;
this._readCapability.resolve();
}
_setReadableStream(readableStream) {
this._readableStream = readableStream;
readableStream.on('readable', () => {
this._readCapability.resolve();
});
readableStream.on('end', () => {
// Destroy readableStream to minimize resource usage.
readableStream.destroy();
this._done = true;
this._readCapability.resolve();
});
readableStream.on('error', (reason) => {
this._error(reason);
});
// Destroy readableStream if already in errored state.
if (this._errored) {
this._readableStream.destroy(this._reason);
}
}
}
function createRequestOptions(url, headers) {
return {
protocol: url.protocol,
auth: url.auth,
host: url.hostname,
port: url.port,
path: url.path,
method: 'GET',
headers,
};
}
class PDFNodeStreamFullReader extends BaseFullReader {
constructor(stream) {
super(stream);
let handleResponse = (response) => {
this._headersCapability.resolve();
this._setReadableStream(response);
let { allowRangeRequests, suggestedLength, } =
validateRangeRequestCapabilities({
getResponseHeader: (name) => {
// Make sure that headers name are in lower case, as mentioned
// here: https://nodejs.org/api/http.html#http_message_headers.
return this._readableStream.headers[name.toLowerCase()];
},
isHttp: stream.isHttp,
rangeChunkSize: this._rangeChunkSize,
disableRange: this._disableRange,
});
if (allowRangeRequests) {
this._isRangeSupported = true;
}
// Setting right content length.
this._contentLength = suggestedLength;
};
this._request = null;
if (this._url.protocol === 'http:') {
this._request = http.request(createRequestOptions(
this._url, stream.httpHeaders), handleResponse);
} else {
this._request = https.request(createRequestOptions(
this._url, stream.httpHeaders), handleResponse);
}
this._request.on('error', (reason) => {
this._errored = true;
this._reason = reason;
this._headersCapability.reject(reason);
});
// Note: `request.end(data)` is used to write `data` to request body
// and notify end of request. But one should always call `request.end()`
// even if there is no data to write -- (to notify the end of request).
this._request.end();
}
}
class PDFNodeStreamRangeReader extends BaseRangeReader {
constructor(stream, start, end) {
super(stream);
this._httpHeaders = {};
for (let property in stream.httpHeaders) {
let value = stream.httpHeaders[property];
if (typeof value === 'undefined') {
continue;
}
this._httpHeaders[property] = value;
}
this._httpHeaders['Range'] = `bytes=${start}-${end - 1}`;
this._request = null;
if (this._url.protocol === 'http:') {
this._request = http.request(createRequestOptions(
this._url, this._httpHeaders), (response) => {
this._setReadableStream(response);
});
} else {
this._request = https.request(createRequestOptions(
this._url, this._httpHeaders), (response) => {
this._setReadableStream(response);
});
}
this._request.on('error', (reason) => {
this._errored = true;
this._reason = reason;
});
this._request.end();
}
}
class PDFNodeStreamFsFullReader extends BaseFullReader {
constructor(stream) {
super(stream);
this._setReadableStream(fs.createReadStream(this._url.path));
fs.lstat(this._url.path, (error, stat) => {
if (error) {
this._errored = true;
this._reason = error;
this._headersCapability.reject(error);
return;
}
// Setting right content length.
this._contentLength = stat.size;
this._headersCapability.resolve();
});
}
}
class PDFNodeStreamFsRangeReader extends BaseRangeReader {
constructor(stream, start, end) {
super(stream);
this._setReadableStream(
fs.createReadStream(this._url.path, { start, end: end - 1, }));
}
}
export {
PDFNodeStream,
};

View File

@ -31,7 +31,13 @@ var pdfjsDisplaySVG = require('./display/svg.js');
if (typeof PDFJSDev === 'undefined' ||
!PDFJSDev.test('FIREFOX || MOZCENTRAL')) {
require('./display/network.js');
if (pdfjsSharedUtil.isNodeJS()) {
var PDFNodeStream = require('./display/node_stream.js').PDFNodeStream;
pdfjsDisplayAPI.setPDFNetworkStreamClass(PDFNodeStream);
} else {
var PDFNetworkStream = require('./display/network.js').PDFNetworkStream;
pdfjsDisplayAPI.setPDFNetworkStreamClass(PDFNetworkStream);
}
}
exports.PDFJS = pdfjsDisplayGlobal.PDFJS;

View File

@ -15,6 +15,7 @@
"fonts_spec.js",
"function_spec.js",
"murmurhash3_spec.js",
"node_stream_spec.js",
"parser_spec.js",
"primitives_spec.js",
"stream_spec.js",

View File

@ -43,6 +43,8 @@
function initializePDFJS(callback) {
Promise.all([
'pdfjs/display/global',
'pdfjs/display/api',
'pdfjs/display/network',
'pdfjs-test/unit/annotation_spec',
'pdfjs-test/unit/api_spec',
'pdfjs-test/unit/bidi_spec',
@ -72,7 +74,11 @@ function initializePDFJS(callback) {
return SystemJS.import(moduleName);
})).then(function (modules) {
var displayGlobal = modules[0];
var displayApi = modules[1];
var PDFNetworkStream = modules[2].PDFNetworkStream;
// Set network stream class for unit tests.
displayApi.setPDFNetworkStreamClass(PDFNetworkStream);
// Configure the worker.
displayGlobal.PDFJS.workerSrc = '../../build/generic/build/pdf.worker.js';
// Opt-in to using the latest API.

View File

@ -0,0 +1,234 @@
/* Copyright 2017 Mozilla Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* globals __non_webpack_require__ */
import { assert, isNodeJS } from '../../src/shared/util';
import { PDFNodeStream } from '../../src/display/node_stream';
// Make sure that we only running this script is Node.js environments.
assert(isNodeJS());
let path = __non_webpack_require__('path');
let url = __non_webpack_require__('url');
let http = __non_webpack_require__('http');
let fs = __non_webpack_require__('fs');
describe('node_stream', function() {
let server = null;
let port = null;
let pdf = url.parse(encodeURI('file://' + path.join(process.cwd(),
'./test/pdfs/tracemonkey.pdf'))).href;
let pdfLength = 1016315;
beforeAll((done) => {
// Create http server to serve pdf data for tests.
server = http.createServer((request, response) => {
let filePath = process.cwd() + '/test/pdfs' + request.url;
fs.lstat(filePath, (error, stat) => {
if (error) {
response.writeHead(404);
response.end(`File ${request.url} not found!`);
return;
}
if (!request.headers['range']) {
let contentLength = stat.size;
let stream = fs.createReadStream(filePath);
response.writeHead(200, {
'Content-Type': 'application/pdf',
'Content-Length': contentLength,
'Accept-Ranges': 'bytes',
});
stream.pipe(response);
} else {
let [start, end] =
request.headers['range'].split('=')[1].split('-').map((x) => {
return Number(x);
});
let stream = fs.createReadStream(filePath, { start, end, });
response.writeHead(206, {
'Content-Type': 'application/pdf',
});
stream.pipe(response);
}
});
}).listen(0); /* Listen on a random free port */
port = server.address().port;
done();
});
afterAll((done) => {
// Close the server from accepting new connections after all test finishes.
server.close();
done();
});
it('read both http(s) and filesystem pdf files', function(done) {
let stream1 = new PDFNodeStream({
source: {
url: `http://127.0.0.1:${port}/tracemonkey.pdf`,
rangeChunkSize: 65536,
disableStream: true,
},
disableRange: true,
});
let stream2 = new PDFNodeStream({
source: {
url: pdf,
rangeChunkSize: 65536,
disableStream: true,
},
disableRange: true,
});
let fullReader1 = stream1.getFullReader();
let fullReader2 = stream2.getFullReader();
let isStreamingSupported1, isRangeSupported1;
let promise1 = fullReader1.headersReady.then(() => {
isStreamingSupported1 = fullReader1.isStreamingSupported;
isRangeSupported1 = fullReader1.isRangeSupported;
});
let isStreamingSupported2, isRangeSupported2;
let promise2 = fullReader2.headersReady.then(() => {
isStreamingSupported2 = fullReader2.isStreamingSupported;
isRangeSupported2 = fullReader2.isRangeSupported;
});
let len1 = 0, len2 = 0;
let read1 = function () {
return fullReader1.read().then(function (result) {
if (result.done) {
return;
}
len1 += result.value.byteLength;
return read1();
});
};
let read2 = function () {
return fullReader2.read().then(function (result) {
if (result.done) {
return;
}
len2 += result.value.byteLength;
return read2();
});
};
let readPromise = Promise.all([read1(), read2(), promise1, promise2]);
readPromise.then((result) => {
expect(isStreamingSupported1).toEqual(false);
expect(isRangeSupported1).toEqual(false);
expect(isStreamingSupported2).toEqual(false);
expect(isRangeSupported2).toEqual(false);
expect(len1).toEqual(pdfLength);
expect(len1).toEqual(len2);
done();
}).catch((reason) => {
done.fail(reason);
});
});
it('read custom ranges for both http(s) and filesystem urls',
function(done) {
let rangeSize = 32768;
let stream1 = new PDFNodeStream({
source: {
url: `http://127.0.0.1:${port}/tracemonkey.pdf`,
length: pdfLength,
rangeChunkSize: rangeSize,
disableStream: true,
},
disableRange: false,
});
let stream2 = new PDFNodeStream({
source: {
url: pdf,
length: pdfLength,
rangeChunkSize: rangeSize,
disableStream: true,
},
disableRange: false,
});
let fullReader1 = stream1.getFullReader();
let fullReader2 = stream2.getFullReader();
let isStreamingSupported1, isRangeSupported1, fullReaderCancelled1;
let isStreamingSupported2, isRangeSupported2, fullReaderCancelled2;
let promise1 = fullReader1.headersReady.then(function () {
isStreamingSupported1 = fullReader1.isStreamingSupported;
isRangeSupported1 = fullReader1.isRangeSupported;
// we shall be able to close the full reader without issues
fullReader1.cancel('Don\'t need full reader');
fullReaderCancelled1 = true;
});
let promise2 = fullReader2.headersReady.then(function () {
isStreamingSupported2 = fullReader2.isStreamingSupported;
isRangeSupported2 = fullReader2.isRangeSupported;
fullReader2.cancel('Don\'t need full reader');
fullReaderCancelled2 = true;
});
// Skipping fullReader results, requesting something from the PDF end.
let tailSize = (pdfLength % rangeSize) || rangeSize;
let range11Reader = stream1.getRangeReader(pdfLength - tailSize - rangeSize,
pdfLength - tailSize);
let range12Reader = stream1.getRangeReader(pdfLength - tailSize, pdfLength);
let range21Reader = stream2.getRangeReader(pdfLength - tailSize - rangeSize,
pdfLength - tailSize);
let range22Reader = stream2.getRangeReader(pdfLength - tailSize, pdfLength);
let result11 = { value: 0, }, result12 = { value: 0, };
let result21 = { value: 0, }, result22 = { value: 0, };
let read = function (reader, lenResult) {
return reader.read().then(function (result) {
if (result.done) {
return;
}
lenResult.value += result.value.byteLength;
return read(reader, lenResult);
});
};
let readPromises = Promise.all([read(range11Reader, result11),
read(range12Reader, result12),
read(range21Reader, result21),
read(range22Reader, result22),
promise1, promise2]);
readPromises.then(function () {
expect(result11.value).toEqual(rangeSize);
expect(result12.value).toEqual(tailSize);
expect(result21.value).toEqual(rangeSize);
expect(result22.value).toEqual(tailSize);
expect(isStreamingSupported1).toEqual(false);
expect(isRangeSupported1).toEqual(true);
expect(fullReaderCancelled1).toEqual(true);
expect(isStreamingSupported2).toEqual(false);
expect(isRangeSupported2).toEqual(true);
expect(fullReaderCancelled2).toEqual(true);
done();
}).catch(function (reason) {
done.fail(reason);
});
});
});