/*
 * Decompiled with CFR 0.152.
 */
package fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop;

import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import fr.ens.biologie.genomique.eoulsan.CommonHadoop;
import fr.ens.biologie.genomique.eoulsan.EoulsanException;
import fr.ens.biologie.genomique.eoulsan.Settings;
import fr.ens.biologie.genomique.eoulsan.annotations.HadoopOnly;
import fr.ens.biologie.genomique.eoulsan.bio.io.hadoop.FastqInputFormat;
import fr.ens.biologie.genomique.eoulsan.bio.io.hadoop.SAMOutputFormat;
import fr.ens.biologie.genomique.eoulsan.core.InputPorts;
import fr.ens.biologie.genomique.eoulsan.core.InputPortsBuilder;
import fr.ens.biologie.genomique.eoulsan.core.Modules;
import fr.ens.biologie.genomique.eoulsan.core.Parameter;
import fr.ens.biologie.genomique.eoulsan.core.StepConfigurationContext;
import fr.ens.biologie.genomique.eoulsan.core.TaskContext;
import fr.ens.biologie.genomique.eoulsan.core.TaskResult;
import fr.ens.biologie.genomique.eoulsan.core.TaskStatus;
import fr.ens.biologie.genomique.eoulsan.data.Data;
import fr.ens.biologie.genomique.eoulsan.data.DataFile;
import fr.ens.biologie.genomique.eoulsan.data.DataFormat;
import fr.ens.biologie.genomique.eoulsan.data.DataFormats;
import fr.ens.biologie.genomique.eoulsan.data.MapperIndexDataFormat;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.AbstractReadsMapperModule;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop.PairedEndFastqToTfq;
import fr.ens.biologie.genomique.eoulsan.modules.mapping.hadoop.ReadsMapperMapper;
import fr.ens.biologie.genomique.eoulsan.util.hadoop.MapReduceUtils;
import fr.ens.biologie.genomique.kenetre.bio.FastqFormat;
import fr.ens.biologie.genomique.kenetre.util.StringUtils;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigInteger;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

@HadoopOnly
public class ReadsMapperHadoopModule
extends AbstractReadsMapperModule {
    @Override
    public InputPorts getInputPorts() {
        InputPortsBuilder builder = new InputPortsBuilder();
        builder.addPort("reads", DataFormats.READS_FASTQ, true);
        builder.addPort("mapperindex", (DataFormat)new MapperIndexDataFormat(this.getMapper()), true);
        return builder.create();
    }

    @Override
    public void configure(StepConfigurationContext context, Set<Parameter> stepParameters) throws EoulsanException {
        super.configure(context, stepParameters);
        if (!this.getMapper().isSplitsAllowed()) {
            Modules.invalidConfiguration(context, "The selected mapper cannot be used in Hadoop mode as computation cannot be parallelized: " + this.getMapper().getName());
        }
        if (!this.isUseBundledBinaries()) {
            Modules.invalidConfiguration(context, "Non bundled mapper binaries cannot be used in Hadoop mode");
        }
        if (!this.getMapperDockerImage().isEmpty()) {
            Modules.invalidConfiguration(context, "Cannot use a mapper Docker image in Hadoop mode");
        }
    }

    @Override
    public TaskResult execute(TaskContext context, TaskStatus status) {
        Configuration conf = CommonHadoop.createConfiguration();
        try {
            Job job;
            Data readsData = context.getInputData(DataFormats.READS_FASTQ);
            String dataName = readsData.getName();
            DataFile mapperIndexFile = context.getInputData(new MapperIndexDataFormat(this.getMapper())).getDataFile();
            DataFile outFile = context.getOutputData(DataFormats.MAPPER_RESULTS_SAM, readsData).getDataFile();
            DataFile tfqFile = null;
            FastqFormat fastqFormat = readsData.getMetadata().getFastqFormat();
            if (readsData.getDataFileCount() == 1) {
                job = this.createJobConf(conf, context, dataName, readsData.getDataFile(0), false, DataFormats.READS_FASTQ, fastqFormat, mapperIndexFile, outFile);
            } else {
                DataFile inFile1 = readsData.getDataFile(0);
                DataFile inFile2 = readsData.getDataFile(1);
                tfqFile = new DataFile(inFile1.getParent(), inFile1.getBasename() + DataFormats.READS_TFQ.getDefaultExtension());
                MapReduceUtils.submitAndWaitForJob(PairedEndFastqToTfq.convert(conf, inFile1, inFile2, tfqFile, this.getReducerTaskCount()), readsData.getName(), 5000, status, "reads_mapping");
                job = this.createJobConf(conf, context, dataName, tfqFile, true, DataFormats.READS_TFQ, fastqFormat, mapperIndexFile, outFile);
            }
            MapReduceUtils.submitAndWaitForJob(job, readsData.getName(), 5000, status, "reads_mapping");
            if (tfqFile != null) {
                FileSystem fs = FileSystem.get((Configuration)conf);
                fs.delete(new Path(tfqFile.getSource()), true);
            }
            return status.createTaskResult();
        }
        catch (EoulsanException | IOException e) {
            return status.createTaskResult(e, "Error while running job: " + e.getMessage());
        }
    }

    private Job createJobConf(Configuration parentConf, TaskContext context, String dataName, DataFile readsFile, boolean pairedEnd, DataFormat inputFormat, FastqFormat fastqFormat, DataFile mapperIndexFile, DataFile outFile) throws IOException {
        Configuration jobConf = new Configuration(parentConf);
        Path inputPath = new Path(readsFile.getSource());
        jobConf.set(ReadsMapperMapper.MAPPER_NAME_KEY, this.getMapperName());
        jobConf.set(ReadsMapperMapper.MAPPER_VERSION_KEY, this.getMapperVersion());
        jobConf.set(ReadsMapperMapper.MAPPER_FLAVOR_KEY, this.getMapperFlavor());
        jobConf.set(ReadsMapperMapper.PAIR_END_KEY, Boolean.toString(pairedEnd));
        if (this.getMapperLocalThreads() > 0) {
            jobConf.set(ReadsMapperMapper.MAPPER_THREADS_KEY, "" + this.getMapperHadoopThreads());
        }
        if (this.getMapperArguments() != null) {
            jobConf.set(ReadsMapperMapper.MAPPER_ARGS_KEY, StringUtils.doubleQuotes((String)this.getMapperArguments()));
        }
        jobConf.set(ReadsMapperMapper.FASTQ_FORMAT_KEY, "" + fastqFormat);
        jobConf.set(ReadsMapperMapper.INDEX_CHECKSUM_KEY, ReadsMapperHadoopModule.computeZipCheckSum(mapperIndexFile, parentConf));
        jobConf.set(CommonHadoop.COUNTER_GROUP_KEY, "reads_mapping");
        jobConf.set("mapreduce.task.timeout", "3600000");
        jobConf.set("mapreduce.job.jvm.numtasks", "1");
        jobConf.set("mapreduce.map.memory.mb", "" + this.getMapperHadoopMemoryRequired());
        jobConf.set("mapreduce.map.java.opts", "-Xmx4096M");
        ReadsMapperHadoopModule.setZooKeeperJobConfiguration(jobConf, context);
        Job job = Job.getInstance((Configuration)jobConf, (String)("Mapping reads in " + fastqFormat + " with " + this.getMapperName() + " (" + dataName + ", " + readsFile.getName() + ")"));
        Path genomeIndex = new Path(mapperIndexFile.getSource());
        job.addCacheFile(genomeIndex.toUri());
        job.setJarByClass(ReadsMapperHadoopModule.class);
        FileInputFormat.addInputPath((Job)job, (Path)inputPath);
        if (inputFormat == DataFormats.READS_FASTQ) {
            job.setInputFormatClass(FastqInputFormat.class);
        } else {
            job.setInputFormatClass(KeyValueTextInputFormat.class);
        }
        job.setMapperClass(ReadsMapperMapper.class);
        job.setOutputFormatClass(SAMOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(0);
        FileOutputFormat.setOutputPath((Job)job, (Path)new Path(outFile.getSource()));
        return job;
    }

    static void setZooKeeperJobConfiguration(Configuration jobConf, TaskContext context) {
        Settings settings = context.getSettings();
        Object connectString = settings.getZooKeeperConnectString();
        if (connectString == null) {
            connectString = jobConf.get("yarn.resourcemanager.hostname").split(":")[0] + ":" + settings.getZooKeeperDefaultPort();
        }
        jobConf.set(ReadsMapperMapper.ZOOKEEPER_CONNECT_STRING_KEY, (String)connectString);
        jobConf.set(ReadsMapperMapper.ZOOKEEPER_SESSION_TIMEOUT_KEY, "" + settings.getZooKeeperSessionTimeout());
    }

    static String computeZipCheckSum(DataFile file, Configuration conf) throws IOException {
        Path path = new Path(file.getSource());
        FileSystem fs = FileSystem.get((URI)path.toUri(), (Configuration)conf);
        FileChecksum checksum = fs.getFileChecksum(path);
        if (checksum != null) {
            return new BigInteger(1, checksum.getBytes()).toString(16);
        }
        return ReadsMapperHadoopModule.computeZipCheckSum(file.open());
    }

    private static String computeZipCheckSum(InputStream in) throws IOException {
        ZipArchiveEntry e;
        ZipArchiveInputStream zais = new ZipArchiveInputStream(in);
        Hasher hs = Hashing.md5().newHasher();
        HashMap<String, long[]> map = new HashMap<String, long[]>();
        while ((e = zais.getNextZipEntry()) != null) {
            map.put(e.getName(), new long[]{e.getSize(), e.getCrc()});
        }
        zais.close();
        for (String filename : new TreeSet(map.keySet())) {
            hs.putString((CharSequence)filename, StandardCharsets.UTF_8);
            for (long l : (long[])map.get(filename)) {
                hs.putLong(l);
            }
        }
        return hs.hash().toString();
    }
}

