import {asyncScheduler, defer, exhaustMap, finalize, Observable, ObservableInput, OperatorFunction, scheduled, Subject, throttle} from "rxjs"

export function exhaustMapWithTrailing<T, R>(project: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R> {
    return (source): Observable<R> =>
        defer(() => {
            const release = new Subject<void>()
            return source.pipe(
                throttle(() => release, {
                    leading: true,
                    trailing: true,
                }),
                exhaustMap((value, index) =>
                    scheduled(project(value, index), asyncScheduler).pipe(
                        finalize(() => {
                            release.next()
                        }),
                    ),
                ),
            )
        })
}
