/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.TestSamzaRunner;
import org.apache.beam.runners.samza.state.SamzaMapState;
import org.apache.beam.runners.samza.state.SamzaSetState;
import org.apache.beam.runners.samza.translation.ConfigBuilder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.storage.StorageEngineFactory;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.storage.kv.KeyValueStoreMetrics;
import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory;
import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStore;
import org.apache.samza.system.SystemStreamPartition;
import org.junit.Assert;
import org.junit.Test;

public class SamzaStoreStateInternalsTest
implements Serializable {
    public final transient TestPipeline pipeline = TestPipeline.create();

    @Test
    public void testMapStateIterator() {
        String stateId = "foo";
        String countStateId = "count";
        DoFn<KV<String, KV<String, Integer>>, KV<String, Integer>> fn = new DoFn<KV<String, KV<String, Integer>>, KV<String, Integer>>(){
            @DoFn.StateId(value="foo")
            private final StateSpec<MapState<String, Integer>> mapState = StateSpecs.map((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of());
            @DoFn.StateId(value="count")
            private final StateSpec<CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningFromInputInternal((Coder)VarIntCoder.of(), (Combine.CombineFn)Sum.ofIntegers());

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c, @DoFn.StateId(value="foo") MapState<String, Integer> mapState, @DoFn.StateId(value="count") CombiningState<Integer, int[], Integer> count) {
                SamzaMapState state = (SamzaMapState)mapState;
                KV value = (KV)((KV)c.element()).getValue();
                state.put((Object)((String)value.getKey()), (Object)((Integer)value.getValue()));
                count.add((Object)1);
                if ((Integer)count.read() >= 4) {
                    ArrayList<KV> content = new ArrayList<KV>();
                    Iterator iterator = (Iterator)state.readIterator().read();
                    while (iterator.hasNext()) {
                        Map.Entry entry = (Map.Entry)iterator.next();
                        content.add(KV.of((Object)((String)entry.getKey()), (Object)((Integer)entry.getValue())));
                        c.output((Object)KV.of((Object)((String)entry.getKey()), (Object)((Integer)entry.getValue())));
                    }
                    Assert.assertEquals(content, (Object)ImmutableList.of((Object)KV.of((Object)"a", (Object)97), (Object)KV.of((Object)"b", (Object)42), (Object)KV.of((Object)"c", (Object)12)));
                }
            }
        };
        PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)KV.of((Object)"a", (Object)97)), (Object[])new KV[]{KV.of((Object)"hello", (Object)KV.of((Object)"b", (Object)42)), KV.of((Object)"hello", (Object)KV.of((Object)"b", (Object)42)), KV.of((Object)"hello", (Object)KV.of((Object)"c", (Object)12))}))).apply((PTransform)ParDo.of((DoFn)fn));
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new KV[]{KV.of((Object)"a", (Object)97), KV.of((Object)"b", (Object)42), KV.of((Object)"c", (Object)12)});
        TestSamzaRunner.fromOptions((PipelineOptions)PipelineOptionsFactory.fromArgs((String[])new String[]{"--runner=org.apache.beam.runners.samza.TestSamzaRunner"}).create()).run((Pipeline)this.pipeline);
    }

    @Test
    public void testSetStateIterator() {
        String stateId = "foo";
        String countStateId = "count";
        DoFn<KV<String, Integer>, Set<Integer>> fn = new DoFn<KV<String, Integer>, Set<Integer>>(){
            @DoFn.StateId(value="foo")
            private final StateSpec<SetState<Integer>> setState = StateSpecs.set((Coder)VarIntCoder.of());
            @DoFn.StateId(value="count")
            private final StateSpec<CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningFromInputInternal((Coder)VarIntCoder.of(), (Combine.CombineFn)Sum.ofIntegers());

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c, @DoFn.StateId(value="foo") SetState<Integer> setState, @DoFn.StateId(value="count") CombiningState<Integer, int[], Integer> count) {
                SamzaSetState state = (SamzaSetState)setState;
                ReadableState isEmpty = state.isEmpty();
                state.add((Object)((Integer)((KV)c.element()).getValue()));
                Assert.assertFalse((boolean)((Boolean)isEmpty.read()));
                count.add((Object)1);
                if ((Integer)count.read() >= 4) {
                    HashSet<Integer> content = new HashSet<Integer>();
                    Iterator iterator = (Iterator)state.readIterator().read();
                    while (iterator.hasNext()) {
                        Integer value = (Integer)iterator.next();
                        content.add(value);
                    }
                    c.output(content);
                    Assert.assertEquals(content, (Object)Sets.newHashSet((Object[])new Integer[]{97, 42, 12}));
                }
            }
        };
        PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)97), (Object[])new KV[]{KV.of((Object)"hello", (Object)42), KV.of((Object)"hello", (Object)42), KV.of((Object)"hello", (Object)12)}))).apply((PTransform)ParDo.of((DoFn)fn));
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new Set[]{Sets.newHashSet((Object[])new Integer[]{97, 42, 12})});
        TestSamzaRunner.fromOptions((PipelineOptions)PipelineOptionsFactory.fromArgs((String[])new String[]{"--runner=org.apache.beam.runners.samza.TestSamzaRunner"}).create()).run((Pipeline)this.pipeline);
    }

    @Test
    public void testIteratorClosed() {
        String stateId = "foo";
        DoFn<KV<String, Integer>, Set<Integer>> fn = new DoFn<KV<String, Integer>, Set<Integer>>(){
            @DoFn.StateId(value="foo")
            private final StateSpec<SetState<Integer>> setState = StateSpecs.set((Coder)VarIntCoder.of());

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c, @DoFn.StateId(value="foo") SetState<Integer> setState) {
                SamzaSetState state = (SamzaSetState)setState;
                state.add((Object)((Integer)((KV)c.element()).getValue()));
                int size = Iterators.size((Iterator)((Iterator)state.readIterator().read()));
                if (size > 1) {
                    Iterator iterator = (Iterator)state.readIterator().read();
                    Assert.assertTrue((boolean)iterator.hasNext());
                    iterator.next();
                }
            }
        };
        ((PCollection)this.pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"hello", (Object)97), (Object[])new KV[]{KV.of((Object)"hello", (Object)42), KV.of((Object)"hello", (Object)42), KV.of((Object)"hello", (Object)12)}))).apply((PTransform)ParDo.of((DoFn)fn));
        SamzaPipelineOptions options = (SamzaPipelineOptions)PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
        options.setRunner(TestSamzaRunner.class);
        HashMap<String, String> configs = new HashMap<String, String>(ConfigBuilder.localRunConfig());
        configs.put("stores.foo.factory", TestStorageEngine.class.getName());
        options.setConfigOverride(configs);
        TestSamzaRunner.fromOptions((PipelineOptions)options).run((Pipeline)this.pipeline).waitUntilFinish();
        Assert.assertEquals((long)8L, (long)TestStore.iterators.size());
        TestStore.iterators.forEach(iter -> Assert.assertTrue((boolean)iter.closed));
    }

    public static class TestStore
    extends InMemoryKeyValueStore {
        static List<TestKeyValueIteraor> iterators = Collections.synchronizedList(new ArrayList());
        private final KeyValueStoreMetrics metrics;

        public TestStore(KeyValueStoreMetrics metrics) {
            super(metrics);
            this.metrics = metrics;
        }

        public KeyValueStoreMetrics metrics() {
            return this.metrics;
        }

        public KeyValueIterator<byte[], byte[]> range(byte[] from, byte[] to) {
            TestKeyValueIteraor iter = new TestKeyValueIteraor((KeyValueIterator<byte[], byte[]>)super.range(from, to));
            iterators.add(iter);
            return iter;
        }

        static class TestKeyValueIteraor
        implements KeyValueIterator<byte[], byte[]> {
            private final KeyValueIterator<byte[], byte[]> iter;
            boolean closed = false;

            TestKeyValueIteraor(KeyValueIterator<byte[], byte[]> iter) {
                this.iter = iter;
            }

            public void close() {
                this.iter.close();
                this.closed = true;
            }

            public boolean hasNext() {
                return this.iter.hasNext();
            }

            public Entry<byte[], byte[]> next() {
                return (Entry)this.iter.next();
            }
        }
    }

    public static class TestStorageEngine
    extends InMemoryKeyValueStorageEngineFactory {
        public KeyValueStore<byte[], byte[]> getKVStore(String storeName, File storeDir, MetricsRegistry registry, SystemStreamPartition changeLogSystemStreamPartition, JobContext jobContext, ContainerContext containerContext, StorageEngineFactory.StoreMode readWrite) {
            KeyValueStoreMetrics metrics = new KeyValueStoreMetrics(storeName, registry);
            return new TestStore(metrics);
        }
    }
}

