/*
 * Decompiled with CFR 0.152.
 */
package fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop;

import fr.ens.biologie.genomique.eoulsan.CommonHadoop;
import fr.ens.biologie.genomique.eoulsan.EoulsanException;
import fr.ens.biologie.genomique.eoulsan.annotations.HadoopOnly;
import fr.ens.biologie.genomique.eoulsan.bio.io.hadoop.SAMInputFormat;
import fr.ens.biologie.genomique.eoulsan.bio.io.hadoop.SAMOutputFormat;
import fr.ens.biologie.genomique.eoulsan.core.InputPorts;
import fr.ens.biologie.genomique.eoulsan.core.InputPortsBuilder;
import fr.ens.biologie.genomique.eoulsan.core.TaskContext;
import fr.ens.biologie.genomique.eoulsan.core.TaskResult;
import fr.ens.biologie.genomique.eoulsan.core.TaskStatus;
import fr.ens.biologie.genomique.eoulsan.data.Data;
import fr.ens.biologie.genomique.eoulsan.data.DataFormats;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.AbstractSAMFilterModule;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop.HadoopMappingUtils;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop.ReadsMapperHadoopModule;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop.SAMFilterMapper;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop.SAMFilterReducer;
import fr.ens.biologie.genomique.eoulsan.util.hadoop.MapReduceUtils;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

@HadoopOnly
public class SAMFilterHadoopModule
extends AbstractSAMFilterModule {
    @Override
    public InputPorts getInputPorts() {
        return InputPortsBuilder.allPortsRequiredInWorkingDirectory(super.getInputPorts());
    }

    @Override
    public TaskResult execute(TaskContext context, TaskStatus status) {
        Configuration conf = CommonHadoop.createConfiguration();
        try {
            Data inData = context.getInputData(DataFormats.MAPPER_RESULTS_SAM);
            Data outData = context.getOutputData(DataFormats.MAPPER_RESULTS_SAM, inData);
            Job job = this.createJob(conf, inData, outData);
            MapReduceUtils.submitAndWaitForJob(job, inData.getName(), 5000, status, "sam_filtering");
            return status.createTaskResult();
        }
        catch (EoulsanException | IOException e) {
            return status.createTaskResult(e, "Error while running job: " + e.getMessage());
        }
    }

    private Job createJob(Configuration parentConf, Data inData, Data outData) throws IOException {
        Configuration jobConf = new Configuration(parentConf);
        Path inputPath = new Path(inData.getDataFile().getSource());
        jobConf.set(CommonHadoop.COUNTER_GROUP_KEY, "sam_filtering");
        HadoopMappingUtils.addParametersToJobConf(this.getAlignmentsFilterParameters(), SAMFilterReducer.MAP_FILTER_PARAMETER_KEY_PREFIX, jobConf);
        jobConf.set("mapreduce.task.timeout", "1800000");
        Job job = Job.getInstance((Configuration)jobConf, (String)("Filter SAM file (" + inData.getName() + ", " + inputPath.getName() + ")"));
        job.setJarByClass(ReadsMapperHadoopModule.class);
        FileInputFormat.addInputPath((Job)job, (Path)inputPath);
        job.setInputFormatClass(SAMInputFormat.class);
        job.setMapperClass(SAMFilterMapper.class);
        job.setReducerClass(SAMFilterReducer.class);
        if (this.getReducerTaskCount() > 0) {
            job.setNumReduceTasks(this.getReducerTaskCount());
        }
        job.setOutputFormatClass(SAMOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileOutputFormat.setOutputPath((Job)job, (Path)new Path(outData.getDataFile().getSource()));
        return job;
    }
}

