/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.cluster;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.PlainListenableActionFuture;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.core.action.ActionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

@PublicApi(since="1.0.0")
public class NodeConnectionsService
extends AbstractLifecycleComponent {
    private static final Logger logger = LogManager.getLogger(NodeConnectionsService.class);
    public static final Setting<TimeValue> CLUSTER_NODE_RECONNECT_INTERVAL_SETTING = Setting.positiveTimeSetting("cluster.nodes.reconnect_interval", TimeValue.timeValueSeconds((long)10L), Setting.Property.NodeScope);
    private final ThreadPool threadPool;
    private final TransportService transportService;
    private final Object mutex = new Object();
    private final Map<DiscoveryNode, ConnectionTarget> targetsByNode = new HashMap<DiscoveryNode, ConnectionTarget>();
    private final TimeValue reconnectInterval;
    private volatile ConnectionChecker connectionChecker;

    @Inject
    public NodeConnectionsService(Settings settings, ThreadPool threadPool, TransportService transportService) {
        this.threadPool = threadPool;
        this.transportService = transportService;
        this.reconnectInterval = CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion) {
        if (discoveryNodes.getSize() == 0) {
            onCompletion.run();
            return;
        }
        GroupedActionListener<Void> listener = new GroupedActionListener<Void>(ActionListener.wrap((Runnable)onCompletion), discoveryNodes.getSize());
        ArrayList<Runnable> runnables = new ArrayList<Runnable>(discoveryNodes.getSize());
        Object object = this.mutex;
        synchronized (object) {
            for (DiscoveryNode discoveryNode : discoveryNodes) {
                boolean isNewNode;
                ConnectionTarget connectionTarget = this.targetsByNode.get(discoveryNode);
                if (connectionTarget == null) {
                    connectionTarget = new ConnectionTarget(discoveryNode);
                    this.targetsByNode.put(discoveryNode, connectionTarget);
                    isNewNode = true;
                } else {
                    isNewNode = connectionTarget.isPendingDisconnection();
                }
                if (isNewNode) {
                    runnables.add(connectionTarget.connect(listener));
                    continue;
                }
                runnables.add(connectionTarget.connect(null));
                runnables.add(() -> listener.onResponse(null));
            }
        }
        runnables.forEach(Runnable::run);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) {
        ArrayList<Runnable> runnables = new ArrayList<Runnable>();
        Object object = this.mutex;
        synchronized (object) {
            HashSet<DiscoveryNode> nodesToDisconnect = new HashSet<DiscoveryNode>(this.targetsByNode.keySet());
            for (DiscoveryNode discoveryNode : discoveryNodes) {
                nodesToDisconnect.remove(discoveryNode);
            }
            for (DiscoveryNode discoveryNode : nodesToDisconnect) {
                runnables.add(this.targetsByNode.get(discoveryNode).disconnect());
            }
        }
        runnables.forEach(Runnable::run);
    }

    void ensureConnections(Runnable onCompletion) {
        this.awaitPendingActivity(() -> this.connectDisconnectedTargets(onCompletion));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void awaitPendingActivity(Runnable onCompletion) {
        ArrayList<Runnable> runnables = new ArrayList<Runnable>();
        Object object = this.mutex;
        synchronized (object) {
            Collection<ConnectionTarget> connectionTargets = this.targetsByNode.values();
            if (connectionTargets.isEmpty()) {
                runnables.add(onCompletion);
            } else {
                GroupedActionListener<Void> listener = new GroupedActionListener<Void>(ActionListener.wrap((Runnable)onCompletion), connectionTargets.size());
                for (ConnectionTarget connectionTarget : connectionTargets) {
                    runnables.add(connectionTarget.awaitCurrentActivity(listener));
                }
            }
        }
        runnables.forEach(Runnable::run);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void connectDisconnectedTargets(Runnable onCompletion) {
        ArrayList<Runnable> runnables = new ArrayList<Runnable>();
        Object object = this.mutex;
        synchronized (object) {
            Collection<ConnectionTarget> connectionTargets = this.targetsByNode.values();
            if (connectionTargets.isEmpty()) {
                runnables.add(onCompletion);
            } else {
                logger.trace("connectDisconnectedTargets: {}", this.targetsByNode);
                GroupedActionListener<Void> listener = new GroupedActionListener<Void>(ActionListener.wrap((Runnable)onCompletion), connectionTargets.size());
                for (ConnectionTarget connectionTarget : connectionTargets) {
                    runnables.add(connectionTarget.ensureConnected(listener));
                }
            }
        }
        runnables.forEach(Runnable::run);
    }

    protected void doStart() {
        ConnectionChecker connectionChecker;
        this.connectionChecker = connectionChecker = new ConnectionChecker();
        connectionChecker.scheduleNextCheck();
    }

    protected void doStop() {
        this.connectionChecker = null;
    }

    protected void doClose() {
    }

    public void reconnectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion) {
        this.connectToNodes(discoveryNodes, () -> {
            this.disconnectFromNodesExcept(discoveryNodes);
            this.ensureConnections(onCompletion);
        });
    }

    private class ConnectionTarget {
        private final DiscoveryNode discoveryNode;
        private PlainListenableActionFuture<Void> future = PlainListenableActionFuture.newListenableFuture();
        private ActivityType activityType = ActivityType.IDLE;
        private final AtomicInteger consecutiveFailureCount = new AtomicInteger();
        private final Runnable connectActivity = new AbstractRunnable(){
            final AbstractRunnable abstractRunnable = this;

            @Override
            protected void doRun() {
                assert (!Thread.holdsLock(NodeConnectionsService.this.mutex)) : "mutex unexpectedly held";
                if (NodeConnectionsService.this.transportService.nodeConnected(ConnectionTarget.this.discoveryNode)) {
                    logger.trace("still connected to {}", (Object)ConnectionTarget.this.discoveryNode);
                    this.onConnected();
                } else {
                    logger.debug("connecting to {}", (Object)ConnectionTarget.this.discoveryNode);
                    NodeConnectionsService.this.transportService.connectToNode(ConnectionTarget.this.discoveryNode, new ActionListener<Void>(){

                        public void onResponse(Void aVoid) {
                            assert (!Thread.holdsLock(NodeConnectionsService.this.mutex)) : "mutex unexpectedly held";
                            logger.debug("connected to {}", (Object)ConnectionTarget.this.discoveryNode);
                            this.onConnected();
                        }

                        public void onFailure(Exception e) {
                            abstractRunnable.onFailure(e);
                        }
                    });
                }
            }

            private void onConnected() {
                ConnectionTarget.this.consecutiveFailureCount.set(0);
                ConnectionTarget.this.onCompletion(ActivityType.CONNECTING, null, ConnectionTarget.this.disconnectActivity);
            }

            @Override
            public void onFailure(Exception e) {
                assert (!Thread.holdsLock(NodeConnectionsService.this.mutex)) : "mutex unexpectedly held";
                int currentFailureCount = ConnectionTarget.this.consecutiveFailureCount.incrementAndGet();
                Level level = currentFailureCount % 6 == 1 ? Level.WARN : Level.DEBUG;
                logger.log(level, (Message)new ParameterizedMessage("failed to connect to {} (tried [{}] times)", (Object)ConnectionTarget.this.discoveryNode, (Object)currentFailureCount), (Throwable)e);
                ConnectionTarget.this.onCompletion(ActivityType.CONNECTING, e, ConnectionTarget.this.disconnectActivity);
            }

            public String toString() {
                return "connect to " + ConnectionTarget.this.discoveryNode;
            }
        };
        private final Runnable disconnectActivity = new AbstractRunnable(){

            @Override
            protected void doRun() {
                assert (!Thread.holdsLock(NodeConnectionsService.this.mutex)) : "mutex unexpectedly held";
                NodeConnectionsService.this.transportService.disconnectFromNode(ConnectionTarget.this.discoveryNode);
                ConnectionTarget.this.consecutiveFailureCount.set(0);
                logger.debug("disconnected from {}", (Object)ConnectionTarget.this.discoveryNode);
                ConnectionTarget.this.onCompletion(ActivityType.DISCONNECTING, null, ConnectionTarget.this.connectActivity);
            }

            @Override
            public void onFailure(Exception e) {
                assert (!Thread.holdsLock(NodeConnectionsService.this.mutex)) : "mutex unexpectedly held";
                ConnectionTarget.this.consecutiveFailureCount.incrementAndGet();
                logger.warn((Message)new ParameterizedMessage("failed to disconnect from {}, possible connection leak", (Object)ConnectionTarget.this.discoveryNode), (Throwable)e);
                assert (false) : "failed to disconnect from " + ConnectionTarget.this.discoveryNode + ", possible connection leak\n" + e;
                ConnectionTarget.this.onCompletion(ActivityType.DISCONNECTING, e, ConnectionTarget.this.connectActivity);
            }
        };

        ConnectionTarget(DiscoveryNode discoveryNode) {
            this.discoveryNode = discoveryNode;
        }

        Runnable connect(@Nullable ActionListener<Void> listener) {
            return this.addListenerAndStartActivity(listener, ActivityType.CONNECTING, this.connectActivity, "disconnection cancelled by reconnection");
        }

        Runnable disconnect() {
            return this.addListenerAndStartActivity(null, ActivityType.DISCONNECTING, this.disconnectActivity, "connection cancelled by disconnection");
        }

        Runnable ensureConnected(@Nullable ActionListener<Void> listener) {
            assert (Thread.holdsLock(NodeConnectionsService.this.mutex)) : "mutex not held";
            if (this.activityType == ActivityType.IDLE) {
                if (NodeConnectionsService.this.transportService.nodeConnected(this.discoveryNode)) {
                    return () -> listener.onResponse(null);
                }
                this.activityType = ActivityType.CONNECTING;
                this.addListener(listener);
                return this.connectActivity;
            }
            this.addListener(listener);
            return () -> {};
        }

        Runnable awaitCurrentActivity(ActionListener<Void> listener) {
            assert (Thread.holdsLock(NodeConnectionsService.this.mutex)) : "mutex not held";
            if (this.activityType == ActivityType.IDLE) {
                return () -> listener.onResponse(null);
            }
            this.addListener(listener);
            return () -> {};
        }

        private void addListener(@Nullable ActionListener<Void> listener) {
            assert (Thread.holdsLock(NodeConnectionsService.this.mutex)) : "mutex not held";
            assert (this.activityType != ActivityType.IDLE);
            if (listener != null) {
                this.future.addListener(listener);
            }
        }

        private PlainListenableActionFuture<Void> getAndClearFuture() {
            assert (Thread.holdsLock(NodeConnectionsService.this.mutex)) : "mutex not held";
            PlainListenableActionFuture<Void> drainedFuture = this.future;
            this.future = PlainListenableActionFuture.newListenableFuture();
            return drainedFuture;
        }

        private Runnable addListenerAndStartActivity(@Nullable ActionListener<Void> listener, ActivityType newActivityType, Runnable activity, String cancellationMessage) {
            assert (Thread.holdsLock(NodeConnectionsService.this.mutex)) : "mutex not held";
            assert (!newActivityType.equals((Object)ActivityType.IDLE));
            if (this.activityType == ActivityType.IDLE) {
                this.activityType = newActivityType;
                this.addListener(listener);
                return activity;
            }
            if (this.activityType == newActivityType) {
                this.addListener(listener);
                return () -> {};
            }
            this.activityType = newActivityType;
            PlainListenableActionFuture<Void> oldFuture = this.getAndClearFuture();
            this.addListener(listener);
            return () -> oldFuture.onFailure((Exception)((Object)new OpenSearchException(cancellationMessage, new Object[0])));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void onCompletion(ActivityType completedActivityType, @Nullable Exception e, Runnable oppositeActivity) {
            Runnable cleanup;
            assert (!Thread.holdsLock(NodeConnectionsService.this.mutex)) : "mutex unexpectedly held";
            Object object = NodeConnectionsService.this.mutex;
            synchronized (object) {
                assert (this.activityType != ActivityType.IDLE);
                if (this.activityType == completedActivityType) {
                    PlainListenableActionFuture<Void> oldFuture = this.getAndClearFuture();
                    this.activityType = ActivityType.IDLE;
                    Runnable runnable = cleanup = e == null ? () -> oldFuture.onResponse(null) : () -> oldFuture.onFailure(e);
                    if (completedActivityType.equals((Object)ActivityType.DISCONNECTING)) {
                        ConnectionTarget removedTarget = NodeConnectionsService.this.targetsByNode.remove(this.discoveryNode);
                        assert (removedTarget == this) : removedTarget + " vs " + this;
                    }
                } else {
                    cleanup = oppositeActivity;
                }
            }
            cleanup.run();
        }

        boolean isPendingDisconnection() {
            assert (Thread.holdsLock(NodeConnectionsService.this.mutex)) : "mutex not held";
            return this.activityType == ActivityType.DISCONNECTING;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public String toString() {
            Object object = NodeConnectionsService.this.mutex;
            synchronized (object) {
                return "ConnectionTarget{discoveryNode=" + this.discoveryNode + ", activityType=" + this.activityType + "}";
            }
        }
    }

    private static enum ActivityType {
        IDLE,
        CONNECTING,
        DISCONNECTING;

    }

    class ConnectionChecker
    extends AbstractRunnable {
        ConnectionChecker() {
        }

        @Override
        protected void doRun() {
            if (NodeConnectionsService.this.connectionChecker == this) {
                NodeConnectionsService.this.connectDisconnectedTargets(this::scheduleNextCheck);
            }
        }

        void scheduleNextCheck() {
            if (NodeConnectionsService.this.connectionChecker == this) {
                NodeConnectionsService.this.threadPool.scheduleUnlessShuttingDown(NodeConnectionsService.this.reconnectInterval, "generic", this);
            }
        }

        @Override
        public void onFailure(Exception e) {
            logger.warn("unexpected error while checking for node reconnects", (Throwable)e);
            this.scheduleNextCheck();
        }

        public String toString() {
            return "periodic reconnection checker";
        }
    }
}

