/*
 * Decompiled with CFR 0.152.
 */
package fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import fr.ens.biologie.genomique.eoulsan.CommonHadoop;
import fr.ens.biologie.genomique.eoulsan.EoulsanException;
import fr.ens.biologie.genomique.eoulsan.Globals;
import fr.ens.biologie.genomique.eoulsan.annotations.HadoopOnly;
import fr.ens.biologie.genomique.eoulsan.bio.io.hadoop.FastqInputFormat;
import fr.ens.biologie.genomique.eoulsan.bio.io.hadoop.FastqOutputFormat;
import fr.ens.biologie.genomique.eoulsan.core.InputPorts;
import fr.ens.biologie.genomique.eoulsan.core.InputPortsBuilder;
import fr.ens.biologie.genomique.eoulsan.core.TaskContext;
import fr.ens.biologie.genomique.eoulsan.core.TaskResult;
import fr.ens.biologie.genomique.eoulsan.core.TaskStatus;
import fr.ens.biologie.genomique.eoulsan.data.Data;
import fr.ens.biologie.genomique.eoulsan.data.DataFile;
import fr.ens.biologie.genomique.eoulsan.data.DataFormat;
import fr.ens.biologie.genomique.eoulsan.data.DataFormats;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.AbstractReadsFilterModule;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop.HadoopMappingUtils;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop.PairedEndFastqToTfq;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop.ReadsFilterMapper;
import fr.ens.biologie.genomique.eoulsan.util.hadoop.MapReduceUtils;
import fr.ens.biologie.genomique.kenetre.bio.FastqFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

@HadoopOnly
public class ReadsFilterHadoopModule
extends AbstractReadsFilterModule {
    static final String OUTPUT_FILE1_KEY = Globals.PARAMETER_PREFIX + ".filter.reads.output.file1";
    static final String OUTPUT_FILE2_KEY = Globals.PARAMETER_PREFIX + ".filter.reads.output.file2";
    private static final String TEMP_DIR_SUFFIX = ".tmp";

    @Override
    public InputPorts getInputPorts() {
        return InputPortsBuilder.allPortsRequiredInWorkingDirectory(super.getInputPorts());
    }

    @Override
    public TaskResult execute(TaskContext context, TaskStatus status) {
        Configuration conf = CommonHadoop.createConfiguration();
        try {
            Job job;
            Data inData = context.getInputData(DataFormats.READS_FASTQ);
            Data outData = context.getOutputData(DataFormats.READS_FASTQ, inData);
            String dataName = inData.getName();
            FastqFormat fastqFormat = inData.getMetadata().getFastqFormat();
            DataFile tfqFile = null;
            if (inData.getDataFileCount() == 1) {
                DataFile inFile = inData.getDataFile(0);
                DataFile outFile = outData.getDataFile(0);
                List<String> filenames = Collections.singletonList(inFile.getName());
                job = this.createJobConf(conf, dataName, inFile, filenames, DataFormats.READS_FASTQ, fastqFormat, outFile);
            } else {
                DataFile inFile1 = inData.getDataFile(0);
                DataFile inFile2 = inData.getDataFile(1);
                DataFile outFile1 = outData.getDataFile(0);
                DataFile outFile2 = outData.getDataFile(1);
                ArrayList filenames = Lists.newArrayList((Object[])new String[]{inFile1.getName(), inFile2.getName()});
                tfqFile = new DataFile(inFile1.getParent(), inFile1.getBasename() + DataFormats.READS_TFQ.getDefaultExtension());
                MapReduceUtils.submitAndWaitForJob(PairedEndFastqToTfq.convert(conf, inFile1, inFile2, tfqFile, this.getReducerTaskCount()), inData.getName(), 5000, status, "reads_filtering");
                job = this.createJobConf(conf, dataName, tfqFile, filenames, DataFormats.READS_TFQ, fastqFormat, outFile1, outFile2);
            }
            MapReduceUtils.submitAndWaitForJob(job, inData.getName(), 5000, status, "reads_filtering");
            if (inData.getDataFileCount() > 1) {
                DataFile outFile1 = outData.getDataFile(0);
                DataFile outFile2 = outData.getDataFile(1);
                DataFile tmpDir = new DataFile(outFile1.getSource() + TEMP_DIR_SUFFIX);
                DataFile outTmpFile1 = new DataFile(tmpDir, outFile1.getName());
                DataFile outTmpFile2 = new DataFile(tmpDir, outFile2.getName());
                outTmpFile1.renameTo(outFile1);
                outTmpFile2.renameTo(outFile2);
                tmpDir.delete(true);
            }
            return status.createTaskResult();
        }
        catch (EoulsanException | IOException e) {
            return status.createTaskResult(e, "Error while running job: " + e.getMessage());
        }
    }

    private Job createJobConf(Configuration parentConf, String dataName, DataFile inFile, List<String> filenames, DataFormat inputFormat, FastqFormat fastqFormat, DataFile ... outFiles) throws IOException {
        Configuration jobConf = new Configuration(parentConf);
        Path inputPath = new Path(inFile.getSource());
        jobConf.set(CommonHadoop.COUNTER_GROUP_KEY, "reads_filtering");
        jobConf.set(ReadsFilterMapper.FASTQ_FORMAT_KEY, fastqFormat.getName());
        HadoopMappingUtils.addParametersToJobConf(this.getReadFilterParameters(), ReadsFilterMapper.READ_FILTER_PARAMETER_KEY_PREFIX, jobConf);
        if (outFiles.length > 1) {
            jobConf.set(OUTPUT_FILE1_KEY, outFiles[0].getName());
            jobConf.set(OUTPUT_FILE2_KEY, outFiles[1].getName());
        }
        Job job = Job.getInstance((Configuration)jobConf, (String)("Filter reads (" + dataName + ", " + Joiner.on((String)", ").join(filenames) + ")"));
        job.setJarByClass(ReadsFilterHadoopModule.class);
        FileInputFormat.addInputPath((Job)job, (Path)inputPath);
        if (inputFormat == DataFormats.READS_FASTQ) {
            job.setInputFormatClass(FastqInputFormat.class);
        } else {
            job.setInputFormatClass(KeyValueTextInputFormat.class);
        }
        job.setMapperClass(ReadsFilterMapper.class);
        job.setOutputFormatClass(FastqOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(0);
        FileOutputFormat.setOutputPath((Job)job, (Path)new Path(outFiles[0].getSource() + (outFiles.length > 1 ? TEMP_DIR_SUFFIX : "")));
        return job;
    }
}

