/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.cluster.coordination;

import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.coordination.CoordinationStateRejectedException;
import org.opensearch.cluster.coordination.Coordinator;
import org.opensearch.cluster.coordination.NodeHealthCheckFailureException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportConnectionListener;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

public class FollowersChecker {
    private static final Logger logger = LogManager.getLogger(FollowersChecker.class);
    public static final String FOLLOWER_CHECK_ACTION_NAME = "internal:coordination/fault_detection/follower_check";
    public static final Setting<TimeValue> FOLLOWER_CHECK_INTERVAL_SETTING = Setting.timeSetting("cluster.fault_detection.follower_check.interval", TimeValue.timeValueMillis((long)1000L), TimeValue.timeValueMillis((long)100L), Setting.Property.NodeScope);
    public static final Setting<TimeValue> FOLLOWER_CHECK_TIMEOUT_SETTING = Setting.timeSetting("cluster.fault_detection.follower_check.timeout", TimeValue.timeValueMillis((long)10000L), TimeValue.timeValueMillis((long)1L), TimeValue.timeValueMillis((long)60000L), Setting.Property.NodeScope, Setting.Property.Dynamic);
    public static final Setting<Integer> FOLLOWER_CHECK_RETRY_COUNT_SETTING = Setting.intSetting("cluster.fault_detection.follower_check.retry_count", 3, 1, Setting.Property.NodeScope);
    private final Settings settings;
    private final TimeValue followerCheckInterval;
    private TimeValue followerCheckTimeout;
    private final int followerCheckRetryCount;
    private final BiConsumer<DiscoveryNode, String> onNodeFailure;
    private final Consumer<FollowerCheckRequest> handleRequestAndUpdateState;
    private final Object mutex = new Object();
    private final Map<DiscoveryNode, FollowerChecker> followerCheckers = ConcurrentCollections.newConcurrentMap();
    private final Set<DiscoveryNode> faultyNodes = new HashSet<DiscoveryNode>();
    private final TransportService transportService;
    private final NodeHealthService nodeHealthService;
    private volatile FastResponseState fastResponseState;
    private ClusterManagerMetrics clusterManagerMetrics;

    public FollowersChecker(Settings settings, ClusterSettings clusterSettings, TransportService transportService, Consumer<FollowerCheckRequest> handleRequestAndUpdateState, BiConsumer<DiscoveryNode, String> onNodeFailure, NodeHealthService nodeHealthService, ClusterManagerMetrics clusterManagerMetrics) {
        this.settings = settings;
        this.transportService = transportService;
        this.handleRequestAndUpdateState = handleRequestAndUpdateState;
        this.onNodeFailure = onNodeFailure;
        this.nodeHealthService = nodeHealthService;
        this.followerCheckInterval = FOLLOWER_CHECK_INTERVAL_SETTING.get(settings);
        this.followerCheckTimeout = FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings);
        this.followerCheckRetryCount = FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings);
        clusterSettings.addSettingsUpdateConsumer(FOLLOWER_CHECK_TIMEOUT_SETTING, this::setFollowerCheckTimeout);
        this.updateFastResponseState(0L, Coordinator.Mode.CANDIDATE);
        transportService.registerRequestHandler(FOLLOWER_CHECK_ACTION_NAME, "same", false, false, FollowerCheckRequest::new, (request, transportChannel, task) -> this.handleFollowerCheck((FollowerCheckRequest)request, transportChannel));
        transportService.addConnectionListener(new TransportConnectionListener(){

            @Override
            public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
                FollowersChecker.this.handleDisconnectedNode(node);
            }
        });
        this.clusterManagerMetrics = clusterManagerMetrics;
    }

    private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) {
        this.followerCheckTimeout = followerCheckTimeout;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setCurrentNodes(DiscoveryNodes discoveryNodes) {
        Object object = this.mutex;
        synchronized (object) {
            Predicate<DiscoveryNode> isUnknownNode = n -> !discoveryNodes.nodeExists((DiscoveryNode)n);
            this.followerCheckers.keySet().removeIf(isUnknownNode);
            this.faultyNodes.removeIf(isUnknownNode);
            discoveryNodes.clusterManagersFirstStream().forEach(discoveryNode -> {
                if (!(discoveryNode.equals(discoveryNodes.getLocalNode()) || this.followerCheckers.containsKey(discoveryNode) || this.faultyNodes.contains(discoveryNode))) {
                    FollowerChecker followerChecker = new FollowerChecker((DiscoveryNode)discoveryNode);
                    this.followerCheckers.put((DiscoveryNode)discoveryNode, followerChecker);
                    followerChecker.start();
                }
            });
        }
    }

    public void clearCurrentNodes() {
        this.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
    }

    public void updateFastResponseState(long term, Coordinator.Mode mode) {
        this.fastResponseState = new FastResponseState(term, mode);
    }

    private void handleFollowerCheck(final FollowerCheckRequest request, final TransportChannel transportChannel) throws IOException {
        StatusInfo statusInfo = this.nodeHealthService.getHealth();
        if (statusInfo.getStatus() == StatusInfo.Status.UNHEALTHY) {
            String message = "handleFollowerCheck: node is unhealthy [" + statusInfo.getInfo() + "], rejecting " + statusInfo.getInfo();
            logger.debug(message);
            throw new NodeHealthCheckFailureException(message, new Object[0]);
        }
        FastResponseState responder = this.fastResponseState;
        if (responder.mode == Coordinator.Mode.FOLLOWER && responder.term == request.term) {
            logger.trace("responding to {} on fast path", (Object)request);
            transportChannel.sendResponse((TransportResponse)TransportResponse.Empty.INSTANCE);
            return;
        }
        if (request.term < responder.term) {
            throw new CoordinationStateRejectedException("rejecting " + request + " since local state is " + this, new Object[0]);
        }
        this.transportService.getThreadPool().generic().execute(new AbstractRunnable(){

            @Override
            protected void doRun() throws IOException {
                logger.trace("responding to {} on slow path", (Object)request);
                try {
                    FollowersChecker.this.handleRequestAndUpdateState.accept(request);
                }
                catch (Exception e) {
                    transportChannel.sendResponse(e);
                    return;
                }
                transportChannel.sendResponse((TransportResponse)TransportResponse.Empty.INSTANCE);
            }

            @Override
            public void onFailure(Exception e) {
                logger.debug((Message)new ParameterizedMessage("exception while responding to {}", (Object)request), (Throwable)e);
            }

            public String toString() {
                return "slow path response to " + request;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Set<DiscoveryNode> getFaultyNodes() {
        Object object = this.mutex;
        synchronized (object) {
            return new HashSet<DiscoveryNode>(this.faultyNodes);
        }
    }

    public String toString() {
        return "FollowersChecker{followerCheckInterval=" + this.followerCheckInterval + ", followerCheckTimeout=" + this.followerCheckTimeout + ", followerCheckRetryCount=" + this.followerCheckRetryCount + ", followerCheckers=" + this.followerCheckers + ", faultyNodes=" + this.faultyNodes + ", fastResponseState=" + this.fastResponseState + "}";
    }

    FastResponseState getFastResponseState() {
        return this.fastResponseState;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Set<DiscoveryNode> getKnownFollowers() {
        Object object = this.mutex;
        synchronized (object) {
            HashSet<DiscoveryNode> knownFollowers = new HashSet<DiscoveryNode>(this.faultyNodes);
            knownFollowers.addAll(this.followerCheckers.keySet());
            return knownFollowers;
        }
    }

    private void handleDisconnectedNode(DiscoveryNode discoveryNode) {
        FollowerChecker followerChecker = this.followerCheckers.get(discoveryNode);
        if (followerChecker != null) {
            logger.info(() -> new ParameterizedMessage("{} disconnected", (Object)followerChecker));
            followerChecker.failNode("disconnected");
        }
    }

    public static class FollowerCheckRequest
    extends TransportRequest {
        private final long term;
        private final DiscoveryNode sender;

        public long getTerm() {
            return this.term;
        }

        public DiscoveryNode getSender() {
            return this.sender;
        }

        public FollowerCheckRequest(long term, DiscoveryNode sender) {
            this.term = term;
            this.sender = sender;
        }

        public FollowerCheckRequest(StreamInput in) throws IOException {
            super(in);
            this.term = in.readLong();
            this.sender = new DiscoveryNode(in);
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeLong(this.term);
            this.sender.writeTo(out);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            FollowerCheckRequest that = (FollowerCheckRequest)o;
            return this.term == that.term && Objects.equals(this.sender, that.sender);
        }

        public String toString() {
            return "FollowerCheckRequest{term=" + this.term + ", sender=" + this.sender + "}";
        }

        public int hashCode() {
            return Objects.hash(this.term, this.sender);
        }
    }

    private class FollowerChecker {
        private final DiscoveryNode discoveryNode;
        private int failureCountSinceLastSuccess;

        FollowerChecker(DiscoveryNode discoveryNode) {
            this.discoveryNode = discoveryNode;
        }

        private boolean running() {
            return this == FollowersChecker.this.followerCheckers.get(this.discoveryNode);
        }

        void start() {
            assert (this.running());
            this.handleWakeUp();
        }

        private void handleWakeUp() {
            if (!this.running()) {
                logger.trace("handleWakeUp: not running");
                return;
            }
            FollowerCheckRequest request = new FollowerCheckRequest(FollowersChecker.this.fastResponseState.term, FollowersChecker.this.transportService.getLocalNode());
            logger.trace("handleWakeUp: checking {} with {}", (Object)this.discoveryNode, (Object)request);
            FollowersChecker.this.transportService.sendRequest(this.discoveryNode, FollowersChecker.FOLLOWER_CHECK_ACTION_NAME, (TransportRequest)request, TransportRequestOptions.builder().withTimeout(FollowersChecker.this.followerCheckTimeout).withType(TransportRequestOptions.Type.PING).build(), new TransportResponseHandler<TransportResponse.Empty>(){

                public TransportResponse.Empty read(StreamInput in) {
                    return TransportResponse.Empty.INSTANCE;
                }

                @Override
                public void handleResponse(TransportResponse.Empty response) {
                    if (!FollowerChecker.this.running()) {
                        logger.trace("{} no longer running", (Object)FollowerChecker.this);
                        return;
                    }
                    FollowerChecker.this.failureCountSinceLastSuccess = 0;
                    logger.trace("{} check successful", (Object)FollowerChecker.this);
                    FollowerChecker.this.scheduleNextWakeUp();
                }

                @Override
                public void handleException(TransportException exp) {
                    String reason;
                    if (!FollowerChecker.this.running()) {
                        logger.debug((Message)new ParameterizedMessage("{} no longer running", (Object)FollowerChecker.this), (Throwable)((Object)exp));
                        return;
                    }
                    ++FollowerChecker.this.failureCountSinceLastSuccess;
                    if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
                        logger.info(() -> new ParameterizedMessage("{} disconnected", (Object)FollowerChecker.this), (Throwable)((Object)exp));
                        reason = "disconnected";
                    } else if (exp.getCause() instanceof NodeHealthCheckFailureException) {
                        logger.info(() -> new ParameterizedMessage("{} health check failed", (Object)FollowerChecker.this), (Throwable)((Object)exp));
                        reason = "health check failed";
                    } else if (FollowerChecker.this.failureCountSinceLastSuccess >= FollowersChecker.this.followerCheckRetryCount) {
                        logger.info(() -> new ParameterizedMessage("{} failed too many times", (Object)FollowerChecker.this), (Throwable)((Object)exp));
                        reason = "followers check retry count exceeded";
                    } else {
                        logger.info(() -> new ParameterizedMessage("{} failed, retrying", (Object)FollowerChecker.this), (Throwable)((Object)exp));
                        FollowerChecker.this.scheduleNextWakeUp();
                        return;
                    }
                    FollowerChecker.this.failNode(reason);
                }

                @Override
                public String executor() {
                    return "same";
                }
            });
        }

        void failNode(final String reason) {
            FollowersChecker.this.clusterManagerMetrics.incrementCounter(FollowersChecker.this.clusterManagerMetrics.followerChecksFailureCounter, 1.0);
            FollowersChecker.this.transportService.getThreadPool().generic().execute(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    Object object = FollowersChecker.this.mutex;
                    synchronized (object) {
                        if (!FollowerChecker.this.running()) {
                            logger.trace("{} no longer running, not marking faulty", (Object)FollowerChecker.this);
                            return;
                        }
                        logger.info("{} marking node as faulty", (Object)FollowerChecker.this);
                        FollowersChecker.this.faultyNodes.add(FollowerChecker.this.discoveryNode);
                        FollowersChecker.this.followerCheckers.remove(FollowerChecker.this.discoveryNode);
                    }
                    FollowersChecker.this.onNodeFailure.accept(FollowerChecker.this.discoveryNode, reason);
                }

                public String toString() {
                    return "detected failure of " + FollowerChecker.this.discoveryNode;
                }
            });
        }

        private void scheduleNextWakeUp() {
            FollowersChecker.this.transportService.getThreadPool().schedule(new Runnable(){

                @Override
                public void run() {
                    FollowerChecker.this.handleWakeUp();
                }

                public String toString() {
                    return FollowerChecker.this + "::handleWakeUp";
                }
            }, FollowersChecker.this.followerCheckInterval, "same");
        }

        public String toString() {
            return "FollowerChecker{discoveryNode=" + this.discoveryNode + ", failureCountSinceLastSuccess=" + this.failureCountSinceLastSuccess + ", [" + FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey() + "]=" + FollowersChecker.this.followerCheckRetryCount + "}";
        }
    }

    static class FastResponseState {
        final long term;
        final Coordinator.Mode mode;

        FastResponseState(long term, Coordinator.Mode mode) {
            this.term = term;
            this.mode = mode;
        }

        public String toString() {
            return "FastResponseState{term=" + this.term + ", mode=" + this.mode + "}";
        }
    }
}

