import { S3 } from "aws-sdk";
import { PassThrough } from "stream";
import mergeStream from "merge-stream";
import DataAdapter, { SupportedAdapters } from "./adapter/DataAdapter";
import { ComplianceResult, Scan } from "./Scan";
import { AWS_COMPLIANCES, Compliance } from "./compliance/ComplianceType";
import AggregatedReportRequestProps from "./AggregatedReportRequestProps";
import {
  BatchWriteCommand,
  DeleteCommand,
  DynamoDBDocumentClient,
  GetCommand,
  PutCommand,
} from "@aws-sdk/lib-dynamodb";
import { DynamoDBClient, QueryCommand } from "@aws-sdk/client-dynamodb";
import { ProwlerComplianceAdapter } from "./adapter/ProwlerComplianceAdapter";
import { stringify } from "jsonstream";



export default class ReportController {
  static readonly ASH_FILE_NAME = "codescan-report.txt";
  static readonly PROWLER_FILE_NAME = "report";
  static readonly MAX_PAGE_SIZE = 50;

  readonly s3Client = new S3({});
  readonly bucketName: string;
  readonly reportTableName: string;
  readonly documentClient: DynamoDBDocumentClient;

  constructor(reportTableName: string, bucketName: string) {
    this.reportTableName = reportTableName;
    this.documentClient = DynamoDBDocumentClient.from(new DynamoDBClient({}));
    this.bucketName = bucketName;
  }

  // Define paths for report files
  static getAshPath = (
    customerId: string,
    reportId: number,
    targetId: number
  ) => {
    return `${customerId}/${reportId}/${targetId}/code/${ReportController.ASH_FILE_NAME}`;
  };

  static getProwlerPathWithoutFilename = (
    customerId: string,
    reportId: number,
    targetId: number
  ) => {
    return `${customerId}/${reportId}/${targetId}`;
  };

  static getOpenvasPath = (
    customerId: string,
    reportId: number,
    targetId: number
  ) => {
    return `${customerId}/${reportId}/${targetId}/openvas/results.csv`;
  };

  static getProwlerPathWithFilename = (
    customerId: string,
    reportId: number,
    targetId: number
  ) => {
    const prefix = ReportController.getProwlerPathWithoutFilename(
      customerId,
      reportId,
      targetId
    );
    return `${prefix}/json-ocsf/${this.PROWLER_FILE_NAME}.ocsf.json`;
  };

  static getFullReportKeyFromScan = (scan: Scan): string => {
    return this.getFullReportKey(scan.CustomerId, scan.ReportId);
  };

  static getFullReportKey = (customerId: string, reportId: number): string => {
    return `${customerId}/${reportId}/full-report.json`;
  };

  static getComplianceReportKey = (
    customerId: string,
    reportId: number,
    targetId: number,
    compliance: Compliance
  ) => {
    return `${customerId}/${reportId}/${targetId}/compliance/${compliance.reportFile}`;
  };

  getReport = async (
    customerId: string,
    reportId: number,
    lastId?: any,
    paginateBackwards?: boolean,
    filters?: { severity?: string; scanType: string }
  ) => {
    const customerReportIdAndScanType = `${customerId}#${reportId}#${filters?.scanType}`;
    let expressionAttributeValues: any = {
      ":CustomerReportIdAndScanType": { S: customerReportIdAndScanType }, // Composite partition key
    };

    // Set up the lastEvaluatedKey only if lastId is provided
    const lastEvaluatedKey: any | undefined = lastId
      ? {
        CustomerReportIdAndScanType: { S: customerReportIdAndScanType }, // Composite partition key
          CustomerId: { S: customerId },
          Id: { S: lastId }, // Sort key (UUID)
        }
      : undefined;

    let IndexName = "CustomerReportIdScanTypeIndex";
    let keyCondition = "CustomerReportIdAndScanType = :CustomerReportIdAndScanType";

    if (filters?.severity) {
      IndexName = "CustomerReportScanTypeSeverityIndex";
      keyCondition += " AND Severity = :SeverityFilter";
      expressionAttributeValues = {
        ...expressionAttributeValues,
        ":SeverityFilter": { S: filters.severity },
      };

      // Ensure Severity is included in lastEvaluatedKey when paginating with a filter
      if (lastEvaluatedKey) {
        lastEvaluatedKey["Severity"] = { S: filters.severity };
      }
    }

    // Construct the query with proper ExclusiveStartKey
    const result = await this.documentClient.send(
      new QueryCommand({
        TableName: this.reportTableName,
        IndexName,
        KeyConditionExpression: keyCondition,
        Limit: ReportController.MAX_PAGE_SIZE, // Page size
        ExclusiveStartKey: lastEvaluatedKey, // Properly set lastEvaluatedKey if defined
        ExpressionAttributeValues: expressionAttributeValues,
        ScanIndexForward: !paginateBackwards, // Control pagination direction
      })
    );

    if (!result.Items) {
      return {};
    }

    // Reverse items if paginating backward
    const items = paginateBackwards ? result.Items.reverse() : result.Items;

    return {
      ...result,
      Items: items, // Return reversed items if paginating backward
      HasMore: items.length === ReportController.MAX_PAGE_SIZE
    };
  };

  getItem = async (customerId: string, itemId: string) => {
    // Construct the query with proper ExclusiveStartKey
    const result = await this.documentClient.send(
      new GetCommand({
        TableName: this.reportTableName,
        Key: {
          CustomerId: customerId.toString(),
          Id: itemId,
        },
      })
    );

    if (!result.Item) {
      return {};
    }

    return result.Item;
  };

  downloadComplianceReport = async (
    customerId: string,
    reportId: number,
    targetId: number,
    compliance: Compliance
  ): Promise<string | undefined> => {
    const s3Object = await this.s3Client.getSignedUrl("getObject", {
      Bucket: this.bucketName,
      Key: ReportController.getComplianceReportKey(
        customerId,
        reportId,
        targetId,
        compliance
      ),
      Expires: 3600, // One Hour
    });

    return s3Object;
  };

  downloadFullReport = async (
    customerId: string,
    reportId: number
  ): Promise<string | undefined> => {
    const s3Object = await this.s3Client.getSignedUrl("getObject", {
      Bucket: this.bucketName,
      Key: ReportController.getFullReportKey(customerId, reportId),
      Expires: 3600, // One Hour
    });

    return s3Object;
  };

  downloadCodescanReport = async (
    customerId: string,
    reportId: number,
    targetId: number
  ): Promise<string | undefined> => {
    const s3Object = await this.s3Client.getSignedUrl("getObject", {
      Bucket: this.bucketName,
      Key: ReportController.getAshPath(customerId, reportId, targetId),
      Expires: 3600, // One Hour
    });

    return s3Object;
  };

  // Method to aggregate reports from multiple files
  aggregateReports = async (request: AggregatedReportRequestProps) => {
    const outputStream = new PassThrough();
    const mergedStream = mergeStream();

    const digest = {
      Critical: 0,
      High: 0,
      Medium: 0,
      Low: 0,
      Log: 0,
      Informational: 0,
    };

    if (!request.scans || request.scans.length === 0) {
      return;
    }

    for (const scan of request.scans) {
      try {
        const objectResponse = (await this.s3Client
          .getObject({
            Bucket: this.bucketName,
            Key: scan.Key,
          })
          .createReadStream()) as PassThrough;

        const adapterPair = Object.entries(SupportedAdapters).find(
          (entry) => scan.Type === entry[0]
        );
        if (!adapterPair) {
          console.error(`${scan.Type} for ${scan.Key} was not found! Failing`);
          continue;
        }
        mergedStream.add(
          this.processStream(
            request.CustomerId,
            request.ReportId,
            scan.TargetId,
            objectResponse,
            adapterPair[1],
            digest
          )
        );
      } catch (e) {
        console.error(`${scan.Key} ${e}`);
      }
    }

    // Pipe merged results to output stream and upload to S3
    mergedStream.pipe(outputStream);
    await this.streamResultsToS3(
      outputStream,
      ReportController.getFullReportKey(request.CustomerId, request.ReportId)
    );

    return digest;
  };

  computeCompliance = async (
    customerId: string,
    reportId: number,
    targetId: number
  ) => {
    const complianceResults: any[] = [];

    // Array of promises to track the completion of all compliance checks
    const compliancePromises = AWS_COMPLIANCES.map(async (compliance) => {
      let complianceResult: ComplianceResult = {
        Id: compliance.id,
        ComplianceFramework: compliance.name,
        Passes: 0,
        Failures: 0,
        Other: 0,
      };

      const key =
        ReportController.getProwlerPathWithoutFilename(
          customerId,
          reportId,
          targetId
        ) +
        "/compliance/" +
        compliance.reportFile;

      try {
        const head = await this.s3Client.headObject({
          Bucket: this.bucketName,
          Key: key,
        });

        console.log(head.httpRequest.body);

        const objectResponse = (await this.s3Client
          .getObject({
            Bucket: this.bucketName,
            Key: key,
          })
          .createReadStream()) as PassThrough;

        const adapter = new ProwlerComplianceAdapter();
        objectResponse.pipe(adapter.getPipe());

        // Return a promise that resolves when the 'end' event fires
        return new Promise((resolve, reject) => {
          adapter.getPipe().on("data", (row: { [x: string]: any }) => {
            const reportItem = adapter.onDataIngest(row);

            if (reportItem === "PASS") {
              complianceResult.Passes++;
            } else if (reportItem === "FAIL") {
              complianceResult.Failures++;
            } else {
              complianceResult.Other++;
            }
          });

          adapter.getPipe().on("end", () => {
            complianceResults.push(complianceResult);
            resolve(null); // Resolve when processing is done
          });

          adapter.getPipe().on("error", (err) => {
            console.error("Error processing CSV:", err);
            reject(err); // Reject the promise if there's an error
          });
        });
      } catch (e) {
        console.error(`Key ${key}`);
        console.error(e);
        return null; // Return null or handle the error case as needed
      }
    });

    // Wait for all compliance checks to complete
    await Promise.all(compliancePromises);

    return complianceResults;
  };

  private processStream = (
    customerId: string,
    reportId: number,
    targetId: number,
    inputStream: PassThrough,
    adapter: DataAdapter,
    digest: any
  ) => {
    const transformStream = stringify("[", ",", "]"); // JSONStream to handle JSON formatting
    let batchBuffer: any[] = []; // Buffer to accumulate items
    const batchSize = 25; // Define the batch size

    // Pipe the input stream to the parser
    inputStream.pipe(adapter.getPipe());
    adapter.getPipe().on("data", async (row: { [x: string]: any }) => {
      const reportItem = adapter.onDataIngest(
        customerId,
        reportId,
        targetId,
        row
      );

      if (!reportItem) {
        return;
      }

      // Add the report item to the batch buffer
      batchBuffer.push({
        PutRequest: {
          Item: reportItem,
        },
      });

      // Update the digest
      digest[reportItem.Severity.toString()]++;

      // Write the item to the transform stream
      transformStream.write(JSON.stringify(reportItem));

      // If the buffer has reached the batch size, send the batch to DynamoDB
      if (batchBuffer.length === batchSize) {
        await this.writeBatchToDB(batchBuffer);
        batchBuffer = []; // Clear the buffer
      }
    });

    adapter.getPipe().on("end", async () => {
      // Process the buffer in chunks of 50
      while (batchBuffer.length > 0) {
        const chunk = batchBuffer.splice(0, 25); // Extract the first 25 elements from the buffer
        await this.writeBatchToDB(chunk); // Submit the chunk to writeBatchToDB
      }

      transformStream.end(); // Signal the end of the transform stream
    });

    adapter.getPipe().on("error", (err) => {
      console.error("Error processing CSV:", err);
      transformStream.emit("error", err);
    });

    return transformStream; // Return the transform stream
  };

  private writeBatchToDB = async (batch: any[], rateLimitMs = 200) => {
    const delay = (ms: number) =>
      new Promise((resolve) => setTimeout(resolve, ms));

    try {
      // Split batches if they are larger than 25
      for (let i = 0; i < batch.length; i += 25) {
        const chunk = batch.slice(i, i + 25); // Slice into chunks of 25

        await this.documentClient.send(
          new BatchWriteCommand({
            RequestItems: {
              [this.reportTableName]: chunk,
            },
          })
        );

        // Introduce rate limit between batch writes
        if (i + 25 < batch.length) {
          await delay(rateLimitMs); // Delay if more batches are remaining
        }
      }
    } catch (err) {
      console.error("Error writing batch to DynamoDB:", err);
    }
  };

  // Function to stream the output stream to S3
  private streamResultsToS3 = async (
    outputStream: PassThrough,
    outputKey: string
  ) => {
    const params = {
      Bucket: this.bucketName,
      Key: outputKey,
      Body: outputStream,
    };

    try {
      const uploadResult = await this.s3Client.upload(params).promise();
      console.log("Upload successful:", uploadResult.Location);
    } catch (error) {
      console.error("Error uploading aggregated data to S3:", error);
    }
  };
}
