/* Copyright 2017 Mozilla Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import {
  AbortException,
  createPromiseCapability,
  UnknownErrorException,
} from "../../src/shared/util.js";
import { LoopbackPort } from "../../src/display/api.js";
import { MessageHandler } from "../../src/shared/message_handler.js";

describe("message_handler", function () {
  // Sleep function to wait for sometime, similar to setTimeout but faster.
  function sleep(ticks) {
    return Promise.resolve().then(() => {
      return ticks && sleep(ticks - 1);
    });
  }

  describe("sendWithStream", function () {
    it("should return a ReadableStream", function () {
      const port = new LoopbackPort();
      const messageHandler1 = new MessageHandler("main", "worker", port);
      const readable = messageHandler1.sendWithStream("fakeHandler");
      // Check if readable is an instance of ReadableStream.
      expect(typeof readable).toEqual("object");
      expect(typeof readable.getReader).toEqual("function");
    });

    it("should read using a reader", async function () {
      let log = "";
      const port = new LoopbackPort();
      const messageHandler1 = new MessageHandler("main", "worker", port);
      const messageHandler2 = new MessageHandler("worker", "main", port);
      messageHandler2.on("fakeHandler", (data, sink) => {
        sink.onPull = function () {
          log += "p";
        };
        sink.onCancel = function (reason) {
          log += "c";
        };
        sink.ready
          .then(() => {
            sink.enqueue("hi");
            return sink.ready;
          })
          .then(() => {
            sink.close();
          });
        return sleep(5);
      });
      const readable = messageHandler1.sendWithStream(
        "fakeHandler",
        {},
        {
          highWaterMark: 1,
          size() {
            return 1;
          },
        }
      );

      const reader = readable.getReader();
      await sleep(10);
      expect(log).toEqual("");

      let result = await reader.read();
      expect(log).toEqual("p");
      expect(result.value).toEqual("hi");
      expect(result.done).toEqual(false);

      await sleep(10);
      result = await reader.read();
      expect(result.value).toEqual(undefined);
      expect(result.done).toEqual(true);
    });

    it("should not read any data when cancelled", async function () {
      let log = "";
      const port = new LoopbackPort();
      const messageHandler2 = new MessageHandler("worker", "main", port);
      messageHandler2.on("fakeHandler", (data, sink) => {
        sink.onPull = function () {
          log += "p";
        };
        sink.onCancel = function (reason) {
          log += "c";
        };
        log += "0";
        sink.ready
          .then(() => {
            log += "1";
            sink.enqueue([1, 2, 3, 4], 4);
            return sink.ready;
          })
          .then(() => {
            log += "2";
            sink.enqueue([5, 6, 7, 8], 4);
            return sink.ready;
          })
          .then(
            () => {
              log += "3";
              sink.close();
            },
            () => {
              log += "4";
            }
          );
      });
      const messageHandler1 = new MessageHandler("main", "worker", port);
      const readable = messageHandler1.sendWithStream(
        "fakeHandler",
        {},
        {
          highWaterMark: 4,
          size(arr) {
            return arr.length;
          },
        }
      );

      const reader = readable.getReader();
      await sleep(10);
      expect(log).toEqual("01");

      const result = await reader.read();
      expect(result.value).toEqual([1, 2, 3, 4]);
      expect(result.done).toEqual(false);

      await sleep(10);
      expect(log).toEqual("01p2");

      await reader.cancel(new AbortException("reader cancelled."));
      expect(log).toEqual("01p2c4");
    });

    it("should not read when errored", async function () {
      let log = "";
      const port = new LoopbackPort();
      const messageHandler2 = new MessageHandler("worker", "main", port);
      messageHandler2.on("fakeHandler", (data, sink) => {
        sink.onPull = function () {
          log += "p";
        };
        sink.onCancel = function (reason) {
          log += "c";
        };
        log += "0";
        sink.ready
          .then(() => {
            log += "1";
            sink.enqueue([1, 2, 3, 4], 4);
            return sink.ready;
          })
          .then(() => {
            log += "e";
            sink.error(new Error("should not read when errored"));
          });
      });
      const messageHandler1 = new MessageHandler("main", "worker", port);
      const readable = messageHandler1.sendWithStream(
        "fakeHandler",
        {},
        {
          highWaterMark: 4,
          size(arr) {
            return arr.length;
          },
        }
      );

      const reader = readable.getReader();
      await sleep(10);
      expect(log).toEqual("01");

      const result = await reader.read();
      expect(result.value).toEqual([1, 2, 3, 4]);
      expect(result.done).toEqual(false);

      try {
        await reader.read();

        // Shouldn't get here.
        expect(false).toEqual(true);
      } catch (reason) {
        expect(log).toEqual("01pe");
        expect(reason instanceof UnknownErrorException).toEqual(true);
        expect(reason.message).toEqual("should not read when errored");
      }
    });

    it("should read data with blocking promise", async function () {
      let log = "";
      const port = new LoopbackPort();
      const messageHandler2 = new MessageHandler("worker", "main", port);
      messageHandler2.on("fakeHandler", (data, sink) => {
        sink.onPull = function () {
          log += "p";
        };
        sink.onCancel = function (reason) {
          log += "c";
        };
        log += "0";
        sink.ready
          .then(() => {
            log += "1";
            sink.enqueue([1, 2, 3, 4], 4);
            return sink.ready;
          })
          .then(() => {
            log += "2";
            sink.enqueue([5, 6, 7, 8], 4);
            return sink.ready;
          })
          .then(() => {
            sink.close();
          });
      });

      const messageHandler1 = new MessageHandler("main", "worker", port);
      const readable = messageHandler1.sendWithStream(
        "fakeHandler",
        {},
        {
          highWaterMark: 4,
          size(arr) {
            return arr.length;
          },
        }
      );

      const reader = readable.getReader();
      // Sleep for 10ms, so that read() is not unblocking the ready promise.
      // Chain all read() to stream in sequence.
      await sleep(10);
      expect(log).toEqual("01");

      let result = await reader.read();
      expect(result.value).toEqual([1, 2, 3, 4]);
      expect(result.done).toEqual(false);

      await sleep(10);
      expect(log).toEqual("01p2");

      result = await reader.read();
      expect(result.value).toEqual([5, 6, 7, 8]);
      expect(result.done).toEqual(false);

      await sleep(10);
      expect(log).toEqual("01p2p");

      result = await reader.read();
      expect(result.value).toEqual(undefined);
      expect(result.done).toEqual(true);
    });

    it(
      "should read data with blocking promise and buffer whole data" +
        " into stream",
      async function () {
        let log = "";
        const port = new LoopbackPort();
        const messageHandler2 = new MessageHandler("worker", "main", port);
        messageHandler2.on("fakeHandler", (data, sink) => {
          sink.onPull = function () {
            log += "p";
          };
          sink.onCancel = function (reason) {
            log += "c";
          };
          log += "0";
          sink.ready
            .then(() => {
              log += "1";
              sink.enqueue([1, 2, 3, 4], 4);
              return sink.ready;
            })
            .then(() => {
              log += "2";
              sink.enqueue([5, 6, 7, 8], 4);
              return sink.ready;
            })
            .then(() => {
              sink.close();
            });
          return sleep(10);
        });

        const messageHandler1 = new MessageHandler("main", "worker", port);
        const readable = messageHandler1.sendWithStream(
          "fakeHandler",
          {},
          {
            highWaterMark: 8,
            size(arr) {
              return arr.length;
            },
          }
        );

        const reader = readable.getReader();
        await sleep(10);
        expect(log).toEqual("012");

        let result = await reader.read();
        expect(result.value).toEqual([1, 2, 3, 4]);
        expect(result.done).toEqual(false);

        await sleep(10);
        expect(log).toEqual("012p");

        result = await reader.read();
        expect(result.value).toEqual([5, 6, 7, 8]);
        expect(result.done).toEqual(false);

        await sleep(10);
        expect(log).toEqual("012p");

        result = await reader.read();
        expect(result.value).toEqual(undefined);
        expect(result.done).toEqual(true);
      }
    );

    it("should ignore any pull after close is called", async function () {
      let log = "";
      const port = new LoopbackPort();
      const capability = createPromiseCapability();
      const messageHandler2 = new MessageHandler("worker", "main", port);
      messageHandler2.on("fakeHandler", (data, sink) => {
        sink.onPull = function () {
          log += "p";
        };
        sink.onCancel = function (reason) {
          log += "c";
        };
        log += "0";
        sink.ready.then(() => {
          log += "1";
          sink.enqueue([1, 2, 3, 4], 4);
        });
        return capability.promise.then(() => {
          sink.close();
        });
      });

      const messageHandler1 = new MessageHandler("main", "worker", port);
      const readable = messageHandler1.sendWithStream(
        "fakeHandler",
        {},
        {
          highWaterMark: 10,
          size(arr) {
            return arr.length;
          },
        }
      );

      const reader = readable.getReader();
      await sleep(10);
      expect(log).toEqual("01");

      capability.resolve();
      await capability.promise;

      let result = await reader.read();
      expect(result.value).toEqual([1, 2, 3, 4]);
      expect(result.done).toEqual(false);

      await sleep(10);
      expect(log).toEqual("01");

      result = await reader.read();
      expect(result.value).toEqual(undefined);
      expect(result.done).toEqual(true);
    });
  });
});