import { Injectable, NgZone } from '@angular/core';
import { clamp, find, isNumber } from 'lodash';
import { combineLatest, Observable, ReplaySubject } from 'rxjs';

import makeDebug from '../../../../makeDebug';
import { ChatConnectionStateService } from './chat-connection-state.service';
import { ChatSendService } from './chat-send.service';
import { ChatDbService } from './data/chat-db.service';
import { ChatChannel, ChatChannelConsumptionStatus } from './data/db-schema';

const debug = makeDebug('services:chat:chat-messages-status');

@Injectable({
  providedIn: 'root',
})
export class ChatMessagesStatusService {
  private readonly _unreadMessages$: ReplaySubject<number> = new ReplaySubject<number>();

  constructor(
    private readonly _chatDb: ChatDbService,
    private readonly _chatConnectionState: ChatConnectionStateService,
    private readonly _chatSendService: ChatSendService,
    private readonly _ngZone: NgZone
  ) {
    this._initUnreadWatcher();
    this._initSyncConsumptionStatus();
  }

  public getTotalUnreadMessagesCount(): Observable<number> {
    return this._unreadMessages$.asObservable();
  }

  public async getUnreadMessagesOfChannel(channelSid: string): Promise<number> {
    const channel = await this._chatDb.getChannel(channelSid);
    const localChannelConsumption = await this._chatDb.getLocalConsumptionOfChannel(channelSid);
    debug('get unread messages of channel', { channelSid, channel, localChannelConsumption });
    let consumptionIndex = 0;
    if (
      localChannelConsumption &&
      localChannelConsumption.localLastConsumedMessageIndex > (channel.lastConsumedMessageIndex || 0)
    ) {
      consumptionIndex = localChannelConsumption.localLastConsumedMessageIndex;
    } else if (isNumber(channel.lastConsumedMessageIndex)) {
      consumptionIndex = channel.lastConsumedMessageIndex || 0;
    }
    return clamp((channel.lastMessageIndex || 0) - consumptionIndex, 0, Number.MAX_SAFE_INTEGER);
  }

  public async updateChannelConsumptionStatus(channelSid: string, index: number) {
    const localChannelConsumption = await this._chatDb.getLocalConsumptionOfChannel(channelSid);
    if (!localChannelConsumption || localChannelConsumption.localLastConsumedMessageIndex < index) {
      debug('update channel consumption', channelSid, index);
      await this._chatDb.setConsumptionIndex(channelSid, index);
      await this._chatDb.setChannelLastLocalUpdate(channelSid);
    } else {
      debug('no consumption update needed', { channelSid, index, localChannelConsumption });
    }
  }

  private _initSyncConsumptionStatus() {
    this._ngZone.runOutsideAngular(() =>
      setInterval(async () => {
        debug('periodic sync of consumption status');
        if (this._chatConnectionState.isOnline) {
          const allConsumptions = await this._chatDb.getAllLocalConsumptions();
          debug('all consumptions', allConsumptions);
          const promises = allConsumptions.map(async consumption => {
            try {
              const channel = await this._chatDb.getChannel(consumption._id);
              if (!channel) {
                return;
              }
              const isLocalConsumptionIndexHigher = this.isLocalConsumptionIndexHigher(channel, consumption);
              debug('check for higher local index', {
                channel,
                consumption,
                isLocalConsumptionIndexHigher,
              });
              if (isLocalConsumptionIndexHigher) {
                try {
                  await this._chatSendService.setConsumptionIndex(
                    consumption._id,
                    consumption.localLastConsumedMessageIndex
                  );
                } catch (err) {
                  if (err.status === 404) {
                    console.error(
                      'tried to update remote consumption index without access for channel',
                      channel.sid,
                      err
                    );
                    await this._chatDb.deleteLocalConsumptionOfChannel(channel.sid);
                  } else {
                    throw err;
                  }
                }
              }
            } catch (error) {
              window.logger.error('could not sync consumption status', error);
            }
          });
          await Promise.all(promises);
        } else {
          debug('was offline. sync skipped');
        }
      }, 10000)
    );
  }

  private isLocalConsumptionIndexHigher(channel: ChatChannel, consumption: ChatChannelConsumptionStatus) {
    if (channel && channel.lastConsumedMessageIndex) {
      return channel.lastConsumedMessageIndex < consumption.localLastConsumedMessageIndex;
    }

    return 0 < consumption.localLastConsumedMessageIndex;
  }

  private async _initUnreadWatcher(): Promise<void> {
    debug('init unread watcher');
    // subscribe to channel updates
    const channels$ = await this._chatDb.getChannels();
    const localConsumptions$ = await this._chatDb.watchAllLocalConsumptions();
    combineLatest([channels$, localConsumptions$]).subscribe(([channels, consumptions]) => {
      debug('calculate unread messages', channels, consumptions);
      const unreadMessages = channels.reduce((acc, channel) => {
        const localChannelConsumption = find(consumptions, { _id: channel.sid });
        let consumptionIndex = 0;
        if (
          localChannelConsumption &&
          localChannelConsumption.localLastConsumedMessageIndex > (channel.lastConsumedMessageIndex || 0)
        ) {
          consumptionIndex = localChannelConsumption.localLastConsumedMessageIndex;
        } else {
          consumptionIndex = channel.lastConsumedMessageIndex || 0;
        }
        return acc + clamp((channel.lastMessageIndex || 0) - consumptionIndex, 0, Number.MAX_SAFE_INTEGER);
      }, 0);
      debug('calculated unread message', unreadMessages);
      this._unreadMessages$.next(unreadMessages);
    });
  }
}
