Merge pull request #8696 from mukulmishra18/sink-ready-rejection
Adds ready capability rejection logic for stream sink.
This commit is contained in:
commit
bd8c12119a
@ -1438,6 +1438,9 @@ MessageHandler.prototype = {
|
||||
|
||||
let streamSink = {
|
||||
enqueue(chunk, size = 1) {
|
||||
if (this.isCancelled) {
|
||||
return;
|
||||
}
|
||||
let lastDesiredSize = this.desiredSize;
|
||||
this.desiredSize -= size;
|
||||
// Enqueue decreases the desiredSize property of sink,
|
||||
@ -1451,6 +1454,9 @@ MessageHandler.prototype = {
|
||||
},
|
||||
|
||||
close() {
|
||||
if (this.isCancelled) {
|
||||
return;
|
||||
}
|
||||
sendStreamRequest({ stream: 'close', });
|
||||
delete self.streamSinks[streamId];
|
||||
},
|
||||
@ -1462,6 +1468,7 @@ MessageHandler.prototype = {
|
||||
sinkCapability: capability,
|
||||
onPull: null,
|
||||
onCancel: null,
|
||||
isCancelled: false,
|
||||
desiredSize,
|
||||
ready: null,
|
||||
};
|
||||
@ -1564,6 +1571,8 @@ MessageHandler.prototype = {
|
||||
sendStreamResponse({ stream: 'cancel_complete',
|
||||
success: false, reason, });
|
||||
});
|
||||
this.streamSinks[data.streamId].sinkCapability.reject(data.reason);
|
||||
this.streamSinks[data.streamId].isCancelled = true;
|
||||
delete this.streamSinks[data.streamId];
|
||||
break;
|
||||
default:
|
||||
|
@ -154,7 +154,7 @@ describe('util_stream', function () {
|
||||
expect(log).toEqual('01p2');
|
||||
return reader.cancel();
|
||||
}).then(() => {
|
||||
expect(log).toEqual('01p2c');
|
||||
expect(log).toEqual('01p2c4');
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
Loading…
Reference in New Issue
Block a user