/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.KeyedTimerData;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.runners.samza.runtime.SamzaTimerInternalsFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.context.TaskContext;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.operators.Scheduler;
import org.apache.samza.storage.kv.KeyValueStoreMetrics;
import org.apache.samza.storage.kv.RocksDbKeyValueStore;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.rocksdb.FlushOptions;
import org.rocksdb.Options;
import org.rocksdb.WriteOptions;

public class SamzaTimerInternalsFactoryTest {
    private static RocksDbKeyValueStore createStore(String name) {
        Options options = new Options();
        options.setCreateIfMissing(true);
        return new RocksDbKeyValueStore(new File(System.getProperty("java.io.tmpdir") + "/" + name), options, (Config)new MapConfig(), false, "beamStore", new WriteOptions(), new FlushOptions(), new KeyValueStoreMetrics("beamStore", (MetricsRegistry)new MetricsRegistryMap()));
    }

    private static SamzaStoreStateInternals.Factory<?> createNonKeyedStateInternalsFactory(SamzaPipelineOptions pipelineOptions, RocksDbKeyValueStore store) {
        TaskContext context = (TaskContext)Mockito.mock(TaskContext.class);
        Mockito.when((Object)context.getStore(Matchers.anyString())).thenReturn((Object)store);
        TupleTag mainOutputTag = new TupleTag("output");
        return SamzaStoreStateInternals.createStateInternalFactory((String)"42", null, (TaskContext)context, (SamzaPipelineOptions)pipelineOptions, null);
    }

    private static SamzaTimerInternalsFactory<String> createTimerInternalsFactory(Scheduler<KeyedTimerData<String>> timerRegistry, String timerStateId, SamzaPipelineOptions pipelineOptions, RocksDbKeyValueStore store) {
        SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory = SamzaTimerInternalsFactoryTest.createNonKeyedStateInternalsFactory(pipelineOptions, store);
        return SamzaTimerInternalsFactory.createTimerInternalFactory((Coder)StringUtf8Coder.of(), timerRegistry, (String)timerStateId, nonKeyedStateInternalsFactory, (WindowingStrategy)WindowingStrategy.globalDefault(), (PCollection.IsBounded)PCollection.IsBounded.BOUNDED, (SamzaPipelineOptions)pipelineOptions);
    }

    @Test
    public void testEventTimeTimers() {
        SamzaPipelineOptions pipelineOptions = (SamzaPipelineOptions)PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
        pipelineOptions.setTimerBufferSize(1);
        RocksDbKeyValueStore store = SamzaTimerInternalsFactoryTest.createStore("store1");
        SamzaTimerInternalsFactory<String> timerInternalsFactory = SamzaTimerInternalsFactoryTest.createTimerInternalsFactory(null, "timer", pipelineOptions, store);
        StateNamespace nameSpace = StateNamespaces.global();
        TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey((Object)"testKey");
        TimerInternals.TimerData timer1 = TimerInternals.TimerData.of((String)"timer1", (StateNamespace)nameSpace, (Instant)new Instant(10L), (TimeDomain)TimeDomain.EVENT_TIME);
        timerInternals.setTimer(timer1);
        TimerInternals.TimerData timer2 = TimerInternals.TimerData.of((String)"timer2", (StateNamespace)nameSpace, (Instant)new Instant(100L), (TimeDomain)TimeDomain.EVENT_TIME);
        timerInternals.setTimer(timer2);
        timerInternalsFactory.setInputWatermark(new Instant(5L));
        Collection readyTimers = timerInternalsFactory.removeReadyTimers();
        Assert.assertTrue((boolean)readyTimers.isEmpty());
        timerInternalsFactory.setInputWatermark(new Instant(20L));
        readyTimers = timerInternalsFactory.removeReadyTimers();
        Assert.assertEquals((long)1L, (long)readyTimers.size());
        Assert.assertEquals((Object)timer1, (Object)((KeyedTimerData)readyTimers.iterator().next()).getTimerData());
        timerInternalsFactory.setInputWatermark(new Instant(150L));
        readyTimers = timerInternalsFactory.removeReadyTimers();
        Assert.assertEquals((long)1L, (long)readyTimers.size());
        Assert.assertEquals((Object)timer2, (Object)((KeyedTimerData)readyTimers.iterator().next()).getTimerData());
        store.close();
    }

    @Test
    public void testRestore() {
        SamzaPipelineOptions pipelineOptions = (SamzaPipelineOptions)PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
        pipelineOptions.setTimerBufferSize(1);
        RocksDbKeyValueStore store = SamzaTimerInternalsFactoryTest.createStore("store2");
        SamzaTimerInternalsFactory<String> timerInternalsFactory = SamzaTimerInternalsFactoryTest.createTimerInternalsFactory(null, "timer", pipelineOptions, store);
        StateNamespace nameSpace = StateNamespaces.global();
        TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey((Object)"testKey");
        TimerInternals.TimerData timer1 = TimerInternals.TimerData.of((String)"timer1", (StateNamespace)nameSpace, (Instant)new Instant(10L), (TimeDomain)TimeDomain.EVENT_TIME);
        timerInternals.setTimer(timer1);
        TimerInternals.TimerData timer2 = TimerInternals.TimerData.of((String)"timer2", (StateNamespace)nameSpace, (Instant)new Instant(100L), (TimeDomain)TimeDomain.EVENT_TIME);
        timerInternals.setTimer(timer2);
        store.close();
        store = SamzaTimerInternalsFactoryTest.createStore("store2");
        SamzaTimerInternalsFactory<String> restoredFactory = SamzaTimerInternalsFactoryTest.createTimerInternalsFactory(null, "timer", pipelineOptions, store);
        restoredFactory.setInputWatermark(new Instant(150L));
        Collection readyTimers = restoredFactory.removeReadyTimers();
        Assert.assertEquals((long)2L, (long)readyTimers.size());
        store.close();
    }

    @Test
    public void testProcessingTimeTimers() throws IOException {
        SamzaPipelineOptions pipelineOptions = (SamzaPipelineOptions)PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
        RocksDbKeyValueStore store = SamzaTimerInternalsFactoryTest.createStore("store3");
        TestTimerRegistry timerRegistry = new TestTimerRegistry();
        SamzaTimerInternalsFactory<String> timerInternalsFactory = SamzaTimerInternalsFactoryTest.createTimerInternalsFactory(timerRegistry, "timer", pipelineOptions, store);
        StateNamespace nameSpace = StateNamespaces.global();
        TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey((Object)"testKey");
        TimerInternals.TimerData timer1 = TimerInternals.TimerData.of((String)"timer1", (StateNamespace)nameSpace, (Instant)new Instant(10L), (TimeDomain)TimeDomain.PROCESSING_TIME);
        timerInternals.setTimer(timer1);
        TimerInternals.TimerData timer2 = TimerInternals.TimerData.of((String)"timer2", (StateNamespace)nameSpace, (Instant)new Instant(100L), (TimeDomain)TimeDomain.PROCESSING_TIME);
        timerInternals.setTimer(timer2);
        Assert.assertEquals((long)2L, (long)timerRegistry.timers.size());
        store.close();
        store = SamzaTimerInternalsFactoryTest.createStore("store3");
        TestTimerRegistry restoredRegistry = new TestTimerRegistry();
        SamzaTimerInternalsFactory<String> restoredFactory = SamzaTimerInternalsFactoryTest.createTimerInternalsFactory(restoredRegistry, "timer", pipelineOptions, store);
        Assert.assertEquals((long)2L, (long)restoredRegistry.timers.size());
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        StringUtf8Coder.of().encode("testKey", (OutputStream)baos);
        byte[] keyBytes = baos.toByteArray();
        restoredFactory.removeProcessingTimer(new KeyedTimerData(keyBytes, (Object)"testKey", timer1));
        restoredFactory.removeProcessingTimer(new KeyedTimerData(keyBytes, (Object)"testKey", timer2));
        store.close();
    }

    private static class TestTimerRegistry
    implements Scheduler<KeyedTimerData<String>> {
        private final List<KeyedTimerData<String>> timers = new ArrayList<KeyedTimerData<String>>();

        private TestTimerRegistry() {
        }

        public void schedule(KeyedTimerData<String> key, long timestamp) {
            this.timers.add(key);
        }

        public void delete(KeyedTimerData<String> key) {
            this.timers.remove(key);
        }
    }
}

