/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.streams.test.OutputVerifier;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TopologyTestDriverTest {
    private static final String SOURCE_TOPIC_1 = "source-topic-1";
    private static final String SOURCE_TOPIC_2 = "source-topic-2";
    private static final String SINK_TOPIC_1 = "sink-topic-1";
    private static final String SINK_TOPIC_2 = "sink-topic-2";
    private final ConsumerRecordFactory<byte[], byte[]> consumerRecordFactory = new ConsumerRecordFactory((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
    private final byte[] key1 = new byte[0];
    private final byte[] value1 = new byte[0];
    private final long timestamp1 = 42L;
    private final ConsumerRecord<byte[], byte[]> consumerRecord1 = this.consumerRecordFactory.create("source-topic-1", (Object)this.key1, (Object)this.value1, 42L);
    private final byte[] key2 = new byte[0];
    private final byte[] value2 = new byte[0];
    private final long timestamp2 = 43L;
    private final ConsumerRecord<byte[], byte[]> consumerRecord2 = this.consumerRecordFactory.create("source-topic-2", (Object)this.key2, (Object)this.value2, 43L);
    private TopologyTestDriver testDriver;
    private final Properties config = new Properties(){
        {
            this.put("application.id", "test-TopologyTestDriver");
            this.put("bootstrap.servers", "dummy:1234");
            this.put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        }
    };
    private KeyValueStore<String, Long> store;
    private StringDeserializer stringDeserializer = new StringDeserializer();
    private LongDeserializer longDeserializer = new LongDeserializer();
    private ConsumerRecordFactory<String, Long> recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new LongSerializer());
    private final boolean eosEnabled;
    private final List<MockProcessor> mockProcessors = new ArrayList<MockProcessor>();

    @Parameterized.Parameters(name="Eos enabled = {0}")
    public static Collection<Object[]> data() {
        ArrayList<Object[]> values = new ArrayList<Object[]>();
        for (boolean eosEnabled : Arrays.asList(true, false)) {
            values.add(new Object[]{eosEnabled});
        }
        return values;
    }

    public TopologyTestDriverTest(boolean eosEnabled) {
        this.eosEnabled = eosEnabled;
        if (eosEnabled) {
            this.config.put("processing.guarantee", "exactly_once");
        }
    }

    @After
    public void tearDown() {
        if (this.testDriver != null) {
            this.testDriver.close();
        }
    }

    private Topology setupSourceSinkTopology() {
        Topology topology = new Topology();
        String sourceName = "source";
        topology.addSource("source", new String[]{SOURCE_TOPIC_1});
        topology.addSink("sink", SINK_TOPIC_1, new String[]{"source"});
        return topology;
    }

    private Topology setupTopologyWithTwoSubtopologies() {
        Topology topology = new Topology();
        String sourceName1 = "source-1";
        String sourceName2 = "source-2";
        topology.addSource("source-1", new String[]{SOURCE_TOPIC_1});
        topology.addSink("sink-1", SINK_TOPIC_1, new String[]{"source-1"});
        topology.addSource("source-2", new String[]{SINK_TOPIC_1});
        topology.addSink("sink-2", SINK_TOPIC_2, new String[]{"source-2"});
        return topology;
    }

    private Topology setupSingleProcessorTopology() {
        return this.setupSingleProcessorTopology(-1L, null, null);
    }

    private Topology setupSingleProcessorTopology(long punctuationIntervalMs, PunctuationType punctuationType, Punctuator callback) {
        Set<Object> punctuations = punctuationIntervalMs > 0L && punctuationType != null && callback != null ? Collections.singleton(new Punctuation(punctuationIntervalMs, punctuationType, callback)) : Collections.emptySet();
        Topology topology = new Topology();
        String sourceName = "source";
        topology.addSource("source", new String[]{SOURCE_TOPIC_1});
        topology.addProcessor("processor", (ProcessorSupplier)new MockProcessorSupplier(punctuations), new String[]{"source"});
        return topology;
    }

    private Topology setupMultipleSourceTopology(String ... sourceTopicNames) {
        Topology topology = new Topology();
        String[] processorNames = new String[sourceTopicNames.length];
        int i = 0;
        for (String sourceTopicName : sourceTopicNames) {
            String sourceName = sourceTopicName + "-source";
            String processorName = sourceTopicName + "-processor";
            topology.addSource(sourceName, new String[]{sourceTopicName});
            processorNames[i++] = processorName;
            topology.addProcessor(processorName, (ProcessorSupplier)new MockProcessorSupplier(), new String[]{sourceName});
        }
        topology.addSink("sink-topic", SINK_TOPIC_1, processorNames);
        return topology;
    }

    private Topology setupGlobalStoreTopology(String ... sourceTopicNames) {
        if (sourceTopicNames.length == 0) {
            throw new IllegalArgumentException("sourceTopicNames cannot be empty");
        }
        Topology topology = new Topology();
        for (String sourceTopicName : sourceTopicNames) {
            topology.addGlobalStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)(sourceTopicName + "-globalStore")), null, null).withLoggingDisabled(), sourceTopicName, null, null, sourceTopicName, sourceTopicName + "-processor", (ProcessorSupplier)new MockProcessorSupplier());
        }
        return topology;
    }

    @Test
    public void shouldInitProcessor() {
        this.testDriver = new TopologyTestDriver(this.setupSingleProcessorTopology(), this.config);
        Assert.assertTrue((boolean)this.mockProcessors.get(0).initialized);
    }

    @Test
    public void shouldCloseProcessor() {
        this.testDriver = new TopologyTestDriver(this.setupSingleProcessorTopology(), this.config);
        this.testDriver.close();
        Assert.assertTrue((boolean)this.mockProcessors.get(0).closed);
        this.testDriver = null;
    }

    @Test
    public void shouldThrowForUnknownTopic() {
        String unknownTopic = "unknownTopic";
        ConsumerRecordFactory consumerRecordFactory = new ConsumerRecordFactory("unknownTopic", (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
        this.testDriver = new TopologyTestDriver(new Topology(), this.config);
        try {
            this.testDriver.pipeInput(consumerRecordFactory.create((Object)null));
            Assert.fail((String)"Should have throw IllegalArgumentException");
        }
        catch (IllegalArgumentException exception) {
            Assert.assertEquals((Object)"Unknown topic: unknownTopic", (Object)exception.getMessage());
        }
    }

    @Test
    public void shouldProcessRecordForTopic() {
        this.testDriver = new TopologyTestDriver(this.setupSourceSinkTopology(), this.config);
        this.testDriver.pipeInput(this.consumerRecord1);
        ProducerRecord outputRecord = this.testDriver.readOutput(SINK_TOPIC_1);
        Assert.assertEquals((Object)this.key1, (Object)outputRecord.key());
        Assert.assertEquals((Object)this.value1, (Object)outputRecord.value());
        Assert.assertEquals((Object)SINK_TOPIC_1, (Object)outputRecord.topic());
    }

    @Test
    public void shouldSetRecordMetadata() {
        this.testDriver = new TopologyTestDriver(this.setupSingleProcessorTopology(), this.config);
        this.testDriver.pipeInput(this.consumerRecord1);
        List processedRecords = this.mockProcessors.get(0).processedRecords;
        Assert.assertEquals((long)1L, (long)processedRecords.size());
        Record record = (Record)processedRecords.get(0);
        Record expectedResult = new Record(this.consumerRecord1);
        expectedResult.offset = 0L;
        Assert.assertThat((Object)record, (Matcher)CoreMatchers.equalTo((Object)expectedResult));
    }

    @Test
    public void shouldSendRecordViaCorrectSourceTopic() {
        this.testDriver = new TopologyTestDriver(this.setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2), this.config);
        List processedRecords1 = this.mockProcessors.get(0).processedRecords;
        List processedRecords2 = this.mockProcessors.get(1).processedRecords;
        this.testDriver.pipeInput(this.consumerRecord1);
        Assert.assertEquals((long)1L, (long)processedRecords1.size());
        Assert.assertEquals((long)0L, (long)processedRecords2.size());
        Record record = (Record)processedRecords1.get(0);
        Record expectedResult = new Record(this.consumerRecord1);
        expectedResult.offset = 0L;
        Assert.assertThat((Object)record, (Matcher)CoreMatchers.equalTo((Object)expectedResult));
        this.testDriver.pipeInput(this.consumerRecord2);
        Assert.assertEquals((long)1L, (long)processedRecords1.size());
        Assert.assertEquals((long)1L, (long)processedRecords2.size());
        record = (Record)processedRecords2.get(0);
        expectedResult = new Record(this.consumerRecord2);
        expectedResult.offset = 0L;
        Assert.assertThat((Object)record, (Matcher)CoreMatchers.equalTo((Object)expectedResult));
    }

    @Test
    public void shouldUseSourceSpecificDeserializers() {
        Topology topology = new Topology();
        String sourceName1 = "source-1";
        String sourceName2 = "source-2";
        String processor = "processor";
        topology.addSource("source-1", Serdes.Long().deserializer(), Serdes.String().deserializer(), new String[]{SOURCE_TOPIC_1});
        topology.addSource("source-2", Serdes.Integer().deserializer(), Serdes.Double().deserializer(), new String[]{SOURCE_TOPIC_2});
        topology.addProcessor("processor", (ProcessorSupplier)new MockProcessorSupplier(), new String[]{"source-1", "source-2"});
        topology.addSink("sink", SINK_TOPIC_1, new Serializer(){

            public byte[] serialize(String topic, Object data) {
                if (data instanceof Long) {
                    return Serdes.Long().serializer().serialize(topic, (Object)((Long)data));
                }
                return Serdes.Integer().serializer().serialize(topic, (Object)((Integer)data));
            }

            public void close() {
            }

            public void configure(Map configs, boolean isKey) {
            }
        }, new Serializer(){

            public byte[] serialize(String topic, Object data) {
                if (data instanceof String) {
                    return Serdes.String().serializer().serialize(topic, (Object)((String)data));
                }
                return Serdes.Double().serializer().serialize(topic, (Object)((Double)data));
            }

            public void close() {
            }

            public void configure(Map configs, boolean isKey) {
            }
        }, new String[]{"processor"});
        this.testDriver = new TopologyTestDriver(topology, this.config);
        ConsumerRecordFactory source1Factory = new ConsumerRecordFactory(SOURCE_TOPIC_1, Serdes.Long().serializer(), Serdes.String().serializer());
        ConsumerRecordFactory source2Factory = new ConsumerRecordFactory(SOURCE_TOPIC_2, Serdes.Integer().serializer(), Serdes.Double().serializer());
        Long source1Key = 42L;
        String source1Value = "anyString";
        Integer source2Key = 73;
        Double source2Value = 3.14;
        ConsumerRecord consumerRecord1 = source1Factory.create((Object)source1Key, (Object)"anyString");
        ConsumerRecord consumerRecord2 = source2Factory.create((Object)source2Key, (Object)source2Value);
        this.testDriver.pipeInput(consumerRecord1);
        OutputVerifier.compareKeyValue((ProducerRecord)this.testDriver.readOutput(SINK_TOPIC_1, Serdes.Long().deserializer(), Serdes.String().deserializer()), (Object)source1Key, (Object)"anyString");
        this.testDriver.pipeInput(consumerRecord2);
        OutputVerifier.compareKeyValue((ProducerRecord)this.testDriver.readOutput(SINK_TOPIC_1, Serdes.Integer().deserializer(), Serdes.Double().deserializer()), (Object)source2Key, (Object)source2Value);
    }

    @Test
    public void shouldUseSinkSpecificSerializers() {
        Topology topology = new Topology();
        String sourceName1 = "source-1";
        String sourceName2 = "source-2";
        topology.addSource("source-1", Serdes.Long().deserializer(), Serdes.String().deserializer(), new String[]{SOURCE_TOPIC_1});
        topology.addSource("source-2", Serdes.Integer().deserializer(), Serdes.Double().deserializer(), new String[]{SOURCE_TOPIC_2});
        topology.addSink("sink-1", SINK_TOPIC_1, Serdes.Long().serializer(), Serdes.String().serializer(), new String[]{"source-1"});
        topology.addSink("sink-2", SINK_TOPIC_2, Serdes.Integer().serializer(), Serdes.Double().serializer(), new String[]{"source-2"});
        this.testDriver = new TopologyTestDriver(topology, this.config);
        ConsumerRecordFactory source1Factory = new ConsumerRecordFactory(SOURCE_TOPIC_1, Serdes.Long().serializer(), Serdes.String().serializer());
        ConsumerRecordFactory source2Factory = new ConsumerRecordFactory(SOURCE_TOPIC_2, Serdes.Integer().serializer(), Serdes.Double().serializer());
        Long source1Key = 42L;
        String source1Value = "anyString";
        Integer source2Key = 73;
        Double source2Value = 3.14;
        ConsumerRecord consumerRecord1 = source1Factory.create((Object)source1Key, (Object)"anyString");
        ConsumerRecord consumerRecord2 = source2Factory.create((Object)source2Key, (Object)source2Value);
        this.testDriver.pipeInput(consumerRecord1);
        OutputVerifier.compareKeyValue((ProducerRecord)this.testDriver.readOutput(SINK_TOPIC_1, Serdes.Long().deserializer(), Serdes.String().deserializer()), (Object)source1Key, (Object)"anyString");
        this.testDriver.pipeInput(consumerRecord2);
        OutputVerifier.compareKeyValue((ProducerRecord)this.testDriver.readOutput(SINK_TOPIC_2, Serdes.Integer().deserializer(), Serdes.Double().deserializer()), (Object)source2Key, (Object)source2Value);
    }

    @Test
    public void shouldProcessConsumerRecordList() {
        this.testDriver = new TopologyTestDriver(this.setupMultipleSourceTopology(SOURCE_TOPIC_1, SOURCE_TOPIC_2), this.config);
        List processedRecords1 = this.mockProcessors.get(0).processedRecords;
        List processedRecords2 = this.mockProcessors.get(1).processedRecords;
        ArrayList<ConsumerRecord<byte[], byte[]>> testRecords = new ArrayList<ConsumerRecord<byte[], byte[]>>(2);
        testRecords.add(this.consumerRecord1);
        testRecords.add(this.consumerRecord2);
        this.testDriver.pipeInput(testRecords);
        Assert.assertEquals((long)1L, (long)processedRecords1.size());
        Assert.assertEquals((long)1L, (long)processedRecords2.size());
        Record record = (Record)processedRecords1.get(0);
        Record expectedResult = new Record(this.consumerRecord1);
        expectedResult.offset = 0L;
        Assert.assertThat((Object)record, (Matcher)CoreMatchers.equalTo((Object)expectedResult));
        record = (Record)processedRecords2.get(0);
        expectedResult = new Record(this.consumerRecord2);
        expectedResult.offset = 0L;
        Assert.assertThat((Object)record, (Matcher)CoreMatchers.equalTo((Object)expectedResult));
    }

    @Test
    public void shouldForwardRecordsFromSubtopologyToSubtopology() {
        this.testDriver = new TopologyTestDriver(this.setupTopologyWithTwoSubtopologies(), this.config);
        this.testDriver.pipeInput(this.consumerRecord1);
        ProducerRecord outputRecord = this.testDriver.readOutput(SINK_TOPIC_1);
        Assert.assertEquals((Object)this.key1, (Object)outputRecord.key());
        Assert.assertEquals((Object)this.value1, (Object)outputRecord.value());
        Assert.assertEquals((Object)SINK_TOPIC_1, (Object)outputRecord.topic());
        outputRecord = this.testDriver.readOutput(SINK_TOPIC_2);
        Assert.assertEquals((Object)this.key1, (Object)outputRecord.key());
        Assert.assertEquals((Object)this.value1, (Object)outputRecord.value());
        Assert.assertEquals((Object)SINK_TOPIC_2, (Object)outputRecord.topic());
    }

    @Test
    public void shouldPopulateGlobalStore() {
        this.testDriver = new TopologyTestDriver(this.setupGlobalStoreTopology(SOURCE_TOPIC_1), this.config);
        KeyValueStore globalStore = this.testDriver.getKeyValueStore("source-topic-1-globalStore");
        Assert.assertNotNull((Object)globalStore);
        Assert.assertNotNull(this.testDriver.getAllStateStores().get("source-topic-1-globalStore"));
        this.testDriver.pipeInput(this.consumerRecord1);
        List processedRecords = this.mockProcessors.get(0).processedRecords;
        Assert.assertEquals((long)1L, (long)processedRecords.size());
        Record record = (Record)processedRecords.get(0);
        Record expectedResult = new Record(this.consumerRecord1);
        expectedResult.offset = 0L;
        Assert.assertThat((Object)record, (Matcher)CoreMatchers.equalTo((Object)expectedResult));
    }

    @Test
    public void shouldPunctuateOnStreamsTime() {
        MockPunctuator mockPunctuator = new MockPunctuator();
        this.testDriver = new TopologyTestDriver(this.setupSingleProcessorTopology(10L, PunctuationType.STREAM_TIME, mockPunctuator), this.config);
        LinkedList<Long> expectedPunctuations = new LinkedList<Long>();
        expectedPunctuations.add(42L);
        this.testDriver.pipeInput(this.consumerRecordFactory.create(SOURCE_TOPIC_1, (Object)this.key1, (Object)this.value1, 42L));
        Assert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        this.testDriver.pipeInput(this.consumerRecordFactory.create(SOURCE_TOPIC_1, (Object)this.key1, (Object)this.value1, 42L));
        Assert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(51L);
        this.testDriver.pipeInput(this.consumerRecordFactory.create(SOURCE_TOPIC_1, (Object)this.key1, (Object)this.value1, 51L));
        Assert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        this.testDriver.pipeInput(this.consumerRecordFactory.create(SOURCE_TOPIC_1, (Object)this.key1, (Object)this.value1, 52L));
        Assert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(61L);
        this.testDriver.pipeInput(this.consumerRecordFactory.create(SOURCE_TOPIC_1, (Object)this.key1, (Object)this.value1, 61L));
        Assert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        this.testDriver.pipeInput(this.consumerRecordFactory.create(SOURCE_TOPIC_1, (Object)this.key1, (Object)this.value1, 65L));
        Assert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(71L);
        this.testDriver.pipeInput(this.consumerRecordFactory.create(SOURCE_TOPIC_1, (Object)this.key1, (Object)this.value1, 71L));
        Assert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        this.testDriver.pipeInput(this.consumerRecordFactory.create(SOURCE_TOPIC_1, (Object)this.key1, (Object)this.value1, 72L));
        Assert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(95L);
        this.testDriver.pipeInput(this.consumerRecordFactory.create(SOURCE_TOPIC_1, (Object)this.key1, (Object)this.value1, 95L));
        Assert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(101L);
        this.testDriver.pipeInput(this.consumerRecordFactory.create(SOURCE_TOPIC_1, (Object)this.key1, (Object)this.value1, 101L));
        Assert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        this.testDriver.pipeInput(this.consumerRecordFactory.create(SOURCE_TOPIC_1, (Object)this.key1, (Object)this.value1, 102L));
        Assert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
    }

    @Test
    public void shouldPunctuateOnWallClockTime() {
        MockPunctuator mockPunctuator = new MockPunctuator();
        this.testDriver = new TopologyTestDriver(this.setupSingleProcessorTopology(10L, PunctuationType.WALL_CLOCK_TIME, mockPunctuator), this.config, 0L);
        LinkedList<Long> expectedPunctuations = new LinkedList<Long>();
        this.testDriver.advanceWallClockTime(5L);
        Assert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(14L);
        this.testDriver.advanceWallClockTime(9L);
        Assert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        this.testDriver.advanceWallClockTime(1L);
        Assert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(35L);
        this.testDriver.advanceWallClockTime(20L);
        Assert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
        expectedPunctuations.add(40L);
        this.testDriver.advanceWallClockTime(5L);
        Assert.assertThat((Object)mockPunctuator.punctuatedAt, (Matcher)CoreMatchers.equalTo(expectedPunctuations));
    }

    @Test
    public void shouldReturnAllStores() {
        Topology topology = this.setupSourceSinkTopology();
        topology.addStateStore(new KeyValueStoreBuilder(Stores.inMemoryKeyValueStore((String)"store"), Serdes.ByteArray(), Serdes.ByteArray(), (Time)new SystemTime()).withLoggingDisabled(), new String[0]);
        topology.addGlobalStore(new KeyValueStoreBuilder(Stores.inMemoryKeyValueStore((String)"globalStore"), Serdes.ByteArray(), Serdes.ByteArray(), (Time)new SystemTime()).withLoggingDisabled(), "sourceProcessorName", Serdes.ByteArray().deserializer(), Serdes.ByteArray().deserializer(), "globalTopicName", "globalProcessorName", new ProcessorSupplier(){

            public Processor get() {
                return null;
            }
        });
        this.testDriver = new TopologyTestDriver(topology, this.config);
        HashSet<String> expectedStoreNames = new HashSet<String>();
        expectedStoreNames.add("store");
        expectedStoreNames.add("globalStore");
        Assert.assertThat(this.testDriver.getAllStateStores().keySet(), (Matcher)CoreMatchers.equalTo(expectedStoreNames));
    }

    private void setup() {
        Topology topology = new Topology();
        topology.addSource("sourceProcessor", new String[]{"input-topic"});
        topology.addProcessor("aggregator", (ProcessorSupplier)new CustomMaxAggregatorSupplier(), new String[]{"sourceProcessor"});
        topology.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"aggStore"), (Serde)Serdes.String(), (Serde)Serdes.Long()).withLoggingDisabled(), new String[]{"aggregator"});
        topology.addSink("sinkProcessor", "result-topic", new String[]{"aggregator"});
        this.config.setProperty("default.key.serde", Serdes.String().getClass().getName());
        this.config.setProperty("default.value.serde", Serdes.Long().getClass().getName());
        this.testDriver = new TopologyTestDriver(topology, this.config);
        this.store = this.testDriver.getKeyValueStore("aggStore");
        this.store.put((Object)"a", (Object)21L);
    }

    @Test
    public void shouldFlushStoreForFirstInput() {
        this.setup();
        this.testDriver.pipeInput(this.recordFactory.create("input-topic", (Object)"a", (Object)1L, 9999L));
        OutputVerifier.compareKeyValue((ProducerRecord)this.testDriver.readOutput("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), (Object)"a", (Object)21L);
        Assert.assertNull((Object)this.testDriver.readOutput("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer));
    }

    @Test
    public void shouldNotUpdateStoreForSmallerValue() {
        this.setup();
        this.testDriver.pipeInput(this.recordFactory.create("input-topic", (Object)"a", (Object)1L, 9999L));
        Assert.assertThat((Object)this.store.get((Object)"a"), (Matcher)CoreMatchers.equalTo((Object)21L));
        OutputVerifier.compareKeyValue((ProducerRecord)this.testDriver.readOutput("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), (Object)"a", (Object)21L);
        Assert.assertNull((Object)this.testDriver.readOutput("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer));
    }

    @Test
    public void shouldNotUpdateStoreForLargerValue() {
        this.setup();
        this.testDriver.pipeInput(this.recordFactory.create("input-topic", (Object)"a", (Object)42L, 9999L));
        Assert.assertThat((Object)this.store.get((Object)"a"), (Matcher)CoreMatchers.equalTo((Object)42L));
        OutputVerifier.compareKeyValue((ProducerRecord)this.testDriver.readOutput("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), (Object)"a", (Object)42L);
        Assert.assertNull((Object)this.testDriver.readOutput("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer));
    }

    @Test
    public void shouldUpdateStoreForNewKey() {
        this.setup();
        this.testDriver.pipeInput(this.recordFactory.create("input-topic", (Object)"b", (Object)21L, 9999L));
        Assert.assertThat((Object)this.store.get((Object)"b"), (Matcher)CoreMatchers.equalTo((Object)21L));
        OutputVerifier.compareKeyValue((ProducerRecord)this.testDriver.readOutput("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), (Object)"a", (Object)21L);
        OutputVerifier.compareKeyValue((ProducerRecord)this.testDriver.readOutput("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), (Object)"b", (Object)21L);
        Assert.assertNull((Object)this.testDriver.readOutput("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer));
    }

    @Test
    public void shouldPunctuateIfEvenTimeAdvances() {
        this.setup();
        this.testDriver.pipeInput(this.recordFactory.create("input-topic", (Object)"a", (Object)1L, 9999L));
        OutputVerifier.compareKeyValue((ProducerRecord)this.testDriver.readOutput("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), (Object)"a", (Object)21L);
        this.testDriver.pipeInput(this.recordFactory.create("input-topic", (Object)"a", (Object)1L, 9999L));
        Assert.assertNull((Object)this.testDriver.readOutput("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer));
        this.testDriver.pipeInput(this.recordFactory.create("input-topic", (Object)"a", (Object)1L, 10000L));
        OutputVerifier.compareKeyValue((ProducerRecord)this.testDriver.readOutput("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), (Object)"a", (Object)21L);
        Assert.assertNull((Object)this.testDriver.readOutput("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer));
    }

    @Test
    public void shouldPunctuateIfWallClockTimeAdvances() {
        this.setup();
        this.testDriver.advanceWallClockTime(60000L);
        OutputVerifier.compareKeyValue((ProducerRecord)this.testDriver.readOutput("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer), (Object)"a", (Object)21L);
        Assert.assertNull((Object)this.testDriver.readOutput("result-topic", (Deserializer)this.stringDeserializer, (Deserializer)this.longDeserializer));
    }

    @Test
    public void shouldCleanUpPersistentStateStoresOnClose() {
        Topology topology = new Topology();
        topology.addSource("sourceProcessor", new String[]{"input-topic"});
        topology.addProcessor("storeProcessor", new ProcessorSupplier(){

            public Processor get() {
                return new Processor<String, Long>(){
                    private KeyValueStore<String, Long> store;

                    public void init(ProcessorContext context) {
                        this.store = (KeyValueStore)context.getStateStore("storeProcessorStore");
                    }

                    public void process(String key, Long value) {
                        this.store.put((Object)key, (Object)value);
                    }

                    public void punctuate(long timestamp) {
                    }

                    public void close() {
                    }
                };
            }
        }, new String[]{"sourceProcessor"});
        topology.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)"storeProcessorStore"), (Serde)Serdes.String(), (Serde)Serdes.Long()), new String[]{"storeProcessor"});
        Properties config = new Properties();
        config.put("application.id", "test-TopologyTestDriver-cleanup");
        config.put("bootstrap.servers", "dummy:1234");
        config.put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        config.put("default.key.serde", Serdes.String().getClass().getName());
        config.put("default.value.serde", Serdes.Long().getClass().getName());
        TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
        Assert.assertNull((Object)testDriver.getKeyValueStore("storeProcessorStore").get((Object)"a"));
        testDriver.pipeInput(this.recordFactory.create("input-topic", (Object)"a", (Object)1L));
        Assert.assertEquals((Object)1L, (Object)testDriver.getKeyValueStore("storeProcessorStore").get((Object)"a"));
        testDriver.close();
        testDriver = new TopologyTestDriver(topology, config);
        Assert.assertNull((String)"Closing the prior test driver should have cleaned up this store and value.", (Object)testDriver.getKeyValueStore("storeProcessorStore").get((Object)"a"));
    }

    @Test
    public void shouldFeedStoreFromGlobalKTable() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.globalTable("topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()), Materialized.as((String)"globalStore"));
        try (TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), this.config);){
            KeyValueStore globalStore = testDriver.getKeyValueStore("globalStore");
            Assert.assertNotNull((Object)globalStore);
            Assert.assertNotNull(testDriver.getAllStateStores().get("globalStore"));
            ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            testDriver.pipeInput(recordFactory.create("topic", (Object)"k1", (Object)"value1"));
            Assert.assertEquals((Object)"value1", (Object)globalStore.get((Object)"k1"));
        }
    }

    private class CustomMaxAggregator
    implements Processor<String, Long> {
        ProcessorContext context;
        private KeyValueStore<String, Long> store;

        private CustomMaxAggregator() {
        }

        public void init(ProcessorContext context) {
            this.context = context;
            context.schedule(60000L, PunctuationType.WALL_CLOCK_TIME, new Punctuator(){

                public void punctuate(long timestamp) {
                    CustomMaxAggregator.this.flushStore();
                }
            });
            context.schedule(10000L, PunctuationType.STREAM_TIME, new Punctuator(){

                public void punctuate(long timestamp) {
                    CustomMaxAggregator.this.flushStore();
                }
            });
            this.store = (KeyValueStore)context.getStateStore("aggStore");
        }

        public void process(String key, Long value) {
            Long oldValue = (Long)this.store.get((Object)key);
            if (oldValue == null || value > oldValue) {
                this.store.put((Object)key, (Object)value);
            }
        }

        private void flushStore() {
            KeyValueIterator it = this.store.all();
            while (it.hasNext()) {
                KeyValue next = (KeyValue)it.next();
                this.context.forward(next.key, next.value);
            }
        }

        public void punctuate(long timestamp) {
        }

        public void close() {
        }
    }

    private class CustomMaxAggregatorSupplier
    implements ProcessorSupplier<String, Long> {
        private CustomMaxAggregatorSupplier() {
        }

        public Processor<String, Long> get() {
            return new CustomMaxAggregator();
        }
    }

    private final class MockProcessorSupplier
    implements ProcessorSupplier {
        private final Collection<Punctuation> punctuations;

        private MockProcessorSupplier() {
            this(Collections.emptySet());
        }

        private MockProcessorSupplier(Collection<Punctuation> punctuations) {
            this.punctuations = punctuations;
        }

        public Processor get() {
            MockProcessor mockProcessor = new MockProcessor(this.punctuations);
            TopologyTestDriverTest.this.mockProcessors.add(mockProcessor);
            return mockProcessor;
        }
    }

    private final class MockProcessor
    implements Processor {
        private final Collection<Punctuation> punctuations;
        private ProcessorContext context;
        private boolean initialized = false;
        private boolean closed = false;
        private final List<Record> processedRecords = new ArrayList<Record>();

        MockProcessor() {
            this(Collections.emptySet());
        }

        MockProcessor(Collection<Punctuation> punctuations) {
            this.punctuations = punctuations;
        }

        public void init(ProcessorContext context) {
            this.initialized = true;
            this.context = context;
            for (Punctuation punctuation : this.punctuations) {
                this.context.schedule(punctuation.intervalMs, punctuation.punctuationType, punctuation.callback);
            }
        }

        public void process(Object key, Object value) {
            this.processedRecords.add(new Record(key, value, this.context.timestamp(), this.context.offset(), this.context.topic()));
            this.context.forward(key, value);
        }

        public void punctuate(long timestamp) {
        }

        public void close() {
            this.closed = true;
        }
    }

    private final class MockPunctuator
    implements Punctuator {
        private final List<Long> punctuatedAt = new LinkedList<Long>();

        private MockPunctuator() {
        }

        public void punctuate(long timestamp) {
            this.punctuatedAt.add(timestamp);
        }
    }

    private static final class Punctuation {
        private final long intervalMs;
        private final PunctuationType punctuationType;
        private final Punctuator callback;

        Punctuation(long intervalMs, PunctuationType punctuationType, Punctuator callback) {
            this.intervalMs = intervalMs;
            this.punctuationType = punctuationType;
            this.callback = callback;
        }
    }

    private static final class Record {
        private Object key;
        private Object value;
        private long timestamp;
        private long offset;
        private String topic;

        Record(ConsumerRecord consumerRecord) {
            this.key = consumerRecord.key();
            this.value = consumerRecord.value();
            this.timestamp = consumerRecord.timestamp();
            this.offset = consumerRecord.offset();
            this.topic = consumerRecord.topic();
        }

        Record(Object key, Object value, long timestamp, long offset, String topic) {
            this.key = key;
            this.value = value;
            this.timestamp = timestamp;
            this.offset = offset;
            this.topic = topic;
        }

        public String toString() {
            return "key: " + this.key + ", value: " + this.value + ", timestamp: " + this.timestamp + ", offset: " + this.offset + ", topic: " + this.topic;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Record record = (Record)o;
            return this.timestamp == record.timestamp && this.offset == record.offset && Objects.equals(this.key, record.key) && Objects.equals(this.value, record.value) && Objects.equals(this.topic, record.topic);
        }

        public int hashCode() {
            return Objects.hash(this.key, this.value, this.timestamp, this.offset, this.topic);
        }
    }
}

