import { randomBytes } from "node:crypto";
import { rmSync } from "node:fs";
import { createServer } from "node:http";
import { Socket } from "node:net";
import { resolve } from "node:path";
import { FileDataStore } from "../db/fileStore";
import { loadEnv } from "../env";
import { issueToken, resolveToken } from "../hub/auth";
import { InMemoryEventBus } from "../hub/eventBus";
import { createEvent } from "../hub/events";
import { WsTransport } from "../web/wsTransport";

function delay(ms: number): Promise<void> {
    return new Promise((resolveDelay) => setTimeout(resolveDelay, ms));
}

function encodeClientTextFrame(message: string): Buffer {
    const payload = Buffer.from(message, "utf8");
    const payloadLength = payload.length;
    const mask = randomBytes(4);
    let header: Buffer;

    if (payloadLength < 126) {
        header = Buffer.from([0x81, payloadLength | 0x80]);
    } else if (payloadLength <= 65535) {
        header = Buffer.alloc(4);
        header[0] = 0x81;
        header[1] = 126 | 0x80;
        header.writeUInt16BE(payloadLength, 2);
    } else {
        header = Buffer.alloc(10);
        header[0] = 0x81;
        header[1] = 127 | 0x80;
        header.writeUInt32BE(0, 2);
        header.writeUInt32BE(payloadLength, 6);
    }

    const maskedPayload = Buffer.alloc(payloadLength);
    for (let i = 0; i < payloadLength; i += 1) {
        maskedPayload[i] = payload[i] ^ mask[i % 4];
    }

    return Buffer.concat([header, mask, maskedPayload]);
}

function parseServerFrames(
    buffer: Buffer<ArrayBufferLike>
): { messages: string[]; remaining: Buffer<ArrayBufferLike> } {
    let offset = 0;
    const messages: string[] = [];

    while (offset + 2 <= buffer.length) {
        const byte1 = buffer[offset];
        const byte2 = buffer[offset + 1];
        const opcode = byte1 & 0x0f;
        const masked = (byte2 & 0x80) !== 0;
        let payloadLength = byte2 & 0x7f;
        let cursor = offset + 2;

        if (masked) {
            throw new Error("Expected unmasked server frame.");
        }

        if (payloadLength === 126) {
            if (cursor + 2 > buffer.length) {
                break;
            }
            payloadLength = buffer.readUInt16BE(cursor);
            cursor += 2;
        } else if (payloadLength === 127) {
            if (cursor + 8 > buffer.length) {
                break;
            }
            const high = buffer.readUInt32BE(cursor);
            const low = buffer.readUInt32BE(cursor + 4);
            if (high !== 0) {
                throw new Error("Frame too large for smoke parser.");
            }
            payloadLength = low;
            cursor += 8;
        }

        const frameLength = cursor + payloadLength;
        if (frameLength > buffer.length) {
            break;
        }

        const payload = buffer.subarray(cursor, cursor + payloadLength);
        if (opcode === 0x1) {
            messages.push(payload.toString("utf8"));
        } else if (opcode === 0x8) {
            break;
        }

        offset = frameLength;
    }

    return { messages, remaining: Buffer.from(buffer.subarray(offset)) };
}

class RawWsClient {
    private buffer: Buffer<ArrayBufferLike> = Buffer.alloc(0);
    private readonly messages: string[] = [];

    public constructor(private readonly socket: Socket) {
        socket.on("data", (chunk) => {
            this.buffer = Buffer.concat([this.buffer, Buffer.from(chunk)]);
            const parsed = parseServerFrames(this.buffer);
            this.buffer = parsed.remaining;
            this.messages.push(...parsed.messages);
        });
    }

    public sendJson(payload: Record<string, unknown>): void {
        this.socket.write(encodeClientTextFrame(JSON.stringify(payload)));
    }

    public async waitForMessage(
        predicate: (message: string) => boolean,
        timeoutMs = 2000
    ): Promise<string> {
        const started = Date.now();
        while (Date.now() - started < timeoutMs) {
            const idx = this.messages.findIndex(predicate);
            if (idx >= 0) {
                const [message] = this.messages.splice(idx, 1);
                return message;
            }
            await delay(20);
        }
        throw new Error("Timed out waiting for websocket message.");
    }

    public async expectNoMessage(timeoutMs = 300): Promise<void> {
        const originalCount = this.messages.length;
        await delay(timeoutMs);
        if (this.messages.length > originalCount) {
            throw new Error("Unexpected websocket message received.");
        }
    }

    public close(): void {
        this.socket.destroy();
    }
}

async function openWebSocket(
    host: string,
    port: number,
    path: string
): Promise<{ statusCode: number; client: RawWsClient | null }> {
    const socket = new Socket();

    await new Promise<void>((resolveConnect, rejectConnect) => {
        socket.once("error", rejectConnect);
        socket.connect(port, host, () => {
            socket.off("error", rejectConnect);
            resolveConnect();
        });
    });

    const secKey = randomBytes(16).toString("base64");
    socket.write(
        `GET ${path} HTTP/1.1\r\n` +
            `Host: ${host}:${port}\r\n` +
            "Upgrade: websocket\r\n" +
            "Connection: Upgrade\r\n" +
            `Sec-WebSocket-Key: ${secKey}\r\n` +
            "Sec-WebSocket-Version: 13\r\n\r\n"
    );

    const rawHeader = await new Promise<string>((resolveHeader, rejectHeader) => {
        let headerBuffer = Buffer.alloc(0);
        const timeout = setTimeout(() => {
            cleanup();
            rejectHeader(new Error("Handshake timeout."));
        }, 2000);

        const onData = (chunk: Buffer): void => {
            headerBuffer = Buffer.concat([headerBuffer, Buffer.from(chunk)]);
            const marker = headerBuffer.indexOf("\r\n\r\n");
            if (marker === -1) {
                return;
            }

            const headerPart = headerBuffer.subarray(0, marker + 4).toString("utf8");
            const remainder = headerBuffer.subarray(marker + 4);
            cleanup();
            resolveHeader(headerPart);

            if (remainder.length > 0) {
                socket.unshift(remainder);
            }
        };

        const onError = (error: Error): void => {
            cleanup();
            rejectHeader(error);
        };

        const cleanup = (): void => {
            clearTimeout(timeout);
            socket.off("data", onData);
            socket.off("error", onError);
        };

        socket.on("data", onData);
        socket.on("error", onError);
    });

    const statusLine = rawHeader.split("\r\n")[0] ?? "";
    const statusCode = Number(statusLine.split(" ")[1] ?? "0");

    if (statusCode !== 101) {
        socket.destroy();
        return { statusCode, client: null };
    }

    return { statusCode, client: new RawWsClient(socket) };
}

async function main(): Promise<void> {
    const env = loadEnv();
    const smokeDataFile = resolve(
        process.cwd(),
        "data",
        `phase5-smoke-${process.pid}-${Date.now()}.json`
    );
    const store = new FileDataStore(smokeDataFile);
    const eventBus = new InMemoryEventBus();
    const adminCommands: string[] = [];

    const server = createServer((_req, res) => {
        res.writeHead(404);
        res.end();
    });

    const wsTransport = new WsTransport(
        server,
        eventBus,
        (token) => resolveToken(store, token, env.TOKEN_PEPPER),
        () => {}
    );

    try {
        await store.init();
        wsTransport.attach();

        const stream = await store.createStream({ name: "phase5-smoke" });
        const admin = await issueToken(store, stream.id, "admin", env.TOKEN_PEPPER);
        const viewer = await issueToken(store, stream.id, "viewer", env.TOKEN_PEPPER);

        eventBus.on("admin_command", async (event) => {
            adminCommands.push(`${event.payload.command}:${event.payload.actorRole}`);
        });

        await new Promise<void>((resolveListen, rejectListen) => {
            server.once("error", rejectListen);
            server.listen(0, "127.0.0.1", () => {
                server.off("error", rejectListen);
                resolveListen();
            });
        });

        const address = server.address();
        if (!address || typeof address === "string") {
            throw new Error("Failed to resolve smoke server address.");
        }

        const baseHost = "127.0.0.1";
        const port = address.port;

        const unauthorized = await openWebSocket(baseHost, port, "/ws?token=invalid");
        if (unauthorized.statusCode !== 401) {
            throw new Error(`Expected unauthorized handshake to return 401, got ${unauthorized.statusCode}.`);
        }

        const adminConn = await openWebSocket(
            baseHost,
            port,
            `/ws?token=${encodeURIComponent(admin.plainToken)}`
        );
        const viewerConn = await openWebSocket(
            baseHost,
            port,
            `/ws?token=${encodeURIComponent(viewer.plainToken)}`
        );

        if (!adminConn.client || !viewerConn.client) {
            throw new Error("Expected admin and viewer websocket handshakes to succeed.");
        }

        await eventBus.emit(
            createEvent("ws_out", {
                streamId: stream.id,
                target: "viewer",
                event: "viewer_only",
                data: { ok: true }
            })
        );

        const viewerMessage = await viewerConn.client.waitForMessage(
            (msg) => msg.includes("\"event\":\"viewer_only\"")
        );
        await adminConn.client.expectNoMessage();

        await eventBus.emit(
            createEvent("ws_out", {
                streamId: stream.id,
                target: "admin",
                event: "admin_only",
                data: { ok: true }
            })
        );

        const adminMessage = await adminConn.client.waitForMessage(
            (msg) => msg.includes("\"event\":\"admin_only\"")
        );
        await viewerConn.client.expectNoMessage();

        adminConn.client.sendJson({ command: "reboot", args: ["now"] });
        await delay(100);
        if (adminCommands.length !== 1 || adminCommands[0] !== "reboot:admin") {
            throw new Error("Expected admin inbound message to emit admin_command event.");
        }

        viewerConn.client.sendJson({ command: "blocked", args: [] });
        await delay(100);
        if (adminCommands.length !== 1) {
            throw new Error("Viewer inbound command should not emit admin_command.");
        }

        adminConn.client.close();
        viewerConn.client.close();

        console.log(
            JSON.stringify(
                {
                    ok: true,
                    viewerMessage,
                    adminMessage,
                    adminCommands
                },
                null,
                2
            )
        );
    } finally {
        wsTransport.shutdown();
        await new Promise<void>((resolveClose) => {
            server.close(() => resolveClose());
        });
        rmSync(smokeDataFile, { force: true });
    }
}

main().catch((error: unknown) => {
    const message = error instanceof Error ? error.message : String(error);
    console.error(message);
    process.exit(1);
});
