/*
 * Decompiled with CFR 0.152.
 */
package fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import fr.ens.biologie.genomique.eoulsan.CommonHadoop;
import fr.ens.biologie.genomique.eoulsan.EoulsanException;
import fr.ens.biologie.genomique.eoulsan.annotations.HadoopOnly;
import fr.ens.biologie.genomique.eoulsan.bio.io.hadoop.FastqInputFormat;
import fr.ens.biologie.genomique.eoulsan.bio.io.hadoop.SAMOutputFormat;
import fr.ens.biologie.genomique.eoulsan.core.InputPorts;
import fr.ens.biologie.genomique.eoulsan.core.InputPortsBuilder;
import fr.ens.biologie.genomique.eoulsan.core.Modules;
import fr.ens.biologie.genomique.eoulsan.core.Parameter;
import fr.ens.biologie.genomique.eoulsan.core.StepConfigurationContext;
import fr.ens.biologie.genomique.eoulsan.core.TaskContext;
import fr.ens.biologie.genomique.eoulsan.core.TaskResult;
import fr.ens.biologie.genomique.eoulsan.core.TaskStatus;
import fr.ens.biologie.genomique.eoulsan.data.Data;
import fr.ens.biologie.genomique.eoulsan.data.DataFile;
import fr.ens.biologie.genomique.eoulsan.data.DataFormat;
import fr.ens.biologie.genomique.eoulsan.data.DataFormats;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.AbstractFilterAndMapReadsModule;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop.HadoopMappingUtils;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop.PairedEndFastqToTfq;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop.ReadsFilterHadoopModule;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop.ReadsFilterMapper;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop.ReadsMapperHadoopModule;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop.ReadsMapperMapper;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop.SAMFilterMapper;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop.SAMFilterReducer;
import fr.ens.biologie.genomique.eoulsan.util.hadoop.MapReduceUtils;
import fr.ens.biologie.genomique.kenetre.bio.FastqFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

@HadoopOnly
public class FilterAndMapReadsHadoopModule
extends AbstractFilterAndMapReadsModule {
    @Override
    public InputPorts getInputPorts() {
        return InputPortsBuilder.allPortsRequiredInWorkingDirectory(super.getInputPorts());
    }

    @Override
    public void configure(StepConfigurationContext context, Set<Parameter> stepParameters) throws EoulsanException {
        super.configure(context, stepParameters);
        if (!this.getMapper().isSplitsAllowed()) {
            Modules.invalidConfiguration(context, "The selected mapper cannot be used in hadoop mode as computation cannot be parallelized: " + this.getMapper().getName());
        }
    }

    @Override
    public TaskResult execute(TaskContext context, TaskStatus status) {
        Configuration conf = CommonHadoop.createConfiguration();
        try {
            Job job;
            Data readsData = context.getInputData(DataFormats.READS_FASTQ);
            String dataName = readsData.getName();
            DataFile samFile = context.getOutputData(DataFormats.MAPPER_RESULTS_SAM, readsData).getDataFile();
            DataFile mapperIndex = context.getInputData("mapperindex").getDataFile();
            FastqFormat fastqFormat = readsData.getMetadata().getFastqFormat();
            DataFile tfqFile = null;
            if (readsData.getDataFileCount() == 1) {
                DataFile inFile = readsData.getDataFile(0);
                List<String> filenames = Collections.singletonList(inFile.getName());
                job = this.createJobConf(conf, context, dataName, inFile, filenames, false, DataFormats.READS_FASTQ, fastqFormat, mapperIndex, samFile);
            } else {
                DataFile inFile1 = readsData.getDataFile(0);
                DataFile inFile2 = readsData.getDataFile(1);
                ArrayList filenames = Lists.newArrayList((Object[])new String[]{inFile1.getName(), inFile2.getName()});
                tfqFile = new DataFile(inFile1.getParent(), inFile1.getBasename() + DataFormats.READS_TFQ.getDefaultExtension());
                MapReduceUtils.submitAndWaitForJob(PairedEndFastqToTfq.convert(conf, inFile1, inFile2, tfqFile, this.getReducerTaskCount()), readsData.getName(), 5000, status, this.getCounterGroup());
                job = this.createJobConf(conf, context, dataName, tfqFile, filenames, true, DataFormats.READS_TFQ, fastqFormat, mapperIndex, samFile);
            }
            MapReduceUtils.submitAndWaitForJob(job, readsData.getName(), 5000, status, this.getCounterGroup());
            return status.createTaskResult();
        }
        catch (EoulsanException | IOException e) {
            return status.createTaskResult(e, "Error while running job: " + e.getMessage());
        }
    }

    private Job createJobConf(Configuration parentConf, TaskContext context, String dataName, DataFile inFile, List<String> filenames, boolean pairedEnd, DataFormat inputFormat, FastqFormat fastqFormat, DataFile genomeIndexFile, DataFile outFile) throws IOException {
        Configuration jobConf = new Configuration(parentConf);
        Path inputPath = new Path(inFile.getSource());
        jobConf.set(CommonHadoop.COUNTER_GROUP_KEY, this.getCounterGroup());
        jobConf.set(ReadsFilterMapper.FASTQ_FORMAT_KEY, fastqFormat.getName());
        HadoopMappingUtils.addParametersToJobConf(this.getReadFilterParameters(), ReadsFilterMapper.READ_FILTER_PARAMETER_KEY_PREFIX, jobConf);
        jobConf.set(ReadsMapperMapper.MAPPER_NAME_KEY, this.getMapperName());
        jobConf.set(ReadsMapperMapper.MAPPER_VERSION_KEY, this.getMapperVersion());
        jobConf.set(ReadsMapperMapper.MAPPER_FLAVOR_KEY, this.getMapperFlavor());
        jobConf.set(ReadsMapperMapper.PAIR_END_KEY, Boolean.toString(pairedEnd));
        if (this.getMapperHadoopThreads() < 0) {
            jobConf.set(ReadsMapperMapper.MAPPER_THREADS_KEY, "" + this.getMapperHadoopThreads());
        }
        if (this.getMapperArguments() != null) {
            jobConf.set(ReadsMapperMapper.MAPPER_ARGS_KEY, this.getMapperArguments());
        }
        jobConf.set(ReadsMapperMapper.FASTQ_FORMAT_KEY, "" + fastqFormat);
        jobConf.set(ReadsMapperMapper.INDEX_CHECKSUM_KEY, ReadsMapperHadoopModule.computeZipCheckSum(genomeIndexFile, parentConf));
        jobConf.set("mapreduce.task.timeout", "3600000");
        jobConf.set("mapreduce.job.jvm.numtasks", "1");
        jobConf.set("mapreduce.map.memory.mb", "" + this.getMapperHadoopMemoryRequired());
        jobConf.set("mapreduce.map.java.opts", "-Xmx4096M");
        ReadsMapperHadoopModule.setZooKeeperJobConfiguration(jobConf, context);
        HadoopMappingUtils.addParametersToJobConf(this.getAlignmentsFilterParameters(), SAMFilterReducer.MAP_FILTER_PARAMETER_KEY_PREFIX, jobConf);
        Job job = Job.getInstance((Configuration)jobConf, (String)("Filter and map reads (" + dataName + ", " + Joiner.on((String)", ").join(filenames) + ")"));
        job.setJarByClass(ReadsFilterHadoopModule.class);
        FileInputFormat.addInputPath((Job)job, (Path)inputPath);
        Path genomeIndex = new Path(genomeIndexFile.getSource());
        job.addCacheFile(genomeIndex.toUri());
        if (inputFormat == DataFormats.READS_FASTQ) {
            job.setInputFormatClass(FastqInputFormat.class);
        } else {
            job.setInputFormatClass(KeyValueTextInputFormat.class);
        }
        ChainMapper.addMapper((Job)job, ReadsFilterMapper.class, Text.class, Text.class, Text.class, Text.class, (Configuration)jobConf);
        ChainMapper.addMapper((Job)job, ReadsMapperMapper.class, Text.class, Text.class, Text.class, Text.class, (Configuration)jobConf);
        ChainMapper.addMapper((Job)job, SAMFilterMapper.class, Text.class, Text.class, Text.class, Text.class, (Configuration)jobConf);
        job.setReducerClass(SAMFilterReducer.class);
        job.setOutputFormatClass(SAMOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileOutputFormat.setOutputPath((Job)job, (Path)new Path(outFile.getSource()));
        return job;
    }
}

