import { Inject, Injectable } from '@angular/core';
import { Store } from '@ngrx/store';
import { ApiCoreService, BEResponse } from '@qtek/core/api-core';
import { AuthUserFeature } from '@qtek/libs/auth-user';
import { MetaCoreFeature } from '@qtek/libs/meta-core';
import * as moment from 'moment';
import {
  BehaviorSubject,
  catchError,
  distinctUntilChanged,
  filter,
  interval,
  map,
  mergeMap,
  Observable,
  repeat,
  retry,
  share,
  shareReplay,
  Subscription,
  take,
  tap,
  timer,
} from 'rxjs';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import {
  BaseWebsocketMessage,
  ErrorMessageResponse,
  InitWebsocketMessageRequest,
  isInitWS,
  isInitWSResponse,
  isPingWSResponse,
  isWSErrorResponse,
  isWSUnauthorized,
  passWhenAlive,
  PingWebsocketMessageRequest,
  WEBSOCKET_BASE_URL,
} from '../models/web-socket.model';

@Injectable({
  providedIn: 'root',
})
export class WebSocketService {
  private pingCounter = 0;
  private pongCounter = 0;
  private retryCounter = 0;
  private repeatCounter = 0;
  private readonly isInitiatedWS = new BehaviorSubject(false);
  private readonly isAliveSubject = new BehaviorSubject(false);
  private readonly isAlive$ = this.isAliveSubject
    .asObservable()
    .pipe(distinctUntilChanged()); //.pipe(share()); //.pipe(shareReplay({ bufferSize: 0, refCount: true }));
  private readonly paramsSubject = new BehaviorSubject<{
    res?: {
      prs?: { prsId: string };
      cmp?: { _id: string };
      wsTkn?: string;
    };
  }>({});
  private readonly params$ = this.paramsSubject.asObservable();
  private readonly retryOnError = (error: Error, retryCount: number) => {
    console.log(
      'retry',
      `Retry number ${this.retryCounter} time ${moment().format(
        'HH:mm:ss:SSS'
      )}`
    );
    this.pingSubscription?.unsubscribe();
    this.pingCounter = 0;
    this.pongCounter = 0;
    this.isAliveSubject.next(false);
    this.isInitiatedWS.next(false);

    return timer(Math.min(20, Math.pow(this.retryCounter++ + 1, 2)) * 1000);
  };
  private readonly repeatOnClose = (retryCount: number) => {
    console.log(
      'repeat',
      `Repeat number ${this.repeatCounter} time ${moment().format(
        'HH:mm:ss:SSS'
      )}`
    );
    this.pingSubscription?.unsubscribe();
    this.pingCounter = 0;
    this.pongCounter = 0;
    this.isAliveSubject.next(false);
    this.isInitiatedWS.next(false);
    return timer(Math.min(20, Math.pow(this.repeatCounter++ + 1, 2)) * 1000);
  };
  private socketSubject: WebSocketSubject<BaseWebsocketMessage> | null = null;
  private reconnect$: Observable<BaseWebsocketMessage> | null = null;
  private reconnectSubscription: Subscription | null = null;
  private pingSubscription: Subscription | null = null;
  private initiatedWSSubscription: Subscription | null = null;
  private readonly wsSocketSubject =
    new BehaviorSubject<WebSocketSubject<BaseWebsocketMessage> | null>(null);
  private readonly wsSocket$ = this.wsSocketSubject.asObservable();

  public init$ = this.isAlive$.pipe(filter(isAlive => isAlive));
  private refreshWSToken$: Observable<BEResponse<{ wsTkn: string }>> = null;

  constructor(
    @Inject(WEBSOCKET_BASE_URL) private webSocketBaseUrl: string,
    private store: Store,
    private apiCoreService: ApiCoreService
  ) {}

  initWebsocketConnection(
    pingPongDelay$: Observable<number>,
    errorHandler: (message: ErrorMessageResponse) => any,
    options?: {
      prsdId?: string;
      cmpId?: string;
      wsTkn?: string;
    }
  ): void {
    const option: {
      res?: {
        prs?: { prsId: string };
        cmp?: { _id: string };
        wsTkn?: string;
      };
    } = {
      ...(!!options?.prsdId || !!options?.wsTkn || !!options?.cmpId
        ? {
            res: {
              ...(options.prsdId ? { prs: { prsId: options?.prsdId } } : {}),
              ...(options.cmpId ? { cmp: { _id: options?.cmpId } } : {}),
              ...(options.wsTkn ? { wsTkn: options?.wsTkn } : {}),
            },
          }
        : {}),
    };

    if (
      this.isAliveSubject.getValue() &&
      option.res?.wsTkn === this.paramsSubject.getValue().res?.wsTkn &&
      option.res?.cmp === this.paramsSubject.getValue().res?.cmp &&
      option.res?.prs === this.paramsSubject.getValue().res?.prs
    ) {
      return;
    }

    this.socketSubject?.complete();
    this.socketSubject?.unsubscribe();

    this.initiatedWSSubscription?.unsubscribe();

    this.paramsSubject.next(option);

    this.isAliveSubject.next(false);

    this.socketSubject = webSocket<BaseWebsocketMessage>({
      url: `${this.webSocketBaseUrl}/e`,
      openObserver: {
        next: () => {
          this.refreshWSToken$ = null;
          this.isInitiatedWS.next(true);
          this.isAliveSubject.next(false);
          this.pingSubscription?.unsubscribe();
          this.pingSubscription = pingPongDelay$
            .pipe(
              filter(delay => delay > 0),
              take(1),
              mergeMap(time => interval(time * 1000)),
              tap(() => {
                if (this.pingCounter !== this.pongCounter) {
                  this.socketSubject.complete();
                  this.socketSubject.unsubscribe();
                  return;
                }
                const pingMessage: PingWebsocketMessageRequest = {
                  op: 'ping',
                  id: this.pingCounter,
                };
                this.pingCounter++;
                this.socketSubject.next(pingMessage);

                this.repeatCounter = 0;
                this.retryCounter = 0;
              })
            )
            .subscribe();
        },
      },

      closeObserver: {
        next: closeEvent => {
          console.log('next closeObserver', closeEvent);
          this.isAliveSubject.next(false);
          this.isInitiatedWS.next(false);
        },
        error: () => {
          console.log('error closeObserver');
          this.isAliveSubject.next(false);
          this.isInitiatedWS.next(false);
        },
        complete: () => {
          console.log('complete closeObserver');
          this.isAliveSubject.next(false);
          this.isInitiatedWS.next(false);
        },
      },
    });

    this.reconnectSubscription?.unsubscribe();
    this.reconnect$ = this.socketSubject.asObservable().pipe(
      map(message => {
        if (isWSErrorResponse(message)) {
          if (isWSUnauthorized(message) && message.ent !== 'i18n') {
            if (this.refreshWSToken$ === null) {
              this.refreshWSToken$ = this.apiCoreService
                .getWsSessionInitToken()
                .pipe(shareReplay(1));
            }
            this.refreshWSToken$
              .pipe(
                tap(({ res }) =>
                  this.initWebsocketConnection(
                    this.store.select(MetaCoreFeature.selectWsPingPong),
                    errorHandler,
                    { wsTkn: res.wsTkn }
                  )
                ),

                catchError(() => {
                  return this.store.select(AuthUserFeature.isAuthorized).pipe(
                    filter(isAuthorized => isAuthorized),
                    tap(isAuthorized => {
                      this.initWebsocketConnection(
                        this.store.select(MetaCoreFeature.selectWsPingPong),
                        errorHandler,
                        {}
                      );
                    })
                  );
                }),
                map(() => message)
              )
              .pipe(take(1))
              .subscribe();
          } else {
            errorHandler(message);
          }
        }

        if (isInitWS(message) && isInitWSResponse(message)) {
          this.isAliveSubject.next(true);
        }

        if (isPingWSResponse(message)) {
          this.pongCounter++;
        }

        return message;
      }),
      repeat({ delay: this.repeatOnClose }),
      retry({ delay: this.retryOnError }),
      share()
    );

    this.initiatedWSSubscription = this.isInitiatedWS
      .asObservable()
      .pipe(
        filter(isInitiated => isInitiated),
        tap(() => {
          const initMessage: InitWebsocketMessageRequest = {
            op: 'init',
            v: 'v1.22.7',
            ...(option ? { ...option } : {}),
          };

          this.socketSubject?.next(initMessage);
        })
      )
      .subscribe();

    this.wsSocketSubject.next(this.socketSubject);

    this.reconnectSubscription = this.reconnect$.subscribe();
  }

  getWebSocket$(): Observable<BaseWebsocketMessage> {
    return this.wsSocket$.pipe(
      passWhenAlive(this.getWebSocketIsAlive$()),
      mergeMap(subject => subject)
    );
  }

  getWebSocketSubject(): Observable<WebSocketSubject<BaseWebsocketMessage>> {
    return this.wsSocket$;
  }

  getWebSocketIsAlive$(): Observable<boolean> {
    return this.isAlive$;
  }
}
