/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.benchmark;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.zookeeper.KeeperException;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BenchBookie {
    static Logger LOG = LoggerFactory.getLogger(BenchBookie.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static long getValidLedgerId(String zkServers) throws IOException, BKException, KeeperException, InterruptedException {
        BookKeeper bkc = null;
        LedgerHandle lh = null;
        long id = 0L;
        try {
            bkc = new BookKeeper(zkServers);
            lh = bkc.createLedger(1, 1, BookKeeper.DigestType.CRC32, new byte[20]);
            long l = id = lh.getId();
            return l;
        }
        finally {
            if (lh != null) {
                lh.close();
            }
            if (bkc != null) {
                bkc.close();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException, ParseException, IOException, BKException, KeeperException {
        Options options = new Options();
        options.addOption("host", true, "Hostname or IP of bookie to benchmark");
        options.addOption("port", true, "Port of bookie to benchmark (default 3181)");
        options.addOption("zookeeper", true, "Zookeeper ensemble, (default \"localhost:2181\")");
        options.addOption("size", true, "Size of message to send, in bytes (default 1024)");
        options.addOption("help", false, "This message");
        PosixParser parser = new PosixParser();
        CommandLine cmd = parser.parse(options, args);
        if (cmd.hasOption("help") || !cmd.hasOption("host")) {
            HelpFormatter formatter = new HelpFormatter();
            formatter.printHelp("BenchBookie <options>", options);
            System.exit(-1);
        }
        String addr = cmd.getOptionValue("host");
        int port = Integer.valueOf(cmd.getOptionValue("port", "3181"));
        int size = Integer.valueOf(cmd.getOptionValue("size", "1024"));
        String servers = cmd.getOptionValue("zookeeper", "localhost:2181");
        NioClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory((Executor)Executors.newCachedThreadPool(), (Executor)Executors.newCachedThreadPool());
        OrderedSafeExecutor executor = new OrderedSafeExecutor(1);
        ClientConfiguration conf = new ClientConfiguration();
        BookieClient bc = new BookieClient(conf, (ClientSocketChannelFactory)channelFactory, executor);
        LatencyCallback lc = new LatencyCallback();
        ThroughputCallback tc = new ThroughputCallback();
        int warmUpCount = 999;
        long ledger = BenchBookie.getValidLedgerId(servers);
        for (long entry = 0L; entry < (long)warmUpCount; ++entry) {
            ChannelBuffer toSend = ChannelBuffers.buffer((int)size);
            toSend.resetReaderIndex();
            toSend.resetWriterIndex();
            toSend.writeLong(ledger);
            toSend.writeLong(entry);
            toSend.writerIndex(toSend.capacity());
            bc.addEntry(new InetSocketAddress(addr, port), ledger, new byte[20], entry, toSend, (BookkeeperInternalCallbacks.WriteCallback)tc, null, 0);
        }
        LOG.info("Waiting for warmup");
        tc.waitFor(warmUpCount);
        ledger = BenchBookie.getValidLedgerId(servers);
        LOG.info("Benchmarking latency");
        int entryCount = 5000;
        long startTime = System.nanoTime();
        for (long entry = 0L; entry < (long)entryCount; ++entry) {
            ChannelBuffer toSend = ChannelBuffers.buffer((int)size);
            toSend.resetReaderIndex();
            toSend.resetWriterIndex();
            toSend.writeLong(ledger);
            toSend.writeLong(entry);
            toSend.writerIndex(toSend.capacity());
            lc.resetComplete();
            bc.addEntry(new InetSocketAddress(addr, port), ledger, new byte[20], entry, toSend, (BookkeeperInternalCallbacks.WriteCallback)lc, null, 0);
            lc.waitForComplete();
        }
        long endTime = System.nanoTime();
        LOG.info("Latency: " + (double)(endTime - startTime) / (double)entryCount / 1000000.0);
        entryCount = 50000;
        ledger = BenchBookie.getValidLedgerId(servers);
        LOG.info("Benchmarking throughput");
        startTime = System.currentTimeMillis();
        tc = new ThroughputCallback();
        for (long entry = 0L; entry < (long)entryCount; ++entry) {
            ChannelBuffer toSend = ChannelBuffers.buffer((int)size);
            toSend.resetReaderIndex();
            toSend.resetWriterIndex();
            toSend.writeLong(ledger);
            toSend.writeLong(entry);
            toSend.writerIndex(toSend.capacity());
            bc.addEntry(new InetSocketAddress(addr, port), ledger, new byte[20], entry, toSend, (BookkeeperInternalCallbacks.WriteCallback)tc, null, 0);
        }
        tc.waitFor(entryCount);
        endTime = System.currentTimeMillis();
        LOG.info("Throughput: " + (long)entryCount * 1000L / (endTime - startTime));
        bc.close();
        channelFactory.releaseExternalResources();
        executor.shutdown();
    }

    static class ThroughputCallback
    implements BookkeeperInternalCallbacks.WriteCallback {
        int count;
        int waitingCount = Integer.MAX_VALUE;

        ThroughputCallback() {
        }

        public synchronized void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) {
            if (rc != 0) {
                LOG.error("Got error " + rc);
            }
            ++this.count;
            if (this.count >= this.waitingCount) {
                this.notifyAll();
            }
        }

        public synchronized void waitFor(int count) throws InterruptedException {
            while (this.count < count) {
                this.waitingCount = count;
                this.wait(1000L);
            }
            this.waitingCount = Integer.MAX_VALUE;
        }
    }

    static class LatencyCallback
    implements BookkeeperInternalCallbacks.WriteCallback {
        boolean complete;

        LatencyCallback() {
        }

        public synchronized void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) {
            if (rc != 0) {
                LOG.error("Got error " + rc);
            }
            this.complete = true;
            this.notifyAll();
        }

        public synchronized void resetComplete() {
            this.complete = false;
        }

        public synchronized void waitForComplete() throws InterruptedException {
            while (!this.complete) {
                this.wait();
            }
        }
    }
}

