/*
 * Decompiled with CFR 0.152.
 */
package fr.ens.biologie.genomique.eoulsan.core.schedulers.clusters;

import fr.ens.biologie.genomique.eoulsan.EoulsanException;
import fr.ens.biologie.genomique.eoulsan.EoulsanLogger;
import fr.ens.biologie.genomique.eoulsan.Main;
import fr.ens.biologie.genomique.eoulsan.core.Step;
import fr.ens.biologie.genomique.eoulsan.core.schedulers.AbstractTaskScheduler;
import fr.ens.biologie.genomique.eoulsan.core.schedulers.clusters.ClusterTaskScheduler;
import fr.ens.biologie.genomique.eoulsan.core.workflow.TaskContextImpl;
import fr.ens.biologie.genomique.eoulsan.core.workflow.TaskResultImpl;
import fr.ens.biologie.genomique.eoulsan.core.workflow.TaskRunner;
import fr.ens.biologie.genomique.kenetre.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;

public abstract class AbstractClusterTaskScheduler
extends AbstractTaskScheduler
implements ClusterTaskScheduler {
    private static final int STATUS_UPDATE_DELAY = 1000;
    private final Queue<TaskThread> queue = new LinkedBlockingQueue<TaskThread>();
    private final StatusUpdateWaitingQueue statusUpdateQueue = new StatusUpdateWaitingQueue();

    @Override
    public void submit(Step step, TaskContextImpl context) {
        super.submit(step, context);
        TaskThread st = new TaskThread(context);
        this.queue.add(st);
        st.start();
    }

    @Override
    public void stop() {
        for (TaskThread thread : this.queue) {
            thread.stopThread();
        }
        this.queue.clear();
    }

    private final class TaskThread
    extends Thread {
        private final TaskContextImpl context;
        private final File taskDir;
        private final String taskPrefix;
        private String jobId;

        private List<String> createJobCommand() throws IOException {
            File taskContextFile = new File(this.taskDir, this.taskPrefix + ".task.context");
            this.context.serialize(taskContextFile);
            ArrayList<String> command = new ArrayList<String>();
            File eoulsanScriptFile = new File(Main.getInstance().getEoulsanScriptPath());
            command.add(eoulsanScriptFile.getAbsolutePath());
            command.add("-j");
            command.add(System.getProperty("java.home"));
            command.add("-w");
            command.add(System.getProperty("user.dir"));
            String logLevel = Main.getInstance().getLogLevelArgument();
            if (logLevel != null) {
                command.add("-loglevel");
                command.add(logLevel);
            }
            command.add("clustertask");
            command.add(taskContextFile.getAbsolutePath());
            return Collections.unmodifiableList(command);
        }

        private String getJobName() {
            return this.context.getJobId() + "-" + this.taskPrefix;
        }

        private TaskResultImpl loadResult() throws EoulsanException, IOException {
            File taskDoneFile = new File(this.taskDir, this.taskPrefix + ".task.done");
            if (!taskDoneFile.exists()) {
                throw new EoulsanException("No done file found for task #" + this.context.getId() + " in step " + AbstractClusterTaskScheduler.this.getStep(this.context).getId());
            }
            File taskResultFile = new File(this.taskDir, this.taskPrefix + ".task.result");
            this.context.deserializeOutputData(new File(this.taskDir, this.taskPrefix + ".task.data"));
            return TaskResultImpl.deserialize(taskResultFile);
        }

        private void createJobIdFile() throws IOException {
            File taskResultFile = new File(this.taskDir, this.taskPrefix + ".task.job.id");
            try (PrintWriter out = new PrintWriter(taskResultFile);){
                out.println(this.jobId);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            TaskResultImpl result = null;
            try {
                AbstractClusterTaskScheduler.this.beforeExecuteTask(this.context);
                File taskFile = this.context.getTaskOutputDirectory().toFile();
                int requiredMemory = this.getRequiredMemory();
                int requiredProcessors = this.context.getCurrentStep().getRequiredProcessors();
                this.jobId = AbstractClusterTaskScheduler.this.submitJob(this.getJobName(), this.createJobCommand(), taskFile, this.context.getId(), requiredMemory, requiredProcessors);
                this.createJobIdFile();
                ClusterTaskScheduler.StatusResult status = null;
                boolean completed = false;
                do {
                    AbstractClusterTaskScheduler.this.statusUpdateQueue.waitTurn(this);
                    status = AbstractClusterTaskScheduler.this.statusJob(this.jobId);
                    switch (status.getStatusValue()) {
                        case COMPLETE: {
                            completed = true;
                            break;
                        }
                    }
                } while (!completed);
                if (status.getExitCode() != 0) {
                    throw new EoulsanException("Invalid task exit code: " + status.getExitCode() + " for task #" + this.context.getId() + " in step " + AbstractClusterTaskScheduler.this.getStep(this.context).getId());
                }
                result = this.loadResult();
                TaskRunner.sendTokens(this.context, result);
            }
            catch (EoulsanException | IOException | InterruptedException e) {
                result = TaskRunner.createStepResult(this.context, e);
            }
            finally {
                if (result == null) {
                    result = TaskRunner.createStepResult(this.context, new IllegalStateException("Result is null for task #" + this.context.getId() + " in step " + AbstractClusterTaskScheduler.this.getStep(this.context).getId()));
                }
                AbstractClusterTaskScheduler.this.afterExecuteTask(this.context, result);
                AbstractClusterTaskScheduler.this.queue.remove(this);
            }
        }

        private int getRequiredMemory() {
            int result = this.context.getCurrentStep().getRequiredMemory();
            if (result > 0) {
                return result;
            }
            result = this.context.getSettings().getDefaultClusterMemoryRequired();
            if (result > 0) {
                return result;
            }
            return Main.getInstance().getEoulsanMemory();
        }

        public void stopThread() {
            if (this.jobId != null) {
                try {
                    AbstractClusterTaskScheduler.this.stopJob(this.jobId);
                }
                catch (IOException e) {
                    EoulsanLogger.getLogger().severe("Error while stopping job " + this.jobId + ": " + e.getMessage());
                }
            }
        }

        TaskThread(TaskContextImpl context) {
            Objects.requireNonNull(context, "context argument cannot be null");
            this.context = context;
            this.taskDir = context.getTaskOutputDirectory().toFile();
            this.taskPrefix = context.getTaskFilePrefix();
            this.setName("TaskThead " + this.getJobName());
        }
    }

    public final class ProcessThreadOutput
    extends Thread {
        final InputStream in;
        final OutputStream out;

        @Override
        public void run() {
            try {
                FileUtils.copy((InputStream)this.in, (OutputStream)this.out);
            }
            catch (IOException e) {
                EoulsanLogger.getLogger().severe(e.getMessage());
            }
        }

        public ProcessThreadOutput(InputStream in, OutputStream out) {
            this.in = in;
            this.out = out;
        }
    }

    private static class StatusUpdateWaitingQueue {
        private final BlockingQueue<TaskThread> queue = new LinkedBlockingDeque<TaskThread>();

        private StatusUpdateWaitingQueue() {
        }

        public void waitTurn(TaskThread l) throws InterruptedException {
            TaskThread e;
            this.queue.add(l);
            do {
                e = (TaskThread)this.queue.element();
                Thread.sleep(1000L);
            } while (e != l);
            this.queue.poll();
        }
    }
}

