/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sysml.runtime.controlprogram.parfor;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.conf.DMLConfig;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysml.runtime.controlprogram.parfor.DataPartitioner;
import org.apache.sysml.runtime.controlprogram.parfor.DataPartitionerRemoteMapper;
import org.apache.sysml.runtime.controlprogram.parfor.DataPartitionerRemoteReducer;
import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.MapReduceTool;
import org.apache.sysml.utils.Statistics;
import org.apache.sysml.yarn.DMLAppMasterUtils;

public class DataPartitionerRemoteMR
extends DataPartitioner {
    private long _pfid = -1L;
    private int _numReducers = -1;
    private int _replication = -1;
    private boolean _jvmReuse = false;
    private boolean _keepIndexes = false;

    public DataPartitionerRemoteMR(ParForProgramBlock.PartitionFormat dpf, long pfid, int numRed, int replication, boolean jvmReuse, boolean keepIndexes) {
        super(dpf._dpf, dpf._N);
        this._pfid = pfid;
        this._numReducers = numRed;
        this._replication = replication;
        this._jvmReuse = jvmReuse;
        this._keepIndexes = keepIndexes;
    }

    @Override
    protected void partitionMatrix(MatrixObject in, String fnameNew, InputInfo ii, OutputInfo oi, long rlen, long clen, int brlen, int bclen) {
        String jobname = "ParFor-DPMR";
        long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0L;
        JobConf job = new JobConf(DataPartitionerRemoteMR.class);
        if (this._pfid >= 0L) {
            job.setJobName(jobname + this._pfid);
        } else {
            job.setJobName("Partition-MR");
        }
        Statistics.incrementNoOfCompiledMRJobs();
        try {
            in.exportData();
            Path path = new Path(in.getFileName());
            MRJobConfiguration.setPartitioningInfo(job, rlen, clen, brlen, bclen, ii, oi, this._format, this._n, fnameNew, this._keepIndexes);
            job.setMapperClass(DataPartitionerRemoteMapper.class);
            job.setReducerClass(DataPartitionerRemoteReducer.class);
            if (oi == OutputInfo.TextCellOutputInfo) {
                job.setMapOutputKeyClass(LongWritable.class);
                job.setMapOutputValueClass(PairWritableCell.class);
            } else if (oi == OutputInfo.BinaryCellOutputInfo) {
                job.setMapOutputKeyClass(LongWritable.class);
                job.setMapOutputValueClass(PairWritableCell.class);
            } else if (oi == OutputInfo.BinaryBlockOutputInfo) {
                job.setMapOutputKeyClass(LongWritable.class);
                job.setMapOutputValueClass(PairWritableBlock.class);
                if (this._format == ParForProgramBlock.PDataPartitionFormat.ROW_BLOCK_WISE_N && rlen > (long)this._n && this._n % brlen != 0 || this._format == ParForProgramBlock.PDataPartitionFormat.COLUMN_BLOCK_WISE_N && clen > (long)this._n && this._n % bclen != 0) {
                    throw new DMLRuntimeException("Data partitioning format " + (Object)((Object)this._format) + " requires aligned blocks.");
                }
            }
            job.setInputFormat(ii.inputFormatClass);
            FileInputFormat.setInputPaths(job, path);
            MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
            job.setOutputFormat(NullOutputFormat.class);
            long reducerGroups = -1L;
            switch (this._format) {
                case ROW_WISE: {
                    reducerGroups = rlen;
                    break;
                }
                case COLUMN_WISE: {
                    reducerGroups = clen;
                    break;
                }
                case ROW_BLOCK_WISE: {
                    reducerGroups = rlen / (long)brlen + (long)(rlen % (long)brlen == 0L ? 0 : 1);
                    break;
                }
                case COLUMN_BLOCK_WISE: {
                    reducerGroups = clen / (long)bclen + (long)(clen % (long)bclen == 0L ? 0 : 1);
                    break;
                }
                case ROW_BLOCK_WISE_N: {
                    reducerGroups = rlen / (long)this._n + (long)(rlen % (long)this._n == 0L ? 0 : 1);
                    break;
                }
                case COLUMN_BLOCK_WISE_N: {
                    reducerGroups = clen / (long)this._n + (long)(clen % (long)this._n == 0L ? 0 : 1);
                    break;
                }
            }
            job.setNumReduceTasks((int)Math.min((long)this._numReducers, reducerGroups));
            job.setInt(MRConfigurationNames.MR_TASK_TIMEOUT, 0);
            job.setMapSpeculativeExecution(false);
            MRJobConfiguration.addBinaryBlockSerializationFramework(job);
            if (this._jvmReuse) {
                job.setNumTasksToExecutePerJvm(-1);
            }
            job.setInt("dfs.replication", this._replication);
            DMLConfig config = ConfigurationManager.getDMLConfig();
            DMLAppMasterUtils.setupMRJobRemoteMaxMemory(job, config);
            MRJobConfiguration.setupCustomMRConfigurations(job, config);
            MRJobConfiguration.setUniqueWorkingDir(job);
            JobClient.runJob(job);
            Statistics.incrementNoOfExecutedMRJobs();
        }
        catch (Exception ex) {
            throw new DMLRuntimeException(ex);
        }
        if (DMLScript.STATISTICS && this._pfid >= 0L) {
            long t1 = System.nanoTime();
            Statistics.maintainCPHeavyHitters("MR-Job_" + jobname, t1 - t0);
        }
    }
}

