package com.netflix.turbine.handler;

import com.netflix.turbine.data.AggDataFromCluster;
import com.netflix.turbine.data.ConcurrentEventQueue;
import com.netflix.turbine.data.DataFromSingleInstance;
import com.netflix.turbine.data.EventQueue;
import com.netflix.turbine.data.StatsRollingNumber;
import com.netflix.turbine.data.TurbineData;
import com.netflix.turbine.discovery.Instance;
import com.netflix.turbine.utils.WorkerThread;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.backoff.ExponentialBackOff;

/* loaded from: input_file:BOOT-INF/lib/turbine-core-1.0.0.jar:com/netflix/turbine/handler/HandlerQueueTuple.class */
public class HandlerQueueTuple<K extends TurbineData> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) AggDataFromCluster.class);
    private final TurbineDataHandler<K> eventHandler;
    private final EventQueue<K> queue;
    private final List<WorkerThread> workerThreads;
    private final int numThreads;
    private volatile boolean stopped;
    private StatsRollingNumber counter = new StatsRollingNumber(10000, 10);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/turbine-core-1.0.0.jar:com/netflix/turbine/handler/HandlerQueueTuple$PerHandlerDispatcher.class */
    public class PerHandlerDispatcher implements WorkerThread.Worker {
        private final EventQueue<K> queue;
        private final TurbineDataHandler<K> eventHandler;

        private PerHandlerDispatcher(HandlerQueueTuple handlerQueueTuple, EventQueue<K> eventQueue, TurbineDataHandler<K> turbineDataHandler) {
            this(eventQueue, turbineDataHandler, 1);
        }

        private PerHandlerDispatcher(EventQueue<K> eventQueue, TurbineDataHandler<K> turbineDataHandler, int i) {
            this.queue = eventQueue;
            this.eventHandler = turbineDataHandler;
        }

        @Override // com.netflix.turbine.utils.WorkerThread.Worker
        public void init() throws Exception {
            HandlerQueueTuple.logger.info("Per handler dispacher started for: " + this.eventHandler.getName());
        }

        /* JADX WARN: Code restructure failed: missing block: B:14:0x0051, code lost:
        
            r8 = move-exception;
         */
        /* JADX WARN: Code restructure failed: missing block: B:16:0x0061, code lost:
        
            if (r4.eventHandler.getCriteria().isCritical() != false) goto L15;
         */
        /* JADX WARN: Code restructure failed: missing block: B:17:0x0064, code lost:
        
            com.netflix.turbine.handler.HandlerQueueTuple.logger.warn("Could not publish event to event handler for " + r4.eventHandler.getName(), (java.lang.Throwable) r8);
         */
        /* JADX WARN: Code restructure failed: missing block: B:18:0x0089, code lost:
        
            return;
         */
        /* JADX WARN: Code restructure failed: missing block: B:19:?, code lost:
        
            return;
         */
        @Override // com.netflix.turbine.utils.WorkerThread.Worker
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void doWork() throws java.lang.Exception {
            /*
                r4 = this;
                java.util.ArrayList r0 = new java.util.ArrayList
                r1 = r0
                r1.<init>()
                r5 = r0
                r0 = 0
                r6 = r0
                r0 = 0
                r7 = r0
            Lc:
                r0 = r4
                com.netflix.turbine.data.EventQueue<K extends com.netflix.turbine.data.TurbineData> r0 = r0.queue
                java.lang.Object r0 = r0.readEvent()
                com.netflix.turbine.data.TurbineData r0 = (com.netflix.turbine.data.TurbineData) r0
                r8 = r0
                r0 = r8
                if (r0 != 0) goto L33
                int r6 = r6 + 1
                r0 = r6
                r1 = 100
                if (r0 <= r1) goto L40
                r0 = 100
                java.lang.Thread.sleep(r0)
                r0 = 0
                r6 = r0
                goto L40
            L33:
                r0 = r5
                r1 = r8
                boolean r0 = r0.add(r1)
                r0 = 0
                r6 = r0
                r0 = 1
                r7 = r0
            L40:
                r0 = r7
                if (r0 == 0) goto Lc
                r0 = r4
                com.netflix.turbine.handler.TurbineDataHandler<K extends com.netflix.turbine.data.TurbineData> r0 = r0.eventHandler     // Catch: java.lang.Exception -> L51
                r1 = r5
                r0.handleData(r1)     // Catch: java.lang.Exception -> L51
                goto L89
            L51:
                r8 = move-exception
                r0 = r4
                com.netflix.turbine.handler.TurbineDataHandler<K extends com.netflix.turbine.data.TurbineData> r0 = r0.eventHandler
                com.netflix.turbine.handler.PerformanceCriteria r0 = r0.getCriteria()
                boolean r0 = r0.isCritical()
                if (r0 == 0) goto L89
                org.slf4j.Logger r0 = com.netflix.turbine.handler.HandlerQueueTuple.access$100()
                java.lang.StringBuilder r1 = new java.lang.StringBuilder
                r2 = r1
                r2.<init>()
                java.lang.String r2 = "Could not publish event to event handler for "
                java.lang.StringBuilder r1 = r1.append(r2)
                r2 = r4
                com.netflix.turbine.handler.TurbineDataHandler<K extends com.netflix.turbine.data.TurbineData> r2 = r2.eventHandler
                java.lang.String r2 = r2.getName()
                java.lang.StringBuilder r1 = r1.append(r2)
                java.lang.String r1 = r1.toString()
                r2 = r8
                r0.warn(r1, r2)
            L89:
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: com.netflix.turbine.handler.HandlerQueueTuple.PerHandlerDispatcher.doWork():void");
        }

        @Override // com.netflix.turbine.utils.WorkerThread.Worker
        public void cleanup() throws Exception {
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/turbine-core-1.0.0.jar:com/netflix/turbine/handler/HandlerQueueTuple$UnitTest.class */
    public static class UnitTest {
        @Test
        public void testProcessWithMultipleThreads() throws Exception {
            final AtomicInteger atomicInteger = new AtomicInteger(0);
            TurbineDataHandler<TurbineData> turbineDataHandler = new TurbineDataHandler<TurbineData>() { // from class: com.netflix.turbine.handler.HandlerQueueTuple.UnitTest.1
                @Override // com.netflix.turbine.handler.TurbineDataHandler
                public String getName() {
                    return "testHandler";
                }

                @Override // com.netflix.turbine.handler.TurbineDataHandler
                public void handleData(Collection<TurbineData> collection) {
                    atomicInteger.addAndGet(collection.size());
                }

                @Override // com.netflix.turbine.handler.TurbineDataHandler
                public void handleHostLost(Instance instance) {
                }

                @Override // com.netflix.turbine.handler.TurbineDataHandler
                public PerformanceCriteria getCriteria() {
                    return new PerformanceCriteria() { // from class: com.netflix.turbine.handler.HandlerQueueTuple.UnitTest.1.1
                        @Override // com.netflix.turbine.handler.PerformanceCriteria
                        public boolean isCritical() {
                            return false;
                        }

                        @Override // com.netflix.turbine.handler.PerformanceCriteria
                        public int getMaxQueueSize() {
                            return 10000;
                        }

                        @Override // com.netflix.turbine.handler.PerformanceCriteria
                        public int numThreads() {
                            return 1;
                        }
                    };
                }
            };
            final HashMap hashMap = new HashMap();
            final Instance instance = new Instance("host", "cluster", true);
            final HandlerQueueTuple handlerQueueTuple = new HandlerQueueTuple(turbineDataHandler);
            handlerQueueTuple.start();
            final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < 10; i++) {
                arrayList.add(newFixedThreadPool.submit(new Callable<Integer>() { // from class: com.netflix.turbine.handler.HandlerQueueTuple.UnitTest.2
                    final AtomicInteger count = new AtomicInteger(0);
                    final Random random = new Random();

                    /* JADX WARN: Can't rename method to resolve collision */
                    /* JADX WARN: Multi-variable type inference failed */
                    @Override // java.util.concurrent.Callable
                    public Integer call() throws Exception {
                        while (!atomicBoolean.get()) {
                            Collection<TurbineData> randomData = getRandomData();
                            handlerQueueTuple.pushData(randomData);
                            this.count.addAndGet(randomData.size());
                            Thread.sleep(50L);
                        }
                        return Integer.valueOf(this.count.get());
                    }

                    private Collection<TurbineData> getRandomData() {
                        int nextInt = this.random.nextInt(10);
                        ArrayList arrayList2 = new ArrayList();
                        for (int i2 = 0; i2 < nextInt; i2++) {
                            arrayList2.add(new DataFromSingleInstance(null, "type", "name", instance, hashMap, 0L));
                        }
                        return arrayList2;
                    }
                }));
            }
            Thread.sleep(3000L);
            atomicBoolean.set(true);
            int i2 = 0;
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                i2 += ((Integer) ((Future) it.next()).get()).intValue();
            }
            newFixedThreadPool.shutdownNow();
            Thread.sleep(ExponentialBackOff.DEFAULT_INITIAL_INTERVAL);
            handlerQueueTuple.stop();
            Thread.sleep(1000L);
            Assert.assertTrue(i2 == atomicInteger.get());
        }
    }

    public HandlerQueueTuple(TurbineDataHandler<K> turbineDataHandler) {
        this.stopped = false;
        this.eventHandler = turbineDataHandler;
        this.queue = new ConcurrentEventQueue(turbineDataHandler.getCriteria().getMaxQueueSize());
        this.numThreads = turbineDataHandler.getCriteria().numThreads();
        this.workerThreads = new ArrayList(this.numThreads);
        this.stopped = false;
    }

    public void start() throws Exception {
        if (this.stopped) {
            logger.info("\n\nTuple already stopped, will not start again, need to create new tuple");
            return;
        }
        for (int i = 0; i < this.numThreads; i++) {
            WorkerThread workerThread = new WorkerThread(new PerHandlerDispatcher(this.queue, this.eventHandler, this.numThreads), -1, false);
            this.workerThreads.add(workerThread);
            workerThread.start();
        }
    }

    public void stop() {
        if (this.stopped) {
            return;
        }
        this.stopped = true;
        Iterator<WorkerThread> it = this.workerThreads.iterator();
        while (it.hasNext()) {
            it.next().stopAndBlock();
        }
        this.workerThreads.clear();
        logger.info("\n\nRemoving tuple for : " + this.eventHandler.getName() + " tuple running: " + running());
    }

    public boolean previouslyStopped() {
        return this.stopped;
    }

    public boolean running() {
        Iterator<WorkerThread> it = this.workerThreads.iterator();
        while (it.hasNext()) {
            if (it.next().isRunning()) {
                return true;
            }
        }
        return false;
    }

    private boolean isCritical() {
        return this.eventHandler.getCriteria().isCritical();
    }

    public TurbineDataHandler<K> getHandler() {
        return this.eventHandler;
    }

    public EventQueue<K> getQueue() {
        return this.queue;
    }

    public void pushData(Collection<K> collection) {
        if (this.stopped) {
            return;
        }
        Iterator<K> it = collection.iterator();
        while (it.hasNext()) {
            pushData((HandlerQueueTuple<K>) it.next());
        }
    }

    public void pushData(K k) {
        if (this.stopped) {
            return;
        }
        boolean writeEvent = this.queue.writeEvent(k);
        if (isCritical()) {
            if (writeEvent) {
                this.counter.increment(StatsRollingNumber.Type.EVENT_PROCESSED);
            } else {
                this.counter.increment(StatsRollingNumber.Type.EVENT_DISCARDED);
            }
        }
    }

    public String toString() {
        return "HandlerQueueTuple [eventHandler=" + this.eventHandler + "]";
    }
}
