/*
 * Decompiled with CFR 0.152.
 */
package co.cask.tephra.distributed;

import co.cask.tephra.TransactionServiceMain;
import co.cask.tephra.distributed.CloseableThriftClient;
import co.cask.tephra.distributed.PooledClientProvider;
import co.cask.tephra.runtime.ConfigModule;
import co.cask.tephra.runtime.DiscoveryModules;
import co.cask.tephra.runtime.TransactionClientModule;
import co.cask.tephra.runtime.TransactionModules;
import co.cask.tephra.runtime.ZKModule;
import com.google.common.base.Throwables;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.internal.zookeeper.InMemoryZKServer;
import org.apache.twill.zookeeper.ZKClientService;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class PooledClientProviderTest {
    public static final int MAX_CLIENT_COUNT = 3;
    public static final long CLIENT_OBTAIN_TIMEOUT = 10L;
    @ClassRule
    public static TemporaryFolder tmpFolder = new TemporaryFolder();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testClientConnectionPoolMaximumNumberOfClients() throws Exception {
        InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
        zkServer.startAndWait();
        try {
            Configuration conf = new Configuration();
            conf.set("data.tx.zookeeper.quorum", zkServer.getConnectionStr());
            conf.set("data.tx.snapshot.dir", tmpFolder.newFolder().getAbsolutePath());
            conf.set("data.tx.client.count", Integer.toString(3));
            conf.set("data.tx.client.obtain.timeout", Long.toString(10L));
            final TransactionServiceMain main = new TransactionServiceMain(conf);
            final CountDownLatch latch = new CountDownLatch(1);
            Thread t = new Thread(){

                @Override
                public void run() {
                    try {
                        main.start();
                        latch.countDown();
                    }
                    catch (Exception e) {
                        throw Throwables.propagate((Throwable)e);
                    }
                }
            };
            try {
                t.start();
                latch.await();
                this.startClientAndTestPool(conf);
            }
            finally {
                main.stop();
                t.join();
            }
        }
        finally {
            zkServer.stopAndWait();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startClientAndTestPool(Configuration conf) throws Exception {
        Injector injector = Guice.createInjector((Module[])new Module[]{new ConfigModule(conf), new ZKModule(), new DiscoveryModules().getDistributedModules(), new TransactionModules().getDistributedModules(), new TransactionClientModule()});
        ZKClientService zkClient = (ZKClientService)injector.getInstance(ZKClientService.class);
        zkClient.startAndWait();
        PooledClientProvider clientProvider = new PooledClientProvider(conf, (DiscoveryServiceClient)injector.getInstance(DiscoveryServiceClient.class));
        CloseableThriftClient closeableThriftClient = clientProvider.getCloseableClient();
        closeableThriftClient.close();
        ArrayList<Future<Integer>> clientIds = new ArrayList<Future<Integer>>();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ExecutorService executor = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 4; ++i) {
            clientIds.add(executor.submit(new RetrieveClient(clientProvider, 5L, countDownLatch)));
        }
        countDownLatch.countDown();
        HashSet ids = new HashSet();
        for (Future future : clientIds) {
            ids.add(future.get());
        }
        Assert.assertEquals((long)3L, (long)ids.size());
        countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < 4; ++i) {
            clientIds.add(executor.submit(new RetrieveClient(clientProvider, 20L, countDownLatch)));
        }
        countDownLatch.countDown();
        int numTimeoutExceptions = 0;
        for (Future future : clientIds) {
            try {
                future.get();
            }
            catch (ExecutionException expected) {
                Assert.assertEquals(TimeoutException.class, expected.getCause().getClass());
                ++numTimeoutExceptions;
            }
        }
        Assert.assertEquals((String)String.format("Expected one thread to not obtain a client within %s milliseconds.", 10L), (long)1L, (long)numTimeoutExceptions);
        executor.shutdown();
    }

    private static class RetrieveClient
    implements Callable<Integer> {
        private final PooledClientProvider pool;
        private final long holdClientMs;
        private final CountDownLatch begin;

        public RetrieveClient(PooledClientProvider pool, long holdClientMs, CountDownLatch begin) {
            this.pool = pool;
            this.holdClientMs = holdClientMs;
            this.begin = begin;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Integer call() throws Exception {
            this.begin.await();
            CloseableThriftClient client = this.pool.getCloseableClient();
            try {
                int id = System.identityHashCode(client.getThriftClient());
                Thread.sleep(this.holdClientMs);
                Integer n = id;
                return n;
            }
            finally {
                client.close();
            }
        }
    }
}

