import { Injectable, NgZone } from '@angular/core';
import { Observable } from 'rxjs';
import { VkAuthenticationService } from '@vk/authentication';
import { mergeMap } from 'rxjs/operators';

export enum EventType {
  Closed,
  NoContent,
  Data,
  RawData,
}

export type EventStreamResponse<TResponse> = DataEvent<TResponse> | RawDataEvent | NoContentEvent | ClosedEvent;

export interface DataEvent<TResponse> {
  readonly type: EventType.Data;
  readonly status: number;
  readonly data: TResponse;
}

export interface RawDataEvent {
  readonly type: EventType.RawData;
  readonly status: number;
  readonly data: string;
}

export interface NoContentEvent {
  readonly type: EventType.NoContent;
  readonly status: number;
}

export interface ClosedEvent {
  readonly type: EventType.Closed;
}

@Injectable({
  providedIn: 'root'
})
export class EventStreamService {

  constructor(private readonly zone: NgZone,
              private readonly authenticationService: VkAuthenticationService) {
  }

  public openEventStream<TResponse>(url: string, includeRawData: boolean = false): Observable<EventStreamResponse<TResponse>> {
    return this.authenticationService.loadUser().pipe(
      mergeMap(user => this.createEventStream<TResponse>(url, user.token, includeRawData))
    );
  }

  private createEventStream<TResponse>(url: string, authorizationToken: string, includeRawData: boolean): Observable<EventStreamResponse<TResponse>> {
    return new Observable<EventStreamResponse<TResponse>>(subscriber => {
      const eventReader = makeEventReader<TResponse>();
      const xhr = new XMLHttpRequest();

      xhr.open('GET', url, true);
      xhr.setRequestHeader('Authorization', `Bearer ${authorizationToken}`);
      xhr.setRequestHeader('Accept', 'text/event-stream');

      if ('responseType' in xhr) {
        xhr.responseType = 'text';
      }

      xhr.onprogress = () => {
        const { status, responseText } = xhr;

        this.zone.run(() => {
          if (status === 204) {
            subscriber.next({ type: EventType.NoContent, status });

          } else if (status >= 200 && status < 300) {
            for (const data of eventReader(responseText)) {
              subscriber.next({ type: EventType.Data, status, data });
            }
          }

          if (includeRawData) {
            subscriber.next({ type: EventType.RawData, status, data: responseText });
          }
        });
      };

      xhr.onerror = e => {
        subscriber.error(e);
      };

      xhr.onloadend = () => {
        subscriber.next({ type: EventType.Closed });
        subscriber.complete();
      };

      xhr.send();

      return () => {
        xhr.abort();
      };
    });
  }

}

function makeEventReader<TResponse>(): (responseText: string) => IterableIterator<TResponse> {
  let position = 0;

  return function*(responseText) {
    const rawChunk = responseText.slice(position);
    const endLinePos = rawChunk.lastIndexOf('\n\n');

    if (endLinePos === -1) {
      return;
    }

    position += endLinePos;
    const chunk = rawChunk.slice(0, endLinePos);

    const lines = chunk
      .split('\n\n')
      .filter(it => it);

    for (const line of lines) {
      const json = line.slice('data:'.length);

      try {
        const data = JSON.parse(json);
        yield data;
      } catch (e) {
        // TODO: implement error handling
        console.error('unable to parse event', json);
      }
    }
  };
}
