/*
 * Decompiled with CFR 0.152.
 */
package fr.ens.biologie.genomique.eoulsan.core.schedulers;

import com.google.common.collect.Queues;
import fr.ens.biologie.genomique.eoulsan.CommonHadoop;
import fr.ens.biologie.genomique.eoulsan.EoulsanException;
import fr.ens.biologie.genomique.eoulsan.EoulsanLogger;
import fr.ens.biologie.genomique.eoulsan.EoulsanRuntime;
import fr.ens.biologie.genomique.eoulsan.HadoopEoulsanRuntime;
import fr.ens.biologie.genomique.eoulsan.core.Step;
import fr.ens.biologie.genomique.eoulsan.core.schedulers.AbstractTaskScheduler;
import fr.ens.biologie.genomique.eoulsan.core.workflow.TaskContextImpl;
import fr.ens.biologie.genomique.eoulsan.core.workflow.TaskResultImpl;
import fr.ens.biologie.genomique.eoulsan.core.workflow.TaskRunner;
import fr.ens.biologie.genomique.eoulsan.core.workflow.TaskSerializationUtils;
import fr.ens.biologie.genomique.eoulsan.data.DataFile;
import fr.ens.biologie.genomique.eoulsan.util.hadoop.HadoopJobEmergencyStopTask;
import fr.ens.biologie.genomique.kenetre.util.StringUtils;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Objects;
import java.util.Queue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

public class HadoopCompatibleTaskScheduler
extends AbstractTaskScheduler {
    private final Configuration conf;
    private final Queue<TaskThread> queue = Queues.newLinkedBlockingQueue();

    @Override
    public void submit(Step step, TaskContextImpl context) {
        super.submit(step, context);
        TaskThread st = new TaskThread(this.conf, context);
        this.queue.add(st);
        st.start();
    }

    @Override
    public void stop() {
        super.stop();
        for (TaskThread thread : this.queue) {
            thread.stopThread();
        }
        this.queue.clear();
    }

    HadoopCompatibleTaskScheduler() {
        this.conf = CommonHadoop.createConfiguration();
    }

    public static final class HadoopCompatibleMapper
    extends Mapper<LongWritable, Text, NullWritable, NullWritable> {
        protected void setup(Mapper.Context context) throws IOException, InterruptedException {
            EoulsanLogger.initConsoleHandler();
            EoulsanLogger.getLogger().info("Start of setup()");
            Configuration conf = context.getConfiguration();
            if (!EoulsanRuntime.isRuntime()) {
                HadoopEoulsanRuntime.newEoulsanRuntime(conf);
            }
            EoulsanLogger.getLogger().info("End of setup()");
        }

        protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            EoulsanLogger.getLogger().info("Start of map()");
            EoulsanLogger.getLogger().info("Task context file: " + value);
            try {
                TaskResultImpl result = TaskSerializationUtils.execute(new DataFile(value.toString()));
                if (result != null) {
                    EoulsanLogger.getLogger().info("Task result: " + (result.isSuccess() ? "SUCCESS" : "FAIL"));
                    EoulsanLogger.getLogger().info("Task Duration: " + StringUtils.toTimeHumanReadable((long)result.getDuration()));
                    if (!result.isSuccess()) {
                        EoulsanLogger.getLogger().severe("Task error message: " + result.getErrorMessage());
                        if (result.getException() != null) {
                            result.getException().printStackTrace();
                        }
                    }
                }
            }
            catch (EoulsanException e) {
                throw new IOException(e);
            }
            EoulsanLogger.getLogger().info("End of map()");
        }
    }

    private final class TaskThread
    extends Thread {
        private static final String SUBMIT_FILE_NAME = "submitfile";
        private final TaskContextImpl context;
        private final Configuration conf;
        private final DataFile taskDir;
        private final String taskPrefix;
        private String jobId;
        private Job hadoopJob;

        private Job createHadoopJob(Configuration conf, DataFile submitFile, int requiredMemory, String jobDescription) throws IOException {
            Configuration jobConf = new Configuration(conf);
            jobConf.set("mapreduce.input.lineinputformat.linespermap", "1");
            if (requiredMemory > 0) {
                jobConf.set("mapreduce.map.memory.mb", "" + requiredMemory);
                int jvmMemory = requiredMemory - 128;
                if (jvmMemory <= 0) {
                    jvmMemory = requiredMemory;
                }
                jobConf.set("mapreduce.map.java.opts", "-Xmx" + jvmMemory + "M");
            }
            Job job = Job.getInstance((Configuration)jobConf, (String)jobDescription);
            job.setJarByClass(HadoopCompatibleTaskScheduler.class);
            FileInputFormat.addInputPath((Job)job, (Path)new Path(submitFile.getSource()));
            job.setInputFormatClass(NLineInputFormat.class);
            job.setMapperClass(HadoopCompatibleMapper.class);
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(NullWritable.class);
            job.setOutputFormatClass(NullOutputFormat.class);
            job.setNumReduceTasks(0);
            return job;
        }

        private DataFile createSubmitFile(DataFile taskContextFile) throws IOException {
            DataFile submitFile = new DataFile(taskContextFile.getParent(), SUBMIT_FILE_NAME);
            OutputStreamWriter writer = new OutputStreamWriter(submitFile.create());
            writer.write(taskContextFile.getSource());
            ((Writer)writer).close();
            return submitFile;
        }

        private TaskResultImpl loadResult() throws EoulsanException, IOException {
            DataFile taskDoneFile = new DataFile(this.taskDir, this.taskPrefix + ".task.done");
            if (!taskDoneFile.exists()) {
                throw new EoulsanException("No done file found for task #" + this.context.getId() + " in step " + HadoopCompatibleTaskScheduler.this.getStep(this.context).getId());
            }
            DataFile taskResultFile = new DataFile(this.taskDir, this.taskPrefix + ".task.result");
            this.context.deserializeOutputData(new DataFile(this.taskDir, this.taskPrefix + ".task.data"));
            return TaskResultImpl.deserialize(taskResultFile);
        }

        @Override
        public void run() {
            TaskResultImpl result = null;
            try {
                this.taskDir.mkdir();
                DataFile taskContextFile = new DataFile(this.taskDir, this.taskPrefix + ".task.context");
                this.context.serialize(taskContextFile);
                if (HadoopCompatibleTaskScheduler.this.isStopped()) {
                    return;
                }
                HadoopCompatibleTaskScheduler.this.beforeExecuteTask(this.context);
                DataFile sumbitFile = this.createSubmitFile(taskContextFile);
                this.hadoopJob = this.createHadoopJob(this.conf, sumbitFile, this.context.getCurrentStep().getRequiredMemory(), "Eoulsan Step " + this.context.getCurrentStep().getId() + " (" + this.context.getCurrentStep().getModuleName() + ") Task #" + this.context.getId() + " (" + this.context.getContextName() + ")");
                this.hadoopJob.submit();
                HadoopJobEmergencyStopTask.addHadoopJobEmergencyStopTask(this.hadoopJob);
                this.hadoopJob.waitForCompletion(false);
                HadoopJobEmergencyStopTask.removeHadoopJobEmergencyStopTask(this.hadoopJob);
                if (!this.hadoopJob.isSuccessful()) {
                    try {
                        result = this.loadResult();
                    }
                    catch (EoulsanException | IOException e) {
                        throw new EoulsanException("Error while running Hadoop job for Eoulsan task #" + this.context.getId() + "(" + this.context.getContextName() + ")");
                    }
                }
                result = this.loadResult();
                this.taskDir.delete(true);
                if (HadoopCompatibleTaskScheduler.this.isStopped()) {
                    return;
                }
                TaskRunner.sendTokens(this.context, result);
            }
            catch (EoulsanException | IOException | ClassNotFoundException | InterruptedException e) {
                result = TaskRunner.createStepResult(this.context, e);
            }
            if (HadoopCompatibleTaskScheduler.this.isStopped()) {
                return;
            }
            HadoopCompatibleTaskScheduler.this.afterExecuteTask(this.context, result);
            HadoopCompatibleTaskScheduler.this.queue.remove(this);
        }

        public void stopThread() {
            if (this.jobId != null) {
                try {
                    if (this.hadoopJob != null) {
                        this.hadoopJob.killJob();
                    }
                }
                catch (IOException e) {
                    EoulsanLogger.getLogger().severe("Error while stopping job " + this.jobId + ": " + e.getMessage());
                }
            }
        }

        TaskThread(Configuration conf, TaskContextImpl context) {
            Objects.requireNonNull(conf, "conf argument cannot be null");
            Objects.requireNonNull(context, "context argument cannot be null");
            DataFile hadoopWorkDir = context.getHadoopWorkingPathname();
            this.conf = conf;
            this.context = context;
            this.taskDir = new DataFile(hadoopWorkDir, "eoulsan-hadoop-compatible-task-" + this.context.getId());
            this.taskPrefix = context.getTaskFilePrefix();
        }
    }
}

