/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.flume.discovery;

import com.datatorrent.api.Component;
import com.google.common.base.Throwables;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.flume.discovery.Discovery;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.EnsurePath;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.InstanceSerializer;
import org.apache.flume.Context;
import org.apache.flume.conf.Configurable;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader;
import org.codehaus.jackson.map.ObjectWriter;
import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZKAssistedDiscovery
implements Discovery<byte[]>,
Component<com.datatorrent.api.Context>,
Configurable,
Serializable {
    @NotNull
    private String serviceName = "ApexFlume";
    @NotNull
    private String connectionString;
    @NotNull
    private String basePath;
    private int connectionTimeoutMillis = 1000;
    private int connectionRetryCount = 10;
    private int conntectionRetrySleepMillis = 500;
    private transient InstanceSerializerFactory instanceSerializerFactory;
    private transient CuratorFramework curatorFramework;
    private transient ServiceDiscovery<byte[]> discovery;
    private static final long serialVersionUID = 201401221145L;
    private static final Logger logger = LoggerFactory.getLogger(ZKAssistedDiscovery.class);

    @Override
    public void unadvertise(Discovery.Service<byte[]> service) {
        this.doAdvertise(service, false);
    }

    @Override
    public void advertise(Discovery.Service<byte[]> service) {
        this.doAdvertise(service, true);
    }

    public void doAdvertise(Discovery.Service<byte[]> service, boolean flag) {
        try {
            new EnsurePath(this.basePath).ensure(this.curatorFramework.getZookeeperClient());
            ServiceInstance<byte[]> instance = this.getInstance(service);
            if (flag) {
                this.discovery.registerService(instance);
            } else {
                this.discovery.unregisterService(instance);
            }
        }
        catch (Exception e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    @Override
    public Collection<Discovery.Service<byte[]>> discover() {
        try {
            new EnsurePath(this.basePath).ensure(this.curatorFramework.getZookeeperClient());
            Collection services = this.discovery.queryForInstances(this.serviceName);
            ArrayList<Discovery.Service<byte[]>> returnable = new ArrayList<Discovery.Service<byte[]>>(services.size());
            for (final ServiceInstance service : services) {
                returnable.add(new Discovery.Service<byte[]>(){

                    @Override
                    public String getHost() {
                        return service.getAddress();
                    }

                    @Override
                    public int getPort() {
                        return service.getPort();
                    }

                    @Override
                    public byte[] getPayload() {
                        return (byte[])service.getPayload();
                    }

                    @Override
                    public String getId() {
                        return service.getId();
                    }

                    public String toString() {
                        return "{" + this.getId() + " => " + this.getHost() + ':' + this.getPort() + '}';
                    }
                });
            }
            return returnable;
        }
        catch (Exception e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    public String toString() {
        return "ZKAssistedDiscovery{serviceName=" + this.serviceName + ", connectionString=" + this.connectionString + ", basePath=" + this.basePath + ", connectionTimeoutMillis=" + this.connectionTimeoutMillis + ", connectionRetryCount=" + this.connectionRetryCount + ", conntectionRetrySleepMillis=" + this.conntectionRetrySleepMillis + '}';
    }

    public int hashCode() {
        int hash = 7;
        hash = 47 * hash + this.serviceName.hashCode();
        hash = 47 * hash + this.connectionString.hashCode();
        hash = 47 * hash + this.basePath.hashCode();
        hash = 47 * hash + this.connectionTimeoutMillis;
        hash = 47 * hash + this.connectionRetryCount;
        hash = 47 * hash + this.conntectionRetrySleepMillis;
        return hash;
    }

    public boolean equals(Object obj) {
        if (obj == null) {
            return false;
        }
        if (this.getClass() != obj.getClass()) {
            return false;
        }
        ZKAssistedDiscovery other = (ZKAssistedDiscovery)obj;
        if (!this.serviceName.equals(other.serviceName)) {
            return false;
        }
        if (!this.connectionString.equals(other.connectionString)) {
            return false;
        }
        if (!this.basePath.equals(other.basePath)) {
            return false;
        }
        if (this.connectionTimeoutMillis != other.connectionTimeoutMillis) {
            return false;
        }
        if (this.connectionRetryCount != other.connectionRetryCount) {
            return false;
        }
        return this.conntectionRetrySleepMillis == other.conntectionRetrySleepMillis;
    }

    ServiceInstance<byte[]> getInstance(Discovery.Service<byte[]> service) throws Exception {
        return ServiceInstance.builder().name(this.serviceName).address(service.getHost()).port(service.getPort()).id(service.getId()).payload((Object)service.getPayload()).build();
    }

    private ServiceDiscovery<byte[]> getDiscovery(CuratorFramework curatorFramework) {
        return ServiceDiscoveryBuilder.builder(byte[].class).basePath(this.basePath).client(curatorFramework).serializer(this.instanceSerializerFactory.getInstanceSerializer(new TypeReference<ServiceInstance<byte[]>>(){})).build();
    }

    InstanceSerializerFactory getInstanceSerializerFactory() {
        return this.instanceSerializerFactory;
    }

    public String getConnectionString() {
        return this.connectionString;
    }

    public void setConnectionString(String connectionString) {
        this.connectionString = connectionString;
    }

    public String getBasePath() {
        return this.basePath;
    }

    public void setBasePath(String basePath) {
        this.basePath = basePath;
    }

    public int getConnectionTimeoutMillis() {
        return this.connectionTimeoutMillis;
    }

    public void setConnectionTimeoutMillis(int connectionTimeoutMillis) {
        this.connectionTimeoutMillis = connectionTimeoutMillis;
    }

    public int getConnectionRetryCount() {
        return this.connectionRetryCount;
    }

    public void setConnectionRetryCount(int connectionRetryCount) {
        this.connectionRetryCount = connectionRetryCount;
    }

    public int getConntectionRetrySleepMillis() {
        return this.conntectionRetrySleepMillis;
    }

    public void setConntectionRetrySleepMillis(int conntectionRetrySleepMillis) {
        this.conntectionRetrySleepMillis = conntectionRetrySleepMillis;
    }

    public String getServiceName() {
        return this.serviceName;
    }

    public void setServiceName(String serviceName) {
        this.serviceName = serviceName;
    }

    public void configure(Context context) {
        this.serviceName = context.getString("serviceName", "ApexFlume");
        this.connectionString = context.getString("connectionString");
        this.basePath = context.getString("basePath");
        this.connectionTimeoutMillis = context.getInteger("connectionTimeoutMillis", Integer.valueOf(1000));
        this.connectionRetryCount = context.getInteger("connectionRetryCount", Integer.valueOf(10));
        this.conntectionRetrySleepMillis = context.getInteger("connectionRetrySleepMillis", Integer.valueOf(500));
    }

    public void setup(com.datatorrent.api.Context context) {
        ObjectMapper om = new ObjectMapper();
        this.instanceSerializerFactory = new InstanceSerializerFactory(om.reader(), om.writer());
        this.curatorFramework = CuratorFrameworkFactory.builder().connectionTimeoutMs(this.connectionTimeoutMillis).retryPolicy((RetryPolicy)new RetryNTimes(this.connectionRetryCount, this.conntectionRetrySleepMillis)).connectString(this.connectionString).build();
        this.curatorFramework.start();
        this.discovery = this.getDiscovery(this.curatorFramework);
        try {
            this.discovery.start();
        }
        catch (Exception ex) {
            Throwables.propagate((Throwable)ex);
        }
    }

    public void teardown() {
        try {
            this.discovery.close();
        }
        catch (IOException ex) {
            throw new RuntimeException(ex);
        }
        finally {
            this.curatorFramework.close();
            this.curatorFramework = null;
        }
    }

    public class InstanceSerializerFactory {
        private final ObjectReader objectReader;
        private final ObjectWriter objectWriter;

        InstanceSerializerFactory(ObjectReader objectReader, ObjectWriter objectWriter) {
            this.objectReader = objectReader;
            this.objectWriter = objectWriter;
        }

        public <T> InstanceSerializer<T> getInstanceSerializer(TypeReference<ServiceInstance<T>> typeReference) {
            return new JacksonInstanceSerializer<T>(this.objectReader, this.objectWriter, typeReference);
        }

        final class JacksonInstanceSerializer<T>
        implements InstanceSerializer<T> {
            private final TypeReference<ServiceInstance<T>> typeRef;
            private final ObjectWriter objectWriter;
            private final ObjectReader objectReader;

            JacksonInstanceSerializer(ObjectReader objectReader, ObjectWriter objectWriter, TypeReference<ServiceInstance<T>> typeRef) {
                this.objectReader = objectReader;
                this.objectWriter = objectWriter;
                this.typeRef = typeRef;
            }

            public ServiceInstance<T> deserialize(byte[] bytes) throws Exception {
                return (ServiceInstance)this.objectReader.withType(this.typeRef).readValue(bytes);
            }

            public byte[] serialize(ServiceInstance<T> serviceInstance) throws Exception {
                ByteArrayOutputStream out = new ByteArrayOutputStream();
                this.objectWriter.writeValue((OutputStream)out, serviceInstance);
                return out.toByteArray();
            }
        }
    }
}

