import { spawn } from "node:child_process";
import { randomBytes } from "node:crypto";
import { rmSync } from "node:fs";
import { Socket } from "node:net";
import { resolve } from "node:path";
import { runCli } from "./commands";

function delay(ms: number): Promise<void> {
    return new Promise((resolveDelay) => setTimeout(resolveDelay, ms));
}

function assert(condition: boolean, message: string): void {
    if (!condition) {
        throw new Error(message);
    }
}

function parseJsonOutput(lines: string[]): Record<string, unknown> {
    return JSON.parse(lines.join("\n")) as Record<string, unknown>;
}

function encodeClientTextFrame(message: string): Buffer {
    const payload = Buffer.from(message, "utf8");
    const payloadLength = payload.length;
    const mask = randomBytes(4);
    let header: Buffer;

    if (payloadLength < 126) {
        header = Buffer.from([0x81, payloadLength | 0x80]);
    } else if (payloadLength <= 65535) {
        header = Buffer.alloc(4);
        header[0] = 0x81;
        header[1] = 126 | 0x80;
        header.writeUInt16BE(payloadLength, 2);
    } else {
        header = Buffer.alloc(10);
        header[0] = 0x81;
        header[1] = 127 | 0x80;
        header.writeUInt32BE(0, 2);
        header.writeUInt32BE(payloadLength, 6);
    }

    const maskedPayload = Buffer.alloc(payloadLength);
    for (let i = 0; i < payloadLength; i += 1) {
        maskedPayload[i] = payload[i] ^ mask[i % 4];
    }

    return Buffer.concat([header, mask, maskedPayload]);
}

function parseServerFrames(
    buffer: Buffer<ArrayBufferLike>
): { messages: string[]; remaining: Buffer<ArrayBufferLike> } {
    let offset = 0;
    const messages: string[] = [];

    while (offset + 2 <= buffer.length) {
        const byte1 = buffer[offset];
        const byte2 = buffer[offset + 1];
        const opcode = byte1 & 0x0f;
        const masked = (byte2 & 0x80) !== 0;
        let payloadLength = byte2 & 0x7f;
        let cursor = offset + 2;

        if (masked) {
            throw new Error("Expected unmasked server frame.");
        }

        if (payloadLength === 126) {
            if (cursor + 2 > buffer.length) {
                break;
            }
            payloadLength = buffer.readUInt16BE(cursor);
            cursor += 2;
        } else if (payloadLength === 127) {
            if (cursor + 8 > buffer.length) {
                break;
            }
            const high = buffer.readUInt32BE(cursor);
            const low = buffer.readUInt32BE(cursor + 4);
            if (high !== 0) {
                throw new Error("Frame too large for parser.");
            }
            payloadLength = low;
            cursor += 8;
        }

        const frameLength = cursor + payloadLength;
        if (frameLength > buffer.length) {
            break;
        }

        const payload = buffer.subarray(cursor, cursor + payloadLength);
        if (opcode === 0x1) {
            messages.push(payload.toString("utf8"));
        } else if (opcode === 0x8) {
            break;
        }

        offset = frameLength;
    }

    return { messages, remaining: Buffer.from(buffer.subarray(offset)) };
}

class RawWsClient {
    private buffer: Buffer<ArrayBufferLike> = Buffer.alloc(0);
    private readonly messages: string[] = [];

    public constructor(private readonly socket: Socket) {
        socket.on("data", (chunk) => {
            this.buffer = Buffer.concat([this.buffer, Buffer.from(chunk)]);
            const parsed = parseServerFrames(this.buffer);
            this.buffer = parsed.remaining;
            this.messages.push(...parsed.messages);
        });
    }

    public sendJson(payload: Record<string, unknown>): void {
        this.socket.write(encodeClientTextFrame(JSON.stringify(payload)));
    }

    public async waitForMessage(
        predicate: (message: string) => boolean,
        timeoutMs = 2500
    ): Promise<string> {
        const started = Date.now();
        while (Date.now() - started < timeoutMs) {
            const idx = this.messages.findIndex(predicate);
            if (idx >= 0) {
                const [message] = this.messages.splice(idx, 1);
                return message;
            }
            await delay(20);
        }
        throw new Error("Timed out waiting for websocket message.");
    }

    public close(): void {
        this.socket.destroy();
    }
}

async function openWebSocket(
    host: string,
    port: number,
    path: string
): Promise<{ statusCode: number; client: RawWsClient | null }> {
    const socket = new Socket();

    await new Promise<void>((resolveConnect, rejectConnect) => {
        socket.once("error", rejectConnect);
        socket.connect(port, host, () => {
            socket.off("error", rejectConnect);
            resolveConnect();
        });
    });

    const secKey = randomBytes(16).toString("base64");
    socket.write(
        `GET ${path} HTTP/1.1\r\n` +
            `Host: ${host}:${port}\r\n` +
            "Upgrade: websocket\r\n" +
            "Connection: Upgrade\r\n" +
            `Sec-WebSocket-Key: ${secKey}\r\n` +
            "Sec-WebSocket-Version: 13\r\n\r\n"
    );

    const rawHeader = await new Promise<string>((resolveHeader, rejectHeader) => {
        let headerBuffer = Buffer.alloc(0);
        const timeout = setTimeout(() => {
            cleanup();
            rejectHeader(new Error("Handshake timeout."));
        }, 2500);

        const onData = (chunk: Buffer): void => {
            headerBuffer = Buffer.concat([headerBuffer, Buffer.from(chunk)]);
            const marker = headerBuffer.indexOf("\r\n\r\n");
            if (marker === -1) {
                return;
            }

            const headerPart = headerBuffer.subarray(0, marker + 4).toString("utf8");
            const remainder = headerBuffer.subarray(marker + 4);
            cleanup();
            resolveHeader(headerPart);

            if (remainder.length > 0) {
                socket.unshift(remainder);
            }
        };

        const onError = (error: Error): void => {
            cleanup();
            rejectHeader(error);
        };

        const cleanup = (): void => {
            clearTimeout(timeout);
            socket.off("data", onData);
            socket.off("error", onError);
        };

        socket.on("data", onData);
        socket.on("error", onError);
    });

    const statusLine = rawHeader.split("\r\n")[0] ?? "";
    const statusCode = Number(statusLine.split(" ")[1] ?? "0");

    if (statusCode !== 101) {
        socket.destroy();
        return { statusCode, client: null };
    }

    return { statusCode, client: new RawWsClient(socket) };
}

async function waitForHealth(baseUrl: string, timeoutMs: number): Promise<void> {
    const start = Date.now();

    while (Date.now() - start < timeoutMs) {
        try {
            const response = await fetch(`${baseUrl}/health`);
            if (response.ok) {
                return;
            }
        } catch {
            // continue polling
        }
        await delay(120);
    }

    throw new Error("Timed out waiting for health endpoint.");
}

async function main(): Promise<void> {
    const smokeDataFile = resolve(process.cwd(), "data", `e2e-${process.pid}-${Date.now()}.json`);
    const smokeChatDir = resolve(process.cwd(), "data", `e2e-chat-${process.pid}-${Date.now()}`);
    const previousDataFile = process.env.DATA_FILE;
    const previousChatLogDir = process.env.CHAT_LOG_DIR;
    const previousHost = process.env.HOST;
    const previousPort = process.env.PORT;
    const host = "127.0.0.1";
    const port = 8911 + Math.floor(Math.random() * 80);
    const baseUrl = `http://${host}:${port}`;

    const out: string[] = [];
    const io = {
        out: (line: string) => out.push(line),
        err: (_line: string) => {}
    };

    process.env.DATA_FILE = smokeDataFile;
    process.env.CHAT_LOG_DIR = smokeChatDir;

    let child: ReturnType<typeof spawn> | null = null;
    let adminClient: RawWsClient | null = null;
    let viewerClient: RawWsClient | null = null;

    try {
        out.length = 0;
        let code = await runCli(["stream:create", "--name", "e2e-stream", "--json"], io);
        assert(code === 0, "stream:create failed");
        const streamResult = parseJsonOutput(out);
        const streamId = String((streamResult.stream as { id: string }).id);

        out.length = 0;
        code = await runCli(["token:create", "--stream", streamId, "--role", "admin", "--json"], io);
        assert(code === 0, "admin token:create failed");
        const adminToken = String(parseJsonOutput(out).plainToken);

        out.length = 0;
        code = await runCli(["token:create", "--stream", streamId, "--role", "viewer", "--json"], io);
        assert(code === 0, "viewer token:create failed");
        const viewerToken = String(parseJsonOutput(out).plainToken);

        child = spawn(
            process.execPath,
            [resolve(process.cwd(), "node_modules/tsx/dist/cli.mjs"), "src/index.ts"],
            {
                cwd: process.cwd(),
                env: {
                    ...process.env,
                    HOST: host,
                    PORT: String(port),
                    DATA_FILE: smokeDataFile,
                    CHAT_LOG_DIR: smokeChatDir
                },
                stdio: ["ignore", "pipe", "pipe"]
            }
        );

        await waitForHealth(baseUrl, 12000);

        const adminConn = await openWebSocket(
            host,
            port,
            `/ws?token=${encodeURIComponent(adminToken)}`
        );
        const viewerConn = await openWebSocket(
            host,
            port,
            `/ws?token=${encodeURIComponent(viewerToken)}`
        );
        assert(
            adminConn.statusCode === 101 && adminConn.client !== null,
            "Admin WS handshake failed."
        );
        assert(
            viewerConn.statusCode === 101 && viewerConn.client !== null,
            "Viewer WS handshake failed."
        );

        const adminWs = adminConn.client as RawWsClient;
        const viewerWs = viewerConn.client as RawWsClient;
        adminClient = adminWs;
        viewerClient = viewerWs;

        adminWs.sendJson({
            command: "poll",
            args: ["start", "Is", "E2E", "working?"]
        });

        const pollStartRaw = await viewerWs.waitForMessage(
            (msg) => msg.includes("\"event\":\"poll_update\"")
        );
        const pollStart = JSON.parse(pollStartRaw) as {
            event: string;
            data: { isOpen?: boolean; question?: string };
        };
        assert(pollStart.event === "poll_update", "Expected poll_update event.");
        assert(pollStart.data.isOpen === true, "Expected poll to open.");
        assert(pollStart.data.question === "Is E2E working?", "Expected poll question text.");

        adminWs.sendJson({
            command: "poll",
            args: ["vote", "yes"]
        });

        const pollVoteRaw = await viewerWs.waitForMessage(
            (msg) =>
                msg.includes("\"event\":\"poll_update\"") && msg.includes("\"yes\":1")
        );
        const pollVote = JSON.parse(pollVoteRaw) as {
            event: string;
            data: { votes?: { yes?: number } };
        };
        assert(pollVote.event === "poll_update", "Expected vote update event.");
        assert(pollVote.data.votes?.yes === 1, "Expected yes votes to increment.");

        console.log(
            JSON.stringify(
                {
                    ok: true,
                    streamId
                },
                null,
                2
            )
        );
    } finally {
        viewerClient?.close();
        adminClient?.close();

        if (child) {
            child.kill("SIGTERM");
            await delay(250);
            if (!child.killed) {
                child.kill("SIGKILL");
            }
        }

        if (previousDataFile === undefined) {
            delete process.env.DATA_FILE;
        } else {
            process.env.DATA_FILE = previousDataFile;
        }
        if (previousChatLogDir === undefined) {
            delete process.env.CHAT_LOG_DIR;
        } else {
            process.env.CHAT_LOG_DIR = previousChatLogDir;
        }
        if (previousHost === undefined) {
            delete process.env.HOST;
        } else {
            process.env.HOST = previousHost;
        }
        if (previousPort === undefined) {
            delete process.env.PORT;
        } else {
            process.env.PORT = previousPort;
        }

        rmSync(smokeDataFile, { force: true });
        rmSync(smokeChatDir, { force: true, recursive: true });
    }
}

main().catch((error: unknown) => {
    const message = error instanceof Error ? error.message : String(error);
    console.error(message);
    process.exit(1);
});
