/*
 * Decompiled with CFR 0.152.
 */
package net.spy.memcached.protocol;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import net.spy.memcached.MemcachedNode;
import net.spy.memcached.compat.SpyObject;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationState;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public abstract class TCPMemcachedNodeImpl
extends SpyObject
implements MemcachedNode {
    private final SocketAddress socketAddress;
    private final ByteBuffer rbuf;
    private final ByteBuffer wbuf;
    protected final BlockingQueue<Operation> writeQ;
    private final BlockingQueue<Operation> readQ;
    private final BlockingQueue<Operation> inputQueue;
    private volatile int reconnectAttempt = 1;
    private SocketChannel channel;
    private int toWrite = 0;
    protected Operation optimizedOp = null;
    private volatile SelectionKey sk = null;

    public TCPMemcachedNodeImpl(SocketAddress sa, SocketChannel c, int bufSize, BlockingQueue<Operation> rq, BlockingQueue<Operation> wq, BlockingQueue<Operation> iq) {
        assert (sa != null) : "No SocketAddress";
        assert (c != null) : "No SocketChannel";
        assert (bufSize > 0) : "Invalid buffer size: " + bufSize;
        assert (rq != null) : "No operation read queue";
        assert (wq != null) : "No operation write queue";
        assert (iq != null) : "No input queue";
        this.socketAddress = sa;
        this.setChannel(c);
        this.rbuf = ByteBuffer.allocate(bufSize);
        this.wbuf = ByteBuffer.allocate(bufSize);
        this.getWbuf().clear();
        this.readQ = rq;
        this.writeQ = wq;
        this.inputQueue = iq;
    }

    @Override
    public final void copyInputQueue() {
        ArrayList tmp = new ArrayList();
        this.inputQueue.drainTo(tmp, this.writeQ.remainingCapacity());
        this.writeQ.addAll(tmp);
    }

    @Override
    public Collection<Operation> destroyInputQueue() {
        ArrayList<Operation> rv = new ArrayList<Operation>();
        this.inputQueue.drainTo(rv);
        return rv;
    }

    @Override
    public final void setupResend() {
        Operation op = this.getCurrentWriteOp();
        if (op != null) {
            ByteBuffer buf = op.getBuffer();
            if (buf != null) {
                buf.reset();
            } else {
                this.getLogger().info("No buffer for current write op, removing");
                this.removeCurrentWriteOp();
            }
        }
        while (this.hasReadOp()) {
            op = this.removeCurrentReadOp();
            if (op == this.getCurrentWriteOp()) continue;
            this.getLogger().warn("Discarding partially completed op: %s", op);
            op.cancel();
        }
        this.getWbuf().clear();
        this.getRbuf().clear();
        this.toWrite = 0;
    }

    private boolean preparePending() {
        this.copyInputQueue();
        Operation nextOp = this.getCurrentWriteOp();
        while (nextOp != null && nextOp.isCancelled()) {
            this.getLogger().info("Removing cancelled operation: %s", nextOp);
            this.removeCurrentWriteOp();
            nextOp = this.getCurrentWriteOp();
        }
        return nextOp != null;
    }

    @Override
    public final void fillWriteBuffer(boolean shouldOptimize) {
        if (this.toWrite == 0 && this.readQ.remainingCapacity() > 0) {
            this.getWbuf().clear();
            Operation o = this.getCurrentWriteOp();
            while (o != null && this.toWrite < this.getWbuf().capacity()) {
                assert (o.getState() == OperationState.WRITING);
                if (!this.readQ.contains(o)) {
                    this.readQ.add(o);
                }
                ByteBuffer obuf = o.getBuffer();
                assert (obuf != null) : "Didn't get a write buffer from " + o;
                int bytesToCopy = Math.min(this.getWbuf().remaining(), obuf.remaining());
                byte[] b = new byte[bytesToCopy];
                obuf.get(b);
                this.getWbuf().put(b);
                this.getLogger().debug("After copying stuff from %s: %s", o, this.getWbuf());
                if (!o.getBuffer().hasRemaining()) {
                    o.writeComplete();
                    this.transitionWriteItem();
                    this.preparePending();
                    if (shouldOptimize) {
                        this.optimize();
                    }
                    o = this.getCurrentWriteOp();
                }
                this.toWrite += bytesToCopy;
            }
            this.getWbuf().flip();
            assert (this.toWrite <= this.getWbuf().capacity()) : "toWrite exceeded capacity: " + this;
            assert (this.toWrite == this.getWbuf().remaining()) : "Expected " + this.toWrite + " remaining, got " + this.getWbuf().remaining();
        } else {
            this.getLogger().debug("Buffer is full, skipping");
        }
    }

    @Override
    public final void transitionWriteItem() {
        Operation op = this.removeCurrentWriteOp();
        assert (op != null) : "There is no write item to transition";
        this.getLogger().debug("Finished writing %s", op);
    }

    protected abstract void optimize();

    @Override
    public final Operation getCurrentReadOp() {
        return (Operation)this.readQ.peek();
    }

    @Override
    public final Operation removeCurrentReadOp() {
        return (Operation)this.readQ.remove();
    }

    @Override
    public final Operation getCurrentWriteOp() {
        return this.optimizedOp == null ? (Operation)this.writeQ.peek() : this.optimizedOp;
    }

    @Override
    public final Operation removeCurrentWriteOp() {
        Operation rv = this.optimizedOp;
        if (rv == null) {
            rv = (Operation)this.writeQ.remove();
        } else {
            this.optimizedOp = null;
        }
        return rv;
    }

    @Override
    public final boolean hasReadOp() {
        return !this.readQ.isEmpty();
    }

    @Override
    public final boolean hasWriteOp() {
        return this.optimizedOp != null || !this.writeQ.isEmpty();
    }

    @Override
    public final void addOp(Operation op) {
        boolean added = this.inputQueue.add(op);
        assert (added);
    }

    @Override
    public final int getSelectionOps() {
        int rv = 0;
        if (this.getChannel().isConnected()) {
            if (this.hasReadOp()) {
                rv |= 1;
            }
            if (this.toWrite > 0 || this.hasWriteOp()) {
                rv |= 4;
            }
        } else {
            rv = 8;
        }
        return rv;
    }

    @Override
    public final ByteBuffer getRbuf() {
        return this.rbuf;
    }

    @Override
    public final ByteBuffer getWbuf() {
        return this.wbuf;
    }

    @Override
    public final SocketAddress getSocketAddress() {
        return this.socketAddress;
    }

    @Override
    public final boolean isActive() {
        return this.reconnectAttempt == 0 && this.getChannel() != null && this.getChannel().isConnected();
    }

    @Override
    public final void reconnecting() {
        ++this.reconnectAttempt;
    }

    @Override
    public final void connected() {
        this.reconnectAttempt = 0;
    }

    @Override
    public final int getReconnectCount() {
        return this.reconnectAttempt;
    }

    public final String toString() {
        int sops = 0;
        if (this.getSk() != null && this.getSk().isValid()) {
            sops = this.getSk().interestOps();
        }
        int rsize = this.readQ.size() + (this.optimizedOp == null ? 0 : 1);
        int wsize = this.writeQ.size();
        int isize = this.inputQueue.size();
        return "{QA sa=" + this.getSocketAddress() + ", #Rops=" + rsize + ", #Wops=" + wsize + ", #iq=" + isize + ", topRop=" + this.getCurrentReadOp() + ", topWop=" + this.getCurrentWriteOp() + ", toWrite=" + this.toWrite + ", interested=" + sops + "}";
    }

    @Override
    public final void registerChannel(SocketChannel ch, SelectionKey skey) {
        this.setChannel(ch);
        this.setSk(skey);
    }

    @Override
    public final void setChannel(SocketChannel to) {
        assert (this.channel == null || !this.channel.isOpen()) : "Attempting to overwrite channel";
        this.channel = to;
    }

    @Override
    public final SocketChannel getChannel() {
        return this.channel;
    }

    @Override
    public final void setSk(SelectionKey to) {
        this.sk = to;
    }

    @Override
    public final SelectionKey getSk() {
        return this.sk;
    }

    @Override
    public final int getBytesRemainingToWrite() {
        return this.toWrite;
    }

    @Override
    public final int writeSome() throws IOException {
        int wrote = this.channel.write(this.wbuf);
        assert (wrote >= 0) : "Wrote negative bytes?";
        this.toWrite -= wrote;
        assert (this.toWrite >= 0) : "toWrite went negative after writing " + wrote + " bytes for " + this;
        this.getLogger().debug("Wrote %d bytes", wrote);
        return wrote;
    }

    @Override
    public final void fixupOps() {
        SelectionKey s = this.sk;
        if (s != null && s.isValid()) {
            int iops = this.getSelectionOps();
            this.getLogger().debug("Setting interested opts to %d", iops);
            s.interestOps(iops);
        } else {
            this.getLogger().debug("Selection key is not valid.");
        }
    }
}

