import { Injectable } from '@angular/core';
import { merge, Observable, Subject, throwError } from 'rxjs';
import { catchError, delay, filter, takeUntil } from 'rxjs/operators';
import { UserDataModel } from 'src/app/shared/models/user-data.model';
import { UserService } from '../user/user.service';
import { ConsolidatorSocket } from './consolidator-socket';
import { UserSocket } from './user-socket';

@Injectable({
  providedIn: 'root',
})
export class WebsocketService {
  user: UserDataModel;

  private destroy$: Subject<void> = new Subject();
  private sockets: Observable<any>;
  agencySocket: UserSocket;
  consolidatorSocket: ConsolidatorSocket;

  constructor(private userService: UserService) {
    this.initUserCurrentValue();
  }

  initUserCurrentValue() {
    this.userService
      .getCurrentUserObservable()
      .pipe(filter((user: any) => user && user?.user?.wp))
      .subscribe((user: UserDataModel) => {
        this.user = user;
      });
  }

  authUser() {
    const user = [5, this.userService.currentUserValue.user.ws];
    this.sendDataByInitConectionWebsocket(user);
  }

  authConsolidator() {
    const user = [5, 'consolidator10'];
    this.sendDataByInitConectionWebsocket(user);
  }

  getSocket(): Observable<any> {
    if (this.sockets) {
      return this.sockets;
    }

    this.consolidatorSocket = new ConsolidatorSocket(this.userService);
    this.agencySocket = new UserSocket(this.userService);

    this.sockets = merge(
      this.consolidatorSocket.fromEvent('message').pipe(
        catchError((error, obsOriginal) => {
          if (this.user && this.user?.user?.wp) {
            return obsOriginal.pipe(delay(5000));
          }
          return throwError(error);
        }),
        takeUntil(this.destroy$)
      ),
      this.agencySocket.fromEvent('message').pipe(
        catchError((error, obsOriginal) => {
          if (this.user && this.user?.user?.wp) {
            return obsOriginal.pipe(delay(5000));
          }
          return throwError(error);
        }),
        takeUntil(this.destroy$)
      )
    );

    return this.sockets;
  }

  sendDataByInitConectionWebsocket(connector) {
    //this.consolidatorSocket.emit(connector);
    //this.agencySocket.emit(connector);
  }

  closedWS() {
    if (this.sockets) {
      this.destroy$.next();
      this.destroy$.complete();
    }
  }
}
