import {Observable, Subject, ReplaySubject, Subscription, firstValueFrom} from "rxjs"

export type TransferListItem = ArrayBuffer
export type MsgEvent = {data: any}

export interface IThreadWorker {
    postMessage(message: any, transfer?: TransferListItem[]): void
    terminate(): void
    observableFromEvent(event: string): Observable<MsgEvent>
}

type TimeoutId = number | NodeJS.Timeout
namespace TimeoutUtils {
    export function set(handler: () => void, delay: number): TimeoutId {
        return setTimeout(handler, delay)
    }

    export function clear(id: TimeoutId): void {
        return clearTimeout(id as NodeJS.Timeout)
    }
}

type WorkerId = number

interface WorkerJob<MessageT, ResponseT> {
    message: MessageT
    transferList?: TransferListItem[]
    response$: Subject<ResponseT>
}

class MonitoredWorker {
    private _idle = true
    private idleTimer: number | NodeJS.Timeout | null = null
    readonly idle$ = new Subject<void>()
    readonly timeout$ = new Subject<void>()
    private currentJob: WorkerJob<unknown, unknown> | null = null

    constructor(
        readonly id: WorkerId,
        private worker: IThreadWorker,
        readonly name: string,
        public maxIdleTime = 5000,
    ) {}

    postJob(job: WorkerJob<unknown, unknown>) {
        const messageSubscription: Subscription = this.worker.observableFromEvent("message").subscribe(
            (event: MsgEvent) => {
                messageSubscription.unsubscribe()
                this.currentJob = null
                this.setIdle(true)
                if (event.data instanceof Error) {
                    console.error("Exception in worker: ", event.data)
                    job.response$.error(event.data)
                } else {
                    job.response$.next(event.data)
                }
                job.response$.complete()
            },
            (error: any) => {
                messageSubscription.unsubscribe()
                this.currentJob = null
                this.setIdle(true)
                console.error("WebWorker error:", error)
                job.response$.error(error)
            },
        )
        this.setIdle(false)
        this.currentJob = job
        this.worker.postMessage(job.message, job.transferList)
    }

    private setIdle(idle: boolean) {
        if (idle && !this._idle) {
            this._idle = idle
            if (this.idleTimer == null) {
                this.idleTimer = TimeoutUtils.set(() => {
                    this.idleTimer = null
                    this.timeout$.next()
                }, this.maxIdleTime)
            }
            this.idle$.next()
        } else if (!idle && this._idle) {
            this._idle = idle
            if (this.idleTimer != null) {
                TimeoutUtils.clear(this.idleTimer)
                this.idleTimer = null
            }
        }
    }

    get idle() {
        return this._idle
    }

    terminate() {
        if (this.idleTimer != null) {
            TimeoutUtils.clear(this.idleTimer)
            this.idleTimer = null
        }
        if (this.currentJob) {
            this.currentJob.response$.error("Worker terminated")
        }
        this.idle$.complete()
        this.timeout$.complete()
        this.worker.terminate()
    }
}

export class WebAssemblyWorkerManager {
    private jobQueue: WorkerJob<any, any>[] = []
    private workers = new Map<WorkerId, MonitoredWorker>()
    private initialWorkers: number
    private maxWorkers: number

    constructor(
        private spawnNewThreadWorker: (name: string) => IThreadWorker,
        isMobileDevice = false,
    ) {
        this.initialWorkers = isMobileDevice ? 1 : 2
        this.maxWorkers = isMobileDevice ? 1 : 6
    }

    private spawnNewWorker(): MonitoredWorker {
        let workerId = 0
        while (this.workers.has(workerId)) ++workerId // find free ID
        const name = `wasm-worker-${workerId}`
        const _worker = this.spawnNewThreadWorker(name)
        const worker = new MonitoredWorker(workerId, _worker, name)
        // console.log(`Starting worker: ${worker.name}`)
        worker.timeout$.subscribe(() => {
            this.terminateWorker(worker)
        })
        worker.idle$.subscribe(() => {
            this.processJobQueue(worker)
        })
        this.workers.set(workerId, worker)
        return worker
    }

    private getIdleWorker(): MonitoredWorker | null {
        for (const worker of this.workers.values()) {
            if (worker.idle) {
                return worker
            }
        }
        return null
    }

    private terminateWorker(worker: MonitoredWorker) {
        // console.log(`Terminating worker: ${worker.name}`);
        this.workers.delete(worker.id)
        worker.terminate()
    }

    private processJobQueue(worker: MonitoredWorker | null = null) {
        const job = this.jobQueue.shift()
        if (!job) return
        // we have jobs left...
        if (!worker) {
            worker = this.getIdleWorker()
        }
        if (!worker && this.workers.size < this.maxWorkers) {
            worker = this.spawnNewWorker()
        }
        if (worker) {
            worker.postJob(job)
        } else {
            // all workers are occupied, put the job back and try again later
            this.jobQueue.unshift(job)
        }
    }

    enqueueJob(job: WorkerJob<any, any>): void {
        this.jobQueue.push(job)
        this.processJobQueue()
    }

    invokeFunction<ResponseT>(operation: string, functionName: string, args: any[], transfers?: TransferListItem[]): Observable<ResponseT> {
        const response$: ReplaySubject<ResponseT> = new ReplaySubject<ResponseT>(1)
        this.enqueueJob({
            message: {operation, functionName, args},
            transferList: transfers,
            response$,
        })
        return response$
    }

    startInitialWorkers(): void {
        while (this.workers.size < this.initialWorkers) {
            const worker = this.spawnNewWorker()
            this.processJobQueue(worker)
        }
    }

    terminateJobsAndWorkers(): void {
        for (const workerId of [...this.workers.keys()]) {
            const worker = this.workers.get(workerId)
            if (worker) this.terminateWorker(worker)
        }
        const tmpJobs = this.jobQueue
        this.jobQueue = []
        for (const job of tmpJobs) {
            job.response$.error("Cancelled")
        }
    }

    get activeWorkers() {
        let count = 0
        for (const worker of this.workers.values()) {
            if (!worker.idle) {
                ++count
            }
        }
        return count
    }

    invokeFunctionAsPromise<ResponseT>(operation: string, functionName: string, args: unknown[], transfers?: TransferListItem[]): Promise<ResponseT> {
        const response$: ReplaySubject<ResponseT> = new ReplaySubject<ResponseT>(1)
        this.enqueueJob({
            message: {operation, functionName, args},
            transferList: transfers,
            response$,
        })
        return firstValueFrom(response$)
    }
}
