/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.benchmark;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang.SystemUtils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BenchBookie {
    static final Logger LOG = LoggerFactory.getLogger(BenchBookie.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static long getValidLedgerId(String zkServers) throws IOException, BKException, KeeperException, InterruptedException {
        BookKeeper bkc = null;
        LedgerHandle lh = null;
        long id = 0L;
        try {
            bkc = new BookKeeper(zkServers);
            lh = bkc.createLedger(1, 1, BookKeeper.DigestType.CRC32, new byte[20]);
            long l = id = lh.getId();
            return l;
        }
        finally {
            if (lh != null) {
                lh.close();
            }
            if (bkc != null) {
                bkc.close();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException, ParseException, IOException, BKException, KeeperException {
        NioEventLoopGroup eventLoop;
        Options options = new Options();
        options.addOption("host", true, "Hostname or IP of bookie to benchmark");
        options.addOption("port", true, "Port of bookie to benchmark (default 3181)");
        options.addOption("zookeeper", true, "Zookeeper ensemble, (default \"localhost:2181\")");
        options.addOption("size", true, "Size of message to send, in bytes (default 1024)");
        options.addOption("warmupCount", true, "Number of messages in warmup phase (default 999)");
        options.addOption("latencyCount", true, "Number of messages in latency phase (default 5000)");
        options.addOption("throughputCount", true, "Number of messages in throughput phase (default 50000)");
        options.addOption("help", false, "This message");
        PosixParser parser = new PosixParser();
        CommandLine cmd = parser.parse(options, args);
        if (cmd.hasOption("help") || !cmd.hasOption("host")) {
            HelpFormatter formatter = new HelpFormatter();
            formatter.printHelp("BenchBookie <options>", options);
            System.exit(-1);
        }
        String addr = cmd.getOptionValue("host");
        int port = Integer.parseInt(cmd.getOptionValue("port", "3181"));
        int size = Integer.parseInt(cmd.getOptionValue("size", "1024"));
        String servers = cmd.getOptionValue("zookeeper", "localhost:2181");
        int warmUpCount = Integer.parseInt(cmd.getOptionValue("warmupCount", "999"));
        int latencyCount = Integer.parseInt(cmd.getOptionValue("latencyCount", "5000"));
        int throughputCount = Integer.parseInt(cmd.getOptionValue("throughputCount", "50000"));
        if (SystemUtils.IS_OS_LINUX) {
            try {
                eventLoop = new EpollEventLoopGroup();
            }
            catch (Throwable t) {
                LOG.warn("Could not use Netty Epoll event loop for benchmark {}", (Object)t.getMessage());
                eventLoop = new NioEventLoopGroup();
            }
        } else {
            eventLoop = new NioEventLoopGroup();
        }
        OrderedExecutor executor = OrderedExecutor.newBuilder().name("BenchBookieClientScheduler").numThreads(1).build();
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new DefaultThreadFactory("BookKeeperClientScheduler"));
        ClientConfiguration conf = new ClientConfiguration();
        BookieClient bc = new BookieClient(conf, (EventLoopGroup)eventLoop, executor, scheduler, (StatsLogger)NullStatsLogger.INSTANCE);
        LatencyCallback lc = new LatencyCallback();
        ThroughputCallback tc = new ThroughputCallback();
        long ledger = BenchBookie.getValidLedgerId(servers);
        for (long entry = 0L; entry < (long)warmUpCount; ++entry) {
            ByteBuf toSend = Unpooled.buffer((int)size);
            toSend.resetReaderIndex();
            toSend.resetWriterIndex();
            toSend.writeLong(ledger);
            toSend.writeLong(entry);
            toSend.writerIndex(toSend.capacity());
            bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20], entry, ByteBufList.get((ByteBuf)toSend), (BookkeeperInternalCallbacks.WriteCallback)tc, null, 0);
        }
        LOG.info("Waiting for warmup");
        tc.waitFor(warmUpCount);
        ledger = BenchBookie.getValidLedgerId(servers);
        LOG.info("Benchmarking latency");
        long startTime = System.nanoTime();
        for (long entry = 0L; entry < (long)latencyCount; ++entry) {
            ByteBuf toSend = Unpooled.buffer((int)size);
            toSend.resetReaderIndex();
            toSend.resetWriterIndex();
            toSend.writeLong(ledger);
            toSend.writeLong(entry);
            toSend.writerIndex(toSend.capacity());
            lc.resetComplete();
            bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20], entry, ByteBufList.get((ByteBuf)toSend), (BookkeeperInternalCallbacks.WriteCallback)lc, null, 0);
            lc.waitForComplete();
        }
        long endTime = System.nanoTime();
        LOG.info("Latency: " + (double)(endTime - startTime) / (double)latencyCount / 1000000.0);
        ledger = BenchBookie.getValidLedgerId(servers);
        LOG.info("Benchmarking throughput");
        startTime = System.currentTimeMillis();
        tc = new ThroughputCallback();
        for (long entry = 0L; entry < (long)throughputCount; ++entry) {
            ByteBuf toSend = Unpooled.buffer((int)size);
            toSend.resetReaderIndex();
            toSend.resetWriterIndex();
            toSend.writeLong(ledger);
            toSend.writeLong(entry);
            toSend.writerIndex(toSend.capacity());
            bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20], entry, ByteBufList.get((ByteBuf)toSend), (BookkeeperInternalCallbacks.WriteCallback)tc, null, 0);
        }
        tc.waitFor(throughputCount);
        endTime = System.currentTimeMillis();
        LOG.info("Throughput: " + (long)throughputCount * 1000L / (endTime - startTime));
        bc.close();
        scheduler.shutdown();
        eventLoop.shutdownGracefully();
        executor.shutdown();
    }

    static class ThroughputCallback
    implements BookkeeperInternalCallbacks.WriteCallback {
        int count;
        int waitingCount = Integer.MAX_VALUE;

        ThroughputCallback() {
        }

        public synchronized void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
            if (rc != 0) {
                LOG.error("Got error " + rc);
            }
            ++this.count;
            if (this.count >= this.waitingCount) {
                this.notifyAll();
            }
        }

        public synchronized void waitFor(int count) throws InterruptedException {
            while (this.count < count) {
                this.waitingCount = count;
                this.wait(1000L);
            }
            this.waitingCount = Integer.MAX_VALUE;
        }
    }

    static class LatencyCallback
    implements BookkeeperInternalCallbacks.WriteCallback {
        boolean complete;

        LatencyCallback() {
        }

        public synchronized void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
            if (rc != 0) {
                LOG.error("Got error " + rc);
            }
            this.complete = true;
            this.notifyAll();
        }

        public synchronized void resetComplete() {
            this.complete = false;
        }

        public synchronized void waitForComplete() throws InterruptedException {
            while (!this.complete) {
                this.wait();
            }
        }
    }
}

