export type BeforeClose = () => Promise<void> | void;

/**
 * Signals the start of a chunk.
 */
export const START_CHUNK = 0x02;
/**
 * Signals the end of a chunk.
 */
export const END_CHUNK = 0x03;
/**
 * The channel byte, used to signal the start and end of a channel.
 */
export const CHANNEL = 0x00;

/**
 * Creates a new stream that allows for multiplexing multiple streams into a single stream.
 */
export class MultiplexedStream extends ReadableStream<Uint8Array> {
  /**
   * The controller for the mux stream.
   */
  private controller!: ReadableStreamDefaultController<Uint8Array>;

  /**
   * The current channel number.
   */
  private channel = 0x04;

  /**
   * The number of active channels, when this reaches zero the stream will close.
   */
  public activeChannels = 0;

  /**
   * Don’t terminate the stream until the cool down has passed and there are no active channels.
   */
  private coolDown = 200;

  /**
   * Prevent the stream from closing on its own.
   */
  public preventClose = false;

  /**
   * The callback function to call when the stream is closed.
   */
  private beforeClose: BeforeClose[];

  /**
   * The encoder for the stream.
   */
  private encode: TextEncoder["encode"];
  /**
   * The decoder for the stream.
   */
  private decode: TextDecoder["decode"];

  constructor(beforeClose: BeforeClose | BeforeClose[] = []) {
    let ctrl;
    super({
      start: (controller) => {
        ctrl = controller;
      },
    });
    this.controller = ctrl!;
    const encode = new TextEncoder();
    this.encode = encode.encode.bind(encode);
    const decode = new TextDecoder();
    this.decode = decode.decode.bind(decode);
    this.beforeClose = Array.isArray(beforeClose) ? beforeClose : [beforeClose];
  }

  /**
   * Send a message to a channel.
   * @param id - The id of the channel to send the message to
   * @param data - The data to send to the channel
   */
  send(id: number, data: Uint8Array) {
    this.controller.enqueue(new Uint8Array([0x02, id, ...data, 0x03]));
  }

  /**
   * Creates a new channel and immediately sends data to it then closes it.
   * @param channelName - The name of the channel
   * @param data - The data to send to the channel
   */
  sendChannel(newChannelLabel: string, data: string) {
    const id = this.channel++;
    this.activeChannels++;
    this.sendChannelLabel(id, newChannelLabel);
    this.send(id, new TextEncoder().encode(data));
    this.closeChannel(id);
  }

  /**
   * Sends a label for a channel.
   * @param id - The id of the channel
   * @param label - The label to send
   */
  sendChannelLabel(id: number, label: string) {
    const newChannelMessage = new Uint8Array([
      0x02,
      0x00,
      id,
      ...this.encode(label),
      0x03,
    ]);
    this.controller.enqueue(newChannelMessage);
  }

  /**
   * Adds a new stream to the multiplex.
   * @param label - The label for the stream
   * @param stream - The source stream to add to the multiplex
   */
  addStream(
    label: string | object,
    reader: ReadableStreamDefaultReader<Uint8Array>
  ): Promise<void> {
    const id = this.channel++;
    this.activeChannels++;
    if (label && typeof label === "object") label = JSON.stringify(label);
    this.sendChannelLabel(id, label);
    return this.stream(id, reader);
  }

  /**
   * Begins streaming a new stream into the multiplex.
   * @param id - The id of the stream
   * @param stream - The source stream to add to the multiplex
   */
  async stream(id: number, reader: ReadableStreamDefaultReader<Uint8Array>) {
    const { done, value: chunk } = await reader.read();
    if (chunk) this.send(id, chunk);
    if (done) {
      reader.releaseLock();
      this.closeChannel(id);
    } else {
      await this.stream(id, reader);
    }
  }

  /**
   * Waits for a callback to complete before closing the multiplex.
   * @param callback - The callback to wait for
   */
  async waitFor(callback: CallableFunction) {
    this.activeChannels++;
    await callback();
    this.closeChannel();
  }

  /**
   * Add additional before close events.
   * @param beforeClose - Add additional before close events.
   */
  addBeforeClose(beforeClose: BeforeClose | BeforeClose[]) {
    this.beforeClose.push(
      ...(Array.isArray(beforeClose) ? beforeClose : [beforeClose])
    );
  }

  /**
   * Closes a channel in the multiplex.
   * @param id - The id of the channel to close
   */
  closeChannel(id?: number) {
    if (id) {
      const closeChannel = new Uint8Array([0x02, id, 0x00, 0x03]);
      this.controller.enqueue(closeChannel);
    }
    this.activeChannels--;
    if (this.activeChannels <= 0 && !this.preventClose) {
      setTimeout(async () => {
        if (this.activeChannels <= 0) {
          await Promise.all(this.beforeClose.map((cb) => cb()));
          this.controller.close();
        }
      }, this.coolDown);
    }
  }
}
