package com.netflix.turbine.handler;

import com.netflix.turbine.data.TurbineData;
import com.netflix.turbine.discovery.Instance;
import com.netflix.turbine.monitor.TurbineDataMonitor;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.codehaus.janino.Descriptor;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/turbine-core-1.0.0.jar:com/netflix/turbine/handler/TurbineDataDispatcher.class */
public class TurbineDataDispatcher<K extends TurbineData> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) TurbineDataDispatcher.class);
    private final String name;
    private volatile boolean stopped = false;
    private final ConcurrentHashMap<Instance, ConcurrentHashMap<String, HandlerQueueTuple<K>>> eventHandlersForHosts = new ConcurrentHashMap<>();
    private ConcurrentHashMap<Instance, AtomicInteger> iterationsWithoutHandlers = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, HandlerQueueTuple<K>> handlerTuples = new ConcurrentHashMap<>();

    /* loaded from: input_file:BOOT-INF/lib/turbine-core-1.0.0.jar:com/netflix/turbine/handler/TurbineDataDispatcher$UnitTest.class */
    public static class UnitTest {
        Instance instance = new Instance("test", "cluster", true);
        private static PerformanceCriteria perfCriteria = new PerformanceCriteria() { // from class: com.netflix.turbine.handler.TurbineDataDispatcher.UnitTest.1
            @Override // com.netflix.turbine.handler.PerformanceCriteria
            public boolean isCritical() {
                return false;
            }

            @Override // com.netflix.turbine.handler.PerformanceCriteria
            public int getMaxQueueSize() {
                return 1;
            }

            @Override // com.netflix.turbine.handler.PerformanceCriteria
            public int numThreads() {
                return 1;
            }
        };

        /* loaded from: input_file:BOOT-INF/lib/turbine-core-1.0.0.jar:com/netflix/turbine/handler/TurbineDataDispatcher$UnitTest$TestData.class */
        public class TestData extends TurbineData {
            public TestData(String str, String str2) {
                super(null, str, str2);
            }

            @Override // com.netflix.turbine.data.TurbineData
            public HashMap<String, Long> getNumericAttributes() {
                return null;
            }

            @Override // com.netflix.turbine.data.TurbineData
            public HashMap<String, String> getStringAttributes() {
                return null;
            }

            @Override // com.netflix.turbine.data.TurbineData
            public HashMap<String, Map<String, ? extends Number>> getNestedMapAttributes() {
                return null;
            }
        }

        @Test(expected = Descriptor.JAVA_LANG_RUNTIMEEXCEPTION)
        public void testRegisterDuplicateHandler() throws Exception {
            TurbineDataDispatcher turbineDataDispatcher = new TurbineDataDispatcher("TEST");
            TurbineDataHandler<K> turbineDataHandler = (TurbineDataHandler) Mockito.mock(TurbineDataHandler.class);
            Mockito.when(turbineDataHandler.getName()).thenReturn("h1");
            Mockito.when(turbineDataHandler.getCriteria()).thenReturn(perfCriteria);
            turbineDataDispatcher.registerEventHandler(this.instance, turbineDataHandler);
            turbineDataDispatcher.registerEventHandler(this.instance, turbineDataHandler);
        }

        @Test
        public void testRegsiterAndDeregister() throws Exception {
            List singletonList = Collections.singletonList(new TestData(null, null));
            Mockito.when(((TurbineDataMonitor) Mockito.mock(TurbineDataMonitor.class)).getName()).thenReturn("publisher");
            TurbineDataDispatcher turbineDataDispatcher = new TurbineDataDispatcher("TEST");
            TurbineDataHandler<K> turbineDataHandler = (TurbineDataHandler) Mockito.mock(TurbineDataHandler.class);
            TurbineDataHandler<K> turbineDataHandler2 = (TurbineDataHandler) Mockito.mock(TurbineDataHandler.class);
            Mockito.when(turbineDataHandler.getName()).thenReturn("h1");
            Mockito.when(turbineDataHandler2.getName()).thenReturn("h2");
            Mockito.when(turbineDataHandler.getCriteria()).thenReturn(perfCriteria);
            Mockito.when(turbineDataHandler2.getCriteria()).thenReturn(perfCriteria);
            turbineDataDispatcher.registerEventHandler(this.instance, turbineDataHandler);
            turbineDataDispatcher.registerEventHandler(this.instance, turbineDataHandler2);
            turbineDataDispatcher.pushData(this.instance, singletonList);
            Thread.sleep(1000L);
            ((TurbineDataHandler) Mockito.verify(turbineDataHandler, Mockito.times(1))).handleData(Matchers.anyList());
            turbineDataDispatcher.deregisterEventHandler(turbineDataHandler);
            turbineDataDispatcher.pushData(this.instance, singletonList);
            ((TurbineDataHandler) Mockito.verify(turbineDataHandler, Mockito.times(1))).handleData(Matchers.anyList());
            turbineDataDispatcher.deregisterEventHandler(turbineDataHandler);
            turbineDataDispatcher.deregisterEventHandler(turbineDataHandler2);
            for (int i = 0; i < 5; i++) {
                Assert.assertTrue(turbineDataDispatcher.pushData(this.instance, singletonList));
            }
            ((TurbineDataHandler) Mockito.verify(turbineDataHandler, Mockito.times(1))).handleData(Matchers.anyList());
            for (int i2 = 0; i2 < 4; i2++) {
                Assert.assertFalse(turbineDataDispatcher.pushData(this.instance, singletonList));
            }
            turbineDataDispatcher.stopDispatcher();
        }

        @Test
        public void testEventRejection() throws Exception {
            final List singletonList = Collections.singletonList(new TestData(null, null));
            Mockito.when(((TurbineDataMonitor) Mockito.mock(TurbineDataMonitor.class)).getName()).thenReturn("publisher");
            final TurbineDataDispatcher turbineDataDispatcher = new TurbineDataDispatcher("TEST");
            TurbineDataHandler<K> turbineDataHandler = (TurbineDataHandler) Mockito.mock(TurbineDataHandler.class);
            TurbineDataHandler<K> turbineDataHandler2 = (TurbineDataHandler) Mockito.mock(TurbineDataHandler.class);
            Mockito.when(turbineDataHandler.getName()).thenReturn("h1");
            Mockito.when(turbineDataHandler2.getName()).thenReturn("h2");
            Mockito.when(turbineDataHandler.getCriteria()).thenReturn(perfCriteria);
            Mockito.when(turbineDataHandler2.getCriteria()).thenReturn(perfCriteria);
            turbineDataDispatcher.registerEventHandler(this.instance, turbineDataHandler);
            turbineDataDispatcher.registerEventHandler(this.instance, turbineDataHandler2);
            ((TurbineDataHandler) Mockito.doAnswer(new Answer<Void>() { // from class: com.netflix.turbine.handler.TurbineDataDispatcher.UnitTest.2
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public Void m1343answer(InvocationOnMock invocationOnMock) throws Throwable {
                    Thread.sleep(10L);
                    return null;
                }
            }).when(turbineDataHandler)).handleData(singletonList);
            final AtomicLong atomicLong = new AtomicLong(0L);
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(100);
            for (int i = 0; i < 100; i++) {
                newFixedThreadPool.submit(new Callable<Void>() { // from class: com.netflix.turbine.handler.TurbineDataDispatcher.UnitTest.3
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        while (true) {
                            turbineDataDispatcher.pushData(UnitTest.this.instance, singletonList);
                            atomicLong.incrementAndGet();
                            Thread.sleep(10L);
                        }
                    }
                });
            }
            Thread.sleep(3000L);
            newFixedThreadPool.shutdownNow();
            turbineDataDispatcher.stopDispatcher();
        }
    }

    public TurbineDataDispatcher(String str) {
        this.name = str;
    }

    public String getName() {
        return this.name;
    }

    public void handleHostLost(Instance instance) {
        ConcurrentHashMap<String, HandlerQueueTuple<K>> remove = this.eventHandlersForHosts.remove(instance);
        if (remove != null) {
            Iterator<HandlerQueueTuple<K>> it = remove.values().iterator();
            while (it.hasNext()) {
                it.next().getHandler().handleHostLost(instance);
            }
            remove.clear();
        }
        this.iterationsWithoutHandlers.remove(instance);
    }

    public void registerEventHandler(Instance instance, TurbineDataHandler<K> turbineDataHandler) {
        ConcurrentHashMap<String, HandlerQueueTuple<K>> concurrentHashMap = this.eventHandlersForHosts.get(instance);
        if (concurrentHashMap == null) {
            concurrentHashMap = new ConcurrentHashMap<>();
            ConcurrentHashMap<String, HandlerQueueTuple<K>> putIfAbsent = this.eventHandlersForHosts.putIfAbsent(instance, concurrentHashMap);
            if (putIfAbsent != null) {
                concurrentHashMap = putIfAbsent;
            }
        }
        if (concurrentHashMap.get(turbineDataHandler.getName()) != null) {
            throw new RuntimeException("Handler has already been registered: " + turbineDataHandler.getName() + ", existing handlers: " + concurrentHashMap.keySet());
        }
        HandlerQueueTuple<K> handlerQueueTuple = getHandlerQueueTuple(turbineDataHandler);
        if (handlerQueueTuple.previouslyStopped()) {
            logger.info("Found handler tuple to be stopped: " + turbineDataHandler.getName() + " will not associate with host : " + instance.getHostname());
        } else if (concurrentHashMap.putIfAbsent(turbineDataHandler.getName(), handlerQueueTuple) != null) {
            throw new RuntimeException("Handler has already been registered: " + turbineDataHandler.getName() + ", existing handlers: " + concurrentHashMap.keySet());
        }
    }

    private HandlerQueueTuple<K> getHandlerQueueTuple(TurbineDataHandler<K> turbineDataHandler) {
        HandlerQueueTuple<K> handlerQueueTuple = this.handlerTuples.get(turbineDataHandler.getName());
        if (handlerQueueTuple != null) {
            return handlerQueueTuple;
        }
        HandlerQueueTuple<K> handlerQueueTuple2 = new HandlerQueueTuple<>(turbineDataHandler);
        HandlerQueueTuple<K> putIfAbsent = this.handlerTuples.putIfAbsent(turbineDataHandler.getName(), handlerQueueTuple2);
        if (putIfAbsent != null) {
            return putIfAbsent;
        }
        logger.info("\n\nJust added and starting handler tuple: " + turbineDataHandler.getName());
        try {
            handlerQueueTuple2.start();
            return handlerQueueTuple2;
        } catch (Exception e) {
            logger.error("Caught failure when registering handler");
            deregisterEventHandler(turbineDataHandler);
            throw new RuntimeException(e);
        }
    }

    public void deregisterEventHandler(String str) {
        logger.info("Removing event handler: " + str);
        HandlerQueueTuple<K> handlerQueueTuple = this.handlerTuples.get(str);
        if (handlerQueueTuple != null) {
            handlerQueueTuple.stop();
            HandlerQueueTuple<K> remove = this.handlerTuples.remove(str);
            logger.info(" tuples : " + this.handlerTuples.keySet());
            logger.info("Removed handler queue tuple for handler: " + remove);
        }
        for (Instance instance : this.eventHandlersForHosts.keySet()) {
            HandlerQueueTuple<K> remove2 = this.eventHandlersForHosts.get(instance).remove(str);
            if (remove2 != null) {
                remove2.getHandler().handleHostLost(instance);
            }
        }
    }

    public void deregisterEventHandler(TurbineDataHandler<K> turbineDataHandler) {
        deregisterEventHandler(turbineDataHandler.getName());
    }

    public TurbineDataHandler<K> findHandlerForHost(Instance instance, String str) {
        HandlerQueueTuple<K> handlerQueueTuple;
        ConcurrentHashMap<String, HandlerQueueTuple<K>> concurrentHashMap = this.eventHandlersForHosts.get(instance);
        if (concurrentHashMap == null || (handlerQueueTuple = concurrentHashMap.get(str)) == null) {
            return null;
        }
        return handlerQueueTuple.getHandler();
    }

    public boolean pushData(Instance instance, Collection<K> collection) {
        ConcurrentHashMap<String, HandlerQueueTuple<K>> concurrentHashMap;
        if (this.stopped || (concurrentHashMap = this.eventHandlersForHosts.get(instance)) == null) {
            return false;
        }
        Iterator<HandlerQueueTuple<K>> it = concurrentHashMap.values().iterator();
        while (it.hasNext()) {
            it.next().pushData(collection);
        }
        AtomicInteger iterationWithoutHandlerCount = getIterationWithoutHandlerCount(instance);
        if (concurrentHashMap.size() != 0) {
            iterationWithoutHandlerCount.set(0);
            return true;
        }
        iterationWithoutHandlerCount.incrementAndGet();
        if (iterationWithoutHandlerCount.get() <= 5) {
            return true;
        }
        logger.info("We no longer have handlers to dispatch to");
        return false;
    }

    public boolean pushData(Instance instance, K k) {
        ConcurrentHashMap<String, HandlerQueueTuple<K>> concurrentHashMap;
        if (this.stopped || (concurrentHashMap = this.eventHandlersForHosts.get(instance)) == null) {
            return false;
        }
        Iterator<HandlerQueueTuple<K>> it = concurrentHashMap.values().iterator();
        while (it.hasNext()) {
            it.next().pushData((HandlerQueueTuple<K>) k);
        }
        AtomicInteger iterationWithoutHandlerCount = getIterationWithoutHandlerCount(instance);
        if (concurrentHashMap.size() != 0) {
            iterationWithoutHandlerCount.set(0);
            return true;
        }
        iterationWithoutHandlerCount.incrementAndGet();
        if (iterationWithoutHandlerCount.get() <= 5) {
            return true;
        }
        logger.info("We no longer have handlers to dispatch to");
        return false;
    }

    public void pushData(Collection<String> collection, Collection<K> collection2) {
        if (this.stopped) {
            return;
        }
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            HandlerQueueTuple<K> handlerQueueTuple = this.handlerTuples.get(it.next());
            if (handlerQueueTuple != null) {
                handlerQueueTuple.pushData(collection2);
            }
        }
    }

    public void pushData(Collection<String> collection, K k) {
        if (this.stopped) {
            logger.info("Dispatcher has been stopped, will not deliver data");
            return;
        }
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            HandlerQueueTuple<K> handlerQueueTuple = this.handlerTuples.get(it.next());
            if (handlerQueueTuple != null) {
                handlerQueueTuple.pushData((HandlerQueueTuple<K>) k);
            }
        }
    }

    public void stopDispatcher() {
        this.stopped = true;
        for (HandlerQueueTuple<K> handlerQueueTuple : this.handlerTuples.values()) {
            handlerQueueTuple.stop();
            HandlerQueueTuple<K> remove = this.handlerTuples.remove(handlerQueueTuple.getHandler().getName());
            logger.info(" tuples : " + this.handlerTuples.keySet());
            logger.info("Key: " + remove);
        }
        this.handlerTuples.clear();
        for (Instance instance : this.eventHandlersForHosts.keySet()) {
            handleHostLost(instance);
            ConcurrentHashMap<String, HandlerQueueTuple<K>> concurrentHashMap = this.eventHandlersForHosts.get(instance);
            if (concurrentHashMap != null) {
                concurrentHashMap.clear();
            }
        }
        this.eventHandlersForHosts.clear();
    }

    private AtomicInteger getIterationWithoutHandlerCount(Instance instance) {
        AtomicInteger atomicInteger = this.iterationsWithoutHandlers.get(instance);
        if (atomicInteger == null) {
            atomicInteger = new AtomicInteger(0);
            AtomicInteger putIfAbsent = this.iterationsWithoutHandlers.putIfAbsent(instance, atomicInteger);
            if (putIfAbsent != null) {
                atomicInteger = putIfAbsent;
            }
        }
        return atomicInteger;
    }

    public boolean running() {
        Iterator<HandlerQueueTuple<K>> it = this.handlerTuples.values().iterator();
        while (it.hasNext()) {
            if (it.next().running()) {
                return true;
            }
        }
        return false;
    }

    public Set<String> getAllHandlerNames() {
        return this.handlerTuples.keySet();
    }
}
