/*
 * Decompiled with CFR 0.152.
 */
package co.cask.cdap.common.zookeeper.coordination;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.discovery.ResolvingDiscoverable;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.DiscoveryRuntimeModule;
import co.cask.cdap.common.guice.ZKClientModule;
import co.cask.cdap.common.zookeeper.coordination.AssignmentChangeListener;
import co.cask.cdap.common.zookeeper.coordination.AssignmentStrategy;
import co.cask.cdap.common.zookeeper.coordination.BalancedAssignmentStrategy;
import co.cask.cdap.common.zookeeper.coordination.PartitionReplica;
import co.cask.cdap.common.zookeeper.coordination.ResourceCoordinator;
import co.cask.cdap.common.zookeeper.coordination.ResourceCoordinatorClient;
import co.cask.cdap.common.zookeeper.coordination.ResourceHandler;
import co.cask.cdap.common.zookeeper.coordination.ResourceRequirement;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryService;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.internal.zookeeper.InMemoryZKServer;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClientService;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResourceCoordinatorTest {
    private static final Logger LOG = LoggerFactory.getLogger(ResourceCoordinatorTest.class);
    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
    private static InMemoryZKServer zkServer;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAssignment() throws InterruptedException, ExecutionException {
        CConfiguration cConf = CConfiguration.create();
        cConf.set("zookeeper.quorum", zkServer.getConnectionStr());
        String serviceName = "test-assignment";
        Injector injector = Guice.createInjector((Module[])new Module[]{new ConfigModule(cConf), new ZKClientModule(), new DiscoveryRuntimeModule().getDistributedModules()});
        ZKClientService zkClient = (ZKClientService)injector.getInstance(ZKClientService.class);
        zkClient.startAndWait();
        DiscoveryService discoveryService = (DiscoveryService)injector.getInstance(DiscoveryService.class);
        try {
            ResourceCoordinator coordinator = new ResourceCoordinator((ZKClient)zkClient, (DiscoveryServiceClient)injector.getInstance(DiscoveryServiceClient.class), (AssignmentStrategy)new BalancedAssignmentStrategy());
            coordinator.startAndWait();
            try {
                ResourceCoordinatorClient client = new ResourceCoordinatorClient((ZKClient)zkClient);
                client.startAndWait();
                try {
                    ResourceRequirement requirement = ResourceRequirement.builder((String)serviceName).addPartitions("p", 5, 1).build();
                    client.submitRequirement(requirement).get();
                    Assert.assertEquals((Object)requirement, (Object)client.fetchRequirement(requirement.getName()).get());
                    Discoverable discoverable1 = this.createDiscoverable(serviceName, 10000);
                    Cancellable cancelDiscoverable1 = discoveryService.register((Discoverable)ResolvingDiscoverable.of((Discoverable)discoverable1));
                    SynchronousQueue<Collection<PartitionReplica>> assignmentQueue = new SynchronousQueue<Collection<PartitionReplica>>();
                    Semaphore finishSemaphore = new Semaphore(0);
                    Cancellable cancelSubscribe1 = this.subscribe(client, discoverable1, assignmentQueue, finishSemaphore);
                    Collection assigned = (Collection)assignmentQueue.poll(30L, TimeUnit.SECONDS);
                    Assert.assertNotNull((Object)assigned);
                    Assert.assertEquals((long)5L, (long)assigned.size());
                    cancelDiscoverable1.cancel();
                    Assert.assertTrue((boolean)((Collection)assignmentQueue.poll(30L, TimeUnit.SECONDS)).isEmpty());
                    cancelDiscoverable1 = discoveryService.register((Discoverable)ResolvingDiscoverable.of((Discoverable)discoverable1));
                    assigned = (Collection)assignmentQueue.poll(30L, TimeUnit.SECONDS);
                    Assert.assertNotNull((Object)assigned);
                    Assert.assertEquals((long)5L, (long)assigned.size());
                    Discoverable discoverable2 = this.createDiscoverable(serviceName, 10001);
                    Cancellable cancelDiscoverable2 = discoveryService.register((Discoverable)ResolvingDiscoverable.of((Discoverable)discoverable2));
                    assigned = (Collection)assignmentQueue.poll(30L, TimeUnit.SECONDS);
                    Assert.assertNotNull((Object)assigned);
                    Assert.assertEquals((long)3L, (long)assigned.size());
                    cancelDiscoverable1.cancel();
                    Assert.assertTrue((boolean)((Collection)assignmentQueue.poll(30L, TimeUnit.SECONDS)).isEmpty());
                    cancelSubscribe1.cancel();
                    Assert.assertTrue((boolean)finishSemaphore.tryAcquire(2L, TimeUnit.SECONDS));
                    Cancellable cancelSubscribe2 = this.subscribe(client, discoverable2, assignmentQueue, finishSemaphore);
                    assigned = (Collection)assignmentQueue.poll(30L, TimeUnit.SECONDS);
                    Assert.assertNotNull((Object)assigned);
                    Assert.assertEquals((long)5L, (long)assigned.size());
                    client.submitRequirement(ResourceRequirement.builder((String)serviceName).build());
                    Assert.assertTrue((boolean)((Collection)assignmentQueue.poll(30L, TimeUnit.SECONDS)).isEmpty());
                    client.submitRequirement(ResourceRequirement.builder((String)serviceName).addPartitions("p", 1, 1).build());
                    assigned = (Collection)assignmentQueue.poll(30L, TimeUnit.SECONDS);
                    Assert.assertNotNull((Object)assigned);
                    Assert.assertEquals((long)1L, (long)assigned.size());
                    client.deleteRequirement(requirement.getName());
                    Assert.assertTrue((boolean)((Collection)assignmentQueue.poll(30L, TimeUnit.SECONDS)).isEmpty());
                    cancelSubscribe2.cancel();
                    Assert.assertTrue((boolean)finishSemaphore.tryAcquire(2L, TimeUnit.SECONDS));
                    cancelDiscoverable2.cancel();
                }
                finally {
                    client.stopAndWait();
                }
            }
            finally {
                coordinator.stopAndWait();
            }
        }
        finally {
            zkClient.stopAndWait();
        }
    }

    @BeforeClass
    public static void init() throws IOException {
        zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build();
        zkServer.startAndWait();
    }

    @AfterClass
    public static void finish() {
        zkServer.stopAndWait();
    }

    private Cancellable subscribe(ResourceCoordinatorClient client, final Discoverable discoverable, final BlockingQueue<Collection<PartitionReplica>> assignmentQueue, final Semaphore finishSemaphore) {
        return client.subscribe(discoverable.getName(), (AssignmentChangeListener)new ResourceHandler(discoverable){

            public void onChange(Collection<PartitionReplica> partitionReplicas) {
                try {
                    LOG.debug("Discoverable {} Received: {}", (Object)discoverable.getSocketAddress().getPort(), partitionReplicas);
                    assignmentQueue.put(partitionReplicas);
                }
                catch (InterruptedException e) {
                    LOG.error("Interrupted.", (Throwable)e);
                }
            }

            public void finished(Throwable failureCause) {
                LOG.debug("Finished on {}", (Object)discoverable.getSocketAddress().getPort());
                if (failureCause == null) {
                    finishSemaphore.release();
                } else {
                    LOG.error("Finished with failure for {}", (Object)discoverable.getSocketAddress().getPort(), (Object)failureCause);
                }
            }
        });
    }

    private Discoverable createDiscoverable(String serviceName, int port) {
        InetSocketAddress address;
        try {
            address = new InetSocketAddress(InetAddress.getLocalHost(), port);
        }
        catch (UnknownHostException e) {
            address = new InetSocketAddress(port);
        }
        return new Discoverable(serviceName, address);
    }
}

