/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LegacyStickyTaskAssignor
implements LegacyTaskAssignor {
    private static final Logger log = LoggerFactory.getLogger(LegacyStickyTaskAssignor.class);
    private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 1;
    private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 10;
    private Map<ProcessId, ClientState> clients;
    private Set<TaskId> allTaskIds;
    private Set<TaskId> statefulTaskIds;
    private final Map<TaskId, ProcessId> previousActiveTaskAssignment = new HashMap<TaskId, ProcessId>();
    private final Map<TaskId, Set<ProcessId>> previousStandbyTaskAssignment = new HashMap<TaskId, Set<ProcessId>>();
    private RackAwareTaskAssignor rackAwareTaskAssignor;
    private AssignmentConfigs configs;
    private TaskPairs taskPairs;
    private final boolean mustPreserveActiveTaskAssignment;

    public LegacyStickyTaskAssignor() {
        this(false);
    }

    LegacyStickyTaskAssignor(boolean mustPreserveActiveTaskAssignment) {
        this.mustPreserveActiveTaskAssignment = mustPreserveActiveTaskAssignment;
    }

    @Override
    public boolean assign(Map<ProcessId, ClientState> clients, Set<TaskId> allTaskIds, Set<TaskId> statefulTaskIds, RackAwareTaskAssignor rackAwareTaskAssignor, AssignmentConfigs configs) {
        this.clients = clients;
        this.allTaskIds = allTaskIds;
        this.statefulTaskIds = statefulTaskIds;
        this.rackAwareTaskAssignor = rackAwareTaskAssignor;
        this.configs = configs;
        int maxPairs = allTaskIds.size() * (allTaskIds.size() - 1) / 2;
        this.taskPairs = new TaskPairs(maxPairs);
        this.mapPreviousTaskAssignment(clients);
        this.assignActive();
        this.optimizeActive();
        this.assignStandby(configs.numStandbyReplicas());
        this.optimizeStandby();
        return false;
    }

    private void optimizeStandby() {
        if (this.configs.numStandbyReplicas() > 0 && this.rackAwareTaskAssignor != null && this.rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
            int trafficCost = this.configs.rackAwareTrafficCost().orElse(1);
            int nonOverlapCost = this.configs.rackAwareNonOverlapCost().orElse(10);
            TreeMap<ProcessId, ClientState> clientStates = new TreeMap<ProcessId, ClientState>(this.clients);
            this.rackAwareTaskAssignor.optimizeStandbyTasks(clientStates, trafficCost, nonOverlapCost, (s, d, t, c) -> true);
        }
    }

    private void optimizeActive() {
        if (this.rackAwareTaskAssignor != null && this.rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
            int trafficCost = this.configs.rackAwareTrafficCost().orElse(1);
            int nonOverlapCost = this.configs.rackAwareNonOverlapCost().orElse(10);
            TreeSet<TaskId> statefulTasks = new TreeSet<TaskId>(this.statefulTaskIds);
            TreeMap<ProcessId, ClientState> clientStates = new TreeMap<ProcessId, ClientState>(this.clients);
            this.rackAwareTaskAssignor.optimizeActiveTasks(statefulTasks, clientStates, trafficCost, nonOverlapCost);
            TreeSet statelessTasks = (TreeSet)Utils.diff(TreeSet::new, this.allTaskIds, statefulTasks);
            this.rackAwareTaskAssignor.optimizeActiveTasks(statelessTasks, clientStates, 1, 0);
        }
    }

    private void assignStandby(int numStandbyReplicas) {
        block0: for (TaskId taskId : this.statefulTaskIds) {
            for (int i = 0; i < numStandbyReplicas; ++i) {
                Set<ProcessId> ids = this.findClientsWithoutAssignedTask(taskId);
                if (ids.isEmpty()) {
                    log.warn("Unable to assign {} of {} standby tasks for task [{}]. There is not enough available capacity. You should increase the number of threads and/or application instances to maintain the requested number of standby replicas.", new Object[]{numStandbyReplicas - i, numStandbyReplicas, taskId});
                    continue block0;
                }
                this.allocateTaskWithClientCandidates(taskId, ids, false);
            }
        }
    }

    private void assignActive() {
        TaskId taskId;
        int totalCapacity = this.sumCapacity(this.clients.values());
        if (totalCapacity == 0) {
            throw new IllegalStateException("`totalCapacity` should never be zero.");
        }
        int tasksPerThread = this.allTaskIds.size() / totalCapacity;
        HashSet<TaskId> assigned = new HashSet<TaskId>();
        for (Map.Entry<TaskId, ProcessId> entry : this.previousActiveTaskAssignment.entrySet()) {
            taskId = entry.getKey();
            if (!this.allTaskIds.contains(taskId)) continue;
            ClientState client = this.clients.get(entry.getValue());
            if (!this.mustPreserveActiveTaskAssignment && !client.hasUnfulfilledQuota(tasksPerThread)) continue;
            this.assignTaskToClient(assigned, taskId, client);
        }
        HashSet<TaskId> unassigned = new HashSet<TaskId>(this.allTaskIds);
        unassigned.removeAll(assigned);
        Iterator iterator = unassigned.iterator();
        block1: while (iterator.hasNext()) {
            taskId = (TaskId)iterator.next();
            Set<ProcessId> clientIds = this.previousStandbyTaskAssignment.get(taskId);
            if (clientIds == null) continue;
            for (ProcessId clientId : clientIds) {
                ClientState client = this.clients.get(clientId);
                if (!client.hasUnfulfilledQuota(tasksPerThread)) continue;
                this.assignTaskToClient(assigned, taskId, client);
                iterator.remove();
                continue block1;
            }
        }
        ArrayList<TaskId> sortedTasks = new ArrayList<TaskId>(unassigned);
        Collections.sort(sortedTasks);
        for (TaskId taskId2 : sortedTasks) {
            this.allocateTaskWithClientCandidates(taskId2, this.clients.keySet(), true);
        }
    }

    private void allocateTaskWithClientCandidates(TaskId taskId, Set<ProcessId> clientsWithin, boolean active) {
        ClientState client = this.findClient(taskId, clientsWithin);
        this.taskPairs.addPairs(taskId, client.assignedTasks());
        if (active) {
            client.assignActive(taskId);
        } else {
            client.assignStandby(taskId);
        }
    }

    private void assignTaskToClient(Set<TaskId> assigned, TaskId taskId, ClientState client) {
        this.taskPairs.addPairs(taskId, client.assignedTasks());
        client.assignActive(taskId);
        assigned.add(taskId);
    }

    private Set<ProcessId> findClientsWithoutAssignedTask(TaskId taskId) {
        HashSet<ProcessId> clientIds = new HashSet<ProcessId>();
        for (Map.Entry<ProcessId, ClientState> client : this.clients.entrySet()) {
            if (client.getValue().hasAssignedTask(taskId)) continue;
            clientIds.add(client.getKey());
        }
        return clientIds;
    }

    private ClientState findClient(TaskId taskId, Set<ProcessId> clientsWithin) {
        if (clientsWithin.size() == 1) {
            return this.clients.get(clientsWithin.iterator().next());
        }
        ClientState previous = this.findClientsWithPreviousAssignedTask(taskId, clientsWithin);
        if (previous == null) {
            return this.leastLoaded(taskId, clientsWithin);
        }
        if (this.shouldBalanceLoad(previous)) {
            ClientState standby = this.findLeastLoadedClientWithPreviousStandByTask(taskId, clientsWithin);
            if (standby == null || this.shouldBalanceLoad(standby)) {
                return this.leastLoaded(taskId, clientsWithin);
            }
            return standby;
        }
        return previous;
    }

    private boolean shouldBalanceLoad(ClientState client) {
        return client.reachedCapacity() && this.hasClientsWithMoreAvailableCapacity(client);
    }

    private boolean hasClientsWithMoreAvailableCapacity(ClientState client) {
        for (ClientState clientState : this.clients.values()) {
            if (!clientState.hasMoreAvailableCapacityThan(client)) continue;
            return true;
        }
        return false;
    }

    private ClientState findClientsWithPreviousAssignedTask(TaskId taskId, Set<ProcessId> clientsWithin) {
        ProcessId previous = this.previousActiveTaskAssignment.get(taskId);
        if (previous != null && clientsWithin.contains(previous)) {
            return this.clients.get(previous);
        }
        return this.findLeastLoadedClientWithPreviousStandByTask(taskId, clientsWithin);
    }

    private ClientState findLeastLoadedClientWithPreviousStandByTask(TaskId taskId, Set<ProcessId> clientsWithin) {
        Set<ProcessId> ids = this.previousStandbyTaskAssignment.get(taskId);
        if (ids == null) {
            return null;
        }
        HashSet<ProcessId> constrainTo = new HashSet<ProcessId>(ids);
        constrainTo.retainAll(clientsWithin);
        return this.leastLoaded(taskId, constrainTo);
    }

    private ClientState leastLoaded(TaskId taskId, Set<ProcessId> clientIds) {
        ClientState leastLoaded = this.findLeastLoaded(taskId, clientIds, true);
        if (leastLoaded == null) {
            return this.findLeastLoaded(taskId, clientIds, false);
        }
        return leastLoaded;
    }

    private ClientState findLeastLoaded(TaskId taskId, Set<ProcessId> clientIds, boolean checkTaskPairs) {
        ClientState leastLoaded = null;
        for (ProcessId id : clientIds) {
            ClientState client = this.clients.get(id);
            if (client.assignedTaskCount() == 0) {
                return client;
            }
            if (leastLoaded != null && !client.hasMoreAvailableCapacityThan(leastLoaded)) continue;
            if (!checkTaskPairs) {
                leastLoaded = client;
                continue;
            }
            if (!this.taskPairs.hasNewPair(taskId, client.assignedTasks())) continue;
            leastLoaded = client;
        }
        return leastLoaded;
    }

    private void mapPreviousTaskAssignment(Map<ProcessId, ClientState> clients) {
        for (Map.Entry<ProcessId, ClientState> clientState : clients.entrySet()) {
            for (TaskId activeTask : clientState.getValue().prevActiveTasks()) {
                this.previousActiveTaskAssignment.put(activeTask, clientState.getKey());
            }
            for (TaskId prevAssignedTask : clientState.getValue().prevStandbyTasks()) {
                this.previousStandbyTaskAssignment.computeIfAbsent(prevAssignedTask, t -> new HashSet());
                this.previousStandbyTaskAssignment.get(prevAssignedTask).add(clientState.getKey());
            }
        }
    }

    private int sumCapacity(Collection<ClientState> values) {
        int capacity = 0;
        for (ClientState client : values) {
            capacity += client.capacity();
        }
        return capacity;
    }

    private static class TaskPairs {
        private final Set<Pair> pairs;
        private final int maxPairs;

        TaskPairs(int maxPairs) {
            this.maxPairs = maxPairs;
            this.pairs = new HashSet<Pair>(maxPairs);
        }

        boolean hasNewPair(TaskId task1, Set<TaskId> taskIds) {
            if (this.pairs.size() == this.maxPairs) {
                return false;
            }
            for (TaskId taskId : taskIds) {
                if (this.pairs.contains(this.pair(task1, taskId))) continue;
                return true;
            }
            return false;
        }

        void addPairs(TaskId taskId, Set<TaskId> assigned) {
            for (TaskId id : assigned) {
                this.pairs.add(this.pair(id, taskId));
            }
        }

        Pair pair(TaskId task1, TaskId task2) {
            if (task1.compareTo(task2) < 0) {
                return new Pair(task1, task2);
            }
            return new Pair(task2, task1);
        }

        private static class Pair {
            private final TaskId task1;
            private final TaskId task2;

            Pair(TaskId task1, TaskId task2) {
                this.task1 = task1;
                this.task2 = task2;
            }

            public boolean equals(Object o) {
                if (this == o) {
                    return true;
                }
                if (o == null || this.getClass() != o.getClass()) {
                    return false;
                }
                Pair pair = (Pair)o;
                return Objects.equals(this.task1, pair.task1) && Objects.equals(this.task2, pair.task2);
            }

            public int hashCode() {
                return Objects.hash(this.task1, this.task2);
            }
        }
    }
}

