import { createHash, randomUUID } from "node:crypto";
import type { IncomingMessage, Server as HttpServer } from "node:http";
import type { Duplex } from "node:stream";
import type { Role } from "../db/types";
import type { InMemoryEventBus } from "../hub/eventBus";
import { createEvent } from "../hub/events";
import { LIMITS } from "../hub/limits";
import type { OpsMetrics } from "../hub/opsMetrics";
import type { RuntimeManager } from "../hub/runtimeManager";

type AuthContext = {
    tokenId: string;
    streamId: string;
    role: Role;
};

type ResolveAuth = (token: string | null) => Promise<AuthContext | null>;

type LogFn = (fields: Record<string, unknown>) => void;

type ClientConnection = {
    socket: Duplex;
    tokenId: string;
    sessionId: string | null;
    streamId: string;
    role: Role;
    buffer: Buffer;
    adminWindowStartMs: number;
    adminWindowCount: number;
};

export type AdminSession = {
    sessionId: string;
    streamId: string;
    tokenId: string;
    connectedAt: string;
    lastSeenAt: string;
    commandCount: number;
    rateLimitDrops: number;
    backpressureDrops: number;
    remoteAddress?: string | null;
};

export type ConnectedClientStats = {
    total: number;
    admin: number;
    viewer: number;
};

type AdminCommandQueueItem = {
    streamId: string;
    tokenId: string;
    sessionId: string | null;
    command: string;
    args: string[];
    actorRole: Role;
};

export type QueueStats = {
    depth: number;
    maxDepth: number;
    dropped: number;
};

const WS_ACCEPT_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";

function writeHttpError(socket: Duplex, statusCode: number, reason: string): void {
    socket.write(
        `HTTP/1.1 ${statusCode} ${reason}\r\n` +
            "Connection: close\r\n" +
            "Content-Type: text/plain; charset=utf-8\r\n" +
            "Content-Length: 0\r\n\r\n"
    );
    socket.destroy();
}

function createAcceptValue(secWebSocketKey: string): string {
    return createHash("sha1").update(secWebSocketKey + WS_ACCEPT_GUID).digest("base64");
}

function encodeTextFrame(message: string): Buffer {
    const payload = Buffer.from(message, "utf8");
    const payloadLength = payload.length;

    if (payloadLength < 126) {
        return Buffer.concat([Buffer.from([0x81, payloadLength]), payload]);
    }

    if (payloadLength <= 65535) {
        const header = Buffer.alloc(4);
        header[0] = 0x81;
        header[1] = 126;
        header.writeUInt16BE(payloadLength, 2);
        return Buffer.concat([header, payload]);
    }

    const header = Buffer.alloc(10);
    header[0] = 0x81;
    header[1] = 127;
    header.writeUInt32BE(0, 2);
    header.writeUInt32BE(payloadLength, 6);
    return Buffer.concat([header, payload]);
}

function encodePongFrame(payload: Buffer): Buffer {
    return Buffer.concat([Buffer.from([0x8a, payload.length]), payload]);
}

function parseFrames(buffer: Buffer): { messages: string[]; remaining: Buffer; close: boolean } {
    let offset = 0;
    const messages: string[] = [];
    let close = false;

    while (offset + 2 <= buffer.length) {
        const byte1 = buffer[offset];
        const byte2 = buffer[offset + 1];
        const opcode = byte1 & 0x0f;
        const masked = (byte2 & 0x80) !== 0;
        let payloadLength = byte2 & 0x7f;
        let cursor = offset + 2;

        if (!masked) {
            close = true;
            break;
        }

        if (payloadLength === 126) {
            if (cursor + 2 > buffer.length) {
                break;
            }
            payloadLength = buffer.readUInt16BE(cursor);
            cursor += 2;
        } else if (payloadLength === 127) {
            if (cursor + 8 > buffer.length) {
                break;
            }
            const high = buffer.readUInt32BE(cursor);
            const low = buffer.readUInt32BE(cursor + 4);
            if (high !== 0) {
                close = true;
                break;
            }
            payloadLength = low;
            cursor += 8;
        }

        const frameLength = cursor + 4 + payloadLength;
        if (frameLength > buffer.length) {
            break;
        }
        if (payloadLength > LIMITS.MAX_WS_FRAME_PAYLOAD_BYTES) {
            close = true;
            break;
        }

        const mask = buffer.subarray(cursor, cursor + 4);
        cursor += 4;
        const maskedPayload = buffer.subarray(cursor, cursor + payloadLength);
        const payload = Buffer.alloc(payloadLength);

        for (let i = 0; i < payloadLength; i += 1) {
            payload[i] = maskedPayload[i] ^ mask[i % 4];
        }

        if (opcode === 0x8) {
            close = true;
            offset = frameLength;
            break;
        }

        if (opcode === 0x9) {
            messages.push(`__PING__${payload.toString("base64")}`);
        } else if (opcode === 0x1) {
            messages.push(payload.toString("utf8"));
        }

        offset = frameLength;
    }

    return { messages, remaining: buffer.subarray(offset), close };
}

function safeJsonParse(input: string): unknown {
    try {
        return JSON.parse(input);
    } catch {
        return null;
    }
}

export class WsTransport {
    private readonly clients = new Set<ClientConnection>();
    private readonly adminSessions = new Map<string, AdminSession>();
    private readonly unsubscribers: Array<() => void> = [];
    private readonly adminCommandQueue: AdminCommandQueueItem[] = [];
    private queueDroppedCount = 0;
    private queueDrainRunning = false;

    public constructor(
        private readonly server: HttpServer,
        private readonly eventBus: InMemoryEventBus,
        private readonly resolveAuth: ResolveAuth,
        private readonly log: LogFn,
        private readonly runtimeManager?: RuntimeManager,
        private readonly opsMetrics?: OpsMetrics,
        private readonly onAdminPresenceChanged?: (streamId: string, count: number) => void
    ) {}

    public attach(): void {
        this.server.on("upgrade", (req, socket) => {
            void this.handleUpgrade(req, socket);
        });

        const unsubscribeWsOut = this.eventBus.on(
            "ws_out",
            async (event) => {
                const message = JSON.stringify({
                    event: event.payload.event,
                    data: event.payload.data
                });

                for (const client of this.clients) {
                    if (client.streamId !== event.payload.streamId) {
                        continue;
                    }
                    if (event.payload.target !== "all" && client.role !== event.payload.target) {
                        continue;
                    }
                    client.socket.write(encodeTextFrame(message));
                }
            },
            {
                componentId: "ws-transport",
                componentKind: "transport"
            }
        );

        this.unsubscribers.push(unsubscribeWsOut);
    }

    public shutdown(): void {
        for (const unsubscribe of this.unsubscribers) {
            unsubscribe();
        }
        for (const client of this.clients) {
            client.socket.destroy();
        }
        this.clients.clear();
        this.adminSessions.clear();
        this.adminCommandQueue.length = 0;
        this.queueDrainRunning = false;
        this.updateRuntimeMetrics();
    }

    public getConnectedClientStats(): ConnectedClientStats {
        return this.getClientStats();
    }

    public listAdminSessions(): AdminSession[] {
        return Array.from(this.adminSessions.values()).sort((a, b) =>
            a.connectedAt.localeCompare(b.connectedAt)
        );
    }

    public getQueueStats(): QueueStats {
        return {
            depth: this.adminCommandQueue.length,
            maxDepth: LIMITS.ADMIN_COMMAND_QUEUE_MAX,
            dropped: this.queueDroppedCount
        };
    }

    private async handleUpgrade(req: IncomingMessage, socket: Duplex): Promise<void> {
        const host = req.headers.host ?? "localhost";
        const url = new URL(req.url ?? "/", `http://${host}`);

        if (url.pathname !== "/ws") {
            writeHttpError(socket, 404, "Not Found");
            return;
        }

        const secKey = req.headers["sec-websocket-key"];
        const upgrade = req.headers.upgrade;

        if (!secKey || upgrade?.toLowerCase() !== "websocket") {
            writeHttpError(socket, 400, "Bad Request");
            return;
        }

        const auth = await this.resolveAuth(url.searchParams.get("token"));
        if (!auth) {
            writeHttpError(socket, 401, "Unauthorized");
            return;
        }

        const accept = createAcceptValue(secKey);
        socket.write(
            "HTTP/1.1 101 Switching Protocols\r\n" +
                "Upgrade: websocket\r\n" +
                "Connection: Upgrade\r\n" +
                `Sec-WebSocket-Accept: ${accept}\r\n\r\n`
        );

        const client: ClientConnection = {
            socket,
            tokenId: auth.tokenId,
            sessionId: auth.role === "admin" ? randomUUID() : null,
            streamId: auth.streamId,
            role: auth.role,
            buffer: Buffer.alloc(0),
            adminWindowStartMs: Date.now(),
            adminWindowCount: 0
        };

        this.clients.add(client);
        this.opsMetrics?.recordWsConnected();
        this.ensureAdminSession(client);
        this.updateRuntimeMetrics();
        this.runtimeManager?.setComponentStatus(client.streamId, {
            id: "ws-transport",
            kind: "transport",
            level: "ok",
            message: "websocket client connected",
            updatedAt: new Date().toISOString(),
            details: {
                connectedClientsForStream: this.getStreamConnectionCount(client.streamId)
            }
        });
        this.runtimeManager?.setStreamState(client.streamId, "ready", "websocket client connected");
        this.log({
            message: "ws_connected",
            streamId: client.streamId,
            role: client.role,
            tokenId: client.tokenId,
            sessionId: client.sessionId,
            remoteAddress: (socket as { remoteAddress?: string }).remoteAddress ?? null
        });

        socket.on("data", (chunk) => {
            client.buffer = Buffer.concat([client.buffer, Buffer.from(chunk)]);
            this.markSessionSeen(client);
            const parsed = parseFrames(client.buffer);
            client.buffer = parsed.remaining;

            if (parsed.close) {
                socket.end();
                this.removeClient(client);
                return;
            }

            for (const message of parsed.messages) {
                if (message.startsWith("__PING__")) {
                    const payload = Buffer.from(message.slice("__PING__".length), "base64");
                    socket.write(encodePongFrame(payload));
                    continue;
                }
                void this.handleClientMessage(client, message);
            }
        });

        socket.on("close", () => {
            this.removeClient(client);
        });

        socket.on("error", () => {
            this.removeClient(client);
        });
    }

    private async handleClientMessage(client: ClientConnection, rawMessage: string): Promise<void> {
        const parsed = safeJsonParse(rawMessage);
        if (!parsed || typeof parsed !== "object") {
            return;
        }

        const command = (parsed as { command?: unknown }).command;
        const argsValue = (parsed as { args?: unknown }).args;
        const args =
            Array.isArray(argsValue) && argsValue.every((item) => typeof item === "string")
                ? argsValue
                : [];

        if (typeof command !== "string" || !command.trim()) {
            return;
        }
        const normalizedCommand = command.trim().toLowerCase();
        if (normalizedCommand.length > LIMITS.MAX_COMMAND_LENGTH) {
            return;
        }
        if (args.length > LIMITS.MAX_ARGS_COUNT) {
            return;
        }
        if (args.some((arg) => arg.length > LIMITS.MAX_ARG_LENGTH)) {
            return;
        }

        if (client.role !== "admin") {
            return;
        }
        if (!this.allowAdminCommand(client)) {
            this.opsMetrics?.recordAdminRateLimitDrop();
            this.bumpSessionRateLimitDrop(client);
            this.log({
                message: "admin_command_rate_limited",
                streamId: client.streamId,
                role: client.role,
                command: normalizedCommand,
                windowMs: LIMITS.ADMIN_COMMAND_RATE_LIMIT_WINDOW_MS,
                limit: LIMITS.ADMIN_COMMAND_RATE_LIMIT_COUNT
            });
            return;
        }
        this.bumpSessionCommandCount(client);
        this.enqueueAdminCommand(client, normalizedCommand, args);
    }

    private allowAdminCommand(client: ClientConnection): boolean {
        const now = Date.now();
        const windowAge = now - client.adminWindowStartMs;
        if (windowAge >= LIMITS.ADMIN_COMMAND_RATE_LIMIT_WINDOW_MS) {
            client.adminWindowStartMs = now;
            client.adminWindowCount = 0;
        }

        if (client.adminWindowCount >= LIMITS.ADMIN_COMMAND_RATE_LIMIT_COUNT) {
            return false;
        }

        client.adminWindowCount += 1;
        return true;
    }

    private removeClient(client: ClientConnection): void {
        if (!this.clients.delete(client)) {
            return;
        }
        this.opsMetrics?.recordWsDisconnected();
        if (client.sessionId) {
            this.adminSessions.delete(client.sessionId);
            this.emitAdminPresence(client.streamId);
        }
        if (this.runtimeManager) {
            const remaining = this.getStreamConnectionCount(client.streamId);
            this.runtimeManager.setComponentStatus(client.streamId, {
                id: "ws-transport",
                kind: "transport",
                level: "ok",
                message: remaining > 0 ? "websocket clients active" : "no websocket clients",
                updatedAt: new Date().toISOString(),
                details: {
                    connectedClientsForStream: remaining
                }
            });
            if (remaining === 0) {
                this.runtimeManager.setStreamState(client.streamId, "stopped", "no active websocket clients");
            }
        }
        this.updateRuntimeMetrics();
    }

    private ensureAdminSession(client: ClientConnection): void {
        if (!client.sessionId) {
            return;
        }

        const now = new Date().toISOString();
        this.adminSessions.set(client.sessionId, {
            sessionId: client.sessionId,
            streamId: client.streamId,
            tokenId: client.tokenId,
            connectedAt: now,
            lastSeenAt: now,
            commandCount: 0,
            rateLimitDrops: 0,
            backpressureDrops: 0,
            remoteAddress: (client.socket as { remoteAddress?: string }).remoteAddress ?? null
        });
        this.emitAdminPresence(client.streamId);
    }

    private markSessionSeen(client: ClientConnection): void {
        if (!client.sessionId) {
            return;
        }
        const session = this.adminSessions.get(client.sessionId);
        if (!session) {
            return;
        }
        session.lastSeenAt = new Date().toISOString();
    }

    private bumpSessionCommandCount(client: ClientConnection): void {
        if (!client.sessionId) {
            return;
        }
        const session = this.adminSessions.get(client.sessionId);
        if (!session) {
            return;
        }
        session.commandCount += 1;
        session.lastSeenAt = new Date().toISOString();
    }

    private bumpSessionRateLimitDrop(client: ClientConnection): void {
        if (!client.sessionId) {
            return;
        }
        const session = this.adminSessions.get(client.sessionId);
        if (!session) {
            return;
        }
        session.rateLimitDrops += 1;
        session.lastSeenAt = new Date().toISOString();
    }

    private bumpSessionBackpressureDrop(client: ClientConnection): void {
        if (!client.sessionId) {
            return;
        }
        const session = this.adminSessions.get(client.sessionId);
        if (!session) {
            return;
        }
        session.backpressureDrops += 1;
        session.lastSeenAt = new Date().toISOString();
    }

    private enqueueAdminCommand(
        client: ClientConnection,
        command: string,
        args: string[]
    ): void {
        if (this.adminCommandQueue.length >= LIMITS.ADMIN_COMMAND_QUEUE_MAX) {
            this.queueDroppedCount += 1;
            this.opsMetrics?.recordAdminBackpressureDrop();
            this.bumpSessionBackpressureDrop(client);
            this.log({
                message: "admin_command_backpressure_dropped",
                streamId: client.streamId,
                role: client.role,
                command,
                queueDepth: this.adminCommandQueue.length,
                queueMax: LIMITS.ADMIN_COMMAND_QUEUE_MAX,
                droppedCount: this.queueDroppedCount
            });
            return;
        }

        this.adminCommandQueue.push({
            streamId: client.streamId,
            tokenId: client.tokenId,
            sessionId: client.sessionId,
            command,
            args,
            actorRole: client.role
        });
        this.drainAdminCommandQueue();
    }

    private drainAdminCommandQueue(): void {
        if (this.queueDrainRunning) {
            return;
        }
        this.queueDrainRunning = true;
        void this.runDrainLoop();
    }

    private async runDrainLoop(): Promise<void> {
        try {
            while (this.adminCommandQueue.length > 0) {
                const item = this.adminCommandQueue.shift();
                if (!item) {
                    continue;
                }
                await this.eventBus.emit(
                    createEvent("admin_command", {
                        streamId: item.streamId,
                        tokenId: item.tokenId,
                        sessionId: item.sessionId,
                        command: item.command,
                        args: item.args,
                        actorRole: item.actorRole
                    })
                );
            }
        } finally {
            this.queueDrainRunning = false;
            if (this.adminCommandQueue.length > 0) {
                this.drainAdminCommandQueue();
            }
        }
    }

    private getClientStats(): ConnectedClientStats {
        let admin = 0;
        let viewer = 0;
        for (const client of this.clients) {
            if (client.role === "admin") {
                admin += 1;
            } else if (client.role === "viewer") {
                viewer += 1;
            }
        }
        return {
            total: this.clients.size,
            admin,
            viewer
        };
    }

    private getStreamConnectionCount(streamId: string): number {
        let count = 0;
        for (const client of this.clients) {
            if (client.streamId === streamId) {
                count += 1;
            }
        }
        return count;
    }

    private updateRuntimeMetrics(): void {
        if (!this.runtimeManager) {
            return;
        }
        this.runtimeManager.setConnectedClients(this.getClientStats());
    }

    private emitAdminPresence(streamId: string): void {
        if (!this.onAdminPresenceChanged) {
            return;
        }
        this.onAdminPresenceChanged(streamId, this.getAdminSessionCountByStream(streamId));
    }

    private getAdminSessionCountByStream(streamId: string): number {
        let count = 0;
        for (const session of this.adminSessions.values()) {
            if (session.streamId === streamId) {
                count += 1;
            }
        }
        return count;
    }
}
