import { Injectable } from "@angular/core";
import { webSocket, WebSocketSubject } from "rxjs/webSocket";
import { Buffer } from 'buffer';


export class SocketFeed {
  segment?: string;
  scripCode?: number;
  marketLot?: number;
  priceTick?: number;
  BSP?: number;
  BBP?: number;
  BSQ?: number;
  BBQ?: number;
  priceDivisor?: number;
  volume?: number;
  LUT?: Date;
  LTT?: Date;
  open?: number;
  high?: number;
  low?: number;
  prevClose?: number;
  change?: number;
  changePer?: number;
  OI?: number;
  ATP?: number;
  TBQ?: number;
  TSQ?: number;
  lifeTimeHigh?: number;
  lifeTimeLow?: number;
  lowPriceRange?: number;
  highPriceRange?: number;
  lowTER?: number;
  highTER?: number;
  LTQ?: number;
  LTP?: number;
  value?: number;
  DPR?: string;
  TER?: string;
}

@Injectable()
export class DhanFeedService {
  private subject!: WebSocketSubject<any>;
  private clientId: string = '1000000836'; //1000000836386788-W-DHN1804
  private accessToken: string = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJpc3MiOiJkaGFuIiwicGFydG5lcklkIjoiIiwiZXhwIjoxNzI2ODIzNzQ4LCJ0b2tlbkNvbnN1bWVyVHlwZSI6IlNFTEYiLCJ3ZWJob29rVXJsIjoiIiwiZGhhbkNsaWVudElkIjoiMTAwMDAwMDgzNiJ9.xb8v62fGj7bUzQ2FZz0ibDsr0F0Ns1RTfJwYuKdxHTFwZsp_XqzWox4nibNJOcHMq8v3fVijpsAmKHKDwNEVhQ';
  private isConnected = false;
  private feedData: Map<string, SocketFeed> = new Map();
  private readonly SEGMENT_MAPPER: any = {
    NSE: 1, BSE: 4, MCXFO: 5, NSEFO: 2, NSECDS: 3, BSEFO: 8,
    1: 'NSE', 4: 'BSE', 5: 'MCXFO', 2: 'NSEFO', 3: 'NSECDS', 8: 'BSEFO'
  };
  public connect(url: string) {
    this.subject = webSocket({
      url: url,
      binaryType: 'arraybuffer',
      serializer: (value) => value,
      deserializer: (value) => value,
      closeObserver: {
        next: () => this.closeObserverCallback()
      },
      openObserver: {
        next: () => {
          this.openObserverCallback();
        }
      }
    });
    this.subject.subscribe({
      next: (message: MessageEvent<any>) => this.onMessage(Buffer.from(message.data)),
      error: (err) => this.onError(err)
    });
  }


  public disconnect() {
    if (this.subject) {
      this.subject.complete();
    }
  }

  public send(message: Buffer) {
    console.log(message);
    if (!this.subject.closed && this.isConnected) {
      console.log(`sending message to Socket`);
      this.subject.next(message);
    }
  }

  private closeObserverCallback() {

  }

  private openObserverCallback() {
    this.isConnected = true;
    setTimeout(() => this.send(DhanRequestBuilder.getAuthRequest(this.clientId, this.accessToken)), 10);
    //SENSEX 23AUG PE 81000, NIFTY 22AUG CE 24800, IREDA NSE, OLA ELEC BSE, TCS BSE
    setTimeout(() => this.send(DhanRequestBuilder.getQuoteRequestPacket(this.clientId, ['2@36788', '1@20261', '4@544225', '4@532540', '2@48003', '8@867844'], true)), 5000);

    setTimeout(() => this.send(DhanRequestBuilder.getTickerRequestPacket(this.clientId, ['2@36788', '1@20261', '4@544225', '4@532540', '2@48003', '8@867844'], true)), 5000);
  }

  private onMessage(message: Buffer) {
    const responseCode = message.readInt8(0);
    switch (responseCode) {
      case 50:
        this.processDisconnection(message);
        break;
      case 4:
        this.processQuotePacket(message);
        break;
      case 5:
        this.processOIQuotePacket(message);
        break;
      case 6:
        this.processPreviousDayPacket(message);
        break;
    }
  }

  processQuotePacket(message: Buffer) {
    if (message.length >= 50) {
      const segment = this.SEGMENT_MAPPER[message.readInt8(3) as any];
      const scripCode = message.readUInt32LE(4)
      const feed: SocketFeed = this.feedData.has(`${segment}@${scripCode}`) ? this.feedData.get(`${segment}@${scripCode}`) || new SocketFeed() : new SocketFeed();
      feed.segment = segment;
      feed.scripCode = scripCode;
      feed.LTP = Math.round(message.readFloatLE(8) * 100) / 100;
      feed.LTQ = message.readUInt16LE(12);
      feed.LTT = new Date(message.readUInt32LE(14) * 1000);
      feed.ATP = Math.round(message.readFloatLE(18) * 100) / 100;
      feed.volume = message.readUInt32LE(22)
      feed.TSQ = message.readUInt32LE(26);
      feed.TBQ = message.readUInt32LE(30);
      feed.open = Math.round(message.readFloatLE(34) * 100) / 100;
      feed.prevClose = Math.round(message.readFloatLE(38) * 100) / 100;
      feed.high = Math.round(message.readFloatLE(42) * 100) / 100;
      feed.low = Math.round(message.readFloatLE(46) * 100) / 100;
      feed.value = Math.round(feed.volume * feed.ATP * 100) / 100;
      this.feedData.set(`${feed.segment}@${feed.scripCode}`, feed);
      console.log(feed);
    }
  }

  processPreviousDayPacket(message: Buffer) {
    const segment = this.SEGMENT_MAPPER[message.readInt8(3) as any];
    const scripCode = message.readUInt32LE(4)
    const feed: SocketFeed = this.feedData.has(`${segment}@${scripCode}`) ? this.feedData.get(`${segment}@${scripCode}`) || new SocketFeed() : new SocketFeed();
    feed.segment = segment;
    feed.scripCode = scripCode;
    feed.prevClose = Math.round(message.readFloatLE(8) * 100) / 100;
    if (feed.LTP) {
      feed.change = Math.round((feed.LTP - feed.prevClose) * 100) / 100;
      feed.changePer = Math.round(((feed.change * 100) / feed.prevClose) * 100) / 100;
    }
  }

  processOIQuotePacket(message: Buffer) {
    const segment = this.SEGMENT_MAPPER[message.readInt8(3) as any];
    const scripCode = message.readUInt32LE(4)
    const feed: SocketFeed = this.feedData.has(`${segment}@${scripCode}`) ? this.feedData.get(`${segment}@${scripCode}`) || new SocketFeed() : new SocketFeed();
    feed.segment = segment;
    feed.scripCode = scripCode;
    feed.OI = message.readUInt32LE(8);
    this.feedData.set(`${segment}@${scripCode}`, feed);
    console.log(this.feedData);
  }

  private onError(err: any) {
    this.isConnected = false;
    console.error(err);
  }

  private processDisconnection(message: Buffer) {
    const errorCode = message.readInt16LE(8);
    switch (errorCode) {
      case 805:
        console.error('Disconnected: No. of active websocket connections exceeded');
        break;
      case 806:
        console.error('Disconnected: Subscribe to Data APIs to continue');
        break;
      case 807:
        console.error('Disconnected: Access Token is expired');
        break;
      case 808:
        console.error('Disconnected: Invalid Client ID');
        break;
      case 809:
        console.error('Disconnected: Authentication Failed - check');
        break;
    }
    this.isConnected = false;
  }

  private processQuote(message: DataView) {
    let data: any = {};
    data.scripCode = message.getInt32(5);
    data.segmentId = message.getInt8(4);
    data.LTP = message.getFloat32(9);
    data.ATP = message.getFloat32(19);
    data.LTQ = message.getInt16(13);
    data.volume = message.getInt32(23);
    console.log('processQuote', data);
  }

  private processOIQuote(message: DataView) {
    let data: any = {};
    data.scripCode = message.getInt32(5);
    data.segmentId = message.getInt8(4);
    data.OI = message.getInt32(9);
    console.log('processOIQuote', data);
  }
}

export class DhanRequestBuilder {
  private static readonly HEADER_PACKET_SIZE = 83;
  private static readonly AUTH_PACKET_SIZE = 502;
  private static readonly INSTRUMENT_LENGTH_SIZE = 4;
  private static readonly INSTRUMENT_SIZE = 21;
  private static readonly CONNECT_CODE = 11;
  private static readonly DISCONNECT_CODE = 12;
  private static readonly TICKER_REQ_SUB_CODE = 15;
  private static readonly TICKER_REQ_UNSUB_CODE = 16;
  private static readonly QUOTE_REQ_SUB_CODE = 17;
  private static readonly QUOTE_REQ_UNSUB_CODE = 18;
  private static readonly DEPTH_REQ_SUB_CODE = 19;
  private static readonly DEPTH_REQ_UNSUB_CODE = 20;

  private static getHeaderPacket(requestType: number, clientId: string, messageLength: number) {
    const header = Buffer.alloc(DhanRequestBuilder.HEADER_PACKET_SIZE);
    header.writeInt8(requestType, 0);
    header.writeInt16LE(DhanRequestBuilder.HEADER_PACKET_SIZE + messageLength, 2);
    header.write(clientId, 3, 'utf-8' as any);
    return header;
  }

  private static getFeedRequestPacket(code: number, clientId: string, scrips: string[],) {
    const header = DhanRequestBuilder.getHeaderPacket(code, clientId, DhanRequestBuilder.INSTRUMENT_LENGTH_SIZE + (scrips.length * DhanRequestBuilder.INSTRUMENT_SIZE));
    const scripsLengthBuffer = Buffer.alloc(4);
    scripsLengthBuffer.writeInt32LE(scrips.length, 0);
    let instrumentInfo = Buffer.alloc(0);
    scrips.forEach((scrip) => {
      const [segment, scripCode] = scrip.split('@');
      const segmentBuffer = Buffer.alloc(1);
      segmentBuffer.writeUInt8(Number(segment), 0);
      const scripCodeBuffer = Buffer.alloc(20);
      scripCodeBuffer.write(scripCode, 0, 'utf-8' as any);
      instrumentInfo = Buffer.concat([instrumentInfo, segmentBuffer, scripCodeBuffer]);
    });
    const padding = Buffer.alloc((100 - scrips.length) * 21);
    instrumentInfo = Buffer.concat([instrumentInfo, padding]);
    return Buffer.concat([header, scripsLengthBuffer, instrumentInfo]);
  }

  static getAuthRequest(clientId: string, accessToken: string) {
    const header = DhanRequestBuilder.getHeaderPacket(DhanRequestBuilder.CONNECT_CODE, clientId, DhanRequestBuilder.AUTH_PACKET_SIZE);
    const accessTokenBuffer = Buffer.alloc(DhanRequestBuilder.AUTH_PACKET_SIZE - 2);
    const authTypeBuffer = Buffer.alloc(2);
    accessTokenBuffer.write(accessToken, 0, 'utf-8' as any);
    authTypeBuffer.write('2P', 0, 'utf-8' as any);
    return Buffer.concat([header, accessTokenBuffer, authTypeBuffer]);
  }

  static getQuoteRequestPacket(clientId: string, scrips: string[], isSubscribe: boolean) {
    const code = isSubscribe ? DhanRequestBuilder.QUOTE_REQ_SUB_CODE : DhanRequestBuilder.QUOTE_REQ_UNSUB_CODE;
    return this.getFeedRequestPacket(code, clientId, scrips);
  }

  static getTickerRequestPacket(clientId: string, scrips: string[], isSubscribe: boolean) {
    const code = isSubscribe ? DhanRequestBuilder.TICKER_REQ_SUB_CODE : DhanRequestBuilder.TICKER_REQ_UNSUB_CODE;
    return this.getFeedRequestPacket(code, clientId, scrips);
  }

  static getDepthRequestPacket(clientId: string, scrips: string[], isSubscribe: boolean) {
    const code = isSubscribe ? DhanRequestBuilder.DEPTH_REQ_SUB_CODE : DhanRequestBuilder.DEPTH_REQ_UNSUB_CODE;
    return this.getFeedRequestPacket(code, clientId, scrips);
  }
}
