/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.cluster.service;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateApplier;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.ClusterStateTaskConfig;
import org.opensearch.cluster.LocalNodeClusterManagerListener;
import org.opensearch.cluster.NodeConnectionsService;
import org.opensearch.cluster.StreamNodeConnectionsService;
import org.opensearch.cluster.TimeoutClusterStateListener;
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterApplier;
import org.opensearch.cluster.service.SourcePrioritizedRunnable;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
import org.opensearch.common.StopWatch;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.concurrent.ThreadContextAccess;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

@PublicApi(since="1.0.0")
public class ClusterApplierService
extends AbstractLifecycleComponent
implements ClusterApplier {
    private static final Logger logger = LogManager.getLogger(ClusterApplierService.class);
    public static final Setting<TimeValue> CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING = Setting.positiveTimeSetting("cluster.service.slow_task_logging_threshold", TimeValue.timeValueSeconds((long)30L), Setting.Property.Dynamic, Setting.Property.NodeScope);
    public static final String CLUSTER_UPDATE_THREAD_NAME = "clusterApplierService#updateTask";
    private final ClusterSettings clusterSettings;
    protected final ThreadPool threadPool;
    private volatile TimeValue slowTaskLoggingThreshold;
    private volatile PrioritizedOpenSearchThreadPoolExecutor threadPoolExecutor;
    private final Collection<ClusterStateApplier> highPriorityStateAppliers = new CopyOnWriteArrayList<ClusterStateApplier>();
    private final Collection<ClusterStateApplier> normalPriorityStateAppliers = new CopyOnWriteArrayList<ClusterStateApplier>();
    private final Collection<ClusterStateApplier> lowPriorityStateAppliers = new CopyOnWriteArrayList<ClusterStateApplier>();
    private final Collection<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>();
    private final Map<TimeoutClusterStateListener, NotifyTimeout> timeoutClusterStateListeners = new ConcurrentHashMap<TimeoutClusterStateListener, NotifyTimeout>();
    private final AtomicReference<ClusterState> preCommitState = new AtomicReference();
    private final AtomicReference<ClusterState> state;
    private final String nodeName;
    private NodeConnectionsService nodeConnectionsService;
    private NodeConnectionsService streamNodeConnectionsService;
    private final ClusterManagerMetrics clusterManagerMetrics;

    public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
        this(nodeName, settings, clusterSettings, threadPool, new ClusterManagerMetrics((MetricsRegistry)NoopMetricsRegistry.INSTANCE));
    }

    public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, ClusterManagerMetrics clusterManagerMetrics) {
        this.clusterSettings = clusterSettings;
        this.threadPool = threadPool;
        this.state = new AtomicReference();
        this.nodeName = nodeName;
        this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
        this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, this::setSlowTaskLoggingThreshold);
        this.clusterManagerMetrics = clusterManagerMetrics;
    }

    private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
        this.slowTaskLoggingThreshold = slowTaskLoggingThreshold;
    }

    public synchronized void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) {
        assert (this.nodeConnectionsService == null) : "nodeConnectionsService is already set";
        this.nodeConnectionsService = nodeConnectionsService;
    }

    public synchronized void setStreamNodeConnectionsService(StreamNodeConnectionsService streamNodeConnectionsService) {
        assert (this.streamNodeConnectionsService == null) : "streamNodeConnectionsService is already set";
        this.streamNodeConnectionsService = streamNodeConnectionsService;
    }

    @Override
    public void setInitialState(ClusterState initialState) {
        if (this.lifecycle.started()) {
            throw new IllegalStateException("can't set initial state when started");
        }
        assert (this.state.get() == null) : "state is already set";
        this.state.set(initialState);
    }

    protected synchronized void doStart() {
        Objects.requireNonNull(this.nodeConnectionsService, "please set the node connection service before starting");
        Objects.requireNonNull(this.state.get(), "please set initial state before starting");
        this.threadPoolExecutor = this.createThreadPoolExecutor();
    }

    protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() {
        return OpenSearchExecutors.newSinglePrioritizing(this.nodeName + "/clusterApplierService#updateTask", OpenSearchExecutors.daemonThreadFactory(this.nodeName, CLUSTER_UPDATE_THREAD_NAME), this.threadPool.getThreadContext(), this.threadPool.scheduler());
    }

    protected synchronized void doStop() {
        for (Map.Entry<TimeoutClusterStateListener, NotifyTimeout> onGoingTimeout : this.timeoutClusterStateListeners.entrySet()) {
            try {
                onGoingTimeout.getValue().cancel();
                onGoingTimeout.getKey().onClose();
            }
            catch (Exception ex) {
                logger.debug("failed to notify listeners on shutdown", (Throwable)ex);
            }
        }
        ThreadPool.terminate(this.threadPoolExecutor, 10L, TimeUnit.SECONDS);
    }

    protected synchronized void doClose() {
    }

    public ClusterState state() {
        assert (ClusterApplierService.assertNotCalledFromClusterStateApplier("the applied cluster state is not yet available"));
        ClusterState clusterState = this.state.get();
        assert (clusterState != null) : "initial cluster state not set yet";
        return clusterState;
    }

    public boolean isStateInitialised() {
        return this.state.get() != null;
    }

    public boolean isInitialClusterStateSet() {
        return Objects.nonNull(this.state.get());
    }

    public void addHighPriorityApplier(ClusterStateApplier applier) {
        this.highPriorityStateAppliers.add(applier);
    }

    public void addLowPriorityApplier(ClusterStateApplier applier) {
        this.lowPriorityStateAppliers.add(applier);
    }

    public void addStateApplier(ClusterStateApplier applier) {
        this.normalPriorityStateAppliers.add(applier);
    }

    public void removeApplier(ClusterStateApplier applier) {
        this.normalPriorityStateAppliers.remove(applier);
        this.highPriorityStateAppliers.remove(applier);
        this.lowPriorityStateAppliers.remove(applier);
    }

    public void addListener(ClusterStateListener listener) {
        this.clusterStateListeners.add(listener);
    }

    public void removeListener(ClusterStateListener listener) {
        this.clusterStateListeners.remove(listener);
    }

    public void removeTimeoutListener(TimeoutClusterStateListener listener) {
        NotifyTimeout timeout = this.timeoutClusterStateListeners.remove(listener);
        if (timeout != null) {
            timeout.cancel();
        }
    }

    public void addLocalNodeClusterManagerListener(LocalNodeClusterManagerListener listener) {
        this.addListener(listener);
    }

    public void addTimeoutListener(final @Nullable TimeValue timeout, final TimeoutClusterStateListener listener) {
        if (this.lifecycle.stoppedOrClosed()) {
            listener.onClose();
            return;
        }
        try {
            this.threadPoolExecutor.execute(new SourcePrioritizedRunnable(Priority.HIGH, "_add_listener_"){

                @Override
                public void run() {
                    NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
                    NotifyTimeout previous = ClusterApplierService.this.timeoutClusterStateListeners.put(listener, notifyTimeout);
                    assert (previous == null) : "Added same listener [" + String.valueOf(listener) + "]";
                    if (ClusterApplierService.this.lifecycle.stoppedOrClosed()) {
                        listener.onClose();
                        return;
                    }
                    if (timeout != null) {
                        notifyTimeout.cancellable = ClusterApplierService.this.threadPool.schedule(notifyTimeout, timeout, "generic");
                    }
                    listener.postAdded();
                }
            });
        }
        catch (OpenSearchRejectedExecutionException e) {
            if (this.lifecycle.stoppedOrClosed()) {
                listener.onClose();
            }
            throw e;
        }
    }

    public void runOnApplierThread(String source, Consumer<ClusterState> clusterStateConsumer, ClusterApplier.ClusterApplyListener listener, Priority priority) {
        this.submitStateUpdateTask(source, ClusterStateTaskConfig.build(priority), clusterState -> {
            clusterStateConsumer.accept((ClusterState)clusterState);
            return clusterState;
        }, listener);
    }

    public void runOnApplierThread(String source, Consumer<ClusterState> clusterStateConsumer, ClusterApplier.ClusterApplyListener listener) {
        this.runOnApplierThread(source, clusterStateConsumer, listener, Priority.HIGH);
    }

    public ThreadPool threadPool() {
        return this.threadPool;
    }

    @Override
    public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplier.ClusterApplyListener listener) {
        Function<ClusterState, ClusterState> applyFunction = currentState -> {
            ClusterState nextState = (ClusterState)clusterStateSupplier.get();
            if (nextState != null) {
                return nextState;
            }
            return currentState;
        };
        this.submitStateUpdateTask(source, ClusterStateTaskConfig.build(Priority.HIGH), applyFunction, listener);
    }

    public void updateClusterState(String source, Function<ClusterState, ClusterState> updateFunction, ClusterApplier.ClusterApplyListener listener) {
        this.submitStateUpdateTask(source, ClusterStateTaskConfig.build(Priority.HIGH), updateFunction, listener);
    }

    private void submitStateUpdateTask(String source, ClusterStateTaskConfig config, Function<ClusterState, ClusterState> executor, ClusterApplier.ClusterApplyListener listener) {
        block11: {
            if (!this.lifecycle.started()) {
                return;
            }
            ThreadContext threadContext = this.threadPool.getThreadContext();
            Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);
            try (ThreadContext.StoredContext ignore = threadContext.stashContext();){
                ThreadContextAccess.doPrivilegedVoid(threadContext::markAsSystemContext);
                UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterApplyListener(listener, supplier, logger), executor);
                if (config.timeout() != null) {
                    this.threadPoolExecutor.execute(updateTask, config.timeout(), () -> this.threadPool.generic().execute(() -> listener.onFailure(source, (Exception)((Object)new ProcessClusterEventTimeoutException(config.timeout(), source)))));
                } else {
                    this.threadPoolExecutor.execute(updateTask);
                }
            }
            catch (OpenSearchRejectedExecutionException e) {
                if (this.lifecycle.stoppedOrClosed()) break block11;
                throw e;
            }
        }
    }

    public static boolean assertNotClusterStateUpdateThread(String reason) {
        assert (!Thread.currentThread().getName().contains(CLUSTER_UPDATE_THREAD_NAME)) : "Expected current thread [" + String.valueOf(Thread.currentThread()) + "] to not be the cluster state update thread. Reason: [" + reason + "]";
        return true;
    }

    private static boolean assertNotCalledFromClusterStateApplier(String reason) {
        if (Thread.currentThread().getName().contains(CLUSTER_UPDATE_THREAD_NAME)) {
            for (StackTraceElement element : Thread.currentThread().getStackTrace()) {
                String className = element.getClassName();
                String methodName = element.getMethodName();
                if (className.equals(ClusterStateObserver.class.getName())) {
                    return true;
                }
                if (className.equals(ClusterApplierService.class.getName()) && methodName.equals("callClusterStateAppliers")) {
                    throw new AssertionError((Object)("should not be called by a cluster state applier. reason [" + reason + "]"));
                }
            }
        }
        return true;
    }

    private void runTask(UpdateTask task) {
        ClusterState newClusterState;
        if (!this.lifecycle.started()) {
            logger.debug("processing [{}]: ignoring, cluster applier service not started", (Object)task.source);
            return;
        }
        logger.debug("processing [{}]: execute", (Object)task.source);
        ClusterState previousClusterState = this.state.get();
        long startTimeMS = this.currentTimeInMillis();
        StopWatch stopWatch = new StopWatch();
        try (StopWatch.TimingHandle ignored = stopWatch.timing("running task [" + task.source + "]");){
            newClusterState = task.apply(previousClusterState);
        }
        catch (Exception e) {
            TimeValue executionTime = TimeValue.timeValueMillis((long)Math.max(0L, this.currentTimeInMillis() - startTimeMS));
            logger.trace(() -> new ParameterizedMessage("failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}", new Object[]{executionTime, previousClusterState.version(), task.source, previousClusterState}), (Throwable)e);
            this.warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
            task.listener.onFailure(task.source, e);
            return;
        }
        if (previousClusterState == newClusterState) {
            executionTime = TimeValue.timeValueMillis((long)Math.max(0L, this.currentTimeInMillis() - startTimeMS));
            logger.debug("processing [{}]: took [{}] no change in cluster state", (Object)task.source, (Object)executionTime);
            this.warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
            task.listener.onSuccess(task.source);
        } else {
            if (logger.isTraceEnabled()) {
                logger.debug("cluster state updated, version [{}], source [{}]\n{}", (Object)newClusterState.version(), (Object)task.source, (Object)newClusterState);
            } else {
                logger.debug("cluster state updated, version [{}], source [{}]", (Object)newClusterState.version(), (Object)task.source);
            }
            try {
                this.applyChanges(task, previousClusterState, newClusterState, stopWatch);
                executionTime = TimeValue.timeValueMillis((long)Math.max(0L, this.currentTimeInMillis() - startTimeMS));
                logger.debug("processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", (Object)task.source, (Object)executionTime, (Object)newClusterState.version(), (Object)newClusterState.stateUUID());
                this.warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
                task.listener.onSuccess(task.source);
            }
            catch (Exception e) {
                TimeValue executionTime = TimeValue.timeValueMillis((long)Math.max(0L, this.currentTimeInMillis() - startTimeMS));
                if (logger.isTraceEnabled()) {
                    logger.warn((Message)new ParameterizedMessage("failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}", new Object[]{executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source, newClusterState}), (Throwable)e);
                } else {
                    logger.warn((Message)new ParameterizedMessage("failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]", new Object[]{executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source}), (Throwable)e);
                }
                assert (this.applicationMayFail());
                task.listener.onFailure(task.source, e);
            }
        }
    }

    private void applyChanges(UpdateTask task, ClusterState previousClusterState, ClusterState newClusterState, StopWatch stopWatch) {
        String summary;
        ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(task.source, newClusterState, previousClusterState);
        DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
        if (nodesDelta.hasChanges() && logger.isInfoEnabled() && (summary = nodesDelta.shortSummary()).length() > 0) {
            logger.info("{}, term: {}, version: {}, reason: {}", (Object)summary, (Object)newClusterState.term(), (Object)newClusterState.version(), (Object)task.source);
        }
        logger.trace("connecting to nodes of cluster state with version {}", (Object)newClusterState.version());
        try (StopWatch.TimingHandle ignored = stopWatch.timing("connecting to new nodes");){
            this.connectToNodesAndWait(newClusterState);
        }
        if (!clusterChangedEvent.state().blocks().disableStatePersistence() && clusterChangedEvent.metadataChanged()) {
            logger.debug("applying settings from cluster state with version {}", (Object)newClusterState.version());
            Settings incomingSettings = clusterChangedEvent.state().metadata().settings();
            try (StopWatch.TimingHandle ignored = stopWatch.timing("applying settings");){
                this.clusterSettings.applySettings(incomingSettings);
            }
        }
        logger.debug("apply cluster state with version {}", (Object)newClusterState.version());
        this.callClusterStateAppliers(clusterChangedEvent, stopWatch);
        logger.debug("completed calling appliers of cluster state for version {}", (Object)newClusterState.version());
        this.nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes());
        if (this.streamNodeConnectionsService != null) {
            this.streamNodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes());
        }
        assert (newClusterState.coordinationMetadata().getLastAcceptedConfiguration().equals(newClusterState.coordinationMetadata().getLastCommittedConfiguration())) : String.valueOf(newClusterState.coordinationMetadata().getLastAcceptedConfiguration()) + " vs " + String.valueOf(newClusterState.coordinationMetadata().getLastCommittedConfiguration()) + " on " + String.valueOf(newClusterState.nodes().getLocalNode());
        logger.debug("set locally applied cluster state to version {}", (Object)newClusterState.version());
        this.state.set(newClusterState);
        this.callClusterStateListeners(clusterChangedEvent, stopWatch);
        logger.debug("completed calling listeners of cluster state for version {}", (Object)newClusterState.version());
    }

    protected void connectToNodesAndWait(ClusterState newClusterState) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.nodeConnectionsService.connectToNodes(newClusterState.nodes(), countDownLatch::countDown);
        try {
            countDownLatch.await();
        }
        catch (InterruptedException e) {
            logger.debug("interrupted while connecting to nodes, continuing", (Throwable)e);
            Thread.currentThread().interrupt();
        }
        CountDownLatch streamNodeLatch = new CountDownLatch(1);
        if (this.streamNodeConnectionsService != null) {
            this.streamNodeConnectionsService.connectToNodes(newClusterState.nodes(), streamNodeLatch::countDown);
            try {
                streamNodeLatch.await();
            }
            catch (InterruptedException e) {
                logger.debug("interrupted while connecting to nodes, continuing", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        }
    }

    private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {
        this.callClusterStateAppliers(clusterChangedEvent, stopWatch, this.highPriorityStateAppliers);
        this.callClusterStateAppliers(clusterChangedEvent, stopWatch, this.normalPriorityStateAppliers);
        this.callClusterStateAppliers(clusterChangedEvent, stopWatch, this.lowPriorityStateAppliers);
    }

    private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch, Collection<ClusterStateApplier> clusterStateAppliers) {
        for (ClusterStateApplier applier : clusterStateAppliers) {
            logger.trace("calling [{}] with change to version [{}]", (Object)applier, (Object)clusterChangedEvent.state().version());
            StopWatch.TimingHandle ignored = stopWatch.timing("running applier [" + String.valueOf(applier) + "]");
            try {
                long applierStartTimeNS = System.nanoTime();
                applier.applyClusterState(clusterChangedEvent);
                this.clusterManagerMetrics.recordLatency(this.clusterManagerMetrics.clusterStateAppliersHistogram, Double.valueOf(Math.max(0L, TimeValue.nsecToMSec((long)(System.nanoTime() - applierStartTimeNS)))), Optional.of(Tags.create().addTag("Operation", applier.getClass().getSimpleName())));
            }
            finally {
                if (ignored == null) continue;
                ignored.close();
            }
        }
    }

    private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {
        this.callClusterStateListener(clusterChangedEvent, stopWatch, this.clusterStateListeners);
        this.callClusterStateListener(clusterChangedEvent, stopWatch, this.timeoutClusterStateListeners.keySet());
    }

    private void callClusterStateListener(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch, Collection<? extends ClusterStateListener> listeners) {
        for (ClusterStateListener clusterStateListener : listeners) {
            try {
                logger.trace("calling [{}] with change to version [{}]", (Object)clusterStateListener, (Object)clusterChangedEvent.state().version());
                StopWatch.TimingHandle ignored = stopWatch.timing("notifying listener [" + String.valueOf(clusterStateListener) + "]");
                try {
                    long listenerStartTimeNS = System.nanoTime();
                    clusterStateListener.clusterChanged(clusterChangedEvent);
                    this.clusterManagerMetrics.recordLatency(this.clusterManagerMetrics.clusterStateListenersHistogram, Double.valueOf(Math.max(0L, TimeValue.nsecToMSec((long)(System.nanoTime() - listenerStartTimeNS)))), Optional.of(Tags.create().addTag("Operation", clusterStateListener.getClass().getSimpleName())));
                }
                finally {
                    if (ignored == null) continue;
                    ignored.close();
                }
            }
            catch (Exception ex) {
                logger.warn("failed to notify ClusterStateListener", (Throwable)ex);
            }
        }
    }

    private void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source, StopWatch stopWatch) {
        if (executionTime.getMillis() > this.slowTaskLoggingThreshold.getMillis()) {
            logger.warn("cluster state applier task [{}] took [{}] which is above the warn threshold of [{}]: {}", (Object)source, (Object)executionTime, (Object)this.slowTaskLoggingThreshold, (Object)Arrays.stream(stopWatch.taskInfo()).map(ti -> "[" + ti.getTaskName() + "] took [" + ti.getTime().millis() + "ms]").collect(Collectors.joining(", ")));
        }
    }

    protected long currentTimeInMillis() {
        return this.threadPool.relativeTimeInMillis();
    }

    protected boolean applicationMayFail() {
        return false;
    }

    public ClusterState preCommitState() {
        return this.preCommitState.get();
    }

    @Override
    public void setPreCommitState(ClusterState clusterState) {
        this.preCommitState.set(clusterState);
    }

    private class NotifyTimeout
    implements Runnable {
        final TimeoutClusterStateListener listener;
        @Nullable
        final TimeValue timeout;
        volatile Scheduler.Cancellable cancellable;

        NotifyTimeout(@Nullable TimeoutClusterStateListener listener, TimeValue timeout) {
            this.listener = listener;
            this.timeout = timeout;
        }

        public void cancel() {
            if (this.cancellable != null) {
                this.cancellable.cancel();
            }
        }

        @Override
        public void run() {
            assert (this.timeout != null) : "This should only ever execute if there's an actual timeout set";
            if (this.cancellable != null && this.cancellable.isCancelled()) {
                return;
            }
            if (ClusterApplierService.this.lifecycle.stoppedOrClosed()) {
                this.listener.onClose();
            } else {
                this.listener.onTimeout(this.timeout);
            }
        }
    }

    class UpdateTask
    extends SourcePrioritizedRunnable
    implements Function<ClusterState, ClusterState> {
        final ClusterApplier.ClusterApplyListener listener;
        final Function<ClusterState, ClusterState> updateFunction;

        UpdateTask(Priority priority, String source, ClusterApplier.ClusterApplyListener listener, Function<ClusterState, ClusterState> updateFunction) {
            super(priority, source);
            this.listener = listener;
            this.updateFunction = updateFunction;
        }

        @Override
        public ClusterState apply(ClusterState clusterState) {
            return this.updateFunction.apply(clusterState);
        }

        @Override
        public void run() {
            ClusterApplierService.this.runTask(this);
        }
    }

    private static class SafeClusterApplyListener
    implements ClusterApplier.ClusterApplyListener {
        private final ClusterApplier.ClusterApplyListener listener;
        protected final Supplier<ThreadContext.StoredContext> context;
        private final Logger logger;

        SafeClusterApplyListener(ClusterApplier.ClusterApplyListener listener, Supplier<ThreadContext.StoredContext> context, Logger logger) {
            this.listener = listener;
            this.context = context;
            this.logger = logger;
        }

        @Override
        public void onFailure(String source, Exception e) {
            try (ThreadContext.StoredContext ignore = this.context.get();){
                this.listener.onFailure(source, e);
            }
            catch (Exception inner) {
                inner.addSuppressed(e);
                this.logger.error((Message)new ParameterizedMessage("exception thrown by listener notifying of failure from [{}]", (Object)source), (Throwable)inner);
            }
        }

        @Override
        public void onSuccess(String source) {
            try (ThreadContext.StoredContext ignore = this.context.get();){
                this.listener.onSuccess(source);
            }
            catch (Exception e) {
                this.logger.error((Message)new ParameterizedMessage("exception thrown by listener while notifying of cluster state processed from [{}]", (Object)source), (Throwable)e);
            }
        }
    }
}

