import { Inject, Injectable } from '@angular/core';
import { isEqual } from 'lodash';
import { RxDatabase, RxDocument } from 'rxdb';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';
import makeDebug from 'src/makeDebug';

import { ChatChannel, ChatChannelConsumptionStatus, ChatCollections, ChatMember, ChatMessage } from './db-schema';
import { RX_DB_SERVICE_TOKEN } from './rx-db-factory';

const debug = makeDebug('services:chat:db');

@Injectable({ providedIn: 'root' })
export class ChatDbService {
  constructor(@Inject(RX_DB_SERVICE_TOKEN) private readonly _db: Promise<RxDatabase<ChatCollections>>) {}

  // ### CHANNELS ###
  public async upsertChannel(channel: ChatChannel) {
    debug('upsert channel', channel);
    const db = await this._db;
    await db.channels.atomicUpsert(channel);
  }

  public async removeChannel(channelSid: string) {
    const db = await this._db;
    const channelDoc = await db.channels.findOne().where('sid').eq(channelSid).exec();

    await channelDoc.atomicSet('removed', true);
  }

  public async getChannels() {
    debug('return all channels');
    const db = await this._db;
    return db.channels
      .find()
      .where('removed')
      .ne(true)
      .$.pipe(map(chatChannelDocs => chatChannelDocs.map(chatChannelDoc => chatChannelDoc.toJSON() as ChatChannel)));
  }

  public async getChannel(channelSid: string) {
    debug('get channel', channelSid);
    const db = await this._db;
    const result = await db.channels.findOne({ selector: { sid: channelSid, removed: { $ne: true } } }).exec();
    if (result) {
      return result.toJSON() as ChatChannel;
    } else {
      return null;
    }
  }

  public async setChannelLastLocalUpdate(channelSid: string) {
    debug('set last local update of channel', { channelSid });
    const db = await this._db;
    const channelDoc = await db.channels.findOne().where('sid').eq(channelSid).exec();
    await channelDoc.atomicSet('lastLocalUpdateAt', new Date().toISOString());
  }

  // ### CONSUMPTION ###
  public async setConsumptionIndex(channelSid: string, index: number, allowLower = false) {
    debug('set consumption index', { channelSid, index });
    const db = await this._db;
    if (allowLower === false) {
      const consumption = await db.consumptions.findOne({ selector: { _id: channelSid } }).exec();
      if (consumption && consumption.localLastConsumedMessageIndex > index) {
        return;
      }
    }
    await db.consumptions.atomicUpsert({
      _id: channelSid,
      localLastConsumedMessageIndex: index,
      syncLastConsumedMessageIndex: true,
    });
  }

  public async watchAllLocalConsumptions(): Promise<Observable<ChatChannelConsumptionStatus[]>> {
    debug('get all consumptions');
    const db = await this._db;
    return db.consumptions.find().$.pipe(map(e => e.map(el => el.toJSON() as ChatChannelConsumptionStatus)));
  }

  public async getAllLocalConsumptions(): Promise<ChatChannelConsumptionStatus[]> {
    debug('get all consumptions');
    const db = await this._db;
    const consumptions = await db.consumptions.find().exec();
    return consumptions.map(doc => doc.toJSON() as ChatChannelConsumptionStatus);
  }

  public async getLocalConsumptionOfChannel(channelSid: string): Promise<ChatChannelConsumptionStatus> {
    debug('get consumption of channel', channelSid);
    const db = await this._db;
    const result = await db.consumptions.findOne({ selector: { _id: channelSid } }).exec();
    if (!result) {
      return null;
    }
    return result.toJSON() as ChatChannelConsumptionStatus;
  }

  public async deleteLocalConsumptionOfChannel(channelSid: string): Promise<void> {
    debug('get consumption of channel', channelSid);
    const db = await this._db;
    await db.consumptions.findOne({ selector: { _id: channelSid } }).remove();
  }

  // ### Messages ###
  public async insertMessage(message: ChatMessage): Promise<RxDocument<ChatMessage>> {
    debug('upsert message', message);
    const db = await this._db;
    return db.messages.insert(message);
  }

  public async removeMessage(messageSid: string) {
    const query = (await this._db).messages.find().where('sid').eq(messageSid);
    await query.remove();
  }

  public async bulkInsertMessages(messages: ChatMessage[]) {
    debug('bulk (up)insert messages', messages);
    const db = await this._db;
    debug('message ids', messages);
    try {
      await db.messages.bulkInsert(messages);
    } catch (error) {
      window.logger.error('Bulk insert failed.', error);
    }
    debug('bulk inserted messages', { existingMessagesIds: messages });
  }

  public async getMessagesOfChannel(channelSid: string): Promise<Observable<ChatMessage[]>> {
    debug('get messages of channel', channelSid);
    const db = await this._db;
    const messages$ = db.messages
      .find()
      .where('channelSid')
      .eq(channelSid)
      .$.pipe(map(e => e.map(el => el.toJSON() as ChatMessage)));
    return messages$;
  }

  public async getLastMessageOfChannel(channelSid: string): Promise<ChatMessage | null> {
    debug('get last message of channel', channelSid);
    const db = await this._db;
    const messageResult = await db.messages.findOne().where('channelSid').eq(channelSid).sort({ index: 'desc' }).exec();
    if (messageResult) {
      return messageResult.toJSON();
    } else {
      return null;
    }
  }

  // ### MEMBERS ###
  public async bulkUpdateChannelMembers(channelSid: string, members: ChatMember[]) {
    debug('bulk update channel members', { channelSid, members });
    const db = await this._db;
    /*
    Usually we would use bulkUpsert, but it is available in rxdb 12 only.
    This is a bugfix solution, the original process was:
    1. bulkInsert all members (updates may fail unrecognized, thats why 2.)
    2. sequentially atomicUpsert all inserted members
    3. find and remove unused members
    Unfortunately this procedure was very slow.
    The in-memory processing and bulkRemove and bulkInsert-method resulted
    in a massive performance gain as the IndexDB is kind of slow.
    */
    try {
      const channelMembers = (
        await db.members
          .find({
            selector: {
              channelSid,
            },
          })
          .exec()
      ).map(member => member.toJSON() as ChatMember);

      const removedMembers = channelMembers.filter(dbMember => !members.find(m => m.sid === dbMember.sid));
      const changedMembers = channelMembers.filter(
        dbMember =>
          !isEqual(
            members.find(m => m.sid === dbMember.sid),
            dbMember
          )
      );
      const newMembers = members.filter(member => !channelMembers.find(dbMember => dbMember.sid === member.sid));

      if (removedMembers.length || changedMembers.length) {
        await db.members.bulkRemove([
          ...removedMembers.map(member => member.sid),
          ...changedMembers.map(member => member.sid),
        ]);
      }

      if (newMembers.length || changedMembers.length) {
        await db.members.bulkInsert([...newMembers, ...changedMembers]);
      }
      debug('bulk updated channelmembers', { members });
    } catch (err) {
      console.error('member bulk update error', err);
    }
  }

  public async upsertChannelMember(member: ChatMember) {
    debug('upsert channel memer');
    const db = await this._db;
    await db.members.atomicUpsert(member);
  }

  public async removeChannelMember(channelSid: string, memberSid: string) {
    debug('remove channel member', { channelSid, memberSid });
    const db = await this._db;
    const query = db.members.find().where('channelSid').eq(channelSid).where('sid').eq(memberSid);
    await query.remove();
  }

  public async getChannelMembers(channelSid: string): Promise<Observable<ChatMember[]>> {
    debug('get channel members', channelSid);
    const db = await this._db;
    return db.members
      .find()
      .where('channelSid')
      .eq(channelSid)
      .$.pipe(map(e => e.map(el => el.toJSON() as ChatMember)));
  }

  public async getCurrentSendQueue(channelSid: string = null): Promise<RxDocument<ChatMessage>[]> {
    debug('get current send queue');
    const db = await this._db;
    let query = db.messages.find().where('status').eq('pending');
    if (channelSid) {
      query = query.where('channelSid').eq(channelSid);
    }
    return await query.exec();
  }

  async upsertMessage(chatMessage: ChatMessage) {
    debug('upsert message', chatMessage);
    const db = await this._db;
    await db.messages.atomicUpsert(chatMessage);
  }

  public async getMemberSidForIdentity(channelSid: string, identity: string): Promise<string | null> {
    const db = await this._db;
    const result = await db.members.findOne({ selector: { channelSid, identity } }).exec();
    if (result) {
      return result.sid;
    } else {
      return null;
    }
  }

  public async clearAll(): Promise<void> {
    const db = await this._db;
    await db.channels.remove();
    await db.members.remove();
    await db.consumptions.remove();
    await db.messages.remove();
  }
}
