/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.group;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
import org.apache.kafka.common.errors.UnsupportedAssignorException;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.requests.AlterShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.ConsumerGroupDescribeRequest;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.ShareGroupDescribeRequest;
import org.apache.kafka.common.requests.StreamsGroupDescribeRequest;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.common.runtime.CoordinatorEventProcessor;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetrics;
import org.apache.kafka.coordinator.common.runtime.CoordinatorOperationExceptionHelper;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetrics;
import org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilderSupplier;
import org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor;
import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
import org.apache.kafka.coordinator.common.runtime.Serializer;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde;
import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
import org.apache.kafka.coordinator.group.PartitionMetadataClient;
import org.apache.kafka.coordinator.group.Utils;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateResult;
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
import org.apache.kafka.server.share.persister.InitializeShareGroupStateParameters;
import org.apache.kafka.server.share.persister.InitializeShareGroupStateResult;
import org.apache.kafka.server.share.persister.PartitionErrorData;
import org.apache.kafka.server.share.persister.PartitionFactory;
import org.apache.kafka.server.share.persister.PartitionIdData;
import org.apache.kafka.server.share.persister.PartitionStateSummaryData;
import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParameters;
import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryResult;
import org.apache.kafka.server.share.persister.TopicData;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.slf4j.Logger;

public class GroupCoordinatorService
implements GroupCoordinator {
    private final Logger log;
    private final GroupCoordinatorConfig config;
    private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime;
    private final GroupCoordinatorMetrics groupCoordinatorMetrics;
    private final GroupConfigManager groupConfigManager;
    private final Persister persister;
    private final Timer timer;
    private final AtomicBoolean isActive = new AtomicBoolean(false);
    private final Set<String> consumerGroupAssignors;
    private final PartitionMetadataClient partitionMetadataClient;
    private volatile int numPartitions = -1;
    private volatile CoordinatorMetadataImage metadataImage = null;

    GroupCoordinatorService(LogContext logContext, GroupCoordinatorConfig config, CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime, GroupCoordinatorMetrics groupCoordinatorMetrics, GroupConfigManager groupConfigManager, Persister persister, Timer timer, PartitionMetadataClient partitionMetadataClient) {
        this.log = logContext.logger(GroupCoordinatorService.class);
        this.config = config;
        this.runtime = runtime;
        this.groupCoordinatorMetrics = groupCoordinatorMetrics;
        this.groupConfigManager = groupConfigManager;
        this.persister = persister;
        this.timer = timer;
        this.consumerGroupAssignors = config.consumerGroupAssignors().stream().map(PartitionAssignor::name).collect(Collectors.toSet());
        this.partitionMetadataClient = partitionMetadataClient;
    }

    private void throwIfNotActive() {
        if (!this.isActive.get()) {
            throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
        }
    }

    private TopicPartition topicPartitionFor(String groupId) {
        return new TopicPartition("__consumer_offsets", this.partitionFor(groupId));
    }

    @Override
    public int partitionFor(String groupId) {
        this.throwIfNotActive();
        return org.apache.kafka.common.utils.Utils.abs((int)groupId.hashCode()) % this.numPartitions;
    }

    private void throwIfConsumerGroupHeartbeatRequestIsInvalid(ConsumerGroupHeartbeatRequestData request, int apiVersion) throws InvalidRequestException, UnsupportedAssignorException {
        if (apiVersion >= 1 || request.memberEpoch() > 0 || request.memberEpoch() == -1) {
            Utils.throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
        }
        Utils.throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
        Utils.throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
        Utils.throwIfEmptyString(request.rackId(), "RackId can't be empty.");
        if (request.memberEpoch() == 0) {
            if (request.rebalanceTimeoutMs() == -1) {
                throw new InvalidRequestException("RebalanceTimeoutMs must be provided in first request.");
            }
            if (request.topicPartitions() == null || !request.topicPartitions().isEmpty()) {
                throw new InvalidRequestException("TopicPartitions must be empty when (re-)joining.");
            }
            if (request.subscribedTopicNames() == null && request.subscribedTopicRegex() == null) {
                throw new InvalidRequestException("Either SubscribedTopicNames or SubscribedTopicRegex must be non-null when (re-)joining.");
            }
        } else if (request.memberEpoch() == -2) {
            Utils.throwIfNull(request.instanceId(), "InstanceId can't be null.");
        } else if (request.memberEpoch() < -2) {
            throw new InvalidRequestException("MemberEpoch is invalid.");
        }
        if (request.serverAssignor() != null && !this.consumerGroupAssignors.contains(request.serverAssignor())) {
            throw new UnsupportedAssignorException("ServerAssignor " + request.serverAssignor() + " is not supported. Supported assignors: " + String.join((CharSequence)", ", this.consumerGroupAssignors) + ".");
        }
    }

    @Override
    public CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(AuthorizableRequestContext context, ConsumerGroupHeartbeatRequestData request) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
        }
        try {
            this.throwIfConsumerGroupHeartbeatRequestIsInvalid(request, context.requestVersion());
        }
        catch (Throwable ex) {
            ApiError apiError = ApiError.fromThrowable((Throwable)ex);
            return CompletableFuture.completedFuture(new ConsumerGroupHeartbeatResponseData().setErrorCode(apiError.error().code()).setErrorMessage(apiError.message()));
        }
        return this.runtime.scheduleWriteOperation("consumer-group-heartbeat", this.topicPartitionFor(request.groupId()), Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> coordinator.consumerGroupHeartbeat(context, request)).exceptionally(exception -> (ConsumerGroupHeartbeatResponseData)CoordinatorOperationExceptionHelper.handleOperationException((String)"consumer-group-heartbeat", (Object)request, (Throwable)exception, (error, message) -> new ConsumerGroupHeartbeatResponseData().setErrorCode(error.code()).setErrorMessage(message), (Logger)this.log));
    }

    private static void throwIfInvalidTopology(StreamsGroupHeartbeatRequestData.Topology topology) throws StreamsInvalidTopologyException {
        for (StreamsGroupHeartbeatRequestData.Subtopology subtopology : topology.subtopologies()) {
            for (StreamsGroupHeartbeatRequestData.TopicInfo topicInfo : subtopology.stateChangelogTopics()) {
                if (topicInfo.partitions() == 0) continue;
                throw new StreamsInvalidTopologyException(String.format("Changelog topic %s must have an undefined partition count, but it is set to %d.", topicInfo.name(), topicInfo.partitions()));
            }
        }
    }

    private static void throwIfStreamsGroupHeartbeatRequestIsInvalid(StreamsGroupHeartbeatRequestData request) throws InvalidRequestException {
        Utils.throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
        Utils.throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
        Utils.throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
        Utils.throwIfEmptyString(request.rackId(), "RackId can't be empty.");
        if (request.memberEpoch() == 0) {
            if (request.rebalanceTimeoutMs() == -1) {
                throw new InvalidRequestException("RebalanceTimeoutMs must be provided in first request.");
            }
            Utils.throwIfNotEmptyCollection(request.activeTasks(), "ActiveTasks must be empty when (re-)joining.");
            Utils.throwIfNotEmptyCollection(request.standbyTasks(), "StandbyTasks must be empty when (re-)joining.");
            Utils.throwIfNotEmptyCollection(request.warmupTasks(), "WarmupTasks must be empty when (re-)joining.");
            Utils.throwIfNull(request.topology(), "Topology must be non-null when (re-)joining.");
            if (request.topology() != null) {
                GroupCoordinatorService.throwIfInvalidTopology(request.topology());
            }
        } else if (request.memberEpoch() == -2) {
            Utils.throwIfNull(request.instanceId(), "InstanceId can't be null.");
        } else if (request.memberEpoch() < -2) {
            throw new InvalidRequestException(String.format("MemberEpoch is %d, but must be greater than or equal to -2.", request.memberEpoch()));
        }
        if (request.activeTasks() != null || request.standbyTasks() != null || request.warmupTasks() != null) {
            Utils.throwIfNull(request.activeTasks(), "If one task-type is non-null, all must be non-null.");
            Utils.throwIfNull(request.standbyTasks(), "If one task-type is non-null, all must be non-null.");
            Utils.throwIfNull(request.warmupTasks(), "If one task-type is non-null, all must be non-null.");
        }
        if (request.memberEpoch() != 0) {
            Utils.throwIfNotNull(request.topology(), "Topology can only be provided when (re-)joining.");
        }
    }

    private static void throwIfStreamsGroupHeartbeatRequestIsUsingUnsupportedFeatures(StreamsGroupHeartbeatRequestData request) throws InvalidRequestException {
        Utils.throwIfNotNull(request.instanceId(), "Static membership is not yet supported.");
        Utils.throwIfNotNull(request.taskOffsets(), "TaskOffsets are not supported yet.");
        Utils.throwIfNotNull(request.taskEndOffsets(), "TaskEndOffsets are not supported yet.");
        Utils.throwIfNotNullOrEmpty(request.warmupTasks(), "WarmupTasks are not supported yet.");
        if (request.topology() != null) {
            for (StreamsGroupHeartbeatRequestData.Subtopology subtopology : request.topology().subtopologies()) {
                Utils.throwIfNotEmptyCollection(subtopology.sourceTopicRegex(), "Regular expressions for source topics are not supported yet.");
            }
        }
    }

    @Override
    public CompletableFuture<StreamsGroupHeartbeatResult> streamsGroupHeartbeat(AuthorizableRequestContext context, StreamsGroupHeartbeatRequestData request) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(new StreamsGroupHeartbeatResult(new StreamsGroupHeartbeatResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), Map.of()));
        }
        try {
            GroupCoordinatorService.throwIfStreamsGroupHeartbeatRequestIsUsingUnsupportedFeatures(request);
            GroupCoordinatorService.throwIfStreamsGroupHeartbeatRequestIsInvalid(request);
        }
        catch (Throwable ex) {
            ApiError apiError = ApiError.fromThrowable((Throwable)ex);
            return CompletableFuture.completedFuture(new StreamsGroupHeartbeatResult(new StreamsGroupHeartbeatResponseData().setErrorCode(apiError.error().code()).setErrorMessage(apiError.message()), Map.of()));
        }
        return this.runtime.scheduleWriteOperation("streams-group-heartbeat", this.topicPartitionFor(request.groupId()), Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> coordinator.streamsGroupHeartbeat(context, request)).exceptionally(exception -> (StreamsGroupHeartbeatResult)CoordinatorOperationExceptionHelper.handleOperationException((String)"streams-group-heartbeat", (Object)request, (Throwable)exception, (error, message) -> new StreamsGroupHeartbeatResult(new StreamsGroupHeartbeatResponseData().setErrorCode(error.code()).setErrorMessage(message), Map.of()), (Logger)this.log));
    }

    private static void throwIfShareGroupHeartbeatRequestIsInvalid(ShareGroupHeartbeatRequestData request) throws InvalidRequestException {
        Utils.throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
        Utils.throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
        Utils.throwIfEmptyString(request.rackId(), "RackId can't be empty.");
        if (request.memberEpoch() == 0) {
            if (request.subscribedTopicNames() == null || request.subscribedTopicNames().isEmpty()) {
                throw new InvalidRequestException("SubscribedTopicNames must be set in first request.");
            }
        } else if (request.memberEpoch() < -1) {
            throw new InvalidRequestException("MemberEpoch is invalid.");
        }
    }

    @Override
    public CompletableFuture<ShareGroupHeartbeatResponseData> shareGroupHeartbeat(AuthorizableRequestContext context, ShareGroupHeartbeatRequestData request) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(new ShareGroupHeartbeatResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
        }
        try {
            GroupCoordinatorService.throwIfShareGroupHeartbeatRequestIsInvalid(request);
        }
        catch (Throwable ex) {
            ApiError apiError = ApiError.fromThrowable((Throwable)ex);
            return CompletableFuture.completedFuture(new ShareGroupHeartbeatResponseData().setErrorCode(apiError.error().code()).setErrorMessage(apiError.message()));
        }
        return ((CompletableFuture)this.runtime.scheduleWriteOperation("share-group-heartbeat", this.topicPartitionFor(request.groupId()), Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> coordinator.shareGroupHeartbeat(context, request)).thenCompose(result -> {
            if (((Optional)result.getValue()).isPresent()) {
                this.timer.add(new TimerTask(0L, (Map.Entry)result){
                    final /* synthetic */ Map.Entry val$result;
                    {
                        this.val$result = entry;
                        super(arg0);
                    }

                    public void run() {
                        GroupCoordinatorService.this.persisterInitialize((InitializeShareGroupStateParameters)((Optional)this.val$result.getValue()).get(), (ShareGroupHeartbeatResponseData)this.val$result.getKey()).whenComplete((__, exp) -> {
                            if (exp != null) {
                                GroupCoordinatorService.this.log.error("Persister initialization failed", exp);
                            }
                        });
                    }
                });
            }
            return CompletableFuture.completedFuture((ShareGroupHeartbeatResponseData)result.getKey());
        })).exceptionally(exception -> (ShareGroupHeartbeatResponseData)CoordinatorOperationExceptionHelper.handleOperationException((String)"share-group-heartbeat", (Object)request, (Throwable)exception, (error, message) -> new ShareGroupHeartbeatResponseData().setErrorCode(error.code()).setErrorMessage(message), (Logger)this.log));
    }

    CompletableFuture<AlterShareGroupOffsetsResponseData> persisterInitialize(InitializeShareGroupStateParameters request, AlterShareGroupOffsetsResponseData response) {
        if (request.groupTopicPartitionData().topicsData().isEmpty()) {
            return CompletableFuture.completedFuture(response);
        }
        return this.persister.initializeState(request).handle((result, exp) -> {
            if (exp == null) {
                if (result.errorCounts().isEmpty()) {
                    this.handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), (InitializeShareGroupStateResult)result, new ShareGroupHeartbeatResponseData());
                    return response;
                }
                return this.buildErrorResponse(response, (InitializeShareGroupStateResult)result);
            }
            return this.buildErrorResponse(request, response, (Throwable)exp);
        });
    }

    private AlterShareGroupOffsetsResponseData buildErrorResponse(AlterShareGroupOffsetsResponseData response, InitializeShareGroupStateResult result) {
        AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData();
        Map topicPartitionErrorsMap = result.getErrors();
        data.setResponses(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection(response.responses().stream().map(topic -> {
            AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic topicData = new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName(topic.topicName()).setTopicId(topic.topicId());
            topic.partitions().forEach(partition -> {
                if (partition.errorCode() != Errors.NONE.code()) {
                    topicData.partitions().add(partition);
                    return;
                }
                Map partitionErrors = Optional.ofNullable(topicPartitionErrorsMap).map(map -> (Map)map.get(topic.topicId())).orElse(Collections.emptyMap());
                PartitionErrorData error = (PartitionErrorData)partitionErrors.get(partition.partitionIndex());
                AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition partitionData = error == null ? partition.duplicate() : new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(partition.partitionIndex()).setErrorCode(error.errorCode()).setErrorMessage(error.errorMessage());
                topicData.partitions().add(partitionData);
            });
            return topicData;
        }).iterator()));
        return data;
    }

    private AlterShareGroupOffsetsResponseData buildErrorResponse(InitializeShareGroupStateParameters request, AlterShareGroupOffsetsResponseData response, Throwable exp) {
        AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData();
        GroupTopicPartitionData gtp = request.groupTopicPartitionData();
        this.log.error("Unable to initialize share group state for {}, {} while altering share group offsets", new Object[]{gtp.groupId(), gtp.topicsData(), exp});
        Errors error = Errors.forException((Throwable)exp);
        data.setErrorCode(error.code()).setErrorMessage(exp.getMessage()).setResponses(response.responses());
        data.setResponses(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection(response.responses().stream().map(topic -> {
            AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic topicData = new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName(topic.topicName()).setTopicId(topic.topicId());
            topic.partitions().forEach(partition -> {
                AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition partitionData = new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(partition.partitionIndex()).setErrorCode(error.code()).setErrorMessage(error.message());
                topicData.partitions().add(partitionData);
            });
            return topicData;
        }).iterator()));
        return data;
    }

    CompletableFuture<ShareGroupHeartbeatResponseData> persisterInitialize(InitializeShareGroupStateParameters request, ShareGroupHeartbeatResponseData defaultResponse) {
        return ((CompletableFuture)this.persister.initializeState(request).handle((response, exp) -> {
            if (exp == null) {
                return this.handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), (InitializeShareGroupStateResult)response, defaultResponse);
            }
            GroupTopicPartitionData gtp = request.groupTopicPartitionData();
            this.log.error("Unable to initialize share group state {}, {}", new Object[]{gtp.groupId(), gtp.topicsData(), exp});
            Errors error = Errors.forException((Throwable)exp);
            HashMap<Uuid, Set<Integer>> topicPartitionMap = new HashMap<Uuid, Set<Integer>>();
            gtp.topicsData().forEach(topicData -> topicPartitionMap.computeIfAbsent(topicData.topicId(), k -> new HashSet()).addAll(topicData.partitions().stream().map(PartitionIdData::partition).collect(Collectors.toSet())));
            return this.uninitializeShareGroupState(error, gtp.groupId(), topicPartitionMap);
        })).thenCompose(resp -> resp);
    }

    private CompletableFuture<ShareGroupHeartbeatResponseData> handlePersisterInitializeResponse(String groupId, InitializeShareGroupStateResult persisterInitializeResult, ShareGroupHeartbeatResponseData defaultResponse) {
        Errors persisterError = Errors.NONE;
        for (TopicData topicData : persisterInitializeResult.topicsData()) {
            Optional<PartitionErrorData> errData = topicData.partitions().stream().filter(partition -> partition.errorCode() != Errors.NONE.code()).findAny();
            if (!errData.isPresent()) continue;
            persisterError = Errors.forCode((short)errData.get().errorCode());
            break;
        }
        HashMap<Uuid, Set<Integer>> topicPartitionMap = new HashMap<Uuid, Set<Integer>>();
        for (TopicData topicData : persisterInitializeResult.topicsData()) {
            topicPartitionMap.put(topicData.topicId(), topicData.partitions().stream().map(PartitionIdData::partition).collect(Collectors.toSet()));
        }
        if (persisterError.code() == Errors.NONE.code()) {
            if (topicPartitionMap.isEmpty()) {
                return CompletableFuture.completedFuture(defaultResponse);
            }
            return this.performShareGroupStateMetadataInitialize(groupId, topicPartitionMap, defaultResponse);
        }
        this.log.error("Received error while calling initialize state for {} on persister, errorCode: {}.", (Object)groupId, (Object)persisterError.code());
        return this.uninitializeShareGroupState(persisterError, groupId, topicPartitionMap);
    }

    private CompletableFuture<ShareGroupHeartbeatResponseData> uninitializeShareGroupState(Errors error, String groupId, Map<Uuid, Set<Integer>> topicPartitionMap) {
        return ((CompletableFuture)this.runtime.scheduleWriteOperation("uninitialize-share-group-state", this.topicPartitionFor(groupId), Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> coordinator.uninitializeShareGroupState(groupId, topicPartitionMap)).thenApply(__ -> new ShareGroupHeartbeatResponseData().setErrorCode(error.code()).setErrorMessage(error.message()))).exceptionally(exception -> {
            this.log.error("Unable to cleanup topic partitions from share group state metadata", exception);
            Errors err = Errors.forException((Throwable)new IllegalStateException("Unable to cleanup topic partitions from share group state metadata", (Throwable)exception));
            return new ShareGroupHeartbeatResponseData().setErrorCode(err.code()).setErrorMessage(err.message());
        });
    }

    private CompletableFuture<ShareGroupHeartbeatResponseData> performShareGroupStateMetadataInitialize(String groupId, Map<Uuid, Set<Integer>> topicPartitionMap, ShareGroupHeartbeatResponseData defaultResponse) {
        return ((CompletableFuture)this.runtime.scheduleWriteOperation("initialize-share-group-state", this.topicPartitionFor(groupId), Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> coordinator.initializeShareGroupState(groupId, topicPartitionMap)).handle((__, exp) -> {
            if (exp == null) {
                return CompletableFuture.completedFuture(defaultResponse);
            }
            this.log.error("Unable to initialize share group state partition metadata for {}.", (Object)groupId, exp);
            Errors error = Errors.forException((Throwable)exp);
            return this.uninitializeShareGroupState(error, groupId, topicPartitionMap);
        })).thenCompose(resp -> resp);
    }

    @Override
    public CompletableFuture<JoinGroupResponseData> joinGroup(AuthorizableRequestContext context, JoinGroupRequestData request, BufferSupplier bufferSupplier) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(new JoinGroupResponseData().setMemberId(request.memberId()).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
        }
        if (!GroupCoordinatorService.isGroupIdNotEmpty(request.groupId())) {
            return CompletableFuture.completedFuture(new JoinGroupResponseData().setMemberId(request.memberId()).setErrorCode(Errors.INVALID_GROUP_ID.code()));
        }
        if (request.sessionTimeoutMs() < this.config.classicGroupMinSessionTimeoutMs() || request.sessionTimeoutMs() > this.config.classicGroupMaxSessionTimeoutMs()) {
            return CompletableFuture.completedFuture(new JoinGroupResponseData().setMemberId(request.memberId()).setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()));
        }
        CompletableFuture<JoinGroupResponseData> responseFuture = new CompletableFuture<JoinGroupResponseData>();
        this.runtime.scheduleWriteOperation("classic-group-join", this.topicPartitionFor(request.groupId()), Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> coordinator.classicGroupJoin(context, request, responseFuture)).exceptionally(exception -> {
            if (!responseFuture.isDone()) {
                responseFuture.complete((JoinGroupResponseData)CoordinatorOperationExceptionHelper.handleOperationException((String)"classic-group-join", (Object)request, (Throwable)exception, (error, __) -> new JoinGroupResponseData().setErrorCode(error.code()), (Logger)this.log));
            }
            return null;
        });
        return responseFuture;
    }

    @Override
    public CompletableFuture<SyncGroupResponseData> syncGroup(AuthorizableRequestContext context, SyncGroupRequestData request, BufferSupplier bufferSupplier) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(new SyncGroupResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
        }
        if (!GroupCoordinatorService.isGroupIdNotEmpty(request.groupId())) {
            return CompletableFuture.completedFuture(new SyncGroupResponseData().setErrorCode(Errors.INVALID_GROUP_ID.code()));
        }
        CompletableFuture<SyncGroupResponseData> responseFuture = new CompletableFuture<SyncGroupResponseData>();
        this.runtime.scheduleWriteOperation("classic-group-sync", this.topicPartitionFor(request.groupId()), Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> coordinator.classicGroupSync(context, request, responseFuture)).exceptionally(exception -> {
            if (!responseFuture.isDone()) {
                responseFuture.complete((SyncGroupResponseData)CoordinatorOperationExceptionHelper.handleOperationException((String)"classic-group-sync", (Object)request, (Throwable)exception, (error, __) -> new SyncGroupResponseData().setErrorCode(error.code()), (Logger)this.log));
            }
            return null;
        });
        return responseFuture;
    }

    @Override
    public CompletableFuture<HeartbeatResponseData> heartbeat(AuthorizableRequestContext context, HeartbeatRequestData request) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(new HeartbeatResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
        }
        if (!GroupCoordinatorService.isGroupIdNotEmpty(request.groupId())) {
            return CompletableFuture.completedFuture(new HeartbeatResponseData().setErrorCode(Errors.INVALID_GROUP_ID.code()));
        }
        return this.runtime.scheduleWriteOperation("classic-group-heartbeat", this.topicPartitionFor(request.groupId()), Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> coordinator.classicGroupHeartbeat(context, request)).exceptionally(exception -> (HeartbeatResponseData)CoordinatorOperationExceptionHelper.handleOperationException((String)"classic-group-heartbeat", (Object)request, (Throwable)exception, (error, __) -> {
            if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                return new HeartbeatResponseData().setErrorCode(Errors.NONE.code());
            }
            return new HeartbeatResponseData().setErrorCode(error.code());
        }, (Logger)this.log));
    }

    @Override
    public CompletableFuture<LeaveGroupResponseData> leaveGroup(AuthorizableRequestContext context, LeaveGroupRequestData request) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(new LeaveGroupResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
        }
        if (!GroupCoordinatorService.isGroupIdNotEmpty(request.groupId())) {
            return CompletableFuture.completedFuture(new LeaveGroupResponseData().setErrorCode(Errors.INVALID_GROUP_ID.code()));
        }
        return this.runtime.scheduleWriteOperation("classic-group-leave", this.topicPartitionFor(request.groupId()), Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> coordinator.classicGroupLeave(context, request)).exceptionally(exception -> (LeaveGroupResponseData)CoordinatorOperationExceptionHelper.handleOperationException((String)"classic-group-leave", (Object)request, (Throwable)exception, (error, __) -> {
            if (error == Errors.UNKNOWN_MEMBER_ID) {
                List<LeaveGroupResponseData.MemberResponse> memberResponses = request.members().stream().map(member -> new LeaveGroupResponseData.MemberResponse().setMemberId(member.memberId()).setGroupInstanceId(member.groupInstanceId()).setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())).toList();
                return new LeaveGroupResponseData().setMembers(memberResponses);
            }
            return new LeaveGroupResponseData().setErrorCode(error.code());
        }, (Logger)this.log));
    }

    @Override
    public CompletableFuture<ListGroupsResponseData> listGroups(AuthorizableRequestContext context, ListGroupsRequestData request) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(new ListGroupsResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
        }
        List futures = FutureUtils.mapExceptionally((List)this.runtime.scheduleReadAllOperation("list-groups", (coordinator, lastCommittedOffset) -> coordinator.listGroups(request.statesFilter(), request.typesFilter(), lastCommittedOffset)), exception -> {
            if ((exception = Errors.maybeUnwrapException((Throwable)exception)) instanceof NotCoordinatorException) {
                return List.of();
            }
            throw new CompletionException((Throwable)exception);
        });
        return ((CompletableFuture)FutureUtils.combineFutures((List)futures, ArrayList::new, List::addAll).thenApply(groups -> new ListGroupsResponseData().setGroups(groups))).exceptionally(exception -> (ListGroupsResponseData)CoordinatorOperationExceptionHelper.handleOperationException((String)"list-groups", (Object)request, (Throwable)exception, (error, __) -> new ListGroupsResponseData().setErrorCode(error.code()), (Logger)this.log));
    }

    @Override
    public CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> consumerGroupDescribe(AuthorizableRequestContext context, List<String> groupIds) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(ConsumerGroupDescribeRequest.getErrorDescribedGroupList(groupIds, (Errors)Errors.COORDINATOR_NOT_AVAILABLE));
        }
        ArrayList futures = new ArrayList(groupIds.size());
        HashMap<TopicPartition, List> groupsByTopicPartition = new HashMap<TopicPartition, List>();
        groupIds.forEach(groupId -> {
            if (GroupCoordinatorService.isGroupIdNotEmpty(groupId)) {
                groupsByTopicPartition.computeIfAbsent(this.topicPartitionFor((String)groupId), __ -> new ArrayList()).add(groupId);
            } else {
                futures.add(CompletableFuture.completedFuture(List.of(new ConsumerGroupDescribeResponseData.DescribedGroup().setGroupId("").setErrorCode(Errors.INVALID_GROUP_ID.code()))));
            }
        });
        groupsByTopicPartition.forEach((topicPartition, groupList) -> {
            CompletionStage future = this.runtime.scheduleReadOperation("consumer-group-describe", topicPartition, (coordinator, lastCommittedOffset) -> coordinator.consumerGroupDescribe((List<String>)groupList, lastCommittedOffset)).exceptionally(exception -> (List)CoordinatorOperationExceptionHelper.handleOperationException((String)"consumer-group-describe", (Object)groupList, (Throwable)exception, (error, __) -> ConsumerGroupDescribeRequest.getErrorDescribedGroupList((List)groupList, (Errors)error), (Logger)this.log));
            futures.add(future);
        });
        return FutureUtils.combineFutures(futures, ArrayList::new, List::addAll);
    }

    @Override
    public CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> streamsGroupDescribe(AuthorizableRequestContext context, List<String> groupIds) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(StreamsGroupDescribeRequest.getErrorDescribedGroupList(groupIds, (Errors)Errors.COORDINATOR_NOT_AVAILABLE));
        }
        ArrayList futures = new ArrayList(groupIds.size());
        HashMap<TopicPartition, List> groupsByTopicPartition = new HashMap<TopicPartition, List>();
        groupIds.forEach(groupId -> {
            if (GroupCoordinatorService.isGroupIdNotEmpty(groupId)) {
                groupsByTopicPartition.computeIfAbsent(this.topicPartitionFor((String)groupId), __ -> new ArrayList()).add(groupId);
            } else {
                futures.add(CompletableFuture.completedFuture(List.of(new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId("").setErrorCode(Errors.INVALID_GROUP_ID.code()))));
            }
        });
        groupsByTopicPartition.forEach((topicPartition, groupList) -> {
            CompletionStage future = this.runtime.scheduleReadOperation("streams-group-describe", topicPartition, (coordinator, lastCommittedOffset) -> coordinator.streamsGroupDescribe((List<String>)groupList, lastCommittedOffset)).exceptionally(exception -> (List)CoordinatorOperationExceptionHelper.handleOperationException((String)"streams-group-describe", (Object)groupList, (Throwable)exception, (error, __) -> StreamsGroupDescribeRequest.getErrorDescribedGroupList((List)groupList, (Errors)error), (Logger)this.log));
            futures.add(future);
        });
        return FutureUtils.combineFutures(futures, ArrayList::new, List::addAll);
    }

    @Override
    public CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> shareGroupDescribe(AuthorizableRequestContext context, List<String> groupIds) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(ShareGroupDescribeRequest.getErrorDescribedGroupList(groupIds, (Errors)Errors.COORDINATOR_NOT_AVAILABLE));
        }
        ArrayList futures = new ArrayList(groupIds.size());
        HashMap<TopicPartition, List> groupsByTopicPartition = new HashMap<TopicPartition, List>();
        groupIds.forEach(groupId -> {
            if (GroupCoordinatorService.isGroupIdNotEmpty(groupId)) {
                groupsByTopicPartition.computeIfAbsent(this.topicPartitionFor((String)groupId), __ -> new ArrayList()).add(groupId);
            } else {
                futures.add(CompletableFuture.completedFuture(List.of(new ShareGroupDescribeResponseData.DescribedGroup().setGroupId("").setErrorCode(Errors.INVALID_GROUP_ID.code()))));
            }
        });
        groupsByTopicPartition.forEach((topicPartition, groupList) -> {
            CompletionStage future = this.runtime.scheduleReadOperation("share-group-describe", topicPartition, (coordinator, lastCommittedOffset) -> coordinator.shareGroupDescribe((List<String>)groupList, lastCommittedOffset)).exceptionally(exception -> (List)CoordinatorOperationExceptionHelper.handleOperationException((String)"share-group-describe", (Object)groupList, (Throwable)exception, (error, __) -> ShareGroupDescribeRequest.getErrorDescribedGroupList((List)groupList, (Errors)error), (Logger)this.log));
            futures.add(future);
        });
        return FutureUtils.combineFutures(futures, ArrayList::new, List::addAll);
    }

    @Override
    public CompletableFuture<AlterShareGroupOffsetsResponseData> alterShareGroupOffsets(AuthorizableRequestContext context, String groupId, AlterShareGroupOffsetsRequestData request) {
        if (!this.isActive.get() || this.metadataImage == null) {
            return CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponseData((Errors)Errors.COORDINATOR_NOT_AVAILABLE));
        }
        if (groupId == null || groupId.isEmpty()) {
            return CompletableFuture.completedFuture(AlterShareGroupOffsetsRequest.getErrorResponseData((Errors)Errors.INVALID_GROUP_ID));
        }
        if (request.topics() == null) {
            return CompletableFuture.completedFuture(new AlterShareGroupOffsetsResponseData());
        }
        return ((CompletableFuture)this.runtime.scheduleWriteOperation("share-group-offsets-alter", this.topicPartitionFor(groupId), Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> coordinator.alterShareGroupOffsets(groupId, request)).thenCompose(result -> this.persisterInitialize((InitializeShareGroupStateParameters)result.getValue(), (AlterShareGroupOffsetsResponseData)result.getKey()))).exceptionally(exception -> (AlterShareGroupOffsetsResponseData)CoordinatorOperationExceptionHelper.handleOperationException((String)"share-group-offsets-alter", (Object)request, (Throwable)exception, AlterShareGroupOffsetsRequest::getErrorResponseData, (Logger)this.log));
    }

    @Override
    public CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> describeGroups(AuthorizableRequestContext context, List<String> groupIds) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(DescribeGroupsRequest.getErrorDescribedGroupList(groupIds, (Errors)Errors.COORDINATOR_NOT_AVAILABLE));
        }
        ArrayList futures = new ArrayList(groupIds.size());
        HashMap<TopicPartition, List> groupsByTopicPartition = new HashMap<TopicPartition, List>();
        groupIds.forEach(groupId -> {
            if (groupId == null) {
                futures.add(CompletableFuture.completedFuture(List.of(new DescribeGroupsResponseData.DescribedGroup().setGroupId("").setErrorCode(Errors.INVALID_GROUP_ID.code()))));
            } else {
                TopicPartition topicPartition = this.topicPartitionFor((String)groupId);
                groupsByTopicPartition.computeIfAbsent(topicPartition, __ -> new ArrayList()).add(groupId);
            }
        });
        groupsByTopicPartition.forEach((topicPartition, groupList) -> {
            CompletionStage future = this.runtime.scheduleReadOperation("describe-groups", topicPartition, (coordinator, lastCommittedOffset) -> coordinator.describeGroups(context, (List<String>)groupList, lastCommittedOffset)).exceptionally(exception -> (List)CoordinatorOperationExceptionHelper.handleOperationException((String)"describe-groups", (Object)groupList, (Throwable)exception, (error, __) -> DescribeGroupsRequest.getErrorDescribedGroupList((List)groupList, (Errors)error), (Logger)this.log));
            futures.add(future);
        });
        return FutureUtils.combineFutures(futures, ArrayList::new, List::addAll);
    }

    @Override
    public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> deleteGroups(AuthorizableRequestContext context, List<String> groupIds, BufferSupplier bufferSupplier) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(DeleteGroupsRequest.getErrorResultCollection(groupIds, (Errors)Errors.COORDINATOR_NOT_AVAILABLE));
        }
        ArrayList futures = new ArrayList(groupIds.size());
        HashMap<TopicPartition, List> groupsByTopicPartition = new HashMap<TopicPartition, List>();
        groupIds.forEach(groupId -> {
            if (groupId == null) {
                futures.add(CompletableFuture.completedFuture(DeleteGroupsRequest.getErrorResultCollection(Collections.singletonList(null), (Errors)Errors.INVALID_GROUP_ID)));
            } else {
                TopicPartition topicPartition = this.topicPartitionFor((String)groupId);
                groupsByTopicPartition.computeIfAbsent(topicPartition, __ -> new ArrayList()).add(groupId);
            }
        });
        groupsByTopicPartition.forEach((topicPartition, groupList) -> {
            CompletionStage future = this.deleteShareGroups((TopicPartition)topicPartition, (List<String>)groupList).thenCompose(groupErrMap -> {
                DeleteGroupsResponseData.DeletableGroupResultCollection deletableGroupResults = new DeleteGroupsResponseData.DeletableGroupResultCollection();
                List<String> retainedGroupIds = this.updateResponseAndGetNonErrorGroupList((Map<String, Errors>)groupErrMap, (List<String>)groupList, deletableGroupResults);
                if (retainedGroupIds.isEmpty()) {
                    return CompletableFuture.completedFuture(deletableGroupResults);
                }
                return ((CompletableFuture)this.handleDeleteGroups(context, (TopicPartition)topicPartition, retainedGroupIds).whenComplete((resp, __) -> resp.forEach(result -> deletableGroupResults.add((ImplicitLinkedHashCollection.Element)result.duplicate())))).thenApply(__ -> deletableGroupResults);
            });
            futures.add(future);
        });
        return FutureUtils.combineFutures(futures, DeleteGroupsResponseData.DeletableGroupResultCollection::new, (accumulator, newResults) -> newResults.forEach(result -> accumulator.add((ImplicitLinkedHashCollection.Element)result.duplicate())));
    }

    private List<String> updateResponseAndGetNonErrorGroupList(Map<String, Errors> shareGroupErrMap, List<String> groupList, DeleteGroupsResponseData.DeletableGroupResultCollection deletableGroupResults) {
        ArrayList errGroupIds = new ArrayList();
        shareGroupErrMap.forEach((groupId, error) -> {
            if (error.code() != Errors.NONE.code()) {
                this.log.error("Error deleting share group {} due to error {}", groupId, error);
                errGroupIds.add(groupId);
                deletableGroupResults.add((ImplicitLinkedHashCollection.Element)new DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId).setErrorCode(error.code()));
            }
        });
        HashSet<String> groupSet = new HashSet<String>(groupList);
        errGroupIds.forEach(groupSet::remove);
        return groupSet.stream().toList();
    }

    private CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> handleDeleteGroups(AuthorizableRequestContext context, TopicPartition topicPartition, List<String> groupIds) {
        return this.runtime.scheduleWriteOperation("delete-groups", topicPartition, Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> coordinator.deleteGroups(context, groupIds)).exceptionally(exception -> (DeleteGroupsResponseData.DeletableGroupResultCollection)CoordinatorOperationExceptionHelper.handleOperationException((String)"delete-groups", (Object)groupIds, (Throwable)exception, (error, __) -> DeleteGroupsRequest.getErrorResultCollection((List)groupIds, (Errors)error), (Logger)this.log));
    }

    private CompletableFuture<Map<String, Errors>> deleteShareGroups(TopicPartition topicPartition, List<String> groupList) {
        return ((CompletableFuture)this.runtime.scheduleWriteOperation("delete-share-groups", topicPartition, Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> coordinator.sharePartitionDeleteRequests(groupList)).thenCompose(this::performShareGroupsDeletion)).exceptionally(exception -> (Map)CoordinatorOperationExceptionHelper.handleOperationException((String)"delete-share-groups", (Object)groupList, (Throwable)exception, (error, __) -> {
            HashMap errors = new HashMap();
            groupList.forEach(group -> errors.put(group, error));
            return errors;
        }, (Logger)this.log));
    }

    private CompletableFuture<Map<String, Errors>> performShareGroupsDeletion(Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>> deleteRequests) {
        ArrayList<CompletableFuture<Map.Entry<String, DeleteShareGroupStateResult>>> futures = new ArrayList<CompletableFuture<Map.Entry<String, DeleteShareGroupStateResult>>>(deleteRequests.size());
        HashMap errorMap = new HashMap();
        deleteRequests.forEach((groupId, valPair) -> {
            if (valPair.getValue() == Errors.NONE) {
                futures.add(this.deleteShareGroup((DeleteShareGroupStateParameters)valPair.getKey()));
            } else {
                errorMap.put(groupId, (Errors)valPair.getValue());
            }
        });
        return this.persisterDeleteToGroupIdErrorMap(futures).thenApply(respErrMap -> {
            errorMap.putAll(respErrMap);
            return errorMap;
        });
    }

    private CompletableFuture<Map.Entry<String, DeleteShareGroupStateResult>> deleteShareGroup(DeleteShareGroupStateParameters deleteRequest) {
        String groupId = deleteRequest.groupTopicPartitionData().groupId();
        return ((CompletableFuture)this.persister.deleteState(deleteRequest).thenCompose(result -> CompletableFuture.completedFuture(Map.entry(groupId, result)))).exceptionally(exception -> {
            this.log.error("Unable to delete share group partition(s) - {} using request {}", new Object[]{groupId, deleteRequest, exception});
            List<TopicData> respTopicData = deleteRequest.groupTopicPartitionData().topicsData().stream().map(reqTopicData -> new TopicData(reqTopicData.topicId(), reqTopicData.partitions().stream().map(reqPartData -> {
                Errors err = Errors.forException((Throwable)exception);
                return PartitionFactory.newPartitionErrorData((int)reqPartData.partition(), (short)err.code(), (String)err.message());
            }).toList())).toList();
            return Map.entry(groupId, new DeleteShareGroupStateResult.Builder().setTopicsData(respTopicData).build());
        });
    }

    private CompletableFuture<Map<String, Errors>> persisterDeleteToGroupIdErrorMap(List<CompletableFuture<Map.Entry<String, DeleteShareGroupStateResult>>> futures) {
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenCompose(v -> {
            HashMap<String, Errors> groupIds = new HashMap<String, Errors>();
            for (CompletableFuture future : futures) {
                Map.Entry entry = future.getNow(null);
                groupIds.putIfAbsent((String)entry.getKey(), Errors.NONE);
                for (TopicData topicData : ((DeleteShareGroupStateResult)entry.getValue()).topicsData()) {
                    Optional<PartitionErrorData> errItem = topicData.partitions().stream().filter(errData -> errData.errorCode() != Errors.NONE.code()).findAny();
                    errItem.ifPresent(val -> {
                        this.log.error("Received error while deleting share group {} - {}", entry.getKey(), val);
                        groupIds.put((String)entry.getKey(), Errors.forCode((short)val.errorCode()));
                    });
                }
            }
            return CompletableFuture.completedFuture(groupIds);
        });
    }

    @Override
    public CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchOffsets(AuthorizableRequestContext context, OffsetFetchRequestData.OffsetFetchRequestGroup request, boolean requireStable) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(OffsetFetchResponse.groupError((OffsetFetchRequestData.OffsetFetchRequestGroup)request, (Errors)Errors.COORDINATOR_NOT_AVAILABLE, (int)context.requestVersion()));
        }
        if (request.groupId() == null) {
            return CompletableFuture.completedFuture(OffsetFetchResponse.groupError((OffsetFetchRequestData.OffsetFetchRequestGroup)request, (Errors)Errors.INVALID_GROUP_ID, (int)context.requestVersion()));
        }
        if (requireStable) {
            return this.runtime.scheduleWriteOperation("fetch-offsets", this.topicPartitionFor(request.groupId()), Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> new CoordinatorResult(List.of(), (Object)coordinator.fetchOffsets(request, Long.MAX_VALUE))).exceptionally(exception -> this.handleOffsetFetchException("fetch-offsets", context, request, (Throwable)exception));
        }
        return this.runtime.scheduleReadOperation("fetch-offsets", this.topicPartitionFor(request.groupId()), (coordinator, offset) -> coordinator.fetchOffsets(request, offset));
    }

    @Override
    public CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchAllOffsets(AuthorizableRequestContext context, OffsetFetchRequestData.OffsetFetchRequestGroup request, boolean requireStable) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(OffsetFetchResponse.groupError((OffsetFetchRequestData.OffsetFetchRequestGroup)request, (Errors)Errors.COORDINATOR_NOT_AVAILABLE, (int)context.requestVersion()));
        }
        if (request.groupId() == null) {
            return CompletableFuture.completedFuture(OffsetFetchResponse.groupError((OffsetFetchRequestData.OffsetFetchRequestGroup)request, (Errors)Errors.INVALID_GROUP_ID, (int)context.requestVersion()));
        }
        if (requireStable) {
            return this.runtime.scheduleWriteOperation("fetch-all-offsets", this.topicPartitionFor(request.groupId()), Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> new CoordinatorResult(List.of(), (Object)coordinator.fetchAllOffsets(request, Long.MAX_VALUE))).exceptionally(exception -> this.handleOffsetFetchException("fetch-all-offsets", context, request, (Throwable)exception));
        }
        return this.runtime.scheduleReadOperation("fetch-all-offsets", this.topicPartitionFor(request.groupId()), (coordinator, offset) -> coordinator.fetchAllOffsets(request, offset));
    }

    @Override
    public CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> describeShareGroupOffsets(AuthorizableRequestContext context, DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(DescribeShareGroupOffsetsRequest.getErrorDescribedGroup((String)requestData.groupId(), (Errors)Errors.COORDINATOR_NOT_AVAILABLE));
        }
        if (this.metadataImage == null) {
            return CompletableFuture.completedFuture(DescribeShareGroupOffsetsRequest.getErrorDescribedGroup((String)requestData.groupId(), (Errors)Errors.COORDINATOR_NOT_AVAILABLE));
        }
        HashMap<Uuid, String> requestTopicIdToNameMapping = new HashMap<Uuid, String>();
        ArrayList readStateSummaryData = new ArrayList(requestData.topics().size());
        ArrayList<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic> describeShareGroupOffsetsResponseTopicList = new ArrayList<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>(requestData.topics().size());
        requestData.topics().forEach(topic -> {
            Optional topicMetadataOpt = this.metadataImage.topicMetadata(topic.topicName());
            if (topicMetadataOpt.isPresent()) {
                Uuid topicId = ((CoordinatorMetadataImage.TopicMetadata)topicMetadataOpt.get()).id();
                requestTopicIdToNameMapping.put(topicId, topic.topicName());
                readStateSummaryData.add(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData().setTopicId(topicId).setPartitions(topic.partitions().stream().map(partitionIndex -> new ReadShareGroupStateSummaryRequestData.PartitionData().setPartition(partitionIndex.intValue())).toList()));
            } else {
                describeShareGroupOffsetsResponseTopicList.add(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName(topic.topicName()).setTopicId(Uuid.ZERO_UUID).setPartitions(topic.partitions().stream().map(partition -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(partition.intValue()).setStartOffset(-1L)).toList()));
            }
        });
        if (readStateSummaryData.isEmpty()) {
            return CompletableFuture.completedFuture(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(requestData.groupId()).setTopics(describeShareGroupOffsetsResponseTopicList));
        }
        ReadShareGroupStateSummaryRequestData readSummaryRequestData = new ReadShareGroupStateSummaryRequestData().setGroupId(requestData.groupId()).setTopics(readStateSummaryData);
        return this.readShareGroupStateSummary(readSummaryRequestData, requestTopicIdToNameMapping, describeShareGroupOffsetsResponseTopicList);
    }

    @Override
    public CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> describeShareGroupAllOffsets(AuthorizableRequestContext context, DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(DescribeShareGroupOffsetsRequest.getErrorDescribedGroup((String)requestData.groupId(), (Errors)Errors.COORDINATOR_NOT_AVAILABLE));
        }
        if (this.metadataImage == null) {
            return CompletableFuture.completedFuture(DescribeShareGroupOffsetsRequest.getErrorDescribedGroup((String)requestData.groupId(), (Errors)Errors.COORDINATOR_NOT_AVAILABLE));
        }
        return this.runtime.scheduleReadOperation("share-group-initialized-partitions", this.topicPartitionFor(requestData.groupId()), (coordinator, offset) -> coordinator.initializedShareGroupPartitions(requestData.groupId())).thenCompose(topicPartitionMap -> {
            HashMap<Uuid, String> requestTopicIdToNameMapping = new HashMap<Uuid, String>();
            ArrayList<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic> describeShareGroupOffsetsResponseTopicList = new ArrayList<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>(topicPartitionMap.size());
            ReadShareGroupStateSummaryRequestData readSummaryRequestData = new ReadShareGroupStateSummaryRequestData().setGroupId(requestData.groupId());
            topicPartitionMap.forEach((topicId, partitionSet) -> this.metadataImage.topicMetadata(topicId).ifPresent(topicMetadata -> {
                requestTopicIdToNameMapping.put((Uuid)topicId, topicMetadata.name());
                readSummaryRequestData.topics().add(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData().setTopicId(topicId).setPartitions(partitionSet.stream().map(partitionIndex -> new ReadShareGroupStateSummaryRequestData.PartitionData().setPartition(partitionIndex.intValue())).toList()));
            }));
            return this.readShareGroupStateSummary(readSummaryRequestData, requestTopicIdToNameMapping, describeShareGroupOffsetsResponseTopicList);
        });
    }

    private CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> readShareGroupStateSummary(ReadShareGroupStateSummaryRequestData readSummaryRequestData, Map<Uuid, String> requestTopicIdToNameMapping, List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic> describeShareGroupOffsetsResponseTopicList) {
        if (readSummaryRequestData.topics().isEmpty()) {
            return CompletableFuture.completedFuture(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(readSummaryRequestData.groupId()).setTopics(describeShareGroupOffsetsResponseTopicList));
        }
        CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> future = new CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>();
        this.persister.readSummary(ReadShareGroupStateSummaryParameters.from((ReadShareGroupStateSummaryRequestData)readSummaryRequestData)).whenComplete((result, error) -> {
            if (error != null) {
                this.log.error("Failed to read summary of the share partition", error);
                future.completeExceptionally((Throwable)error);
                return;
            }
            if (result == null || result.topicsData() == null) {
                this.log.error("Result is null for the read state summary");
                future.completeExceptionally(new IllegalStateException("Result is null for the read state summary"));
                return;
            }
            this.computeShareGroupLagAndBuildResponse((ReadShareGroupStateSummaryResult)result, requestTopicIdToNameMapping, describeShareGroupOffsetsResponseTopicList, future, readSummaryRequestData.groupId());
        });
        return future;
    }

    private void computeShareGroupLagAndBuildResponse(ReadShareGroupStateSummaryResult readSummaryResult, Map<Uuid, String> requestTopicIdToNameMapping, List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic> describeShareGroupOffsetsResponseTopicList, CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> responseFuture, String groupId) {
        HashSet<TopicPartition> partitionsToComputeLag = new HashSet<TopicPartition>();
        readSummaryResult.topicsData().forEach(topicData -> topicData.partitions().forEach(partitionData -> {
            if (this.shouldComputeSharePartitionLag((PartitionStateSummaryData)partitionData)) {
                partitionsToComputeLag.add(new TopicPartition((String)requestTopicIdToNameMapping.get(topicData.topicId()), partitionData.partition()));
            }
        }));
        Map partitionLatestOffsets = partitionsToComputeLag.isEmpty() ? Map.of() : this.partitionMetadataClient.listLatestOffsets(partitionsToComputeLag);
        DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup responseGroup = new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(groupId);
        ArrayList responseTopics = new ArrayList();
        CompletableFuture.allOf(partitionLatestOffsets.values().toArray(new CompletableFuture[0])).whenComplete((result, error) -> {
            if (error != null) {
                this.log.error("Failed to retrieve partition end offsets while calculating share partitions lag for share group - {}", (Object)groupId, error);
                responseFuture.completeExceptionally((Throwable)error);
                return;
            }
            readSummaryResult.topicsData().forEach(topicData -> {
                DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic topic = new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicId(topicData.topicId()).setTopicName((String)requestTopicIdToNameMapping.get(topicData.topicId()));
                ArrayList partitionResponses = new ArrayList();
                topicData.partitions().forEach(partitionData -> {
                    TopicPartition tp = new TopicPartition((String)requestTopicIdToNameMapping.get(topicData.topicId()), partitionData.partition());
                    if (!this.shouldComputeSharePartitionLag((PartitionStateSummaryData)partitionData)) {
                        partitionResponses.add(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(partitionData.partition()).setStartOffset(partitionData.errorCode() == Errors.NONE.code() ? partitionData.startOffset() : -1L).setLeaderEpoch(partitionData.errorCode() == Errors.NONE.code() ? partitionData.leaderEpoch() : 0).setLag(-1L));
                    } else {
                        PartitionMetadataClient.OffsetResponse offsetResponse = (PartitionMetadataClient.OffsetResponse)((CompletableFuture)partitionLatestOffsets.get(tp)).join();
                        if (offsetResponse.error().code() != Errors.NONE.code()) {
                            this.log.error("Partition end offset fetch failed for topicPartition {}", (Object)tp);
                            partitionResponses.add(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(partitionData.partition()).setErrorCode(offsetResponse.error().code()).setErrorMessage(offsetResponse.error().message()));
                        } else {
                            long lag = offsetResponse.offset() - partitionData.startOffset() - (long)partitionData.deliveryCompleteCount();
                            partitionResponses.add(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(partitionData.partition()).setStartOffset(partitionData.startOffset()).setLeaderEpoch(partitionData.leaderEpoch()).setLag(lag));
                        }
                    }
                });
                topic.setPartitions(partitionResponses);
                responseTopics.add(topic);
            });
            responseTopics.addAll(describeShareGroupOffsetsResponseTopicList);
            responseGroup.setTopics(responseTopics);
            responseFuture.complete(responseGroup);
        });
    }

    private boolean shouldComputeSharePartitionLag(PartitionStateSummaryData partitionData) {
        return partitionData.errorCode() == Errors.NONE.code() && partitionData.startOffset() != -1L && partitionData.deliveryCompleteCount() != -1;
    }

    @Override
    public CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOffsets(AuthorizableRequestContext context, DeleteShareGroupOffsetsRequestData requestData) {
        if (!this.isActive.get() || this.metadataImage == null) {
            return CompletableFuture.completedFuture(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData((Errors)Errors.COORDINATOR_NOT_AVAILABLE));
        }
        String groupId = requestData.groupId();
        if (!GroupCoordinatorService.isGroupIdNotEmpty(groupId)) {
            return CompletableFuture.completedFuture(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData((Errors)Errors.INVALID_GROUP_ID));
        }
        if (requestData.topics() == null) {
            return CompletableFuture.completedFuture(new DeleteShareGroupOffsetsResponseData());
        }
        return ((CompletableFuture)this.runtime.scheduleWriteOperation("initiate-delete-share-group-offsets", this.topicPartitionFor(groupId), Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> coordinator.initiateDeleteShareGroupOffsets(groupId, requestData)).thenCompose(resultHolder -> this.deleteShareGroupOffsetsState(groupId, (GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder)resultHolder))).exceptionally(exception -> (DeleteShareGroupOffsetsResponseData)CoordinatorOperationExceptionHelper.handleOperationException((String)"initiate-delete-share-group-offsets", (Object)groupId, (Throwable)exception, DeleteShareGroupOffsetsRequest::getErrorDeleteResponseData, (Logger)this.log));
    }

    private CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOffsetsState(String groupId, GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder resultHolder) {
        ArrayList errorTopicResponseList;
        if (resultHolder == null) {
            this.log.error("Failed to retrieve deleteState request parameters from group coordinator for the group {}", (Object)groupId);
            return CompletableFuture.completedFuture(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData((Errors)Errors.UNKNOWN_SERVER_ERROR));
        }
        if (resultHolder.topLevelErrorCode() != Errors.NONE.code()) {
            return CompletableFuture.completedFuture(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData((short)resultHolder.topLevelErrorCode(), (String)resultHolder.topLevelErrorMessage()));
        }
        ArrayList<Object> arrayList = errorTopicResponseList = resultHolder.errorTopicResponseList() == null ? new ArrayList() : new ArrayList<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>(resultHolder.errorTopicResponseList());
        if (resultHolder.deleteStateRequestParameters() == null) {
            return CompletableFuture.completedFuture(new DeleteShareGroupOffsetsResponseData().setResponses(errorTopicResponseList));
        }
        return ((CompletableFuture)this.persister.deleteState(resultHolder.deleteStateRequestParameters()).thenCompose(result -> this.handleDeleteShareGroupOffsetStateResult(groupId, (DeleteShareGroupStateResult)result, errorTopicResponseList))).exceptionally(throwable -> {
            this.log.error("Failed to delete share group state due to: {}", (Object)throwable.getMessage(), throwable);
            return DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData((Errors)Errors.forException((Throwable)throwable));
        });
    }

    private CompletableFuture<DeleteShareGroupOffsetsResponseData> handleDeleteShareGroupOffsetStateResult(String groupId, DeleteShareGroupStateResult result, List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponses) {
        if (result == null || result.topicsData() == null) {
            this.log.error("Result is null for the delete share group state");
            IllegalStateException exception = new IllegalStateException("Result is null for the delete share group state");
            return CompletableFuture.completedFuture(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData((Errors)Errors.forException((Throwable)exception)));
        }
        HashMap<Uuid, String> successTopics = new HashMap<Uuid, String>();
        result.topicsData().forEach(topicData -> {
            Optional<PartitionErrorData> errItem = topicData.partitions().stream().filter(errData -> errData.errorCode() != Errors.NONE.code()).findAny();
            String topicName = this.metadataImage.topicMetadata(topicData.topicId()).map(CoordinatorMetadataImage.TopicMetadata::name).orElse(null);
            if (errItem.isPresent()) {
                errorTopicResponses.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicId(topicData.topicId()).setTopicName(topicName).setErrorMessage(Errors.forCode((short)errItem.get().errorCode()).message()).setErrorCode(errItem.get().errorCode()));
            } else {
                successTopics.put(topicData.topicId(), topicName);
            }
        });
        if (successTopics.isEmpty()) {
            return CompletableFuture.completedFuture(new DeleteShareGroupOffsetsResponseData().setResponses(errorTopicResponses));
        }
        return this.completeDeleteShareGroupOffsets(groupId, successTopics, errorTopicResponses);
    }

    private CompletableFuture<DeleteShareGroupOffsetsResponseData> completeDeleteShareGroupOffsets(String groupId, Map<Uuid, String> successTopics, List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponses) {
        return this.runtime.scheduleWriteOperation("complete-delete-share-group-offsets", this.topicPartitionFor(groupId), Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> coordinator.completeDeleteShareGroupOffsets(groupId, successTopics, errorTopicResponses)).exceptionally(exception -> (DeleteShareGroupOffsetsResponseData)CoordinatorOperationExceptionHelper.handleOperationException((String)"complete-delete-share-group-offsets", (Object)groupId, (Throwable)exception, (error, __) -> DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData((Errors)error), (Logger)this.log));
    }

    @Override
    public CompletableFuture<OffsetCommitResponseData> commitOffsets(AuthorizableRequestContext context, OffsetCommitRequestData request, BufferSupplier bufferSupplier) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(OffsetCommitRequest.getErrorResponse((OffsetCommitRequestData)request, (Errors)Errors.COORDINATOR_NOT_AVAILABLE));
        }
        if (request.groupId() == null) {
            return CompletableFuture.completedFuture(OffsetCommitRequest.getErrorResponse((OffsetCommitRequestData)request, (Errors)Errors.INVALID_GROUP_ID));
        }
        return this.runtime.scheduleWriteOperation("commit-offset", this.topicPartitionFor(request.groupId()), Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> coordinator.commitOffset(context, request)).exceptionally(exception -> (OffsetCommitResponseData)CoordinatorOperationExceptionHelper.handleOperationException((String)"commit-offset", (Object)request, (Throwable)exception, (error, __) -> OffsetCommitRequest.getErrorResponse((OffsetCommitRequestData)request, (Errors)error), (Logger)this.log));
    }

    @Override
    public CompletableFuture<TxnOffsetCommitResponseData> commitTransactionalOffsets(AuthorizableRequestContext context, TxnOffsetCommitRequestData request, BufferSupplier bufferSupplier) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(TxnOffsetCommitRequest.getErrorResponse((TxnOffsetCommitRequestData)request, (Errors)Errors.COORDINATOR_NOT_AVAILABLE));
        }
        if (!GroupCoordinatorService.isGroupIdNotEmpty(request.groupId())) {
            return CompletableFuture.completedFuture(TxnOffsetCommitRequest.getErrorResponse((TxnOffsetCommitRequestData)request, (Errors)Errors.INVALID_GROUP_ID));
        }
        return this.runtime.scheduleTransactionalWriteOperation("txn-commit-offset", this.topicPartitionFor(request.groupId()), request.transactionalId(), request.producerId(), request.producerEpoch(), Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> coordinator.commitTransactionalOffset(context, request), context.requestVersion()).exceptionally(exception -> (TxnOffsetCommitResponseData)CoordinatorOperationExceptionHelper.handleOperationException((String)"txn-commit-offset", (Object)request, (Throwable)exception, (error, __) -> TxnOffsetCommitRequest.getErrorResponse((TxnOffsetCommitRequestData)request, (Errors)error), (Logger)this.log));
    }

    @Override
    public CompletableFuture<OffsetDeleteResponseData> deleteOffsets(AuthorizableRequestContext context, OffsetDeleteRequestData request, BufferSupplier bufferSupplier) {
        if (!this.isActive.get()) {
            return CompletableFuture.completedFuture(new OffsetDeleteResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
        }
        if (!GroupCoordinatorService.isGroupIdNotEmpty(request.groupId())) {
            return CompletableFuture.completedFuture(new OffsetDeleteResponseData().setErrorCode(Errors.INVALID_GROUP_ID.code()));
        }
        return this.runtime.scheduleWriteOperation("delete-offsets", this.topicPartitionFor(request.groupId()), Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> coordinator.deleteOffsets(context, request)).exceptionally(exception -> (OffsetDeleteResponseData)CoordinatorOperationExceptionHelper.handleOperationException((String)"delete-offsets", (Object)request, (Throwable)exception, (error, __) -> new OffsetDeleteResponseData().setErrorCode(error.code()), (Logger)this.log));
    }

    @Override
    public CompletableFuture<Void> completeTransaction(TopicPartition tp, long producerId, short producerEpoch, int coordinatorEpoch, TransactionResult result, short transactionVersion, Duration timeout) {
        if (!this.isActive.get()) {
            return FutureUtils.failedFuture((Throwable)Errors.COORDINATOR_NOT_AVAILABLE.exception());
        }
        if (!tp.topic().equals("__consumer_offsets")) {
            return FutureUtils.failedFuture((Throwable)new IllegalStateException("Completing a transaction for " + String.valueOf(tp) + " is not expected"));
        }
        return this.runtime.scheduleTransactionCompletion("write-txn-marker", tp, producerId, producerEpoch, coordinatorEpoch, result, transactionVersion, timeout);
    }

    @Override
    public void onPartitionsDeleted(List<TopicPartition> topicPartitions, BufferSupplier bufferSupplier) throws ExecutionException, InterruptedException {
        this.throwIfNotActive();
        CompletableFuture.allOf(FutureUtils.mapExceptionally((List)this.runtime.scheduleWriteAllOperation("on-partition-deleted", Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> coordinator.onPartitionsDeleted(topicPartitions)), exception -> {
            this.log.error("Could not delete offsets for deleted partitions {} due to: {}.", new Object[]{topicPartitions, exception.getMessage(), exception});
            return null;
        }).toArray(new CompletableFuture[0])).get();
        if (this.metadataImage == null || this.metadataImage.equals((Object)CoordinatorMetadataImage.EMPTY)) {
            return;
        }
        Set topicIds = topicPartitions.stream().filter(tp -> this.metadataImage.topicMetadata(tp.topic()).isPresent()).map(tp -> ((CoordinatorMetadataImage.TopicMetadata)this.metadataImage.topicMetadata(tp.topic()).get()).id()).collect(Collectors.toSet());
        if (topicIds.isEmpty()) {
            return;
        }
        CompletableFuture.allOf(FutureUtils.mapExceptionally((List)this.runtime.scheduleWriteAllOperation("maybe-cleanup-share-group-state", Duration.ofMillis(this.config.offsetCommitTimeoutMs()), coordinator -> coordinator.maybeCleanupShareGroupState(topicIds)), exception -> {
            this.log.error("Unable to cleanup state for the deleted topics {}", (Object)topicIds, exception);
            return null;
        }).toArray(new CompletableFuture[0])).get();
    }

    @Override
    public void onElection(int groupMetadataPartitionIndex, int groupMetadataPartitionLeaderEpoch) {
        this.throwIfNotActive();
        this.runtime.scheduleLoadOperation(new TopicPartition("__consumer_offsets", groupMetadataPartitionIndex), groupMetadataPartitionLeaderEpoch);
    }

    @Override
    public void onResignation(int groupMetadataPartitionIndex, OptionalInt groupMetadataPartitionLeaderEpoch) {
        this.throwIfNotActive();
        this.runtime.scheduleUnloadOperation(new TopicPartition("__consumer_offsets", groupMetadataPartitionIndex), groupMetadataPartitionLeaderEpoch);
    }

    @Override
    public void onNewMetadataImage(CoordinatorMetadataImage newImage, CoordinatorMetadataDelta delta) {
        this.throwIfNotActive();
        this.metadataImage = newImage;
        this.runtime.onNewMetadataImage(newImage, delta);
    }

    @Override
    public Properties groupMetadataTopicConfigs() {
        Properties properties = new Properties();
        properties.put("cleanup.policy", "compact");
        properties.put("compression.type", BrokerCompressionType.PRODUCER.name);
        properties.put("segment.bytes", String.valueOf(this.config.offsetsTopicSegmentBytes()));
        return properties;
    }

    @Override
    public Optional<GroupConfig> groupConfig(String groupId) {
        return this.groupConfigManager.groupConfig(groupId);
    }

    @Override
    public void updateGroupConfig(String groupId, Properties newGroupConfig) {
        this.groupConfigManager.updateGroupConfig(groupId, newGroupConfig);
    }

    @Override
    public void startup(IntSupplier groupMetadataTopicPartitionCount) {
        if (!this.isActive.compareAndSet(false, true)) {
            this.log.warn("Group coordinator is already running.");
            return;
        }
        this.log.info("Starting up.");
        this.numPartitions = groupMetadataTopicPartitionCount.getAsInt();
        this.isActive.set(true);
        this.log.info("Startup complete.");
    }

    @Override
    public void shutdown() {
        if (!this.isActive.compareAndSet(true, false)) {
            this.log.warn("Group coordinator is already shutting down.");
            return;
        }
        this.log.info("Shutting down.");
        this.isActive.set(false);
        org.apache.kafka.common.utils.Utils.closeQuietly(this.runtime, (String)"coordinator runtime");
        org.apache.kafka.common.utils.Utils.closeQuietly((AutoCloseable)this.groupCoordinatorMetrics, (String)"group coordinator metrics");
        org.apache.kafka.common.utils.Utils.closeQuietly((AutoCloseable)this.groupConfigManager, (String)"group config manager");
        this.log.info("Shutdown complete.");
    }

    private static boolean isGroupIdNotEmpty(String groupId) {
        return groupId != null && !groupId.isEmpty();
    }

    private OffsetFetchResponseData.OffsetFetchResponseGroup handleOffsetFetchException(String operationName, AuthorizableRequestContext context, OffsetFetchRequestData.OffsetFetchRequestGroup request, Throwable exception) {
        ApiError apiError = ApiError.fromThrowable((Throwable)exception);
        return switch (apiError.error()) {
            case Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NOT_ENOUGH_REPLICAS, Errors.REQUEST_TIMED_OUT -> OffsetFetchResponse.groupError((OffsetFetchRequestData.OffsetFetchRequestGroup)request, (Errors)Errors.NOT_COORDINATOR, (int)context.requestVersion());
            default -> (OffsetFetchResponseData.OffsetFetchResponseGroup)CoordinatorOperationExceptionHelper.handleOperationException((String)operationName, (Object)request, (Throwable)exception, (error, __) -> OffsetFetchResponse.groupError((OffsetFetchRequestData.OffsetFetchRequestGroup)request, (Errors)error, (int)context.requestVersion()), (Logger)this.log);
        };
    }

    private static void requireNonNull(Object obj, String msg) {
        if (obj == null) {
            throw new IllegalArgumentException(msg);
        }
    }

    public static class Builder {
        private final int nodeId;
        private final GroupCoordinatorConfig config;
        private PartitionWriter writer;
        private CoordinatorLoader<CoordinatorRecord> loader;
        private Time time;
        private Timer timer;
        private CoordinatorRuntimeMetrics coordinatorRuntimeMetrics;
        private GroupCoordinatorMetrics groupCoordinatorMetrics;
        private GroupConfigManager groupConfigManager;
        private Persister persister;
        private Optional<Plugin<Authorizer>> authorizerPlugin;
        private PartitionMetadataClient partitionMetadataClient;

        public Builder(int nodeId, GroupCoordinatorConfig config) {
            this.nodeId = nodeId;
            this.config = config;
        }

        public Builder withWriter(PartitionWriter writer) {
            this.writer = writer;
            return this;
        }

        public Builder withLoader(CoordinatorLoader<CoordinatorRecord> loader) {
            this.loader = loader;
            return this;
        }

        public Builder withTime(Time time) {
            this.time = time;
            return this;
        }

        public Builder withTimer(Timer timer) {
            this.timer = timer;
            return this;
        }

        public Builder withCoordinatorRuntimeMetrics(CoordinatorRuntimeMetrics coordinatorRuntimeMetrics) {
            this.coordinatorRuntimeMetrics = coordinatorRuntimeMetrics;
            return this;
        }

        public Builder withGroupCoordinatorMetrics(GroupCoordinatorMetrics groupCoordinatorMetrics) {
            this.groupCoordinatorMetrics = groupCoordinatorMetrics;
            return this;
        }

        public Builder withGroupConfigManager(GroupConfigManager groupConfigManager) {
            this.groupConfigManager = groupConfigManager;
            return this;
        }

        public Builder withPersister(Persister persister) {
            this.persister = persister;
            return this;
        }

        public Builder withAuthorizerPlugin(Optional<Plugin<Authorizer>> authorizerPlugin) {
            this.authorizerPlugin = authorizerPlugin;
            return this;
        }

        public Builder withPartitionMetadataClient(PartitionMetadataClient partitionMetadataClient) {
            this.partitionMetadataClient = partitionMetadataClient;
            return this;
        }

        public GroupCoordinatorService build() {
            GroupCoordinatorService.requireNonNull(this.config, "Config must be set.");
            GroupCoordinatorService.requireNonNull(this.writer, "Writer must be set.");
            GroupCoordinatorService.requireNonNull(this.loader, "Loader must be set.");
            GroupCoordinatorService.requireNonNull(this.time, "Time must be set.");
            GroupCoordinatorService.requireNonNull(this.timer, "Timer must be set.");
            GroupCoordinatorService.requireNonNull(this.coordinatorRuntimeMetrics, "CoordinatorRuntimeMetrics must be set.");
            GroupCoordinatorService.requireNonNull(this.groupCoordinatorMetrics, "GroupCoordinatorMetrics must be set.");
            GroupCoordinatorService.requireNonNull(this.groupConfigManager, "GroupConfigManager must be set.");
            GroupCoordinatorService.requireNonNull(this.persister, "Persister must be set.");
            GroupCoordinatorService.requireNonNull(this.authorizerPlugin, "Authorizer must be set.");
            GroupCoordinatorService.requireNonNull(this.partitionMetadataClient, "PartitionMetadataClient must be set.");
            String logPrefix = String.format("GroupCoordinator id=%d", this.nodeId);
            LogContext logContext = new LogContext(String.format("[%s] ", logPrefix));
            CoordinatorShardBuilderSupplier supplier = () -> new GroupCoordinatorShard.Builder(this.config, this.groupConfigManager).withAuthorizerPlugin(this.authorizerPlugin);
            MultiThreadedEventProcessor processor = new MultiThreadedEventProcessor(logContext, "group-coordinator-event-processor-", this.config.numThreads(), this.time, this.coordinatorRuntimeMetrics);
            CoordinatorRuntime runtime = new CoordinatorRuntime.Builder().withTime(this.time).withTimer(this.timer).withLogPrefix(logPrefix).withLogContext(logContext).withEventProcessor((CoordinatorEventProcessor)processor).withPartitionWriter(this.writer).withLoader(this.loader).withCoordinatorShardBuilderSupplier(supplier).withDefaultWriteTimeOut(Duration.ofMillis(this.config.offsetCommitTimeoutMs())).withCoordinatorRuntimeMetrics(this.coordinatorRuntimeMetrics).withCoordinatorMetrics((CoordinatorMetrics)this.groupCoordinatorMetrics).withSerializer((Serializer)new GroupCoordinatorRecordSerde()).withCompression(Compression.of((CompressionType)this.config.offsetTopicCompressionType()).build()).withAppendLingerMs(this.config.appendLingerMs()).withExecutorService(Executors.newSingleThreadExecutor()).build();
            return new GroupCoordinatorService(logContext, this.config, (CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord>)runtime, this.groupCoordinatorMetrics, this.groupConfigManager, this.persister, this.timer, this.partitionMetadataClient);
        }
    }
}

