/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.kafka;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.apex.malhar.kafka.KafkaOperatorTestBase;
import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
import org.apache.apex.malhar.kafka.KafkaTestProducer;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class KafkaInputOperatorTest
extends KafkaOperatorTestBase {
    private int totalBrokers = 0;
    private String partition = null;
    private static final Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
    private static List<String> tupleCollection = new LinkedList<String>();
    private static CountDownLatch latch;
    private static boolean hasFailure;
    private static int failureTrigger;
    private static int k;

    @Parameterized.Parameters(name="multi-cluster: {0}, multi-partition: {1}, partition: {2}")
    public static Collection<Object[]> testScenario() {
        return Arrays.asList({true, false, "one_to_one"}, {true, false, "one_to_many"}, {true, true, "one_to_one"}, {true, true, "one_to_many"}, {false, true, "one_to_one"}, {false, true, "one_to_many"}, {false, false, "one_to_one"}, {false, false, "one_to_many"});
    }

    public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition, String partition) {
        this.hasMultiCluster = hasMultiCluster;
        this.hasMultiPartition = hasMultiPartition;
        int cluster = 1 + (hasMultiCluster ? 1 : 0);
        this.totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster;
        this.partition = partition;
    }

    @Test
    public void testPartitionableInputOperator() throws Exception {
        hasFailure = false;
        this.testInputOperator(false);
    }

    @Test
    public void testPartitionableInputOperatorWithFailure() throws Exception {
        hasFailure = true;
        this.testInputOperator(true);
    }

    public void testInputOperator(boolean hasFailure) throws Exception {
        latch = new CountDownLatch(this.totalBrokers);
        int totalCount = 10000;
        KafkaTestProducer p = new KafkaTestProducer("testtopic", this.hasMultiPartition, this.hasMultiCluster);
        p.setSendCount(totalCount);
        Thread t = new Thread(p);
        t.start();
        LocalMode lma = LocalMode.newInstance();
        DAG dag = lma.getDAG();
        KafkaSinglePortInputOperator node = (KafkaSinglePortInputOperator)dag.addOperator("Kafka input", KafkaSinglePortInputOperator.class);
        node.setInitialPartitionCount(1);
        node.setTopics("testtopic");
        node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
        node.setClusters(this.getClusterConfig());
        node.setStrategy(this.partition);
        CollectorModule collector = (CollectorModule)dag.addOperator("TestMessageCollector", (Operator)new CollectorModule());
        dag.addStream("Kafka message", (Operator.OutputPort)node.outputPort, (Operator.InputPort)collector.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
        LocalMode.Controller lc = lma.getController();
        lc.setHeartbeatMonitoringEnabled(false);
        if (hasFailure) {
            this.setupHasFailureTest(node, dag);
        }
        lc.runAsync();
        boolean notTimeout = latch.await(40000L, TimeUnit.MILLISECONDS);
        Assert.assertTrue((String)("TIMEOUT: 40s Collected " + tupleCollection), (boolean)notTimeout);
        Assert.assertEquals((String)"Tuple count", (long)totalCount, (long)tupleCollection.size());
        logger.debug(String.format("Number of emitted tuples: %d", tupleCollection.size()));
        t.join();
        p.close();
        lc.shutdown();
    }

    private void setupHasFailureTest(KafkaSinglePortInputOperator operator, DAG dag) {
        operator.setHoldingBufferSize(5000);
        dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, (Object)1);
        operator.setMaxTuplesPerWindow(500);
    }

    private String getClusterConfig() {
        String l = "localhost:";
        return l + TEST_KAFKA_BROKER_PORT[0][0] + (this.hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") + (this.hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") + (this.hasMultiCluster && this.hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : "");
    }

    static {
        hasFailure = false;
        failureTrigger = 3000;
        k = 0;
    }

    public static class CollectorInputPort
    extends DefaultInputPort<byte[]> {
        public void process(byte[] bt) {
            String tuple = new String(bt);
            if (hasFailure && k++ == failureTrigger) {
                hasFailure = false;
                throw new RuntimeException();
            }
            if (tuple.equals("END_TUPLE")) {
                if (latch != null) {
                    latch.countDown();
                }
                return;
            }
            tupleCollection.add(tuple);
        }

        public void setConnected(boolean flag) {
            if (flag) {
                tupleCollection.clear();
            }
        }
    }

    public static class CollectorModule
    extends BaseOperator {
        public final transient CollectorInputPort inputPort = new CollectorInputPort();
    }
}

