/*
 * Decompiled with CFR 0.152.
 */
package fr.ens.biologie.genomique.eoulsan.modules.mgmt.upload;

import com.google.common.collect.Lists;
import fr.ens.biologie.genomique.eoulsan.EoulsanLogger;
import fr.ens.biologie.genomique.eoulsan.EoulsanRuntime;
import fr.ens.biologie.genomique.eoulsan.EoulsanRuntimeException;
import fr.ens.biologie.genomique.eoulsan.HadoopEoulsanRuntime;
import fr.ens.biologie.genomique.eoulsan.data.DataFile;
import fr.ens.biologie.genomique.eoulsan.data.DataFormatConverter;
import fr.ens.biologie.genomique.eoulsan.util.hadoop.PathUtils;
import fr.ens.biologie.genomique.kenetre.util.StringUtils;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DataFileDistCp {
    private static final Charset CHARSET = Charset.forName("UTF-8");
    private final Configuration conf;
    private final Path jobPath;
    private static final long MAX_COPY_DURATION = 0x6DDD00L;

    public void copy(Map<DataFile, DataFile> entries) throws IOException {
        if (entries == null || entries.size() == 0) {
            return;
        }
        Configuration conf = this.conf;
        Path tmpInputDir = PathUtils.createTempPath(this.jobPath, "distcp-in-", "", conf);
        Path tmpOutputDir = PathUtils.createTempPath(this.jobPath, "distcp-out-", "", conf);
        FileSystem fs = tmpInputDir.getFileSystem(conf);
        fs.mkdirs(tmpInputDir);
        ArrayList inFiles = Lists.newArrayList(entries.keySet());
        this.sortInFilesByDescSize(inFiles);
        NumberFormat nf = NumberFormat.getInstance();
        nf.setMinimumIntegerDigits(Integer.toString(inFiles.size()).length());
        nf.setGroupingUsed(false);
        int count = 0;
        for (DataFile inFile : inFiles) {
            DataFile outFile = entries.get(inFile);
            Path f = new Path(tmpInputDir, "distcp-" + nf.format(++count) + ".cp");
            EoulsanLogger.getLogger().info("Task copy " + inFile + " in " + f.toString());
            BufferedWriter bw = new BufferedWriter(new OutputStreamWriter((OutputStream)fs.create(f), CHARSET));
            bw.write(inFile.getSource() + "\t" + outFile.getSource() + "\n");
            bw.close();
        }
        Job job = DataFileDistCp.createJobConf(conf, tmpInputDir, tmpOutputDir);
        try {
            job.waitForCompletion(false);
        }
        catch (ClassNotFoundException | InterruptedException e) {
            throw new EoulsanRuntimeException("Error while distcp: " + e.getMessage(), e);
        }
        PathUtils.fullyDelete(tmpInputDir, conf);
        PathUtils.fullyDelete(tmpOutputDir, conf);
        if (!job.isSuccessful()) {
            throw new IOException("Unable to copy files using DataFileDistCp.");
        }
    }

    private void sortInFilesByDescSize(List<DataFile> inFiles) {
        inFiles.sort((f1, f2) -> {
            long size2;
            long size1;
            try {
                size1 = f1.getMetaData().getContentLength();
            }
            catch (IOException e) {
                size1 = -1L;
            }
            try {
                size2 = f2.getMetaData().getContentLength();
            }
            catch (IOException e) {
                size2 = -1L;
            }
            return Long.compare(size1, size2) * -1;
        });
    }

    private static Job createJobConf(Configuration parentConf, Path cpEntriesPath, Path outputPath) throws IOException {
        Configuration jobConf = new Configuration(parentConf);
        jobConf.set("mapreduce.task.timeout", "7200000");
        Job job = Job.getInstance((Configuration)jobConf, (String)"DataFileDistcp");
        job.setJarByClass(DataFileDistCp.class);
        FileInputFormat.addInputPath((Job)job, (Path)cpEntriesPath);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(DistCpMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(1);
        FileOutputFormat.setOutputPath((Job)job, (Path)outputPath);
        return job;
    }

    public DataFileDistCp(Configuration conf, Path jobPath) {
        if (conf == null) {
            throw new NullPointerException("The configuration is null");
        }
        if (jobPath == null) {
            throw new NullPointerException("The job Path is null");
        }
        this.conf = conf;
        this.jobPath = jobPath;
    }

    public static final class DistCpMapper
    extends Mapper<LongWritable, Text, Text, Text> {
        private static final String COUNTER_GROUP_NAME = "DataSourceDistCp";

        protected void setup(Mapper.Context context) throws IOException, InterruptedException {
            if (!EoulsanRuntime.isRuntime()) {
                HadoopEoulsanRuntime.newEoulsanRuntime(context.getConfiguration());
            }
        }

        protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            String val = value.toString();
            int tabPos = val.indexOf(9);
            if (tabPos == -1) {
                return;
            }
            Configuration conf = context.getConfiguration();
            String srcPathname = val.substring(0, tabPos);
            Path srcPath = new Path(srcPathname);
            Path destPath = new Path(val.substring(tabPos + 1));
            FileSystem srcFs = srcPath.getFileSystem(conf);
            FileSystem destFs = destPath.getFileSystem(conf);
            FileStatus fStatusSrc = srcFs.getFileStatus(srcPath);
            long srcSize = fStatusSrc == null ? 0L : fStatusSrc.getLen();
            EoulsanLogger.getLogger().info("Start copy " + srcPathname + " to " + destPath + " (" + srcSize + " bytes)\n");
            long startTime = System.currentTimeMillis();
            DataFile src = new DataFile(srcPathname);
            DataFile dest = new DataFile(destPath.toString());
            DistCpMapper.copyFile(src, dest, context);
            long duration = System.currentTimeMillis() - startTime;
            FileStatus fStatusDest = destFs.getFileStatus(destPath);
            long destSize = fStatusDest == null ? 0L : fStatusDest.getLen();
            double speed = destSize == 0L ? 0.0 : (double)destSize / (double)duration * 1000.0;
            EoulsanLogger.getLogger().info("End copy " + srcPathname + " to " + destPath + " in " + StringUtils.toTimeHumanReadable((long)duration) + " (" + destSize + " bytes, " + (int)speed + " bytes/s)\n");
            context.getCounter(COUNTER_GROUP_NAME, "Input file size").increment(srcSize);
            context.getCounter(COUNTER_GROUP_NAME, "Output file size").increment(destSize);
        }

        private static void copyFile(DataFile src, DataFile dest, Mapper.Context context) throws InterruptedException, IOException {
            MyIOExceptionWrapper exp = new MyIOExceptionWrapper();
            Thread t = new Thread(() -> {
                try {
                    new DataFormatConverter(src, dest).convert();
                }
                catch (IOException e) {
                    exp.ioexception = e;
                }
            });
            t.start();
            Counter counter = context.getCounter(COUNTER_GROUP_NAME, "5_seconds");
            long startTime = System.currentTimeMillis();
            while (t.isAlive()) {
                Thread.sleep(5000L);
                counter.increment(1L);
                long duration = System.currentTimeMillis() - startTime;
                if (duration <= 0x6DDD00L) continue;
                throw new IOException("Copy timeout, copy exceed 7200 seconds.");
            }
            if (exp.ioexception != null) {
                throw exp.ioexception;
            }
        }

        private static final class MyIOExceptionWrapper {
            public IOException ioexception;

            private MyIOExceptionWrapper() {
            }
        }
    }
}

