import { Injectable } from '@angular/core';
import { IMessage, RxStomp } from '@stomp/rx-stomp';
import { Observable, filter, map, shareReplay } from 'rxjs';
import { STREAM_URL } from 'src/environments/url';

@Injectable({
  providedIn: 'root'
})
export class ParameterRxStompService extends RxStomp {
  
  //TODO refactor XxxxRxStompService in one service using an injection token TOPIC
  #stream!: Observable<IMessage>;

  constructor() {
    super();
    this.#stream = this.watch(STREAM_URL.PARAMETER_TOPIC).pipe(shareReplay({bufferSize: 1, refCount: true}));
  }

  public connect<T>(messageType?: string): Observable<T> {
    return this.#stream
      .pipe(
        map((message: IMessage) => JSON.parse(message.body)),
        filter((data: {type: string}) => (!!messageType && data.type === messageType) || !messageType),
        map(data => data as T),
      );
  }
}
