/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zookeeper.server.quorum;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.quorum.Election;
import org.apache.zookeeper.server.quorum.LeaderElectionBean;
import org.apache.zookeeper.server.quorum.QuorumCnxManager;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.Vote;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class FastLeaderElection
implements Election {
    private static final Logger LOG = Logger.getLogger(FastLeaderElection.class);
    static final int finalizeWait = 200;
    static final int maxNotificationInterval = 60000;
    QuorumCnxManager manager;
    LinkedBlockingQueue<ToSend> sendqueue;
    LinkedBlockingQueue<Notification> recvqueue;
    QuorumPeer self;
    Messenger messenger;
    volatile long logicalclock;
    long proposedLeader;
    long proposedZxid;
    volatile boolean stop = false;

    public long getLogicalClock() {
        return this.logicalclock;
    }

    public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
        this.manager = manager;
        this.starter(self, manager);
    }

    private void starter(QuorumPeer self, QuorumCnxManager manager) {
        this.self = self;
        this.proposedLeader = -1L;
        this.proposedZxid = -1L;
        this.sendqueue = new LinkedBlockingQueue();
        this.recvqueue = new LinkedBlockingQueue();
        this.messenger = new Messenger(manager);
    }

    private void leaveInstance() {
        this.recvqueue.clear();
    }

    public QuorumCnxManager getCnxManager() {
        return this.manager;
    }

    @Override
    public void shutdown() {
        this.stop = true;
        LOG.debug((Object)"Shutting down connection manager");
        this.manager.halt();
        LOG.debug((Object)"Shutting down messenger");
        this.messenger.halt();
        LOG.debug((Object)"FLE is down");
    }

    private void sendNotifications() {
        for (QuorumPeer.QuorumServer server : this.self.getVotingView().values()) {
            long sid = server.id;
            ToSend notmsg = new ToSend(ToSend.mType.notification, this.proposedLeader, this.proposedZxid, this.logicalclock, QuorumPeer.ServerState.LOOKING, sid);
            this.sendqueue.offer(notmsg);
        }
    }

    private boolean totalOrderPredicate(long newId, long newZxid, long curId, long curZxid) {
        LOG.debug((Object)("id: " + newId + ", proposed id: " + curId + ", zxid: " + newZxid + ", proposed zxid: " + curZxid));
        if (this.self.getQuorumVerifier().getWeight(newId) == 0L) {
            return false;
        }
        return newZxid > curZxid || newZxid == curZxid && newId > curId;
    }

    private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
        HashSet<Long> set = new HashSet<Long>();
        for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
            if (!vote.equals(entry.getValue())) continue;
            set.add(entry.getKey());
        }
        return this.self.getQuorumVerifier().containsQuorum(set);
    }

    private boolean checkLeader(HashMap<Long, Vote> votes, long leader, long epoch) {
        boolean predicate = true;
        if (leader != this.self.getId()) {
            if (votes.get(leader) == null) {
                predicate = false;
            } else if (votes.get((Object)Long.valueOf((long)leader)).state != QuorumPeer.ServerState.LEADING) {
                predicate = false;
            }
        }
        return predicate;
    }

    synchronized void updateProposal(long leader, long zxid) {
        this.proposedLeader = leader;
        this.proposedZxid = zxid;
    }

    synchronized Vote getVote() {
        return new Vote(this.proposedLeader, this.proposedZxid);
    }

    private QuorumPeer.ServerState learningState() {
        if (this.self.getPeerType() == QuorumPeer.LearnerType.PARTICIPANT) {
            LOG.debug((Object)("I'm a participant: " + this.self.getId()));
            return QuorumPeer.ServerState.FOLLOWING;
        }
        LOG.debug((Object)("I'm an observer: " + this.self.getId()));
        return QuorumPeer.ServerState.OBSERVING;
    }

    private long getInitId() {
        if (this.self.getPeerType() == QuorumPeer.LearnerType.PARTICIPANT) {
            return this.self.getId();
        }
        return Long.MIN_VALUE;
    }

    private long getInitLastLoggedZxid() {
        if (this.self.getPeerType() == QuorumPeer.LearnerType.PARTICIPANT) {
            return this.self.getLastLoggedZxid();
        }
        return Long.MIN_VALUE;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Vote lookForLeader() throws InterruptedException {
        try {
            this.self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(this.self.jmxLeaderElectionBean, this.self.jmxLocalPeerBean);
        }
        catch (Exception e) {
            LOG.warn((Object)"Failed to register with JMX", (Throwable)e);
            this.self.jmxLeaderElectionBean = null;
        }
        try {
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
            int notTimeout = 200;
            FastLeaderElection fastLeaderElection = this;
            synchronized (fastLeaderElection) {
                ++this.logicalclock;
                this.updateProposal(this.getInitId(), this.getInitLastLoggedZxid());
            }
            LOG.info((Object)("New election. My id =  " + this.self.getId() + ", Proposed zxid = " + this.proposedZxid));
            this.sendNotifications();
            while (this.self.getPeerState() == QuorumPeer.ServerState.LOOKING && !this.stop) {
                Notification n = this.recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
                if (n == null) {
                    if (this.manager.haveDelivered()) {
                        this.sendNotifications();
                    } else {
                        this.manager.connectAll();
                    }
                    int tmpTimeOut = notTimeout * 2;
                    notTimeout = tmpTimeOut < 60000 ? tmpTimeOut : 60000;
                    LOG.info((Object)("Notification time out: " + notTimeout));
                    continue;
                }
                switch (n.state) {
                    case LOOKING: {
                        LOG.info((Object)("Notification: " + n.leader + ", " + n.zxid + ", " + n.epoch + ", " + this.self.getId() + ", " + (Object)((Object)this.self.getPeerState()) + ", " + (Object)((Object)n.state) + ", " + n.sid));
                        if (n.epoch > this.logicalclock) {
                            this.logicalclock = n.epoch;
                            recvset.clear();
                            if (this.totalOrderPredicate(n.leader, n.zxid, this.getInitId(), this.getInitLastLoggedZxid())) {
                                this.updateProposal(n.leader, n.zxid);
                            } else {
                                this.updateProposal(this.getInitId(), this.getInitLastLoggedZxid());
                            }
                            this.sendNotifications();
                        } else {
                            if (n.epoch < this.logicalclock) {
                                if (!LOG.isDebugEnabled()) break;
                                LOG.debug((Object)("Notification epoch is smaller than logicalclock. n.epoch = " + n.epoch + ", Logical clock" + this.logicalclock));
                                break;
                            }
                            if (this.totalOrderPredicate(n.leader, n.zxid, this.proposedLeader, this.proposedZxid)) {
                                LOG.info((Object)"Updating proposal");
                                this.updateProposal(n.leader, n.zxid);
                                this.sendNotifications();
                            }
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)("Adding vote: From = " + n.sid + ", Proposed leader = " + n.leader + ", Porposed zxid = " + n.zxid + ", Proposed epoch = " + n.epoch));
                        }
                        if (!this.self.getVotingView().containsKey(n.sid)) break;
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
                        if (this.self.getVotingView().size() == recvset.size() && this.self.getQuorumVerifier().getWeight(this.proposedLeader) != 0L) {
                            this.self.setPeerState(this.proposedLeader == this.self.getId() ? QuorumPeer.ServerState.LEADING : this.learningState());
                            this.leaveInstance();
                            Vote tmpTimeOut = new Vote(this.proposedLeader, this.proposedZxid);
                            return tmpTimeOut;
                        }
                        if (!this.termPredicate(recvset, new Vote(this.proposedLeader, this.proposedZxid, this.logicalclock))) break;
                        while ((n = this.recvqueue.poll(200L, TimeUnit.MILLISECONDS)) != null) {
                            if (!this.totalOrderPredicate(n.leader, n.zxid, this.proposedLeader, this.proposedZxid)) continue;
                            this.recvqueue.put(n);
                            break;
                        }
                        if (n != null) break;
                        this.self.setPeerState(this.proposedLeader == this.self.getId() ? QuorumPeer.ServerState.LEADING : this.learningState());
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)("About to leave FLE instance: Leader= " + this.proposedLeader + ", Zxid = " + this.proposedZxid + ", My id = " + this.self.getId() + ", My state = " + (Object)((Object)this.self.getPeerState())));
                        }
                        this.leaveInstance();
                        Vote tmpTimeOut = new Vote(this.proposedLeader, this.proposedZxid);
                        return tmpTimeOut;
                    }
                    case OBSERVING: {
                        LOG.debug((Object)("Notification from observer: " + n.sid));
                        break;
                    }
                    default: {
                        if (n.epoch == this.logicalclock) {
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
                            if (n.state == QuorumPeer.ServerState.LEADING || this.termPredicate(recvset, new Vote(n.leader, n.zxid, n.epoch, n.state)) && this.checkLeader(outofelection, n.leader, n.epoch)) {
                                this.self.setPeerState(n.leader == this.self.getId() ? QuorumPeer.ServerState.LEADING : this.learningState());
                                this.leaveInstance();
                                Vote tmpTimeOut = new Vote(n.leader, n.zxid);
                                return tmpTimeOut;
                            }
                        }
                        LOG.info((Object)("Notification: " + n.leader + ", " + n.zxid + ", " + n.epoch + ", " + this.self.getId() + ", " + (Object)((Object)this.self.getPeerState()) + ", " + (Object)((Object)n.state) + ", " + n.sid));
                        outofelection.put(n.sid, new Vote(n.leader, n.zxid, n.epoch, n.state));
                        if (!this.termPredicate(outofelection, new Vote(n.leader, n.zxid, n.epoch, n.state)) || !this.checkLeader(outofelection, n.leader, n.epoch)) break;
                        Object tmpTimeOut = this;
                        synchronized (tmpTimeOut) {
                            this.logicalclock = n.epoch;
                            this.self.setPeerState(n.leader == this.self.getId() ? QuorumPeer.ServerState.LEADING : this.learningState());
                        }
                        this.leaveInstance();
                        tmpTimeOut = new Vote(n.leader, n.zxid);
                        return tmpTimeOut;
                    }
                }
            }
            fastLeaderElection = null;
            return fastLeaderElection;
        }
        finally {
            try {
                if (this.self.jmxLeaderElectionBean != null) {
                    MBeanRegistry.getInstance().unregister(this.self.jmxLeaderElectionBean);
                }
            }
            catch (Exception e) {
                LOG.warn((Object)"Failed to unregister with JMX", (Throwable)e);
            }
            this.self.jmxLeaderElectionBean = null;
        }
    }

    private class Messenger {
        WorkerSender ws;
        WorkerReceiver wr;

        public boolean queueEmpty() {
            return FastLeaderElection.this.sendqueue.isEmpty() || FastLeaderElection.this.recvqueue.isEmpty();
        }

        Messenger(QuorumCnxManager manager) {
            this.ws = new WorkerSender(manager);
            Thread t = new Thread((Runnable)this.ws, "WorkerSender Thread");
            t.setDaemon(true);
            t.start();
            this.wr = new WorkerReceiver(manager);
            t = new Thread((Runnable)this.wr, "WorkerReceiver Thread");
            t.setDaemon(true);
            t.start();
        }

        void halt() {
            this.ws.stop = true;
            this.wr.stop = true;
        }

        class WorkerSender
        implements Runnable {
            volatile boolean stop = false;
            QuorumCnxManager manager;

            WorkerSender(QuorumCnxManager manager) {
                this.manager = manager;
            }

            public void run() {
                while (!this.stop) {
                    try {
                        ToSend m = FastLeaderElection.this.sendqueue.poll(3000L, TimeUnit.MILLISECONDS);
                        if (m == null) continue;
                        this.process(m);
                    }
                    catch (InterruptedException e) {
                        // empty catch block
                        break;
                    }
                }
                LOG.info((Object)"WorkerSender is down");
            }

            private void process(ToSend m) {
                byte[] requestBytes = new byte[28];
                ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
                requestBuffer.clear();
                requestBuffer.putInt(m.state.ordinal());
                requestBuffer.putLong(m.leader);
                requestBuffer.putLong(m.zxid);
                requestBuffer.putLong(m.epoch);
                this.manager.toSend(m.sid, requestBuffer);
            }
        }

        class WorkerReceiver
        implements Runnable {
            volatile boolean stop = false;
            QuorumCnxManager manager;

            WorkerReceiver(QuorumCnxManager manager) {
                this.manager = manager;
            }

            public void run() {
                while (!this.stop) {
                    try {
                        ToSend notmsg;
                        QuorumCnxManager.Message response = this.manager.recvQueue.poll(3000L, TimeUnit.MILLISECONDS);
                        if (response == null) continue;
                        if (!FastLeaderElection.this.self.getVotingView().containsKey(response.sid)) {
                            Vote current = FastLeaderElection.this.self.getCurrentVote();
                            ToSend notmsg2 = new ToSend(ToSend.mType.notification, current.id, current.zxid, FastLeaderElection.this.logicalclock, FastLeaderElection.this.self.getPeerState(), response.sid);
                            FastLeaderElection.this.sendqueue.offer(notmsg2);
                            continue;
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)("Receive new notification message. My id = " + FastLeaderElection.this.self.getId()));
                        }
                        if (response.buffer.capacity() < 28) {
                            LOG.error((Object)("Got a short response: " + response.buffer.capacity()));
                            continue;
                        }
                        response.buffer.clear();
                        QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
                        switch (response.buffer.getInt()) {
                            case 0: {
                                ackstate = QuorumPeer.ServerState.LOOKING;
                                break;
                            }
                            case 1: {
                                ackstate = QuorumPeer.ServerState.FOLLOWING;
                                break;
                            }
                            case 2: {
                                ackstate = QuorumPeer.ServerState.LEADING;
                                break;
                            }
                            case 3: {
                                ackstate = QuorumPeer.ServerState.OBSERVING;
                            }
                        }
                        Notification n = new Notification();
                        n.leader = response.buffer.getLong();
                        n.zxid = response.buffer.getLong();
                        n.epoch = response.buffer.getLong();
                        n.state = ackstate;
                        n.sid = response.sid;
                        if (FastLeaderElection.this.self.getPeerState() == QuorumPeer.ServerState.LOOKING) {
                            FastLeaderElection.this.recvqueue.offer(n);
                            if (ackstate != QuorumPeer.ServerState.LOOKING || n.epoch >= FastLeaderElection.this.logicalclock) continue;
                            Vote v = FastLeaderElection.this.getVote();
                            notmsg = new ToSend(ToSend.mType.notification, v.id, v.zxid, FastLeaderElection.this.logicalclock, FastLeaderElection.this.self.getPeerState(), response.sid);
                            FastLeaderElection.this.sendqueue.offer(notmsg);
                            continue;
                        }
                        Vote current = FastLeaderElection.this.self.getCurrentVote();
                        if (ackstate != QuorumPeer.ServerState.LOOKING) continue;
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)("Sending new notification. My id =  " + FastLeaderElection.this.self.getId() + ", Recipient = " + response.sid));
                        }
                        notmsg = new ToSend(ToSend.mType.notification, current.id, current.zxid, FastLeaderElection.this.logicalclock, FastLeaderElection.this.self.getPeerState(), response.sid);
                        FastLeaderElection.this.sendqueue.offer(notmsg);
                    }
                    catch (InterruptedException e) {
                        System.out.println("Interrupted Exception while waiting for new message" + e.toString());
                    }
                }
                LOG.info((Object)"WorkerReceiver is down");
            }
        }
    }

    public static class ToSend {
        long leader;
        long zxid;
        long epoch;
        QuorumPeer.ServerState state;
        long sid;

        ToSend(mType type, long leader, long zxid, long epoch, QuorumPeer.ServerState state, long sid) {
            this.leader = leader;
            this.zxid = zxid;
            this.epoch = epoch;
            this.state = state;
            this.sid = sid;
        }

        /*
         * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
         */
        static enum mType {
            crequest,
            challenge,
            notification,
            ack;

        }
    }

    public static class Notification {
        long leader;
        long zxid;
        long epoch;
        QuorumPeer.ServerState state;
        long sid;
    }
}

