import React, { useCallback, useContext, useEffect, useRef, useState } from 'react';
import { cleanUpJobs, getJobsStatus, startJob, updateJob } from '../_shared/jobs/apis';
import {
  WORKPAPER_JOB_STATUS_COMPLETED,
  WORKPAPER_JOB_STATUS_FAILED,
  WORKPAPER_JOB_STATUS_CANCELLED,
} from '../_shared/jobs/jobStatus';
import { DATAFLOW_FILE_IMPORT_FILE_JOB_TYPE } from '../_shared/jobs/jobTypes';
import { mapJobToProcess } from '../_shared/jobs/jobHelpers';

export const JOB_PROCESSOR_STATUS_PENDING = 'pending';
export const JOB_PROCESSOR_STATUS_REMOVED = 'removed';
export const JOB_PROCESSOR_STATUS_CANCELLING = 'cancelling';
export const JOB_PROCESSOR_STATUS_CANCELLED = 'cancelled';
export const JOB_PROCESSOR_STATUS_COMPLETED = 'completed';
export const JOB_PROCESSOR_STATUS_FAILED = 'failed';
export const JOB_PROCESSOR_STATUS_RUNNING = 'running';
export const JOB_PROCESSOR_STATUS_RETRY_CANCELLED = 'retry-cancel';
export const JOB_PROCESSOR_POOLING_JOB_CREATED = 5;

export const DataFlowJobsContext = React.createContext(null);

const DEFAULT_RETENTION_TIMEOUT = 1000 * 60 * 20; // 20 minutes
const POOLING_JOB_TYPES = [DATAFLOW_FILE_IMPORT_FILE_JOB_TYPE];

export function DataFlowJobsContextProvider({ children }) {
  const [jobs, setJobs] = useState([]);
  const [cancelledJobs, setCancelledJobs] = useState([]);
  const [statusChangeCallbacks, setStatusChangeCallbacks] = useState(new Map());
  const jobTimeouts = useRef(new Map());
  const retentionTimeout = DEFAULT_RETENTION_TIMEOUT;

  /* Start Job pooling states and refs */
  const [activeBatchId, setActiveBatchId] = useState(null);
  const [poolingProcesses, setPoolingProcesses] = useState([]);
  const [poolingActiveProcesses, setPoolingActiveProcesses] = useState(0);
  const [poolingFailedProcesses, setPoolingFailedProcesses] = useState(0);
  const [poolingRequestCleanupJobs, setPoolingRequestCleanupJobs] = useState(false);
  const [onPoolingJobResolvedCallbacks, setOnPoolingJobResolvedCallbacks] = useState({});
  const [hasActivePooling, setHasActivePooling] = useState(false);
  const jobPoolingTimeouts = useRef(new Map());

  const jobsHeartbeatCheckTimeout = useRef(null);
  const jobsHeartbeatCheckInterval = 1000;
  /* Ends Job pooling states and refs */

  const updateJobStatus = useCallback(
    (updateJob, newStatus, error) => {
      setJobs(prevJobs =>
        prevJobs.map(job =>
          updateJob.processId === job.processId &&
          updateJob.batchId === job.batchId &&
          JSON.stringify(updateJob.jobPayload) === JSON.stringify(job.jobPayload)
            ? { ...updateJob, status: newStatus }
            : job
        )
      );

      if (statusChangeCallbacks.has(updateJob.processId)) {
        const callback = statusChangeCallbacks.get(updateJob.processId);
        callback(newStatus, error);
      }
    },
    [statusChangeCallbacks]
  );

  const addJob = useCallback(
    ({ processId, type, batchId, jobPayload, callback, params }) => {
      setJobs(prevJobs => [
        ...prevJobs,
        { processId, type, batchId, jobPayload, callback, params, status: JOB_PROCESSOR_STATUS_PENDING },
      ]);
      if (statusChangeCallbacks.has(processId)) {
        const callback = statusChangeCallbacks.get(processId);
        callback(JOB_PROCESSOR_STATUS_PENDING);
      }
    },
    [statusChangeCallbacks]
  );

  const removeJobTimeout = useCallback(processId => {
    if (jobTimeouts.current.has(processId)) {
      clearTimeout(jobTimeouts.current.get(processId));
      jobTimeouts.current.delete(processId);
    }
  }, []);

  const updateJobsByProperties = useCallback((lookupJob, params) => {
    setJobs(prevJobs =>
      prevJobs.map(job => (JSON.stringify(job) === JSON.stringify(lookupJob) ? { ...job, ...params } : job))
    );
  }, []);

  const getCurrentActiveJobByProcessId = useCallback(
    processId => {
      const result = jobs.find(
        job =>
          job.processId === processId &&
          (job.status === JOB_PROCESSOR_STATUS_COMPLETED || job.status === JOB_PROCESSOR_STATUS_RUNNING)
      );
      return result;
    },
    [jobs]
  );

  const removeJob = useCallback(
    processId => {
      // Find the job before removing it
      const jobToRemove = jobs.find(job => job.processId === processId);
      if (jobToRemove) {
        // Remove any associated timeout with the job
        removeJobTimeout(processId);
        // Remove the job from the list
        setJobs(prevJobs => prevJobs.filter(job => job.processId !== processId));
        // Report job removal with the found job's processId
        updateJobStatus(jobToRemove, JOB_PROCESSOR_STATUS_REMOVED);
      }
    },
    [jobs, removeJobTimeout, updateJobStatus] // Updated dependencies to include jobs
  );

  const cancelJob = useCallback(
    ({ processId, callback, params }) => {
      // Report job cancellation only for current running or completed job
      const currentJob = getCurrentActiveJobByProcessId(processId);

      const newJob = { ...currentJob, callback, params, status: JOB_PROCESSOR_STATUS_CANCELLING };

      setCancelledJobs(prevCancelledJobs => [...prevCancelledJobs, newJob]);

      updateJobStatus(newJob, JOB_PROCESSOR_STATUS_CANCELLING);
      // combine updateJobstate and status in one
    },
    [updateJobStatus, setCancelledJobs, getCurrentActiveJobByProcessId]
  );

  const cancelingJob = useCallback(
    async job => {
      const { processId, jobId } = job;
      // comment out the remove file from s3
      //await callback(...params);
      try {
        await updateJob(jobId, WORKPAPER_JOB_STATUS_CANCELLED);
      } finally {
        updateJobStatus(job, JOB_PROCESSOR_STATUS_CANCELLED);
        removeJobTimeout(processId);
      }
    },
    [updateJobStatus, removeJobTimeout]
  );

  const runJob = useCallback(
    async job => {
      const { processId, type, batchId, jobPayload, callback, params } = job;
      let serverJobId;

      try {
        let { jobId } = await startJob({
          entityId: processId,
          jobType: type,
          batchId: batchId,
          payload: jobPayload,
        });

        serverJobId = jobId;

        // add jobId for each running job
        updateJobsByProperties(job, { jobId });
        // Execute the job callback with parameters
        await callback(...params, jobId);
        // only execute the job if the status is running
        await updateJob(serverJobId, WORKPAPER_JOB_STATUS_COMPLETED, processId);
        // remove the time out
        removeJobTimeout(processId);
        // Report job completion
        updateJobStatus({ ...job, jobId }, JOB_PROCESSOR_STATUS_COMPLETED);
      } catch (error) {
        // remove the time out
        removeJobTimeout(processId);
        if (serverJobId) {
          // update job failure using serverJobId if available
          await updateJob(serverJobId, WORKPAPER_JOB_STATUS_FAILED, processId);
        }
        updateJobStatus(job, JOB_PROCESSOR_STATUS_FAILED, error);
      }
    },
    [updateJobStatus, removeJobTimeout, updateJobsByProperties]
  );

  const bindOnStatusChanged = useCallback((processId, callback) => {
    setStatusChangeCallbacks(prev => new Map(prev).set(processId, callback));
  }, []);

  useEffect(() => {
    const jobTimeoutsCurrent = jobTimeouts.current;
    jobs.map(job => {
      if (job.status === JOB_PROCESSOR_STATUS_PENDING) {
        const updatedJob = { ...job, status: JOB_PROCESSOR_STATUS_RUNNING };
        updateJobStatus(job, JOB_PROCESSOR_STATUS_RUNNING);
        runJob(updatedJob).catch(error => {
          if (statusChangeCallbacks.has(job.processId)) {
            const callback = statusChangeCallbacks.get(job.processId);
            callback(JOB_PROCESSOR_STATUS_FAILED, error);
          }
        });

        const timeoutId = setTimeout(() => {
          cancelJob(job.processId);
        }, retentionTimeout);

        jobTimeouts.current.set(job.processId, timeoutId);

        return updatedJob;
      } else if (job.status === JOB_PROCESSOR_STATUS_CANCELLING) {
        cancelingJob(job);
      }
      return job;
    });

    return () => {
      jobTimeoutsCurrent.forEach(clearTimeout);
      jobTimeoutsCurrent.clear();
    };
  }, [jobs, retentionTimeout, runJob, cancelJob, removeJob, updateJobStatus, statusChangeCallbacks, cancelingJob]);

  // post check if there is any cancel jobs run before job completed
  useEffect(() => {
    const escapeJob = jobs.find(j =>
      cancelledJobs?.find(
        c =>
          j.processId === c.processId &&
          j.status === JOB_PROCESSOR_STATUS_COMPLETED &&
          j.batchId === c.batchId &&
          JSON.stringify(j.jobPayload) === JSON.stringify(c.jobPayload)
      )
    );
    if (escapeJob) {
      updateJobStatus(escapeJob, JOB_PROCESSOR_STATUS_RETRY_CANCELLED);
      // remove from the cancel queue
      setCancelledJobs(prevCancelledJobs => prevCancelledJobs.filter(job => job.processId !== escapeJob.processId));
    }
  }, [cancelledJobs, jobs, updateJobStatus]);

  /**
   * Job Pooling Section
   */

  /**
   * Add a callback to the job status
   * @param {string} jobStatus - The job status to bind the callback to
   * @returns {function} - A function that will bind the callback to the job status
   */
  const addPoolingJobResolvedCallback = (jobStatus = WORKPAPER_JOB_STATUS_COMPLETED) => {
    return (jobId, callback = function () {}, ...args) => {
      const callbackWrapper = params => callback(params, ...args);
      const onJobResolved = { callback: callbackWrapper, hasBeenCalled: false };
      setOnPoolingJobResolvedCallbacks(resolvedCallbacksState => ({
        ...resolvedCallbacksState,
        [jobId]: { ...resolvedCallbacksState[jobId], [jobStatus]: onJobResolved },
      }));
    };
  };

  /**
   * Bind the callback to the job status WORKPAPER_JOB_STATUS_COMPLETED
   * @param {string} jobId - The job id to bind the callback to
   * @param {function} callback - The callback function to run when the job is completed
   */
  const bindOnPoolingJobCompleted = addPoolingJobResolvedCallback(WORKPAPER_JOB_STATUS_COMPLETED);
  /**
   * Bind the callback to the job status WORKPAPER_JOB_STATUS_FAILED
   * @param {string} jobId - The job id to bind the callback to
   * @param {function} callback - The callback function to run when the job is failed
   */
  const bindOnPoolingJobFailed = addPoolingJobResolvedCallback(WORKPAPER_JOB_STATUS_FAILED);
  /**
   * Bind the callback to the job status WORKPAPER_JOB_STATUS_CANCELLED
   * @param {string} jobId - The job id to bind the callback to
   * @param {function} callback - The callback function to run when the job is cancelled
   */
  const bindOnPoolingJobCancelled = addPoolingJobResolvedCallback(WORKPAPER_JOB_STATUS_CANCELLED);

  /**
   * Start a job and run a callback
   */
  useEffect(() => {
    const jobPoolingTimeoutsCurrent = jobPoolingTimeouts.current;
    return () => {
      // Cleanup all timeouts when the component unmounts
      jobPoolingTimeoutsCurrent.forEach(timeoutId => {
        clearTimeout(timeoutId);
      });
      jobPoolingTimeoutsCurrent.clear();
    };
    // eslint-disable-next-line react-hooks/exhaustive-deps
  }, []);

  /**
   * Start a job and run a callback
   * @param {string} entityId - The entity id, it could be the dataflow id
   * @param {string} jobType - The job type
   * @param {string} batchId - The batch id, it could be the block id
   * @param {object} payload - The job payload
   * @param {function} callback - The callback function to run
   * @param {function} onJobCreatedCallback - The callback function to run when the job is created
   * @param {function} onJobErrorCallback - The callback function to run when the job is errored
   * @param {any} params - The parameters to pass to the callback
   * @returns {object} - The job id and the timeout id
   * @throws {Error} - Throws an error if the job fails to start
   * @throws {Error} - Throws an error if the callback fails to run
   * @throws {Error} - Throws an error if the job is cancelled
   */
  const startJobAndRunCallback = async (
    entityId,
    jobType,
    batchId,
    payload,
    callback,
    onJobCreatedCallback,
    onJobErrorCallback,
    params
  ) => {
    try {
      let { jobId } = await startJob({ entityId, jobType, batchId, payload });
      if (onJobCreatedCallback) {
        onJobCreatedCallback(jobId);
      }

      const timeoutId = setTimeout(() => {
        try {
          callback(...params, jobId);
        } catch (callbackError) {
          if (onJobErrorCallback) onJobErrorCallback(callbackError);
        }
      }, 0);

      return { jobId, timeoutId };
    } catch (error) {
      if (onJobErrorCallback) onJobErrorCallback(error);
    }
  };

  /**
   * Cancel a pooling job
   * @param {string} jobId - The job id to cancel
   * @throws {Error} - Throws an error if the job fails to cancel
   */
  const cancelPoolJob = async jobId => {
    if (!jobId) return;
    const timeoutId = jobPoolingTimeouts.current.get(jobId);
    if (timeoutId) {
      try {
        await updateJob(jobId, WORKPAPER_JOB_STATUS_CANCELLED);
      } finally {
        clearTimeout(timeoutId);
        jobPoolingTimeouts.current.delete(jobId);
      }
    }
  };

  /**
   * Start a pooling job
   * @param {string} entityId - The entity id
   * @param {string} jobType - The job type
   * @param {string} batchId - The batch id
   * @param {object} payload - The job payload
   * @param {function} callback - The callback function to run
   * @param {function} onJobCreatedCallback - The callback function to run when the job is created
   * @param {any} params - The parameters to pass to the callback
   */
  const startPoolingJob = async ({
    entityId,
    jobType,
    batchId,
    payload,
    callback,
    onJobCreatedCallback,
    onJobErrorCallback,
    params,
  }) => {
    const { jobId, timeoutId } = await startJobAndRunCallback(
      entityId,
      jobType,
      batchId,
      payload,
      callback,
      onJobCreatedCallback,
      onJobErrorCallback,
      params
    );
    if (jobId && timeoutId) {
      jobPoolingTimeouts.current.set(jobId, timeoutId);
    }
  };

  /**
   * Resolve a pooling job
   * @param {object} onPoolingJobResolvedCallbacks - The callbacks for the pooling job
   * @param {object} job - The job to resolve
   * @param {string} status - The status of the job
   * @param {Error} error - The error of the job
   */
  const onPoolingJobResolved = (onPoolingJobResolvedCallbacks, job) => {
    const onJobResolved = onPoolingJobResolvedCallbacks[job.jobId][job.status];
    if (onJobResolved && !onJobResolved.hasBeenCalled) {
      onJobResolved.callback(job);
      onJobResolved.hasBeenCalled = true;
      setOnPoolingJobResolvedCallbacks({
        ...onPoolingJobResolvedCallbacks,
        [job.jobId]: { ...onPoolingJobResolvedCallbacks[job.jobId], [job.status]: onJobResolved },
      });
    }
  };

  /**
   * Check the status of the jobs
   */
  const jobsHeartbeatCheck = () => {
    const batchIds = activeBatchId ? [activeBatchId] : null;
    getJobsStatus({
      batchIds,
    }).then(jobs => {
      // Only keep jobs with pooling available
      const processList = jobs.filter(({ jobType }) => POOLING_JOB_TYPES.includes(jobType)).map(mapJobToProcess);

      const jobsInProgress = [];
      const jobsFailed = [];
      const jobsCompleted = [];

      for (let i = 0; i < processList.length; i++) {
        const job = processList[i];
        if (onPoolingJobResolvedCallbacks[job.jobId]) {
          onPoolingJobResolved(onPoolingJobResolvedCallbacks, job);
        }

        if (job.status === WORKPAPER_JOB_STATUS_COMPLETED) {
          jobsCompleted.push(job);
        } else if (job.status === WORKPAPER_JOB_STATUS_FAILED) {
          jobsFailed.push(job);
        } else {
          jobsInProgress.push(job);
        }
      }

      if (activeBatchId) {
        setPoolingProcesses(processList);
        setPoolingActiveProcesses(jobsInProgress.length);
        setPoolingFailedProcesses(jobsFailed.length);
      }

      // We need to update the spreadsheet-api to clean by dataflow Id
      if (poolingRequestCleanupJobs && processList.length) {
        cleanUpJobs().then(() => {
          // Clean up pending active process
          setPoolingProcesses([]);
          setPoolingActiveProcesses(0);
          setPoolingFailedProcesses(0);
          setPoolingRequestCleanupJobs(false);
        });
      }

      // CHECK IF THE HEARTBEAT IS NOT NEEDED ANYMORE
      if ((jobsInProgress && !jobsInProgress.length) || jobs.length === jobsCompleted.length + jobsFailed.length) {
        setHasActivePooling(false);
        // FINISH THE HEARTBEAT CHECKER
        clearTimeout(jobsHeartbeatCheckTimeout.current);
        jobsHeartbeatCheckTimeout.current = null;
      }
    });

    if (
      (poolingProcesses && !poolingProcesses.length) ||
      poolingProcesses.length === poolingActiveProcesses + poolingFailedProcesses
    ) {
      // FINISH THE HEARTBEAT CHECKER
      clearTimeout(jobsHeartbeatCheckTimeout.current);
      jobsHeartbeatCheckTimeout.current = null;
    }
  };

  /**
   * Check the status of the jobs
   */
  useEffect(() => {
    jobsHeartbeatCheckTimeout.current = setInterval(jobsHeartbeatCheck, jobsHeartbeatCheckInterval);

    return () => clearInterval(jobsHeartbeatCheckTimeout.current);
    // eslint-disable-next-line react-hooks/exhaustive-deps
  }, [poolingProcesses, poolingRequestCleanupJobs, onPoolingJobResolvedCallbacks, hasActivePooling, activeBatchId]);

  return (
    <DataFlowJobsContext.Provider
      value={{
        addJob,
        cancelJob,
        removeJob,
        jobs,
        setJobs,
        bindOnStatusChanged,
        setActiveBatchId,
        bindOnPoolingJobCompleted,
        bindOnPoolingJobFailed,
        bindOnPoolingJobCancelled,
        startPoolingJob,
        setPoolingRequestCleanupJobs,
        cancelPoolJob,
      }}
    >
      {children}
    </DataFlowJobsContext.Provider>
  );
}

export const useJobProcessor = () => {
  return useContext(DataFlowJobsContext);
};
