import _ from 'lodash'
import { StreamerMessageType } from '../../tastyworks/account-streamer'
import type Logger from '../../tastyworks/logger'
import { EndlessExponentialRetry } from '../../tastyworks/util/exponential-retry'
import type { JsonMap, JsonValue } from '../../tastyworks/util/json'
import { JsonBuilder, JsonHelper } from '../../tastyworks/util/json'
import type { Disposer, RunLoop } from '../../tastyworks/util/runloop'
import type { WebsocketCreator } from '../../tastyworks/websocket'

export enum STREAMER_STATE {
  Open = 0,
  Closed = 1,
  Error = 2
}

enum MessageAction {
  ACCOUNT_SUBSCRIBE = 'account-subscribe',
  CONNECT = 'connect', // Send this instead of `account-subscribe` for CAWS V2
  HEARTBEAT = 'heartbeat',
  PUBLIC_WATCHLISTS_SUBSCRIBE = 'public-watchlists-subscribe',
  QUOTE_ALERTS_SUBSCRIBE = 'quote-alerts-subscribe',
  USER_MESSAGE_SUBSCRIBE = 'user-message-subscribe'
}

const HEARTBEAT_INTERVAL = 20000 // 20 seconds
// Reconnect if we don't receive something we can understand within 30 seconds.
const RECONNECT_TIMEOUT = 30000 // 30 seconds

const RECONNECT_RETRY_INITIAL_INTERVAL = 500
const RECONNECT_RETRY_MAX_INTERVAL = 5000

const MAX_ERROR_LENGTH = 50

export type StreamerStateObserver = (streamerState: STREAMER_STATE) => void

export type ReconnectObserver = () => void

export type StreamerMessageObserver = (
  messageType: StreamerMessageType,
  dataHelper: JsonHelper,
  timestamp: number,
  accountNumber: string
) => void

function removeElement<T>(array: T[], element: T): void {
  const index = array.indexOf(element)
  if (index < 0) {
    return
  }

  array.splice(index, 1)
}

export interface SessionDetailsProvider {
  getSessionToken(): string

  getSessionSource(): string

  getAccountNumbers(): string[]

  getUserExternalId(): string
}

type EventEmitter = (tag: string, data?: object) => void

const REQUEST_ID = 'request-id'

export class AccountStreamer {
  private websocket: WebSocket | null = null
  private startResolve: ((result: boolean) => void) | null = null
  private startReject: ((reason?: any) => void) | null = null
  private requestCounter = 0
  private queued: string[] = []

  private heartbeatTimerId = 0
  private reconnectTimerId = 0

  private readonly reconnectLooper: EndlessExponentialRetry

  lastCloseEvent: any = null
  lastErrorEvent: any = null
  private _streamerState: STREAMER_STATE = STREAMER_STATE.Closed

  private readonly streamerStateObservers: StreamerStateObserver[] = []

  private readonly reconnectObservers: ReconnectObserver[] = []

  private readonly messageObservers: Map<
    StreamerMessageType,
    StreamerMessageObserver[]
  > = new Map()

  private startPromise: Promise<boolean> | null = null

  private readonly requestPromises: Map<
    number,
    [(status: string) => void, (error: string) => void]
  > = new Map()

  constructor(
    readonly logger: Logger,
    private readonly runLoop: RunLoop,
    private readonly websocketCreator: WebsocketCreator,
    private readonly url: string,
    readonly sessionDetailsProvider: SessionDetailsProvider,
    private readonly emitRemoteEvent?: EventEmitter
  ) {
    /* eslint-disable @typescript-eslint/no-misused-promises */
    this.reconnectLooper = new EndlessExponentialRetry(
      this.runLoop,
      async () => {
        logger.info('AcctStream - reconnect')
        if (!window.navigator.onLine) {
          this.logger.warn('Browser is offline. No connection detected.')
          return
        }

        try {
          await this.start()
          this.reconnectObservers.forEach(observer => {
            try {
              observer()
            } catch (error) {
              this.logger.warn('AcctStream - Reconnect observer failed:', error)
            }
          })
        } catch (e) {
          logger.warn('AcctStream - Failed to reconnect', e)
          const error = String(e).substring(0, MAX_ERROR_LENGTH)
          // eslint-disable-next-line
          this.emitEvent('acct_stream_reconnect_failed', { error })
        }
      },
      RECONNECT_RETRY_INITIAL_INTERVAL,
      RECONNECT_RETRY_MAX_INTERVAL
    )
    /* eslint-enable @typescript-eslint/no-misused-promises */
  }

  emitEvent(tag: string, data: object = {}) {
    this.emitRemoteEvent?.(tag, data)
  }

  addReconnectObserver(observer: ReconnectObserver): Disposer {
    this.reconnectObservers.push(observer)

    return () => {
      removeElement(this.reconnectObservers, observer)
    }
  }

  get streamerState(): STREAMER_STATE {
    return this._streamerState
  }

  set streamerState(streamerState: STREAMER_STATE) {
    this._streamerState = streamerState

    this.streamerStateObservers.forEach(observer => {
      observer(streamerState)
    })
  }

  addStreamerStateObserver(observer: StreamerStateObserver): Disposer {
    this.streamerStateObservers.push(observer)

    return () => {
      removeElement(this.streamerStateObservers, observer)
    }
  }

  private lookupMessageObservers(
    type: StreamerMessageType
  ): StreamerMessageObserver[] {
    let observers = this.messageObservers.get(type)
    if (!observers) {
      observers = []
      this.messageObservers.set(type, observers)
    }

    return observers
  }

  addMessageObserver(
    type: StreamerMessageType,
    observer: StreamerMessageObserver
  ): Disposer {
    const observers = this.lookupMessageObservers(type)
    observers.push(observer)

    return () => {
      removeElement(observers, observer)
    }
  }

  get isOpen(): boolean {
    return this.streamerState === STREAMER_STATE.Open
  }

  get isClosed(): boolean {
    return this.streamerState === STREAMER_STATE.Closed
  }

  get isError(): boolean {
    return this.streamerState === STREAMER_STATE.Error
  }

  async start(): Promise<boolean> {
    if (this.startPromise !== null) {
      return this.startPromise
    }

    const websocket = (this.websocket = this.websocketCreator(this.url))
    this.lastCloseEvent = null
    this.lastErrorEvent = null
    websocket.addEventListener('open', this.handleOpen)
    websocket.addEventListener('close', this.handleClose)
    websocket.addEventListener('error', this.handleError)
    websocket.addEventListener('message', this.handleMessage)

    this.logger.info('AcctStream - starting')
    this.startPromise = new Promise<boolean>((resolve, reject) => {
      this.startResolve = resolve
      this.startReject = reject
    })

    return this.startPromise
  }

  stop() {
    this.streamerState = STREAMER_STATE.Closed
    this.teardown()
  }

  reconnect(): boolean {
    this.logger.info('AcctStream - schedule reconnect')
    this.reconnectLooper.schedule()
    return true
  }

  // Force disconnect and reconnect
  reload() {
    if (this.websocket === null) {
      this.reconnect()
    } else {
      this.streamerState = STREAMER_STATE.Closed
      this.teardown()
    }
  }

  private cancelHeartbeatTimer() {
    if (this.heartbeatTimerId > 0) {
      window.clearTimeout(this.heartbeatTimerId)
      this.heartbeatTimerId = 0
    }
  }

  private teardown() {
    if (this.websocket) {
      const websocket = this.websocket
      websocket.close()
      websocket.removeEventListener('open', this.handleOpen)
      websocket.removeEventListener('close', this.handleClose)
      websocket.removeEventListener('message', this.handleMessage)
      websocket.removeEventListener('error', this.handleError)

      this.websocket = null
    }

    this.startPromise = null
    this.cancelHeartbeatTimer()

    this.logger.info('AcctStream - teardown')

    this.reconnect()
  }

  readonly sendHeartbeat = () => {
    this.send(new JsonBuilder({ action: MessageAction.HEARTBEAT }))
  }

  private scheduleHeartbeatTimer() {
    if (this.heartbeatTimerId > 0) {
      // Heartbeat already scheduled
      return
    }

    this.heartbeatTimerId = window.setTimeout(
      this.sendHeartbeat,
      HEARTBEAT_INTERVAL
    )
  }

  private rescheduleReconnectTimer() {
    if (this.reconnectTimerId > 0) {
      // Reconnect already scheduled
      window.clearTimeout(this.reconnectTimerId)
      this.reconnectTimerId = 0
    }

    this.reconnectTimerId = window.setTimeout(() => {
      this.reconnectTimerId = 0
      this.logger.warn('AcctStream - missed too many heartbeats')
      this.emitEvent('acct_stream_heartbeat_miss_max')
      this.streamerState = STREAMER_STATE.Closed
      this.teardown()
    }, RECONNECT_TIMEOUT)
  }

  send(json: JsonBuilder, includeSessionToken = true): number {
    this.requestCounter += 1
    json.add(REQUEST_ID, this.requestCounter)
    json.add('source', this.sessionDetailsProvider.getSessionSource())

    if (includeSessionToken) {
      const sessionToken = this.sessionDetailsProvider.getSessionToken()
      if (!sessionToken) {
        throw new Error('sessionToken not set')
      }

      json.add('auth-token', sessionToken)
    }

    const message = JSON.stringify(json.json)
    const websocket = this.websocket
    if (websocket?.readyState !== WebSocket.OPEN) {
      // Queue up and send on open
      this.queued.push(message)
    } else {
      // Opting to check queue each time we send a message just in case.
      this.sendQueuedMessages()
      websocket.send(message)
    }

    return this.requestCounter
  }

  public subscribeTo(action: string, value?: JsonValue): number {
    const json = new JsonBuilder()
    json.add('action', action)
    if (!_.isUndefined(value)) {
      json.add('value', value)
    }
    return this.send(json)
  }

  public subscribeToUser() {
    const userExternalId = this.sessionDetailsProvider.getUserExternalId()

    if (!userExternalId) {
      return
    }

    this.subscribeTo(MessageAction.USER_MESSAGE_SUBSCRIBE, userExternalId)
  }

  public async subscribeToAccounts(): Promise<string> {
    const accountNumbers = this.sessionDetailsProvider.getAccountNumbers()
    if (accountNumbers.length === 0) {
      return Promise.reject('no account numbers')
    }

    const value: JsonValue =
      accountNumbers.length > 1 ? accountNumbers : accountNumbers[0]
    const requestId = this.subscribeTo(MessageAction.CONNECT, value)

    return new Promise<string>((resolve, reject) => {
      this.requestPromises.set(requestId, [resolve, reject])
    })
  }

  private sendQueuedMessages() {
    const queued = this.queued.slice(0, this.queued.length)
    if (queued.length === 0 || this.websocket?.readyState !== WebSocket.OPEN) {
      return
    }

    try {
      // websocket.send only fails if .readyState changes to
      // !WebSocket.OPEN in which case we don't want to continue
      // calling .send.
      const websocket = this.websocket
      queued.forEach(msg => websocket.send(msg))
      // Clean up the things we sent.
      this.queued.splice(0, queued.length)
    } catch (error) {
      // Only way we end up here is if the websocket went down.
      // Trust streamer-state observers to re-send messages when
      // the connection comes back up.
      this.logger.error('AcctStream sendQueuedMessages error', error)
      this.queued = []
    }
  }

  private readonly handleOpen = (event: Event) => {
    this.logger.info('AcctStream handleOpen', event)
    if (this.startResolve !== null) {
      this.startResolve(true)
      this.startResolve = this.startReject = null
    }

    this.reconnectLooper.cancel()

    this.sendQueuedMessages()
    this.scheduleHeartbeatTimer()
    this.rescheduleReconnectTimer()
    this.streamerState = STREAMER_STATE.Open
  }

  private readonly handleClose = (event: CloseEvent) => {
    this.logger.info('AcctStream handleClose', event)

    this.lastCloseEvent = event
    this.streamerState = STREAMER_STATE.Closed
    this.teardown()
  }

  private readonly handleError = (event: Event) => {
    if (this.websocket === null) {
      return
    }

    this.logger.warn('AcctStream error', event)
    // eslint-disable-next-line
    this.emitEvent('acct_stream_error')

    this.lastErrorEvent = event
    this.streamerState = STREAMER_STATE.Error

    if (this.startReject !== null) {
      this.startReject(new Error('Failed to connect'))
      this.startReject = this.startResolve = null
    }

    this.teardown()
  }

  private readonly handleMessage = (event: MessageEvent) => {
    let json: JsonMap | null = null

    try {
      json = JSON.parse(event.data as string) as JsonMap
      // Bump reconnect timer along if we receive _any_
      // intelligible message.
      this.rescheduleReconnectTimer()
    } catch (error) {
      this.logger.warn('AcctStream parse error', error)
      return
    }

    if (json?.results !== undefined) {
      const results: JsonMap[] = json.results as JsonMap[]
      for (const result of results) {
        if (result.action !== undefined) {
          this.handleActionMessage(result)
        } else if (result.type !== undefined) {
          this.handleTypeMessage(result)
        }
      }
    } else if (json?.action !== undefined) {
      this.handleActionMessage(json)
    } else if (json?.type !== undefined) {
      this.handleTypeMessage(json)
    } else {
      this.logger.warn('Account streamer unexpected message', json)
    }
  }

  private readonly handleActionMessage = (json: JsonMap) => {
    const action = json.action as string
    const status = json.status as string

    if (MessageAction.HEARTBEAT === action) {
      this.heartbeatTimerId = 0
      this.scheduleHeartbeatTimer()
    }

    const promiseCallbacks = this.requestPromises.get(
      json[REQUEST_ID] as number
    )
    if (promiseCallbacks) {
      const [resolve, reject] = promiseCallbacks
      if (status === 'ok') {
        resolve(json.action as string)
      } else {
        reject(json.message as string)
      }
    }
  }

  private readonly handleTypeMessage = (json: JsonMap) => {
    const type = json.type as string
    const timestampRaw = json.timestamp as string
    const timestamp = timestampRaw ? parseInt(timestampRaw, 10) : 0

    const messageObservers = this.lookupMessageObservers(
      type as StreamerMessageType
    )
    const anyMessageObservers = this.lookupMessageObservers(
      StreamerMessageType.Any
    )
    if (messageObservers.length === 0 && anyMessageObservers.length === 0) {
      this.logger.warn('Unknown message type: ', type)
      return
    }

    const dataHelper = new JsonHelper(json.data as JsonMap)
    const accountNumber =
      dataHelper.getString('account-number') ||
      dataHelper.getString('destination-account-number')

    messageObservers.forEach(observer => {
      try {
        observer(
          type as StreamerMessageType,
          dataHelper,
          timestamp,
          accountNumber
        )
      } catch (e) {
        this.logger.warn('AcctStream observer error: ', e)
        this.emitEvent('acct_stream_observer_error')
      }
    })
    anyMessageObservers.forEach(observer => {
      try {
        observer(
          type as StreamerMessageType,
          dataHelper,
          timestamp,
          accountNumber
        )
      } catch (e) {
        this.logger.warn('AcctStream any observer error: ', e)
        this.emitEvent('acct_stream_observer_error')
      }
    })
  }
}
