/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.dataflow;

import com.google.api.client.util.NanoClock;
import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricStructuredName;
import com.google.api.services.dataflow.model.MetricUpdate;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.SocketTimeoutException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableSetMultimap;
import org.apache.beam.sdk.runners.AggregatorRetrievalException;
import org.apache.beam.sdk.runners.AggregatorValues;
import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(value=JUnit4.class)
public class DataflowPipelineJobTest {
    private static final String PROJECT_ID = "someProject";
    private static final String JOB_ID = "1234";
    @Mock
    private Dataflow mockWorkflowClient;
    @Mock
    private Dataflow.Projects mockProjects;
    @Mock
    private Dataflow.Projects.Jobs mockJobs;
    @Rule
    public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Before
    public void setup() {
        MockitoAnnotations.initMocks((Object)this);
        Mockito.when((Object)this.mockWorkflowClient.projects()).thenReturn((Object)this.mockProjects);
        Mockito.when((Object)this.mockProjects.jobs()).thenReturn((Object)this.mockJobs);
    }

    void checkValidInterval(long pollingIntervalMillis, int attempts, long timeSleptMillis) {
        long highSum = 0L;
        long lowSum = 0L;
        for (int i = 1; i < attempts; ++i) {
            double currentInterval = (double)pollingIntervalMillis * Math.pow(1.5, i - 1);
            double offset = 0.5 * currentInterval;
            highSum += Math.round(currentInterval + offset);
            lowSum += Math.round(currentInterval - offset);
        }
        MatcherAssert.assertThat((Object)timeSleptMillis, (Matcher)org.hamcrest.Matchers.allOf((Matcher)org.hamcrest.Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(lowSum)), (Matcher)org.hamcrest.Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(highSum))));
    }

    @Test
    public void testWaitToFinishMessagesFail() throws Exception {
        Dataflow.Projects.Jobs.Get statusRequest = (Dataflow.Projects.Jobs.Get)Mockito.mock(Dataflow.Projects.Jobs.Get.class);
        Job statusResponse = new Job();
        String string = String.valueOf(PipelineResult.State.DONE.name());
        statusResponse.setCurrentState(string.length() != 0 ? "JOB_STATE_".concat(string) : new String("JOB_STATE_"));
        Mockito.when((Object)this.mockJobs.get((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)JOB_ID))).thenReturn((Object)statusRequest);
        Mockito.when((Object)statusRequest.execute()).thenReturn((Object)statusResponse);
        MonitoringUtil.JobMessagesHandler jobHandler = (MonitoringUtil.JobMessagesHandler)Mockito.mock(MonitoringUtil.JobMessagesHandler.class);
        Dataflow.Projects.Jobs.Messages mockMessages = (Dataflow.Projects.Jobs.Messages)Mockito.mock(Dataflow.Projects.Jobs.Messages.class);
        Dataflow.Projects.Jobs.Messages.List listRequest = (Dataflow.Projects.Jobs.Messages.List)Mockito.mock(Dataflow.Projects.Jobs.Messages.List.class);
        Mockito.when((Object)this.mockJobs.messages()).thenReturn((Object)mockMessages);
        Mockito.when((Object)mockMessages.list((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)JOB_ID))).thenReturn((Object)listRequest);
        Mockito.when((Object)listRequest.execute()).thenThrow(new Class[]{SocketTimeoutException.class});
        DataflowAggregatorTransforms dataflowAggregatorTransforms = (DataflowAggregatorTransforms)Mockito.mock(DataflowAggregatorTransforms.class);
        DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, this.mockWorkflowClient, dataflowAggregatorTransforms);
        PipelineResult.State state = job.waitToFinish(5L, TimeUnit.MINUTES, jobHandler, (Sleeper)this.fastClock, (NanoClock)this.fastClock);
        Assert.assertEquals(null, (Object)state);
    }

    public PipelineResult.State mockWaitToFinishInState(PipelineResult.State state) throws Exception {
        Dataflow.Projects.Jobs.Get statusRequest = (Dataflow.Projects.Jobs.Get)Mockito.mock(Dataflow.Projects.Jobs.Get.class);
        Job statusResponse = new Job();
        String string = String.valueOf(state.name());
        statusResponse.setCurrentState(string.length() != 0 ? "JOB_STATE_".concat(string) : new String("JOB_STATE_"));
        Mockito.when((Object)this.mockJobs.get((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)JOB_ID))).thenReturn((Object)statusRequest);
        Mockito.when((Object)statusRequest.execute()).thenReturn((Object)statusResponse);
        DataflowAggregatorTransforms dataflowAggregatorTransforms = (DataflowAggregatorTransforms)Mockito.mock(DataflowAggregatorTransforms.class);
        DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, this.mockWorkflowClient, dataflowAggregatorTransforms);
        return job.waitToFinish(1L, TimeUnit.MINUTES, null, (Sleeper)this.fastClock, (NanoClock)this.fastClock);
    }

    @Test
    public void testWaitToFinishDone() throws Exception {
        Assert.assertEquals((Object)PipelineResult.State.DONE, (Object)this.mockWaitToFinishInState(PipelineResult.State.DONE));
    }

    @Test
    public void testWaitToFinishFailed() throws Exception {
        Assert.assertEquals((Object)PipelineResult.State.FAILED, (Object)this.mockWaitToFinishInState(PipelineResult.State.FAILED));
    }

    @Test
    public void testWaitToFinishCancelled() throws Exception {
        Assert.assertEquals((Object)PipelineResult.State.CANCELLED, (Object)this.mockWaitToFinishInState(PipelineResult.State.CANCELLED));
    }

    @Test
    public void testWaitToFinishUpdated() throws Exception {
        Assert.assertEquals((Object)PipelineResult.State.UPDATED, (Object)this.mockWaitToFinishInState(PipelineResult.State.UPDATED));
    }

    @Test
    public void testWaitToFinishFail() throws Exception {
        Dataflow.Projects.Jobs.Get statusRequest = (Dataflow.Projects.Jobs.Get)Mockito.mock(Dataflow.Projects.Jobs.Get.class);
        Mockito.when((Object)this.mockJobs.get((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)JOB_ID))).thenReturn((Object)statusRequest);
        Mockito.when((Object)statusRequest.execute()).thenThrow(new Class[]{IOException.class});
        DataflowAggregatorTransforms dataflowAggregatorTransforms = (DataflowAggregatorTransforms)Mockito.mock(DataflowAggregatorTransforms.class);
        DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, this.mockWorkflowClient, dataflowAggregatorTransforms);
        long startTime = this.fastClock.nanoTime();
        PipelineResult.State state = job.waitToFinish(5L, TimeUnit.MINUTES, null, (Sleeper)this.fastClock, (NanoClock)this.fastClock);
        Assert.assertEquals(null, (Object)state);
        long timeDiff = TimeUnit.NANOSECONDS.toMillis(this.fastClock.nanoTime() - startTime);
        this.checkValidInterval(DataflowPipelineJob.MESSAGES_POLLING_INTERVAL, 10, timeDiff);
    }

    @Test
    public void testWaitToFinishTimeFail() throws Exception {
        Dataflow.Projects.Jobs.Get statusRequest = (Dataflow.Projects.Jobs.Get)Mockito.mock(Dataflow.Projects.Jobs.Get.class);
        Mockito.when((Object)this.mockJobs.get((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)JOB_ID))).thenReturn((Object)statusRequest);
        Mockito.when((Object)statusRequest.execute()).thenThrow(new Class[]{IOException.class});
        DataflowAggregatorTransforms dataflowAggregatorTransforms = (DataflowAggregatorTransforms)Mockito.mock(DataflowAggregatorTransforms.class);
        DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, this.mockWorkflowClient, dataflowAggregatorTransforms);
        long startTime = this.fastClock.nanoTime();
        PipelineResult.State state = job.waitToFinish(4L, TimeUnit.MILLISECONDS, null, (Sleeper)this.fastClock, (NanoClock)this.fastClock);
        Assert.assertEquals(null, (Object)state);
        long timeDiff = TimeUnit.NANOSECONDS.toMillis(this.fastClock.nanoTime() - startTime);
        Assert.assertEquals((long)timeDiff, (long)4L);
    }

    @Test
    public void testGetStateReturnsServiceState() throws Exception {
        Dataflow.Projects.Jobs.Get statusRequest = (Dataflow.Projects.Jobs.Get)Mockito.mock(Dataflow.Projects.Jobs.Get.class);
        Job statusResponse = new Job();
        String string = String.valueOf(PipelineResult.State.RUNNING.name());
        statusResponse.setCurrentState(string.length() != 0 ? "JOB_STATE_".concat(string) : new String("JOB_STATE_"));
        Mockito.when((Object)this.mockJobs.get((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)JOB_ID))).thenReturn((Object)statusRequest);
        Mockito.when((Object)statusRequest.execute()).thenReturn((Object)statusResponse);
        DataflowAggregatorTransforms dataflowAggregatorTransforms = (DataflowAggregatorTransforms)Mockito.mock(DataflowAggregatorTransforms.class);
        DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, this.mockWorkflowClient, dataflowAggregatorTransforms);
        Assert.assertEquals((Object)PipelineResult.State.RUNNING, (Object)job.getStateWithRetries(5, (Sleeper)this.fastClock));
    }

    @Test
    public void testGetStateWithExceptionReturnsUnknown() throws Exception {
        Dataflow.Projects.Jobs.Get statusRequest = (Dataflow.Projects.Jobs.Get)Mockito.mock(Dataflow.Projects.Jobs.Get.class);
        Mockito.when((Object)this.mockJobs.get((String)Matchers.eq((Object)PROJECT_ID), (String)Matchers.eq((Object)JOB_ID))).thenReturn((Object)statusRequest);
        Mockito.when((Object)statusRequest.execute()).thenThrow(new Class[]{IOException.class});
        DataflowAggregatorTransforms dataflowAggregatorTransforms = (DataflowAggregatorTransforms)Mockito.mock(DataflowAggregatorTransforms.class);
        DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, this.mockWorkflowClient, dataflowAggregatorTransforms);
        long startTime = this.fastClock.nanoTime();
        Assert.assertEquals((Object)PipelineResult.State.UNKNOWN, (Object)job.getStateWithRetries(5, (Sleeper)this.fastClock));
        long timeDiff = TimeUnit.NANOSECONDS.toMillis(this.fastClock.nanoTime() - startTime);
        this.checkValidInterval(DataflowPipelineJob.STATUS_POLLING_INTERVAL, 5, timeDiff);
    }

    @Test
    public void testGetAggregatorValuesWithNoMetricUpdatesReturnsEmptyValue() throws IOException, AggregatorRetrievalException {
        Aggregator aggregator = (Aggregator)Mockito.mock(Aggregator.class);
        PTransform pTransform = (PTransform)Mockito.mock(PTransform.class);
        String stepName = "s1";
        String fullName = "Foo/Bar/Baz";
        AppliedPTransform<?, ?, ?> appliedTransform = this.appliedPTransform(fullName, (PTransform<PInput, POutput>)pTransform);
        DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms((Map)ImmutableSetMultimap.of((Object)aggregator, (Object)pTransform).asMap(), (Map)ImmutableMap.of(appliedTransform, (Object)stepName));
        Dataflow.Projects.Jobs.GetMetrics getMetrics = (Dataflow.Projects.Jobs.GetMetrics)Mockito.mock(Dataflow.Projects.Jobs.GetMetrics.class);
        Mockito.when((Object)this.mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn((Object)getMetrics);
        JobMetrics jobMetrics = new JobMetrics();
        Mockito.when((Object)getMetrics.execute()).thenReturn((Object)jobMetrics);
        jobMetrics.setMetrics((List)ImmutableList.of());
        Dataflow.Projects.Jobs.Get getState = (Dataflow.Projects.Jobs.Get)Mockito.mock(Dataflow.Projects.Jobs.Get.class);
        Mockito.when((Object)this.mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn((Object)getState);
        Job modelJob = new Job();
        Mockito.when((Object)getState.execute()).thenReturn((Object)modelJob);
        modelJob.setCurrentState(PipelineResult.State.RUNNING.toString());
        DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, this.mockWorkflowClient, aggregatorTransforms);
        AggregatorValues values = job.getAggregatorValues(aggregator);
        MatcherAssert.assertThat((Object)values.getValues(), (Matcher)org.hamcrest.Matchers.empty());
    }

    @Test
    public void testGetAggregatorValuesWithNullMetricUpdatesReturnsEmptyValue() throws IOException, AggregatorRetrievalException {
        Aggregator aggregator = (Aggregator)Mockito.mock(Aggregator.class);
        PTransform pTransform = (PTransform)Mockito.mock(PTransform.class);
        String stepName = "s1";
        String fullName = "Foo/Bar/Baz";
        AppliedPTransform<?, ?, ?> appliedTransform = this.appliedPTransform(fullName, (PTransform<PInput, POutput>)pTransform);
        DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms((Map)ImmutableSetMultimap.of((Object)aggregator, (Object)pTransform).asMap(), (Map)ImmutableMap.of(appliedTransform, (Object)stepName));
        Dataflow.Projects.Jobs.GetMetrics getMetrics = (Dataflow.Projects.Jobs.GetMetrics)Mockito.mock(Dataflow.Projects.Jobs.GetMetrics.class);
        Mockito.when((Object)this.mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn((Object)getMetrics);
        JobMetrics jobMetrics = new JobMetrics();
        Mockito.when((Object)getMetrics.execute()).thenReturn((Object)jobMetrics);
        jobMetrics.setMetrics(null);
        Dataflow.Projects.Jobs.Get getState = (Dataflow.Projects.Jobs.Get)Mockito.mock(Dataflow.Projects.Jobs.Get.class);
        Mockito.when((Object)this.mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn((Object)getState);
        Job modelJob = new Job();
        Mockito.when((Object)getState.execute()).thenReturn((Object)modelJob);
        modelJob.setCurrentState(PipelineResult.State.RUNNING.toString());
        DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, this.mockWorkflowClient, aggregatorTransforms);
        AggregatorValues values = job.getAggregatorValues(aggregator);
        MatcherAssert.assertThat((Object)values.getValues(), (Matcher)org.hamcrest.Matchers.empty());
    }

    @Test
    public void testGetAggregatorValuesWithSingleMetricUpdateReturnsSingletonCollection() throws IOException, AggregatorRetrievalException {
        Sum.SumLongFn combineFn = new Sum.SumLongFn();
        String aggregatorName = "agg";
        TestAggregator aggregator = new TestAggregator(combineFn, aggregatorName);
        PTransform pTransform = (PTransform)Mockito.mock(PTransform.class);
        String stepName = "s1";
        String fullName = "Foo/Bar/Baz";
        AppliedPTransform<?, ?, ?> appliedTransform = this.appliedPTransform(fullName, (PTransform<PInput, POutput>)pTransform);
        DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms((Map)ImmutableSetMultimap.of(aggregator, (Object)pTransform).asMap(), (Map)ImmutableMap.of(appliedTransform, (Object)stepName));
        Dataflow.Projects.Jobs.GetMetrics getMetrics = (Dataflow.Projects.Jobs.GetMetrics)Mockito.mock(Dataflow.Projects.Jobs.GetMetrics.class);
        Mockito.when((Object)this.mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn((Object)getMetrics);
        JobMetrics jobMetrics = new JobMetrics();
        Mockito.when((Object)getMetrics.execute()).thenReturn((Object)jobMetrics);
        MetricUpdate update = new MetricUpdate();
        long stepValue = 1234L;
        update.setScalar((Object)new BigDecimal(stepValue));
        MetricStructuredName structuredName = new MetricStructuredName();
        structuredName.setName(aggregatorName);
        structuredName.setContext((Map)ImmutableMap.of((Object)"step", (Object)stepName));
        update.setName(structuredName);
        jobMetrics.setMetrics((List)ImmutableList.of((Object)update));
        Dataflow.Projects.Jobs.Get getState = (Dataflow.Projects.Jobs.Get)Mockito.mock(Dataflow.Projects.Jobs.Get.class);
        Mockito.when((Object)this.mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn((Object)getState);
        Job modelJob = new Job();
        Mockito.when((Object)getState.execute()).thenReturn((Object)modelJob);
        modelJob.setCurrentState(PipelineResult.State.RUNNING.toString());
        DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, this.mockWorkflowClient, aggregatorTransforms);
        AggregatorValues values = job.getAggregatorValues(aggregator);
        MatcherAssert.assertThat((Object)values.getValuesAtSteps(), (Matcher)org.hamcrest.Matchers.hasEntry((Object)fullName, (Object)stepValue));
        MatcherAssert.assertThat((Object)values.getValuesAtSteps().size(), (Matcher)org.hamcrest.Matchers.equalTo((Object)1));
        MatcherAssert.assertThat((Object)values.getValues(), (Matcher)org.hamcrest.Matchers.contains((Object[])new Long[]{stepValue}));
        MatcherAssert.assertThat((Object)values.getTotalValue((Combine.CombineFn)combineFn), (Matcher)org.hamcrest.Matchers.equalTo((Object)stepValue));
    }

    @Test
    public void testGetAggregatorValuesWithMultipleMetricUpdatesReturnsCollection() throws IOException, AggregatorRetrievalException {
        Sum.SumLongFn combineFn = new Sum.SumLongFn();
        String aggregatorName = "agg";
        TestAggregator aggregator = new TestAggregator(combineFn, aggregatorName);
        PTransform pTransform = (PTransform)Mockito.mock(PTransform.class);
        String stepName = "s1";
        String fullName = "Foo/Bar/Baz";
        AppliedPTransform<?, ?, ?> appliedTransform = this.appliedPTransform(fullName, (PTransform<PInput, POutput>)pTransform);
        PTransform otherTransform = (PTransform)Mockito.mock(PTransform.class);
        String otherStepName = "s88";
        String otherFullName = "Spam/Ham/Eggs";
        AppliedPTransform<?, ?, ?> otherAppliedTransform = this.appliedPTransform(otherFullName, (PTransform<PInput, POutput>)otherTransform);
        DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms((Map)ImmutableSetMultimap.of(aggregator, (Object)pTransform, aggregator, (Object)otherTransform).asMap(), (Map)ImmutableMap.of(appliedTransform, (Object)stepName, otherAppliedTransform, (Object)otherStepName));
        Dataflow.Projects.Jobs.GetMetrics getMetrics = (Dataflow.Projects.Jobs.GetMetrics)Mockito.mock(Dataflow.Projects.Jobs.GetMetrics.class);
        Mockito.when((Object)this.mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn((Object)getMetrics);
        JobMetrics jobMetrics = new JobMetrics();
        Mockito.when((Object)getMetrics.execute()).thenReturn((Object)jobMetrics);
        MetricUpdate updateOne = new MetricUpdate();
        long stepValue = 1234L;
        updateOne.setScalar((Object)new BigDecimal(stepValue));
        MetricStructuredName structuredNameOne = new MetricStructuredName();
        structuredNameOne.setName(aggregatorName);
        structuredNameOne.setContext((Map)ImmutableMap.of((Object)"step", (Object)stepName));
        updateOne.setName(structuredNameOne);
        MetricUpdate updateTwo = new MetricUpdate();
        long stepValueTwo = 1024L;
        updateTwo.setScalar((Object)new BigDecimal(stepValueTwo));
        MetricStructuredName structuredNameTwo = new MetricStructuredName();
        structuredNameTwo.setName(aggregatorName);
        structuredNameTwo.setContext((Map)ImmutableMap.of((Object)"step", (Object)otherStepName));
        updateTwo.setName(structuredNameTwo);
        jobMetrics.setMetrics((List)ImmutableList.of((Object)updateOne, (Object)updateTwo));
        Dataflow.Projects.Jobs.Get getState = (Dataflow.Projects.Jobs.Get)Mockito.mock(Dataflow.Projects.Jobs.Get.class);
        Mockito.when((Object)this.mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn((Object)getState);
        Job modelJob = new Job();
        Mockito.when((Object)getState.execute()).thenReturn((Object)modelJob);
        modelJob.setCurrentState(PipelineResult.State.RUNNING.toString());
        DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, this.mockWorkflowClient, aggregatorTransforms);
        AggregatorValues values = job.getAggregatorValues(aggregator);
        MatcherAssert.assertThat((Object)values.getValuesAtSteps(), (Matcher)org.hamcrest.Matchers.hasEntry((Object)fullName, (Object)stepValue));
        MatcherAssert.assertThat((Object)values.getValuesAtSteps(), (Matcher)org.hamcrest.Matchers.hasEntry((Object)otherFullName, (Object)stepValueTwo));
        MatcherAssert.assertThat((Object)values.getValuesAtSteps().size(), (Matcher)org.hamcrest.Matchers.equalTo((Object)2));
        MatcherAssert.assertThat((Object)values.getValues(), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{stepValue, stepValueTwo}));
        MatcherAssert.assertThat((Object)values.getTotalValue((Combine.CombineFn)combineFn), (Matcher)org.hamcrest.Matchers.equalTo((Object)(stepValue + stepValueTwo)));
    }

    @Test
    public void testGetAggregatorValuesWithUnrelatedMetricUpdateIgnoresUpdate() throws IOException, AggregatorRetrievalException {
        Sum.SumLongFn combineFn = new Sum.SumLongFn();
        String aggregatorName = "agg";
        TestAggregator aggregator = new TestAggregator(combineFn, aggregatorName);
        PTransform pTransform = (PTransform)Mockito.mock(PTransform.class);
        String stepName = "s1";
        String fullName = "Foo/Bar/Baz";
        AppliedPTransform<?, ?, ?> appliedTransform = this.appliedPTransform(fullName, (PTransform<PInput, POutput>)pTransform);
        DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms((Map)ImmutableSetMultimap.of(aggregator, (Object)pTransform).asMap(), (Map)ImmutableMap.of(appliedTransform, (Object)stepName));
        Dataflow.Projects.Jobs.GetMetrics getMetrics = (Dataflow.Projects.Jobs.GetMetrics)Mockito.mock(Dataflow.Projects.Jobs.GetMetrics.class);
        Mockito.when((Object)this.mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn((Object)getMetrics);
        JobMetrics jobMetrics = new JobMetrics();
        Mockito.when((Object)getMetrics.execute()).thenReturn((Object)jobMetrics);
        MetricUpdate ignoredUpdate = new MetricUpdate();
        ignoredUpdate.setScalar(null);
        MetricStructuredName ignoredName = new MetricStructuredName();
        ignoredName.setName("ignoredAggregator.elementCount.out0");
        ignoredName.setContext(null);
        ignoredUpdate.setName(ignoredName);
        jobMetrics.setMetrics((List)ImmutableList.of((Object)ignoredUpdate));
        Dataflow.Projects.Jobs.Get getState = (Dataflow.Projects.Jobs.Get)Mockito.mock(Dataflow.Projects.Jobs.Get.class);
        Mockito.when((Object)this.mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn((Object)getState);
        Job modelJob = new Job();
        Mockito.when((Object)getState.execute()).thenReturn((Object)modelJob);
        modelJob.setCurrentState(PipelineResult.State.RUNNING.toString());
        DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, this.mockWorkflowClient, aggregatorTransforms);
        AggregatorValues values = job.getAggregatorValues(aggregator);
        MatcherAssert.assertThat(values.getValuesAtSteps().entrySet(), (Matcher)org.hamcrest.Matchers.empty());
        MatcherAssert.assertThat((Object)values.getValues(), (Matcher)org.hamcrest.Matchers.empty());
    }

    @Test
    public void testGetAggregatorValuesWithUnusedAggregatorThrowsException() throws AggregatorRetrievalException {
        Aggregator aggregator = (Aggregator)Mockito.mock(Aggregator.class);
        DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms((Map)ImmutableSetMultimap.of().asMap(), (Map)ImmutableMap.of());
        DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, this.mockWorkflowClient, aggregatorTransforms);
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("not used in this pipeline");
        job.getAggregatorValues(aggregator);
    }

    @Test
    public void testGetAggregatorValuesWhenClientThrowsExceptionThrowsAggregatorRetrievalException() throws IOException, AggregatorRetrievalException {
        Sum.SumLongFn combineFn = new Sum.SumLongFn();
        String aggregatorName = "agg";
        TestAggregator aggregator = new TestAggregator(combineFn, aggregatorName);
        PTransform pTransform = (PTransform)Mockito.mock(PTransform.class);
        String stepName = "s1";
        String fullName = "Foo/Bar/Baz";
        AppliedPTransform<?, ?, ?> appliedTransform = this.appliedPTransform(fullName, (PTransform<PInput, POutput>)pTransform);
        DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms((Map)ImmutableSetMultimap.of(aggregator, (Object)pTransform).asMap(), (Map)ImmutableMap.of(appliedTransform, (Object)stepName));
        Dataflow.Projects.Jobs.GetMetrics getMetrics = (Dataflow.Projects.Jobs.GetMetrics)Mockito.mock(Dataflow.Projects.Jobs.GetMetrics.class);
        Mockito.when((Object)this.mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn((Object)getMetrics);
        IOException cause = new IOException();
        Mockito.when((Object)getMetrics.execute()).thenThrow(new Throwable[]{cause});
        Dataflow.Projects.Jobs.Get getState = (Dataflow.Projects.Jobs.Get)Mockito.mock(Dataflow.Projects.Jobs.Get.class);
        Mockito.when((Object)this.mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn((Object)getState);
        Job modelJob = new Job();
        Mockito.when((Object)getState.execute()).thenReturn((Object)modelJob);
        modelJob.setCurrentState(PipelineResult.State.RUNNING.toString());
        DataflowPipelineJob job = new DataflowPipelineJob(PROJECT_ID, JOB_ID, this.mockWorkflowClient, aggregatorTransforms);
        this.thrown.expect(AggregatorRetrievalException.class);
        this.thrown.expectCause(org.hamcrest.Matchers.is((Object)cause));
        this.thrown.expectMessage(aggregator.toString());
        this.thrown.expectMessage("when retrieving Aggregator values for");
        job.getAggregatorValues(aggregator);
    }

    private AppliedPTransform<?, ?, ?> appliedPTransform(String fullName, PTransform<PInput, POutput> transform) {
        return AppliedPTransform.of((String)fullName, (PInput)((PInput)Mockito.mock(PInput.class)), (POutput)((POutput)Mockito.mock(POutput.class)), transform);
    }

    private static class TestAggregator<InT, OutT>
    implements Aggregator<InT, OutT> {
        private final Combine.CombineFn<InT, ?, OutT> combineFn;
        private final String name;

        public TestAggregator(Combine.CombineFn<InT, ?, OutT> combineFn, String name) {
            this.combineFn = combineFn;
            this.name = name;
        }

        public void addValue(InT value) {
            throw new AssertionError();
        }

        public String getName() {
            return this.name;
        }

        public Combine.CombineFn<InT, ?, OutT> getCombineFn() {
            return this.combineFn;
        }
    }
}

