import {Injectable} from '@angular/core';
import {BehaviorSubject, Observable, OperatorFunction, Subscription} from 'rxjs';
import {environment} from 'environments/environment';
import {EventSourcePolyfill} from 'event-source-polyfill';
import {UserService} from 'security/services/user/user.service';
import {ApiUserDataInterface} from 'interfaces/api/user-data.interface';
import {filter, map} from 'rxjs/operators';

export interface MercureInterface {
    token: string;
    topics: string[];
}

interface MercureMessageEvent<DataType = any> {
    content: DataType;
    type: MercureMessageType;
}

export enum MercureMessageType {
    Push = 'push',
    Notification = 'notification',
    System = 'system',
}

@Injectable({
    providedIn: 'root',
})
export class MercureService {
    private eventSource?: EventSource;

    private messageSubject = new BehaviorSubject<MessageEvent | undefined>(undefined);

    constructor(userService: UserService) {
        userService.onUserDataChange.subscribe(user => this.handleUserDataChange(user));
    }

    public subscribe<DataType>(type: MercureMessageType, fn: (data: DataType) => void): Subscription {
        return this.messageSubject
            .pipe(
                filter(event => undefined !== event) as OperatorFunction<MessageEvent | undefined, MessageEvent>,
                map((event?: MessageEvent) => JSON.parse(event.data)),
                filter((event: MercureMessageEvent) => type === event.type)
            ).subscribe((event: MercureMessageEvent) => fn(event.content));
    }

    public close(): void {
        const eventSource: EventSource | undefined = this.eventSource;

        if (undefined === eventSource) {
            return;
        }

        eventSource.close();
    }

    private subscribeToEventSource<EventType extends keyof EventSourceEventMap>(token: string, topics: string[], fn: (event: EventSourceEventMap[EventType]) => void): Subscription {
        // const eventSource = this.eventSource = this.getOrCreateDataSource(token, topics);

        return new Observable<EventSourceEventMap[EventType]>(observer => {
            // eventSource.addEventListener('message', (event) => observer.next(event));
            // eventSource.addEventListener('error', (event) => console.error(event));
        }).subscribe(event => fn(event));
    }

    private getOrCreateDataSource<DataType>(token: string, topics: string[]): EventSource {
        const eventSourceUrl = new URL(environment.mercureHub);

        topics.forEach(topic => eventSourceUrl.searchParams.append('topic', topic));

        return new EventSourcePolyfill(eventSourceUrl.toString(), {headers: {'Authorization': `Bearer ${token}`}});
    }

    private handleUserDataChange(user: ApiUserDataInterface): void {
        this.close(); // Ensure closing any open connections
        this.subscribeToEventSource<'message'>(user.mercure.token, user.mercure.topics, (event: MessageEvent) => {
            if (!this.isMercureMessageEvent(event)) {
                return;
            }

            this.messageSubject.next(event);
        });
    }

    private isMercureMessageEvent(event: any | MessageEvent): event is MercureMessageEvent {
        return event.hasOwnProperty('data') && event.hasOwnProperty('type');
    }
}
