import { START_CHUNK, END_CHUNK, CHANNEL } from "./MultiplexedStream";
import JSON5 from "json5";
import { cloneAny, token } from "@formkit/utils";
import OpenAI from "openai";
import type {
  ChatMessage,
  FormKitSchemaNode,
  FormNode,
  FormStore,
  LLMChatMessageWithError,
} from "~/types";
import { inputsRequiringDetails } from "../prompts/inputs";
import { itemsInOutline } from "./streamProcessors";
import { jsonReader } from "./jsonReader";
import { getInputByName } from "./schema";
/**
 * A list of the supported models and their respective providers.
 * NOTE: These are prices per 1000 tokens.
 */
export const SUPPORTED_MODELS: Record<
  string,
  { input: number; output: number; provider: "openai" | "groq" }
> = {
  "gpt-4o": {
    input: 0.0025,
    output: 0.00125,
    provider: "openai",
  },
  "gpt-4o-2024-08-06": {
    input: 0.0025,
    output: 0.01,
    provider: "openai",
  },
  "gpt-4o-2024-05-13": {
    input: 0.005,
    output: 0.015,
    provider: "openai",
  },
  "gpt-4o-mini": {
    input: 0.00015,
    output: 0.0006,
    provider: "openai",
  },
  "gpt-4o-mini-2024-07-18": {
    input: 0.00015,
    output: 0.0006,
    provider: "openai",
  },
  "llama3-8b-8192": {
    input: 0.00005,
    output: 0.00008,
    provider: "groq",
  },
  "llama3-groq-8b-8192-tool-use-preview": {
    input: 0.00019,
    output: 0.00019,
    provider: "groq",
  },
  "llama-3.1-8b-instant": {
    input: 0.00005,
    output: 0.00008,
    provider: "groq",
  },
};

/**
 * The token cost channel name — used to track the cost of the chat completion.
 * We use a faux channel name here to not make our cost-basis too obvious.
 */
export const TOKEN_COST = "variability";

/**
 * This is the total dollar price we are willing to grant during a free trial.
 */
export const MAX_THRESHOLD = 0.14;

/**
 * The maximum monthly threshold for a user to be able to make edits.
 */
export const MAX_MONTHLY_THRESHOLD = 10.0;

/**
 * The class list for the loading spinner on an input.
 */
export const LOADING_CLASS_LIST = `relative rounded ring-[1px] ring-offset-[5px] ring-blue-100 [&>*]:opacity-50 before:!opacity-100 before:z-50 before:absolute before:top-[calc(50%-12px)] before:left-[calc(50%-12px)] before:border-[3px] before:border-blue-500 before:border-r-transparent before:rounded-full before:w-[24px] before:h-[24px] before:animate-spin after:!opacity-100 after:absolute after:z-40 after:rounded after:-inset-[5px] after:bg-blue-100/20`;

/**
 * Reads a stream completely and returns the full string content.
 * @param stream - The ReadableStream to consume
 * @returns Promise<string> - The complete string content of the stream
 */
export async function streamToString(
  stream: ReadableStream<Uint8Array>
): Promise<string> {
  const decoder = new TextDecoder();
  let result = "";

  for await (const chunk of read(stream.getReader())) {
    result += decoder.decode(chunk);
  }

  return result;
}

/**
 * Read the stream one chunk at a time and reduce it to a single value.
 * @param reader - ReadableStreamDefaultReader<Uint8Array>
 * @param onChunk - The callback to use on each chunk
 */
export async function readStream(
  reader: ReadableStreamDefaultReader<Uint8Array>,
  onChunk?: (chunk: Uint8Array) => void
) {
  const { value, done } = await reader.read();
  if (value && onChunk) {
    onChunk(value);
  }
  if (!done) {
    await readStream(reader, onChunk);
  }
}

/**
 * An async generator to read the stream one chunk at a time.
 * @param reader - ReadableStreamDefaultReader<Uint8Array>
 */
export async function* read<T>(
  reader: ReadableStreamDefaultReader<T>
): AsyncGenerator<T> {
  let value: T | undefined = undefined;
  let done = false;
  do {
    ({ value, done } = await reader.read());
    if (value !== undefined) yield value;
  } while (!done);
}

export async function readToolCalls(
  reader: ReadableStreamDefaultReader<Uint8Array>
): Promise<[Array<Record<string, any>>, number]> {
  const decoder = new TextDecoder();
  const functionCalls: string[] = [];
  let usage = {
    prompt_tokens: 0,
    completion_tokens: 0,
    total_tokens: 0,
  };
  let model = "";

  for await (const chunk of read(reader)) {
    const data = JSON.parse(
      decoder.decode(chunk)
    ) as OpenAI.ChatCompletionChunk;

    if (data.model) {
      model = data.model;
    }
    // @ts-ignore-next-line
    const useageData = data.usage ?? data?.x_groq?.usage;

    if (
      useageData &&
      typeof useageData === "object" &&
      "total_tokens" in useageData
    ) {
      usage = useageData;
    }
    if (Array.isArray(data?.choices?.[0]?.delta?.tool_calls)) {
      const toolCalls = data.choices[0].delta.tool_calls;
      for (const toolCall of toolCalls) {
        if (toolCall.index !== undefined) {
          if (!functionCalls[toolCall.index]) {
            functionCalls[toolCall.index] = "";
          }
          if (toolCall.function?.arguments) {
            functionCalls[toolCall.index] += toolCall.function.arguments;
          }
        }
      }
    }
  }

  const cost = calculatePrice(usage, model);

  return [functionCalls.map((call) => JSON.parse(call)), cost];
}

/**
 * Returns a new ReadableStream that forwards data from the provider reader to the consumer after processing each chunk.
 * @param reader - ReadableStreamDefaultReader<Uint8Array>
 * @param onChunk - The callback to use on each chunk
 * @returns
 */
export function forwardStream(
  reader: ReadableStreamDefaultReader<Uint8Array>,
  onChunk: (chunk: string, write: (chunk: string) => void) => void
): ReadableStream {
  const encoder = new TextEncoder();
  const decoder = new TextDecoder();
  return new ReadableStream({
    async start(controller) {
      const write = (chunk: string) => {
        controller.enqueue(encoder.encode(chunk));
      };
      await readStream(reader, (chunk) => {
        const str = decoder.decode(chunk);
        onChunk(str, write);
      });
      controller.close();
    },
  });
}

/**
 * Read a multiplexed stream and call the onChunk callback with the channel name, chunk, and buffer (up to date concatenation of the chunks).
 * @param stream - A ReadableStream<Uint8Array> created via MultiplexedStream class.
 * @param onChunk - The callback to use on each chunk
 */
export async function readMultiplexedStream(
  reader: ReadableStreamDefaultReader<Uint8Array>,
  onStream: (channelName: string, stream: ReadableStream<Uint8Array>) => void
) {
  let buffer: number[] = [];
  const streams = new Map<
    number,
    ReadableStreamDefaultController<Uint8Array>
  >();
  const decoder = new TextDecoder();

  function processBuffer() {
    const chunk: number[] = [];
    let isFirstByte = false;
    let newChannel = false;
    let channelId: null | number = null;

    for (let i = 0; i < buffer.length; i++) {
      const byte = buffer[i];
      if (byte === START_CHUNK) {
        isFirstByte = true;
        continue;
      } else if (isFirstByte && byte === CHANNEL) {
        newChannel = true;
      } else if ((newChannel || isFirstByte) && channelId === null) {
        channelId = byte;
      } else if (byte === END_CHUNK) {
        const int8Chunk = new Uint8Array(chunk);
        if (newChannel && channelId) {
          const channelName = decoder.decode(int8Chunk);
          const stream = new ReadableStream<Uint8Array>({
            start(controller) {
              streams.set(channelId!, controller);
            },
          });
          onStream(channelName, stream);
        } else if (channelId) {
          const controller = streams.get(channelId);
          if (controller) {
            if (int8Chunk.length === 1 && int8Chunk[0] === CHANNEL) {
              controller.close();
            } else {
              controller.enqueue(int8Chunk);
            }
          }
        }
        channelId = null;
        newChannel = false;
        chunk.length = 0;
        buffer.splice(0, i + 1);
        processBuffer();
      } else {
        chunk.push(byte);
      }
      isFirstByte = false;
    }
  }

  await readStream(reader, (chunk) => {
    buffer.push(...chunk);
    processBuffer();
  });
}

/**
 * Create a new async generator that produces streams of edit functions.
 * Each new stream is a new function call to a given edit function. Options
 * can bef found in editoForm.ts
 */
export async function* editStreams(
  reader: ReadableStreamDefaultReader<Uint8Array>,
  tokenStreamController: ReadableStreamDefaultController<Uint8Array>
): AsyncGenerator<{
  func: string;
  stream: ReadableStream<Uint8Array>;
}> {
  const streams: Record<
    number,
    ReadableStreamDefaultController<Uint8Array>
  > = {};
  const decoder = new TextDecoder();
  const encoder = new TextEncoder();
  for await (const chunk of read(reader)) {
    const data = JSON.parse(
      decoder.decode(chunk)
    ) as OpenAI.ChatCompletionChunk;
    if (data.usage) {
      calculatePrice(data.usage, data.model, tokenStreamController);
    }
    if (!Array.isArray(data?.choices) || data.choices.length < 1) {
      break;
    }
    if (Array.isArray(data?.choices[0].delta?.tool_calls)) {
      const toolCalls = data?.choices[0].delta?.tool_calls;
      for (const toolCall of toolCalls) {
        if (
          toolCall.type === "function" &&
          typeof toolCall.function?.name === "string"
        ) {
          yield {
            func: toolCall.function.name,
            stream: new ReadableStream<Uint8Array>({
              start(controller) {
                streams[toolCall.index] = controller;
              },
            }),
          };
        }

        if (toolCall.function?.arguments) {
          const controller = streams[toolCall.index];
          if (controller) {
            controller.enqueue(encoder.encode(toolCall.function.arguments));
          }
        }
      }
    }
    if (data.choices[0].finish_reason) {
      for (const controller of Object.values(streams)) {
        controller.close();
      }
    }
  }
}

/**
 * Create a function that will process arguments and call a usage callback
 * @param usageCallback - A callback that will be called with the usage object
 * @returns
 */
export function createStream(): [
  ReadableStream<Uint8Array>,
  ReadableStreamDefaultController<Uint8Array>
] {
  let controller: ReadableStreamDefaultController<Uint8Array>;
  const stream = new ReadableStream<Uint8Array>({
    start(c) {
      controller = c;
    },
  });
  return [stream, controller!];
}

/**
 * Create a new stream that will stream a string progressively.
 * @param message - The message or string to stream
 * @returns
 */
export function createStringStream(message: string) {
  let controller: ReadableStreamDefaultController<Uint8Array> | undefined;
  const stream = new ReadableStream<Uint8Array>({
    start(c) {
      controller = c;
    },
  });
  const encoder = new TextEncoder();
  const words = message.split(" ");
  let currentIndex = 0;

  (async () => {
    while (currentIndex < words.length) {
      const wordGroup = words.slice(
        currentIndex,
        currentIndex +
          Math.min(
            2 + Math.floor(Math.random() * 3),
            words.length - currentIndex
          )
      );
      controller!.enqueue(encoder.encode(wordGroup.join(" ") + " "));
      currentIndex += wordGroup.length;
      await new Promise((resolve) => setTimeout(resolve, 10));
    }
    controller!.close();
  })();
  return stream;
}

/**
 * Creates a new stream for tracking tokens (or cost actually) spent.
 * @returns - A tuple containing the stream, controller, and a function to close the stream.
 */
export function createTokenStream(): [
  ReadableStream<Uint8Array>,
  ReadableStreamDefaultController<Uint8Array>,
  () => number
] {
  const [stream, controller] = createStream();
  const enqueue = controller.enqueue;
  let buffer = "";
  const decoder = new TextDecoder();
  controller.enqueue = (chunk) => {
    if (chunk) {
      buffer += decoder.decode(chunk);
    }
    enqueue.call(controller, chunk);
  };
  const encoder = new TextEncoder();
  controller.enqueue(encoder.encode('{ "thresholds": ['));
  return [
    stream,
    controller,
    (): number => {
      controller.enqueue(encoder.encode("]}"));
      controller.close();
      try {
        return (
          JSON5.parse(buffer).thresholds.reduce((acc: number, val: number) => {
            return acc + val;
          }, 0) ?? 0
        );
      } catch (err) {
        console.log(err);
        return 0;
      }
    },
  ];
}

/**
 * The price of a given model per 1000 tokens.
 * @param model - The model to get the price for
 * @returns
 */
function getModelPrice(model: string): [number, number] {
  model = model in SUPPORTED_MODELS ? model : "gpt-4o";
  return [SUPPORTED_MODELS[model].input, SUPPORTED_MODELS[model].output];
}

/**
 * Calculate the price of the tokens spent on the completion.
 * @param usage - The usage object as given from the API
 * @param stream - The stream to enqueue the price to
 * @param model - The model to calculate the price for
 */
export function calculatePrice(
  usage: {
    prompt_tokens: number;
    completion_tokens: number;
    total_tokens: number;
  },
  model: string,
  stream: ReadableStreamDefaultController<Uint8Array>
): void;
export function calculatePrice(
  usage: {
    prompt_tokens: number;
    completion_tokens: number;
    total_tokens: number;
  },
  model: string
): number;
export function calculatePrice(
  usage: {
    prompt_tokens: number;
    completion_tokens: number;
    total_tokens: number;
  },
  model: string,
  stream?: ReadableStreamDefaultController<Uint8Array>
): void | number {
  const [inputPrice, outputPrice] = getModelPrice(model);
  const input = (usage.prompt_tokens / 1_000) * inputPrice;
  const output = (usage.completion_tokens / 1_000) * outputPrice;
  const total = input + output;

  if (stream) {
    stream.enqueue(new TextEncoder().encode(`${total},`));
  } else {
    return total;
  }
}

/**
 * Streams the outline of the form schema.
 * @param store - The store to add the schema to
 * @param stream - The stream to read the schema from
 */
// export async function streamOutline(
//   store: FormStore,
//   stream: ReadableStream<Uint8Array>
// ) {
//   for await (const input of itemsInOutline(stream.getReader())) {
//     if (input.idx in store.earlyInputBuffer) {
//       store.schema.push(
//         Object.assign(
//           { key: token(), name: token() },
//           correctSchema(store.earlyInputBuffer[input.idx])
//         )
//       );
//       delete store.earlyInputBuffer[input.idx]; // eslint-disable-line
//       structureSchema(store.schema);
//     } else {
//       const willLoadMore = inputsRequiringDetails.includes(input.type);
//       const schemaNode = Object.assign(
//         {
//           key: token(),
//           name: token(),
//         },
//         correctSchema(input as Record<string, any>),
//         {
//           outerClass: ` !max-w-none ${
//             input.cols === "1" ? "col-span-1" : "col-span-2"
//           } ${willLoadMore ? LOADING_CLASS_LIST : ""}`,
//         },
//         willLoadMore ? { __loading: true } : {}
//       );
//       store.schema.push(schemaNode);
//       structureSchema(store.schema);
//     }
//   }
// }

/**
 * Reads the id of the form from the mux stream.
 * @param stream - A stream to read the id from.
 * @returns
 */
export async function streamId(
  stream: ReadableStream<Uint8Array>
): Promise<string | undefined> {
  for await (const chunk of jsonReader(stream.getReader(), {
    required: ["id"],
  })) {
    return chunk.id;
  }
}

/**
 * Reads the title from the stream.
 * @param store - The store to add the title to.
 * @param stream - Reads the title from the stream.
 */
export async function streamMetaData(
  store: FormStore,
  stream: ReadableStream<Uint8Array>
) {
  const reader = stream.getReader();
  const readJson = jsonReader(reader, {
    required: ["title", "backgroundImageSearchTerm"],
  });
  for await (const json of readJson) {
    store.title = json.title;
    store.theme.bgImageSearchQueryDefault = json.backgroundImageSearchTerm;
  }
  readJson.return();
}

/**
 * Log the stream to the console.
 * @param stream - The stream to log
 * @param logBuffer - Whether to log the buffer or not
 * @param message - A message to log before the buffer
 */
export async function logStream(
  stream: ReadableStream<Uint8Array>,
  logBuffer = false,
  message = ""
) {
  let buffer = "";
  const reader = stream.getReader();
  for await (const chunk of read(reader)) {
    const str = new TextDecoder().decode(chunk);
    if (logBuffer) {
      buffer += str;
    } else {
      console.log(str);
    }
  }
  if (logBuffer) {
    if (message) {
      console.log(message, JSON.stringify(JSON.parse(buffer), null, 2));
    } else {
      console.log(JSON.stringify(JSON.parse(buffer), null, 2));
    }
  }
}
