import {Observable, Subject, ReplaySubject, firstValueFrom} from "rxjs"
import {WebSocket, createWebSocket, MessageEvent} from "@runtime-env-api/websocket"
import {Blob} from "@runtime-env-api/buffer"

function extractBlobs(data: any): [message: any, blobs: any[]] {
    const blobs: any[] = []
    const check_key = (k: any) => {
        if (typeof k !== "string") throw new Error(`Dict keys not a string: ${k}`)
        return k
    }
    const traverse = (x: any): any => {
        if (typeof x === "object") {
            if (Array.isArray(x)) {
                return x.map(traverse)
            } else if (ArrayBuffer.isView(x) || x instanceof ArrayBuffer || x instanceof Blob) {
                for (let idx = 0; idx < blobs.length; idx++) {
                    if (blobs[idx] === x) {
                        return {$blob: idx}
                    }
                }
                const idx = blobs.length
                blobs.push(x)
                return {$blob: idx}
            } else if (x === null) {
                return null
            } else {
                const ret: any = {}
                for (const k in x) {
                    ret[check_key(k)] = traverse(x[k])
                }
                return ret
            }
        } else {
            return x
        }
    }
    return [traverse(data), blobs]
}

function resolveBlobs(msg: any, blobs: ArrayBufferView[]) {
    const traverse = (x: any): any => {
        if (typeof x === "object") {
            if (Array.isArray(x)) {
                return x.map(traverse)
            } else if (x === null) {
                return x
            } else if ("$blob" in x) {
                return blobs[x["$blob"]]
            } else {
                const ret: any = {}
                for (const k in x) {
                    ret[k] = traverse(x[k])
                }
                return ret
            }
        } else {
            return x
        }
    }
    return traverse(msg)
}

export class WebSocketMessaging {
    static connect(url: string, maxPayload?: number): Observable<WebSocketMessaging> {
        const socket = createWebSocket(url, maxPayload ? {maxPayload: maxPayload} : undefined)
        const connect$ = new Subject<WebSocketMessaging>()
        socket.onopen = (event: any) => {
            connect$.next(new WebSocketMessaging(socket))
            connect$.complete()
        }
        socket.onerror = (event: any) => {
            connect$.error(event)
            connect$.complete()
        }
        return connect$
    }

    private transactions: Subject<any>[] = []

    message$ = new Subject<any>()
    close$ = new Subject<void>()

    destroy() {
        this.socket.close()
        this.message$.complete()
    }

    get connected() {
        return this.socket.readyState === this.socket.OPEN
    }

    send(data: any): void {
        const [msg, blobs] = extractBlobs(data)
        for (let idx = 0; idx < blobs.length; idx++) {
            this.socket.send(JSON.stringify({$send_blob: idx}))
            this.socket.send(blobs[idx])
        }
        this.socket.send(JSON.stringify(msg))
    }

    doTransaction(msg: any) {
        const response$ = new ReplaySubject<any>()
        this.transactions.push(response$)
        this.send(msg)
        return response$
    }

    private recvAwaitingBlob = false
    private recvBlobs: ArrayBufferView[] = []

    constructor(private socket: WebSocket) {
        socket.binaryType = "arraybuffer"
        socket.onerror = this.onError.bind(this)
        socket.onclose = this.onClose.bind(this)
        socket.onmessage = this.onMessage.bind(this)
    }

    private dispatchMsg(msg: any) {
        const transaction = this.transactions.shift()
        if (transaction) {
            transaction.next(msg)
            transaction.complete()
        } else {
            this.message$.next(msg)
        }
    }

    private dispatchError(err: any) {
        const transaction = this.transactions.shift()
        if (transaction) {
            transaction.error(err)
            transaction.complete()
        } else {
            throw new Error(err)
        }
    }

    private onMessage(event: MessageEvent) {
        let msg = event.data
        if (msg instanceof ArrayBuffer) {
            if (!this.recvAwaitingBlob) {
                this.dispatchError("Expecting JSON, got binary blob!")
                return
            }
            this.recvBlobs.push(new Uint8Array(msg))
            this.recvAwaitingBlob = false
        } else if (ArrayBuffer.isView(msg)) {
            if (!this.recvAwaitingBlob) {
                this.dispatchError("Expecting JSON, got binary blob!")
                return
            }
            this.recvBlobs.push(new Uint8Array(msg.buffer, msg.byteOffset, msg.byteLength))
            this.recvAwaitingBlob = false
        } else if (msg instanceof Blob) {
            this.dispatchError("Got unexpected binary blob type!")
            return
        } else {
            if (this.recvAwaitingBlob) {
                this.dispatchError("Expecting binary blob, got JSON!")
                return
            }
            msg = JSON.parse(msg as string)
            if (typeof msg === "object" && "$send_blob" in msg) {
                const idx = msg["$send_blob"] as number
                if (idx !== this.recvBlobs.length) {
                    this.dispatchError("Got binary blob out of sequence!")
                    return
                }
                this.recvAwaitingBlob = true
            } else {
                msg = resolveBlobs(msg, this.recvBlobs)
                this.recvBlobs = []
                this.dispatchMsg(msg)
            }
        }
    }

    private onError(event: any) {
        this.dispatchError(event)
    }

    private onClose(event: any) {
        while (true) {
            const transaction = this.transactions.shift()
            if (!transaction) break
            transaction.error("Connection closed")
            transaction.complete()
        }
        this.message$.complete()
        this.close$.next()
        this.close$.complete()
    }
}

export class RemoteMessaging {
    close$: Promise<void>
    responsePending = false

    constructor(
        private messaging: WebSocketMessaging,
        private remoteName: string,
    ) {
        this.close$ = firstValueFrom(messaging.close$)
        messaging.message$.subscribe((msg) => console.log("Message outside transaction:", msg))
    }

    disconnect() {
        this.messaging.destroy()
    }

    doTransaction(msg: any): Promise<any> {
        this.responsePending = true
        return firstValueFrom(this.messaging.doTransaction(msg)).then((resp: any) => {
            this.responsePending = false
            if (resp?.response === "ok") {
                return resp
            } else {
                const msg = `${this.remoteName} error:` + resp?.message
                console.error(msg)
                throw new Error(msg)
            }
        })
    }
}
