Use streams for OperatorList chunking (issue 10023)
*Please note:* The majority of this patch was written by Yury, and it's simply been rebased and slightly extended to prevent issues when dealing with `RenderingCancelledException`. By leveraging streams this (finally) provides a simple way in which parsing can be aborted on the worker-thread, which will ultimately help save resources. With this patch worker-thread parsing will *only* be aborted when the document is destroyed, and not when rendering is cancelled. There's a couple of reasons for this: - The API currently expects the *entire* OperatorList to be extracted, or an Error to occur, once it's been started. Hence additional re-factoring/re-writing of the API code will be necessary to properly support cancelling and re-starting of OperatorList parsing in cases where the `lastChunk` hasn't yet been seen. - Even with the above addressed, immediately cancelling when encountering a `RenderingCancelledException` will lead to worse performance in e.g. the default viewer. When zooming and/or rotation of the document occurs it's very likely that `cancel` will be (almost) immediately followed by a new `render` call. In that case you'd obviously *not* want to abort parsing on the worker-thread, since then you'd risk throwing away a partially parsed Page and thus be forced to re-parse it again which will regress perceived performance. - This patch is already *somewhat* risky, given that it touches fundamentally important/critical code, and trying to keep it somewhat small should hopefully reduce the risk of regressions (and simplify reviewing as well). Time permitting, once this has landed and been in Nightly for awhile, I'll try to work on the remaining points outlined above. Co-Authored-By: Yury Delendik <ydelendik@mozilla.com> Co-Authored-By: Jonas Jenwald <jonas.jenwald@gmail.com>
This commit is contained in:
parent
ee75fc1298
commit
66e0dd1b06
@ -195,7 +195,7 @@ class Page {
|
||||
});
|
||||
}
|
||||
|
||||
getOperatorList({ handler, task, intent, renderInteractiveForms, }) {
|
||||
getOperatorList({ handler, sink, task, intent, renderInteractiveForms, }) {
|
||||
const contentStreamPromise = this.pdfManager.ensure(this,
|
||||
'getContentStream');
|
||||
const resourcesPromise = this.loadResources([
|
||||
@ -220,7 +220,7 @@ class Page {
|
||||
|
||||
const dataPromises = Promise.all([contentStreamPromise, resourcesPromise]);
|
||||
const pageListPromise = dataPromises.then(([contentStream]) => {
|
||||
const opList = new OperatorList(intent, handler, this.pageIndex);
|
||||
const opList = new OperatorList(intent, sink, this.pageIndex);
|
||||
|
||||
handler.send('StartRenderPage', {
|
||||
transparency: partialEvaluator.hasBlendModes(this.resources),
|
||||
@ -244,7 +244,7 @@ class Page {
|
||||
function([pageOpList, annotations]) {
|
||||
if (annotations.length === 0) {
|
||||
pageOpList.flush(true);
|
||||
return pageOpList;
|
||||
return { length: pageOpList.totalLength, };
|
||||
}
|
||||
|
||||
// Collect the operator list promises for the annotations. Each promise
|
||||
@ -264,7 +264,7 @@ class Page {
|
||||
}
|
||||
pageOpList.addOp(OPS.endAnnotations, []);
|
||||
pageOpList.flush(true);
|
||||
return pageOpList;
|
||||
return { length: pageOpList.totalLength, };
|
||||
});
|
||||
});
|
||||
}
|
||||
|
@ -541,6 +541,9 @@ var PartialEvaluator = (function PartialEvaluatorClosure() {
|
||||
operatorList.addDependencies(tilingOpList.dependencies);
|
||||
operatorList.addOp(fn, tilingPatternIR);
|
||||
}, (reason) => {
|
||||
if (reason instanceof AbortException) {
|
||||
return;
|
||||
}
|
||||
if (this.options.ignoreErrors) {
|
||||
// Error(s) in the TilingPattern -- sending unsupported feature
|
||||
// notification and allow rendering to continue.
|
||||
@ -918,8 +921,8 @@ var PartialEvaluator = (function PartialEvaluatorClosure() {
|
||||
}
|
||||
|
||||
return new Promise(function promiseBody(resolve, reject) {
|
||||
var next = function (promise) {
|
||||
promise.then(function () {
|
||||
let next = function(promise) {
|
||||
Promise.all([promise, operatorList.ready]).then(function () {
|
||||
try {
|
||||
promiseBody(resolve, reject);
|
||||
} catch (ex) {
|
||||
@ -1000,6 +1003,9 @@ var PartialEvaluator = (function PartialEvaluatorClosure() {
|
||||
}
|
||||
resolveXObject();
|
||||
}).catch(function(reason) {
|
||||
if (reason instanceof AbortException) {
|
||||
return;
|
||||
}
|
||||
if (self.options.ignoreErrors) {
|
||||
// Error(s) in the XObject -- sending unsupported feature
|
||||
// notification and allow rendering to continue.
|
||||
@ -1230,6 +1236,9 @@ var PartialEvaluator = (function PartialEvaluatorClosure() {
|
||||
closePendingRestoreOPS();
|
||||
resolve();
|
||||
}).catch((reason) => {
|
||||
if (reason instanceof AbortException) {
|
||||
return;
|
||||
}
|
||||
if (this.options.ignoreErrors) {
|
||||
// Error(s) in the OperatorList -- sending unsupported feature
|
||||
// notification and allow rendering to continue.
|
||||
|
@ -541,11 +541,11 @@ var OperatorList = (function OperatorListClosure() {
|
||||
var CHUNK_SIZE = 1000;
|
||||
var CHUNK_SIZE_ABOUT = CHUNK_SIZE - 5; // close to chunk size
|
||||
|
||||
function OperatorList(intent, messageHandler, pageIndex) {
|
||||
this.messageHandler = messageHandler;
|
||||
function OperatorList(intent, streamSink, pageIndex) {
|
||||
this._streamSink = streamSink;
|
||||
this.fnArray = [];
|
||||
this.argsArray = [];
|
||||
if (messageHandler && intent !== 'oplist') {
|
||||
if (streamSink && intent !== 'oplist') {
|
||||
this.optimizer = new QueueOptimizer(this);
|
||||
} else {
|
||||
this.optimizer = new NullOptimizer(this);
|
||||
@ -555,6 +555,7 @@ var OperatorList = (function OperatorListClosure() {
|
||||
this.pageIndex = pageIndex;
|
||||
this.intent = intent;
|
||||
this.weight = 0;
|
||||
this._resolved = streamSink ? null : Promise.resolve();
|
||||
}
|
||||
|
||||
OperatorList.prototype = {
|
||||
@ -562,6 +563,10 @@ var OperatorList = (function OperatorListClosure() {
|
||||
return this.argsArray.length;
|
||||
},
|
||||
|
||||
get ready() {
|
||||
return this._resolved || this._streamSink.ready;
|
||||
},
|
||||
|
||||
/**
|
||||
* @returns {number} The total length of the entire operator list,
|
||||
* since `this.length === 0` after flushing.
|
||||
@ -573,7 +578,7 @@ var OperatorList = (function OperatorListClosure() {
|
||||
addOp(fn, args) {
|
||||
this.optimizer.push(fn, args);
|
||||
this.weight++;
|
||||
if (this.messageHandler) {
|
||||
if (this._streamSink) {
|
||||
if (this.weight >= CHUNK_SIZE) {
|
||||
this.flush();
|
||||
} else if (this.weight >= CHUNK_SIZE_ABOUT &&
|
||||
@ -642,7 +647,7 @@ var OperatorList = (function OperatorListClosure() {
|
||||
const length = this.length;
|
||||
this._totalLength += length;
|
||||
|
||||
this.messageHandler.send('RenderPageChunk', {
|
||||
this._streamSink.enqueue({
|
||||
operatorList: {
|
||||
fnArray: this.fnArray,
|
||||
argsArray: this.argsArray,
|
||||
@ -651,7 +656,7 @@ var OperatorList = (function OperatorListClosure() {
|
||||
},
|
||||
pageIndex: this.pageIndex,
|
||||
intent: this.intent,
|
||||
}, this._transfers);
|
||||
}, 1, this._transfers);
|
||||
|
||||
this.dependencies = Object.create(null);
|
||||
this.fnArray.length = 0;
|
||||
|
@ -466,10 +466,10 @@ var WorkerMessageHandler = {
|
||||
});
|
||||
});
|
||||
|
||||
handler.on('RenderPageRequest', function wphSetupRenderPage(data) {
|
||||
handler.on('GetOperatorList', function wphSetupRenderPage(data, sink) {
|
||||
var pageIndex = data.pageIndex;
|
||||
pdfManager.getPage(pageIndex).then(function(page) {
|
||||
var task = new WorkerTask('RenderPageRequest: page ' + pageIndex);
|
||||
var task = new WorkerTask(`GetOperatorList: page ${pageIndex}`);
|
||||
startWorkerTask(task);
|
||||
|
||||
// NOTE: Keep this condition in sync with the `info` helper function.
|
||||
@ -478,55 +478,30 @@ var WorkerMessageHandler = {
|
||||
// Pre compile the pdf page and fetch the fonts/images.
|
||||
page.getOperatorList({
|
||||
handler,
|
||||
sink,
|
||||
task,
|
||||
intent: data.intent,
|
||||
renderInteractiveForms: data.renderInteractiveForms,
|
||||
}).then(function(operatorList) {
|
||||
}).then(function(operatorListInfo) {
|
||||
finishWorkerTask(task);
|
||||
|
||||
if (start) {
|
||||
info(`page=${pageIndex + 1} - getOperatorList: time=` +
|
||||
`${Date.now() - start}ms, len=${operatorList.totalLength}`);
|
||||
`${Date.now() - start}ms, len=${operatorListInfo.length}`);
|
||||
}
|
||||
}, function(e) {
|
||||
sink.close();
|
||||
}, function(reason) {
|
||||
finishWorkerTask(task);
|
||||
if (task.terminated) {
|
||||
return; // ignoring errors from the terminated thread
|
||||
}
|
||||
|
||||
// For compatibility with older behavior, generating unknown
|
||||
// unsupported feature notification on errors.
|
||||
handler.send('UnsupportedFeature',
|
||||
{ featureId: UNSUPPORTED_FEATURES.unknown, });
|
||||
|
||||
var minimumStackMessage =
|
||||
'worker.js: while trying to getPage() and getOperatorList()';
|
||||
|
||||
var wrappedException;
|
||||
|
||||
// Turn the error into an obj that can be serialized
|
||||
if (typeof e === 'string') {
|
||||
wrappedException = {
|
||||
message: e,
|
||||
stack: minimumStackMessage,
|
||||
};
|
||||
} else if (typeof e === 'object') {
|
||||
wrappedException = {
|
||||
message: e.message || e.toString(),
|
||||
stack: e.stack || minimumStackMessage,
|
||||
};
|
||||
} else {
|
||||
wrappedException = {
|
||||
message: 'Unknown exception type: ' + (typeof e),
|
||||
stack: minimumStackMessage,
|
||||
};
|
||||
}
|
||||
|
||||
handler.send('PageError', {
|
||||
pageIndex,
|
||||
error: wrappedException,
|
||||
intent: data.intent,
|
||||
});
|
||||
sink.error(reason);
|
||||
throw reason;
|
||||
});
|
||||
});
|
||||
}, this);
|
||||
|
@ -1032,7 +1032,7 @@ class PDFPageProxy {
|
||||
};
|
||||
|
||||
stats.time('Page Request');
|
||||
this._transport.messageHandler.send('RenderPageRequest', {
|
||||
this._pumpOperatorList({
|
||||
pageIndex: this.pageNumber - 1,
|
||||
intent: renderingIntent,
|
||||
renderInteractiveForms: renderInteractiveForms === true,
|
||||
@ -1054,6 +1054,11 @@ class PDFPageProxy {
|
||||
|
||||
if (error) {
|
||||
internalRenderTask.capability.reject(error);
|
||||
|
||||
this._abortOperatorList({
|
||||
intentState,
|
||||
reason: error,
|
||||
});
|
||||
} else {
|
||||
internalRenderTask.capability.resolve();
|
||||
}
|
||||
@ -1135,7 +1140,7 @@ class PDFPageProxy {
|
||||
};
|
||||
|
||||
this._stats.time('Page Request');
|
||||
this._transport.messageHandler.send('RenderPageRequest', {
|
||||
this._pumpOperatorList({
|
||||
pageIndex: this.pageIndex,
|
||||
intent: renderingIntent,
|
||||
});
|
||||
@ -1201,19 +1206,25 @@ class PDFPageProxy {
|
||||
this._transport.pageCache[this.pageIndex] = null;
|
||||
|
||||
const waitOn = [];
|
||||
Object.keys(this.intentStates).forEach(function(intent) {
|
||||
Object.keys(this.intentStates).forEach((intent) => {
|
||||
const intentState = this.intentStates[intent];
|
||||
this._abortOperatorList({
|
||||
intentState,
|
||||
reason: new Error('Page was destroyed.'),
|
||||
force: true,
|
||||
});
|
||||
|
||||
if (intent === 'oplist') {
|
||||
// Avoid errors below, since the renderTasks are just stubs.
|
||||
return;
|
||||
}
|
||||
const intentState = this.intentStates[intent];
|
||||
intentState.renderTasks.forEach(function(renderTask) {
|
||||
const renderCompleted = renderTask.capability.promise.
|
||||
catch(function() {}); // ignoring failures
|
||||
waitOn.push(renderCompleted);
|
||||
renderTask.cancel();
|
||||
});
|
||||
}, this);
|
||||
});
|
||||
this.objs.clear();
|
||||
this.annotationsPromise = null;
|
||||
this.pendingCleanup = false;
|
||||
@ -1273,8 +1284,7 @@ class PDFPageProxy {
|
||||
* For internal use only.
|
||||
* @ignore
|
||||
*/
|
||||
_renderPageChunk(operatorListChunk, intent) {
|
||||
const intentState = this.intentStates[intent];
|
||||
_renderPageChunk(operatorListChunk, intentState) {
|
||||
// Add the new chunk to the current operator list.
|
||||
for (let i = 0, ii = operatorListChunk.length; i < ii; i++) {
|
||||
intentState.operatorList.fnArray.push(operatorListChunk.fnArray[i]);
|
||||
@ -1293,6 +1303,86 @@ class PDFPageProxy {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For internal use only.
|
||||
* @ignore
|
||||
*/
|
||||
_pumpOperatorList(args) {
|
||||
assert(args.intent,
|
||||
'PDFPageProxy._pumpOperatorList: Expected "intent" argument.');
|
||||
|
||||
const readableStream =
|
||||
this._transport.messageHandler.sendWithStream('GetOperatorList', args);
|
||||
const reader = readableStream.getReader();
|
||||
|
||||
const intentState = this.intentStates[args.intent];
|
||||
intentState.streamReader = reader;
|
||||
|
||||
const pump = () => {
|
||||
reader.read().then(({ value, done, }) => {
|
||||
if (done) {
|
||||
intentState.streamReader = null;
|
||||
return;
|
||||
}
|
||||
if (this._transport.destroyed) {
|
||||
return; // Ignore any pending requests if the worker was terminated.
|
||||
}
|
||||
this._renderPageChunk(value.operatorList, intentState);
|
||||
pump();
|
||||
}, (reason) => {
|
||||
intentState.streamReader = null;
|
||||
|
||||
if (this._transport.destroyed) {
|
||||
return; // Ignore any pending requests if the worker was terminated.
|
||||
}
|
||||
if (intentState.operatorList) {
|
||||
// Mark operator list as complete.
|
||||
intentState.operatorList.lastChunk = true;
|
||||
|
||||
for (let i = 0; i < intentState.renderTasks.length; i++) {
|
||||
intentState.renderTasks[i].operatorListChanged();
|
||||
}
|
||||
this._tryCleanup();
|
||||
}
|
||||
|
||||
if (intentState.displayReadyCapability) {
|
||||
intentState.displayReadyCapability.reject(reason);
|
||||
} else if (intentState.opListReadCapability) {
|
||||
intentState.opListReadCapability.reject(reason);
|
||||
} else {
|
||||
throw reason;
|
||||
}
|
||||
});
|
||||
};
|
||||
pump();
|
||||
}
|
||||
|
||||
/**
|
||||
* For internal use only.
|
||||
* @ignore
|
||||
*/
|
||||
_abortOperatorList({ intentState, reason, force = false, }) {
|
||||
assert(reason instanceof Error,
|
||||
'PDFPageProxy._abortOperatorList: Expected "reason" argument.');
|
||||
|
||||
if (!intentState.streamReader) {
|
||||
return;
|
||||
}
|
||||
if (!force && intentState.renderTasks.length !== 0) {
|
||||
// Ensure that an Error occuring in *only* one `InternalRenderTask`, e.g.
|
||||
// multiple render() calls on the same canvas, won't break all rendering.
|
||||
return;
|
||||
}
|
||||
if (reason instanceof RenderingCancelledException) {
|
||||
// Aborting parsing on the worker-thread when rendering is cancelled will
|
||||
// break subsequent rendering operations. TODO: Remove this restriction.
|
||||
return;
|
||||
}
|
||||
intentState.streamReader.cancel(
|
||||
new AbortException(reason && reason.message));
|
||||
intentState.streamReader = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {Object} Returns page stats, if enabled.
|
||||
*/
|
||||
@ -1955,15 +2045,6 @@ class WorkerTransport {
|
||||
page._startRenderPage(data.transparency, data.intent);
|
||||
}, this);
|
||||
|
||||
messageHandler.on('RenderPageChunk', function(data) {
|
||||
if (this.destroyed) {
|
||||
return; // Ignore any pending requests if the worker was terminated.
|
||||
}
|
||||
|
||||
const page = this.pageCache[data.pageIndex];
|
||||
page._renderPageChunk(data.operatorList, data.intent);
|
||||
}, this);
|
||||
|
||||
messageHandler.on('commonobj', function(data) {
|
||||
if (this.destroyed) {
|
||||
return; // Ignore any pending requests if the worker was terminated.
|
||||
@ -2083,33 +2164,6 @@ class WorkerTransport {
|
||||
}
|
||||
}, this);
|
||||
|
||||
messageHandler.on('PageError', function(data) {
|
||||
if (this.destroyed) {
|
||||
return; // Ignore any pending requests if the worker was terminated.
|
||||
}
|
||||
|
||||
const page = this.pageCache[data.pageIndex];
|
||||
const intentState = page.intentStates[data.intent];
|
||||
|
||||
if (intentState.operatorList) {
|
||||
// Mark operator list as complete.
|
||||
intentState.operatorList.lastChunk = true;
|
||||
|
||||
for (let i = 0; i < intentState.renderTasks.length; i++) {
|
||||
intentState.renderTasks[i].operatorListChanged();
|
||||
}
|
||||
page._tryCleanup();
|
||||
}
|
||||
|
||||
if (intentState.displayReadyCapability) {
|
||||
intentState.displayReadyCapability.reject(new Error(data.error));
|
||||
} else if (intentState.opListReadCapability) {
|
||||
intentState.opListReadCapability.reject(new Error(data.error));
|
||||
} else {
|
||||
throw new Error(data.error);
|
||||
}
|
||||
}, this);
|
||||
|
||||
messageHandler.on('UnsupportedFeature', this._onUnsupportedFeature, this);
|
||||
|
||||
messageHandler.on('JpegDecode', function(data) {
|
||||
|
@ -1281,6 +1281,11 @@ describe('api', function() {
|
||||
|
||||
it('gets operatorList, from corrupt PDF file (issue 8702), ' +
|
||||
'with/without `stopAtErrors` set', function(done) {
|
||||
if (isNodeJS()) {
|
||||
pending(
|
||||
'Fails with "Unhandled promise rejection: ..." errors in Node.js.');
|
||||
}
|
||||
|
||||
const loadingTask1 = getDocument(buildGetDocumentParams('issue8702.pdf', {
|
||||
stopAtErrors: false, // The default value.
|
||||
}));
|
||||
|
@ -320,13 +320,12 @@ describe('evaluator', function() {
|
||||
});
|
||||
|
||||
describe('operator list', function () {
|
||||
function MessageHandlerMock() { }
|
||||
MessageHandlerMock.prototype = {
|
||||
send() { },
|
||||
};
|
||||
class StreamSinkMock {
|
||||
enqueue() { }
|
||||
}
|
||||
|
||||
it('should get correct total length after flushing', function () {
|
||||
var operatorList = new OperatorList(null, new MessageHandlerMock());
|
||||
var operatorList = new OperatorList(null, new StreamSinkMock());
|
||||
operatorList.addOp(OPS.save, null);
|
||||
operatorList.addOp(OPS.restore, null);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user