/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sysml.runtime.matrix;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.lib.NLineInputFormat;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.sysml.conf.DMLConfig;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysml.runtime.io.IOUtilFunctions;
import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.LocalFileUtils;

public class CleanupMR {
    private static final Log LOG = LogFactory.getLog(CleanupMR.class.getName());

    private CleanupMR() {
    }

    public static boolean runJob(DMLConfig conf) throws Exception {
        boolean ret = false;
        try {
            JobConf job = new JobConf(CleanupMR.class);
            job.setJobName("Cleanup-MR");
            String dir = conf.getTextValue("sysml.localtmpdir");
            MRJobConfiguration.setSystemMLLocalTmpDir(job, dir);
            int numNodes = InfrastructureAnalyzer.getRemoteParallelNodes();
            job.setMapperClass(CleanupMapper.class);
            job.setNumMapTasks(numNodes);
            job.setNumReduceTasks(0);
            String inFileName = conf.getTextValue("sysml.scratch") + "/cleanup_tasks";
            job.setInputFormat(NLineInputFormat.class);
            job.setOutputFormat(NullOutputFormat.class);
            Path path = new Path(inFileName);
            FileInputFormat.setInputPaths(job, path);
            CleanupMR.writeCleanupTasksToFile(path, numNodes);
            job.setInt(MRConfigurationNames.MR_TASK_TIMEOUT, 0);
            job.setMapSpeculativeExecution(false);
            RunningJob runjob = JobClient.runJob(job);
            ret = runjob.isSuccessful();
        }
        catch (Exception ex) {
            LOG.error("Failed to run cleanup MR job. ", ex);
        }
        return ret;
    }

    private static void writeCleanupTasksToFile(Path path, int numTasks) throws IOException {
        FileSystem fs = IOUtilFunctions.getFileSystem(path);
        try (BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(path, true)));){
            for (int i = 1; i <= numTasks; ++i) {
                br.write(String.valueOf("CLEANUP TASK " + i) + "\n");
            }
        }
        catch (Exception ex) {
            throw new DMLRuntimeException("Error writing cleanup tasks to taskfile " + path.toString(), ex);
        }
    }

    public static class CleanupMapper
    implements Mapper<LongWritable, Text, Writable, Writable> {
        private static final Log LOG = LogFactory.getLog(CleanupMapper.class.getName());
        protected String _dir = null;

        public void map(LongWritable key, Text value, OutputCollector<Writable, Writable> out, Reporter reporter) throws IOException {
            try {
                String task = value.toString();
                LOG.info("Running cleanup task: " + task + " (" + this._dir + ") ... ");
                int count = LocalFileUtils.cleanupRcWorkingDirectory(this._dir);
                LOG.info("Done - deleted " + count + " files.");
            }
            catch (Exception ex) {
                throw new IOException("Failed to execute cleanup task.", ex);
            }
        }

        public void configure(JobConf job) {
            this._dir = MRJobConfiguration.getSystemMLLocalTmpDir(job);
        }

        public void close() {
        }
    }
}

