Adds ready capability rejection logic for stream sink.
This commit is contained in:
parent
38d566f1e5
commit
568b0b6a42
@ -1438,6 +1438,9 @@ MessageHandler.prototype = {
|
|||||||
|
|
||||||
let streamSink = {
|
let streamSink = {
|
||||||
enqueue(chunk, size = 1) {
|
enqueue(chunk, size = 1) {
|
||||||
|
if (this.isCancelled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
let lastDesiredSize = this.desiredSize;
|
let lastDesiredSize = this.desiredSize;
|
||||||
this.desiredSize -= size;
|
this.desiredSize -= size;
|
||||||
// Enqueue decreases the desiredSize property of sink,
|
// Enqueue decreases the desiredSize property of sink,
|
||||||
@ -1451,6 +1454,9 @@ MessageHandler.prototype = {
|
|||||||
},
|
},
|
||||||
|
|
||||||
close() {
|
close() {
|
||||||
|
if (this.isCancelled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
sendStreamRequest({ stream: 'close', });
|
sendStreamRequest({ stream: 'close', });
|
||||||
delete self.streamSinks[streamId];
|
delete self.streamSinks[streamId];
|
||||||
},
|
},
|
||||||
@ -1462,6 +1468,7 @@ MessageHandler.prototype = {
|
|||||||
sinkCapability: capability,
|
sinkCapability: capability,
|
||||||
onPull: null,
|
onPull: null,
|
||||||
onCancel: null,
|
onCancel: null,
|
||||||
|
isCancelled: false,
|
||||||
desiredSize,
|
desiredSize,
|
||||||
ready: null,
|
ready: null,
|
||||||
};
|
};
|
||||||
@ -1564,6 +1571,8 @@ MessageHandler.prototype = {
|
|||||||
sendStreamResponse({ stream: 'cancel_complete',
|
sendStreamResponse({ stream: 'cancel_complete',
|
||||||
success: false, reason, });
|
success: false, reason, });
|
||||||
});
|
});
|
||||||
|
this.streamSinks[data.streamId].sinkCapability.reject(data.reason);
|
||||||
|
this.streamSinks[data.streamId].isCancelled = true;
|
||||||
delete this.streamSinks[data.streamId];
|
delete this.streamSinks[data.streamId];
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
@ -154,7 +154,7 @@ describe('util_stream', function () {
|
|||||||
expect(log).toEqual('01p2');
|
expect(log).toEqual('01p2');
|
||||||
return reader.cancel();
|
return reader.cancel();
|
||||||
}).then(() => {
|
}).then(() => {
|
||||||
expect(log).toEqual('01p2c');
|
expect(log).toEqual('01p2c4');
|
||||||
done();
|
done();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
Loading…
Reference in New Issue
Block a user