/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.dataflow.testing;

import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.LowLevelHttpResponse;
import com.google.api.client.testing.http.MockHttpTransport;
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
import com.google.api.client.testing.http.MockLowLevelHttpResponse;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricStructuredName;
import com.google.api.services.dataflow.model.MetricUpdate;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineRunner;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.repackaged.com.google.common.base.Optional;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Lists;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SerializableMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.NoopPathValidator;
import org.apache.beam.sdk.util.TestCredential;
import org.apache.beam.sdk.util.Transport;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

@RunWith(value=JUnit4.class)
public class TestDataflowPipelineRunnerTest {
    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    @Mock
    private MockHttpTransport transport;
    @Mock
    private MockLowLevelHttpRequest request;
    @Mock
    private GcsUtil mockGcsUtil;
    private TestDataflowPipelineOptions options;
    private Dataflow service;

    @Before
    public void setUp() throws Exception {
        MockitoAnnotations.initMocks((Object)this);
        Mockito.when((Object)this.transport.buildRequest(Matchers.anyString(), Matchers.anyString())).thenReturn((Object)this.request);
        ((MockLowLevelHttpRequest)Mockito.doCallRealMethod().when((Object)this.request)).getContentAsString();
        this.service = new Dataflow((HttpTransport)this.transport, Transport.getJsonFactory(), null);
        this.options = (TestDataflowPipelineOptions)PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
        this.options.setAppName("TestAppName");
        this.options.setProject("test-project");
        this.options.setTempLocation("gs://test/temp/location");
        this.options.setTempRoot("gs://test");
        this.options.setGcpCredential((Credential)new TestCredential());
        this.options.setDataflowClient(this.service);
        this.options.setRunner(TestDataflowPipelineRunner.class);
        this.options.setPathValidatorClass(NoopPathValidator.class);
    }

    @Test
    public void testToString() {
        Assert.assertEquals((Object)"TestDataflowPipelineRunner#TestAppName", (Object)new TestDataflowPipelineRunner(this.options).toString());
    }

    @Test
    public void testRunBatchJobThatSucceeds() throws Exception {
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object[])new Integer[]{1, 2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getDataflowClient()).thenReturn((Object)this.service);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.DONE);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        DataflowPipelineRunner mockRunner = (DataflowPipelineRunner)Mockito.mock(DataflowPipelineRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner)p.getRunner();
        Mockito.when((Object)this.request.execute()).thenReturn((Object)this.generateMockMetricResponse(true, true));
        Assert.assertEquals((Object)mockJob, (Object)runner.run(p, mockRunner));
    }

    @Test
    public void testRunBatchJobThatFails() throws Exception {
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object[])new Integer[]{1, 2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getDataflowClient()).thenReturn((Object)this.service);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.FAILED);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        DataflowPipelineRunner mockRunner = (DataflowPipelineRunner)Mockito.mock(DataflowPipelineRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner)p.getRunner();
        try {
            runner.run(p, mockRunner);
        }
        catch (AssertionError expected) {
            return;
        }
        Assert.fail((String)"AssertionError expected");
    }

    @Test
    public void testBatchPipelineFailsIfException() throws Exception {
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object[])new Integer[]{1, 2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getDataflowClient()).thenReturn((Object)this.service);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.RUNNING);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        Mockito.when((Object)mockJob.waitToFinish(((Long)Matchers.any(Long.class)).longValue(), (TimeUnit)((Object)Matchers.any(TimeUnit.class)), (MonitoringUtil.JobMessagesHandler)Matchers.any(MonitoringUtil.JobMessagesHandler.class))).thenAnswer((Answer)new Answer<PipelineResult.State>(){

            public PipelineResult.State answer(InvocationOnMock invocation) {
                JobMessage message = new JobMessage();
                message.setMessageText("FooException");
                message.setTime(TimeUtil.toCloudTime((ReadableInstant)Instant.now()));
                message.setMessageImportance("JOB_MESSAGE_ERROR");
                ((MonitoringUtil.JobMessagesHandler)invocation.getArguments()[2]).process(Arrays.asList(message));
                return PipelineResult.State.CANCELLED;
            }
        });
        DataflowPipelineRunner mockRunner = (DataflowPipelineRunner)Mockito.mock(DataflowPipelineRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        Mockito.when((Object)this.request.execute()).thenReturn((Object)this.generateMockMetricResponse(false, true));
        TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner)p.getRunner();
        try {
            runner.run(p, mockRunner);
        }
        catch (AssertionError expected) {
            Assert.assertThat((Object)((Throwable)((Object)expected)).getMessage(), (Matcher)org.hamcrest.Matchers.containsString((String)"FooException"));
            ((DataflowPipelineJob)Mockito.verify((Object)mockJob, (VerificationMode)Mockito.atLeastOnce())).cancel();
            return;
        }
        Assert.fail((String)"AssertionError expected");
    }

    @Test
    public void testRunStreamingJobThatSucceeds() throws Exception {
        this.options.setStreaming(true);
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object[])new Integer[]{1, 2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getDataflowClient()).thenReturn((Object)this.service);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.RUNNING);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        DataflowPipelineRunner mockRunner = (DataflowPipelineRunner)Mockito.mock(DataflowPipelineRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        Mockito.when((Object)this.request.execute()).thenReturn((Object)this.generateMockMetricResponse(true, true));
        TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner)p.getRunner();
        runner.run(p, mockRunner);
    }

    @Test
    public void testRunStreamingJobThatFails() throws Exception {
        this.options.setStreaming(true);
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object[])new Integer[]{1, 2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getDataflowClient()).thenReturn((Object)this.service);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.RUNNING);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        DataflowPipelineRunner mockRunner = (DataflowPipelineRunner)Mockito.mock(DataflowPipelineRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        Mockito.when((Object)this.request.execute()).thenReturn((Object)this.generateMockMetricResponse(false, true));
        TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner)p.getRunner();
        try {
            runner.run(p, mockRunner);
        }
        catch (AssertionError expected) {
            return;
        }
        Assert.fail((String)"AssertionError expected");
    }

    @Test
    public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
        DataflowPipelineJob job = (DataflowPipelineJob)Mockito.spy((Object)new DataflowPipelineJob("test-project", "test-job", this.service, null));
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object[])new Integer[]{1, 2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner)p.getRunner();
        Mockito.when((Object)this.request.execute()).thenReturn((Object)this.generateMockMetricResponse(true, true));
        ((DataflowPipelineJob)Mockito.doReturn((Object)PipelineResult.State.DONE).when((Object)job)).getState();
        Assert.assertEquals((Object)Optional.of((Object)true), (Object)runner.checkForSuccess(job));
    }

    @Test
    public void testCheckingForSuccessWhenPAssertFails() throws Exception {
        DataflowPipelineJob job = (DataflowPipelineJob)Mockito.spy((Object)new DataflowPipelineJob("test-project", "test-job", this.service, null));
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object[])new Integer[]{1, 2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner)p.getRunner();
        Mockito.when((Object)this.request.execute()).thenReturn((Object)this.generateMockMetricResponse(false, true));
        ((DataflowPipelineJob)Mockito.doReturn((Object)PipelineResult.State.DONE).when((Object)job)).getState();
        Assert.assertEquals((Object)Optional.of((Object)false), (Object)runner.checkForSuccess(job));
    }

    @Test
    public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
        DataflowPipelineJob job = (DataflowPipelineJob)Mockito.spy((Object)new DataflowPipelineJob("test-project", "test-job", this.service, null));
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object[])new Integer[]{1, 2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner)p.getRunner();
        Mockito.when((Object)this.request.execute()).thenReturn((Object)this.generateMockMetricResponse(true, false));
        ((DataflowPipelineJob)Mockito.doReturn((Object)PipelineResult.State.RUNNING).when((Object)job)).getState();
        Assert.assertEquals((Object)Optional.absent(), (Object)runner.checkForSuccess(job));
    }

    private LowLevelHttpResponse generateMockMetricResponse(boolean success, boolean tentative) throws Exception {
        MetricStructuredName name = new MetricStructuredName();
        name.setName(success ? "PAssertSuccess" : "PAssertFailure");
        name.setContext((Map)(tentative ? ImmutableMap.of((Object)"tentative", (Object)"") : ImmutableMap.of()));
        MetricUpdate metric = new MetricUpdate();
        metric.setName(name);
        metric.setScalar((Object)BigDecimal.ONE);
        MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
        response.setContentType("application/json; charset=UTF-8");
        JobMetrics jobMetrics = new JobMetrics();
        jobMetrics.setMetrics((List)Lists.newArrayList((Object[])new MetricUpdate[]{metric}));
        jobMetrics.setFactory(Transport.getJsonFactory());
        response.setContent(jobMetrics.toPrettyString());
        return response;
    }

    @Test
    public void testStreamingPipelineFailsIfServiceFails() throws Exception {
        DataflowPipelineJob job = (DataflowPipelineJob)Mockito.spy((Object)new DataflowPipelineJob("test-project", "test-job", this.service, null));
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object[])new Integer[]{1, 2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner)p.getRunner();
        Mockito.when((Object)this.request.execute()).thenReturn((Object)this.generateMockMetricResponse(true, false));
        ((DataflowPipelineJob)Mockito.doReturn((Object)PipelineResult.State.FAILED).when((Object)job)).getState();
        Assert.assertEquals((Object)Optional.of((Object)false), (Object)runner.checkForSuccess(job));
    }

    @Test
    public void testStreamingPipelineFailsIfException() throws Exception {
        this.options.setStreaming(true);
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object[])new Integer[]{1, 2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getDataflowClient()).thenReturn((Object)this.service);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.RUNNING);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        Mockito.when((Object)mockJob.waitToFinish(((Long)Matchers.any(Long.class)).longValue(), (TimeUnit)((Object)Matchers.any(TimeUnit.class)), (MonitoringUtil.JobMessagesHandler)Matchers.any(MonitoringUtil.JobMessagesHandler.class))).thenAnswer((Answer)new Answer<PipelineResult.State>(){

            public PipelineResult.State answer(InvocationOnMock invocation) {
                JobMessage message = new JobMessage();
                message.setMessageText("FooException");
                message.setTime(TimeUtil.toCloudTime((ReadableInstant)Instant.now()));
                message.setMessageImportance("JOB_MESSAGE_ERROR");
                ((MonitoringUtil.JobMessagesHandler)invocation.getArguments()[2]).process(Arrays.asList(message));
                return PipelineResult.State.CANCELLED;
            }
        });
        DataflowPipelineRunner mockRunner = (DataflowPipelineRunner)Mockito.mock(DataflowPipelineRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        Mockito.when((Object)this.request.execute()).thenReturn((Object)this.generateMockMetricResponse(false, true));
        TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner)p.getRunner();
        try {
            runner.run(p, mockRunner);
        }
        catch (AssertionError expected) {
            Assert.assertThat((Object)((Throwable)((Object)expected)).getMessage(), (Matcher)org.hamcrest.Matchers.containsString((String)"FooException"));
            ((DataflowPipelineJob)Mockito.verify((Object)mockJob, (VerificationMode)Mockito.atLeastOnce())).cancel();
            return;
        }
        Assert.fail((String)"AssertionError expected");
    }

    @Test
    public void testBatchOnCreateMatcher() throws Exception {
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object[])new Integer[]{1, 2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getDataflowClient()).thenReturn((Object)this.service);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.DONE);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        DataflowPipelineRunner mockRunner = (DataflowPipelineRunner)Mockito.mock(DataflowPipelineRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner)p.getRunner();
        ((TestPipelineOptions)p.getOptions().as(TestPipelineOptions.class)).setOnCreateMatcher((SerializableMatcher)new TestSuccessMatcher(mockJob, 0));
        Mockito.when((Object)this.request.execute()).thenReturn((Object)this.generateMockMetricResponse(true, true));
        runner.run(p, mockRunner);
    }

    @Test
    public void testStreamingOnCreateMatcher() throws Exception {
        this.options.setStreaming(true);
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object[])new Integer[]{1, 2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getDataflowClient()).thenReturn((Object)this.service);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.DONE);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        DataflowPipelineRunner mockRunner = (DataflowPipelineRunner)Mockito.mock(DataflowPipelineRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner)p.getRunner();
        ((TestPipelineOptions)p.getOptions().as(TestPipelineOptions.class)).setOnCreateMatcher((SerializableMatcher)new TestSuccessMatcher(mockJob, 0));
        Mockito.when((Object)mockJob.waitToFinish(((Long)Matchers.any(Long.class)).longValue(), (TimeUnit)((Object)Matchers.any(TimeUnit.class)), (MonitoringUtil.JobMessagesHandler)Matchers.any(MonitoringUtil.JobMessagesHandler.class))).thenReturn((Object)PipelineResult.State.DONE);
        Mockito.when((Object)this.request.execute()).thenReturn((Object)this.generateMockMetricResponse(true, true));
        runner.run(p, mockRunner);
    }

    @Test
    public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object[])new Integer[]{1, 2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getDataflowClient()).thenReturn((Object)this.service);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.DONE);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        DataflowPipelineRunner mockRunner = (DataflowPipelineRunner)Mockito.mock(DataflowPipelineRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner)p.getRunner();
        ((TestPipelineOptions)p.getOptions().as(TestPipelineOptions.class)).setOnSuccessMatcher((SerializableMatcher)new TestSuccessMatcher(mockJob, 1));
        Mockito.when((Object)this.request.execute()).thenReturn((Object)this.generateMockMetricResponse(true, true));
        runner.run(p, mockRunner);
    }

    @Test
    public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
        this.options.setStreaming(true);
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object[])new Integer[]{1, 2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getDataflowClient()).thenReturn((Object)this.service);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.DONE);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        DataflowPipelineRunner mockRunner = (DataflowPipelineRunner)Mockito.mock(DataflowPipelineRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner)p.getRunner();
        ((TestPipelineOptions)p.getOptions().as(TestPipelineOptions.class)).setOnSuccessMatcher((SerializableMatcher)new TestSuccessMatcher(mockJob, 1));
        Mockito.when((Object)mockJob.waitToFinish(((Long)Matchers.any(Long.class)).longValue(), (TimeUnit)((Object)Matchers.any(TimeUnit.class)), (MonitoringUtil.JobMessagesHandler)Matchers.any(MonitoringUtil.JobMessagesHandler.class))).thenReturn((Object)PipelineResult.State.DONE);
        Mockito.when((Object)this.request.execute()).thenReturn((Object)this.generateMockMetricResponse(true, true));
        runner.run(p, mockRunner);
    }

    @Test
    public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception {
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object[])new Integer[]{1, 2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getDataflowClient()).thenReturn((Object)this.service);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.FAILED);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        DataflowPipelineRunner mockRunner = (DataflowPipelineRunner)Mockito.mock(DataflowPipelineRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner)p.getRunner();
        ((TestPipelineOptions)p.getOptions().as(TestPipelineOptions.class)).setOnSuccessMatcher((SerializableMatcher)new TestFailureMatcher());
        Mockito.when((Object)this.request.execute()).thenReturn((Object)this.generateMockMetricResponse(false, true));
        try {
            runner.run(p, mockRunner);
        }
        catch (AssertionError expected) {
            ((DataflowPipelineJob)Mockito.verify((Object)mockJob, (VerificationMode)Mockito.times((int)1))).waitToFinish(((Long)Matchers.any(Long.class)).longValue(), (TimeUnit)((Object)Matchers.any(TimeUnit.class)), (MonitoringUtil.JobMessagesHandler)Matchers.any(MonitoringUtil.JobMessagesHandler.class));
            return;
        }
        Assert.fail((String)"Expected an exception on pipeline failure.");
    }

    @Test
    public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
        this.options.setStreaming(true);
        Pipeline p = TestPipeline.create((PipelineOptions)this.options);
        PCollection pc = (PCollection)p.apply((PTransform)Create.of((Object[])new Integer[]{1, 2, 3}));
        PAssert.that((PCollection)pc).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        DataflowPipelineJob mockJob = (DataflowPipelineJob)Mockito.mock(DataflowPipelineJob.class);
        Mockito.when((Object)mockJob.getDataflowClient()).thenReturn((Object)this.service);
        Mockito.when((Object)mockJob.getState()).thenReturn((Object)PipelineResult.State.FAILED);
        Mockito.when((Object)mockJob.getProjectId()).thenReturn((Object)"test-project");
        Mockito.when((Object)mockJob.getJobId()).thenReturn((Object)"test-job");
        DataflowPipelineRunner mockRunner = (DataflowPipelineRunner)Mockito.mock(DataflowPipelineRunner.class);
        Mockito.when((Object)mockRunner.run((Pipeline)Matchers.any(Pipeline.class))).thenReturn((Object)mockJob);
        TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner)p.getRunner();
        ((TestPipelineOptions)p.getOptions().as(TestPipelineOptions.class)).setOnSuccessMatcher((SerializableMatcher)new TestFailureMatcher());
        Mockito.when((Object)mockJob.waitToFinish(((Long)Matchers.any(Long.class)).longValue(), (TimeUnit)((Object)Matchers.any(TimeUnit.class)), (MonitoringUtil.JobMessagesHandler)Matchers.any(MonitoringUtil.JobMessagesHandler.class))).thenReturn((Object)PipelineResult.State.FAILED);
        Mockito.when((Object)this.request.execute()).thenReturn((Object)this.generateMockMetricResponse(false, true));
        try {
            runner.run(p, mockRunner);
        }
        catch (AssertionError expected) {
            ((DataflowPipelineJob)Mockito.verify((Object)mockJob, (VerificationMode)Mockito.times((int)1))).waitToFinish(((Long)Matchers.any(Long.class)).longValue(), (TimeUnit)((Object)Matchers.any(TimeUnit.class)), (MonitoringUtil.JobMessagesHandler)Matchers.any(MonitoringUtil.JobMessagesHandler.class));
            return;
        }
        Assert.fail((String)"Expected an exception on pipeline failure.");
    }

    static class TestFailureMatcher
    extends BaseMatcher<PipelineResult>
    implements SerializableMatcher<PipelineResult> {
        TestFailureMatcher() {
        }

        public boolean matches(Object o) {
            Assert.fail((String)"OnSuccessMatcher should not be called on pipeline failure.");
            return false;
        }

        public void describeTo(Description description) {
        }
    }

    static class TestSuccessMatcher
    extends BaseMatcher<PipelineResult>
    implements SerializableMatcher<PipelineResult> {
        private final DataflowPipelineJob mockJob;
        private final int called;

        public TestSuccessMatcher(DataflowPipelineJob job, int times) {
            this.mockJob = job;
            this.called = times;
        }

        public boolean matches(Object o) {
            if (!(o instanceof PipelineResult)) {
                Assert.fail((String)String.format("Expected PipelineResult but received %s", o));
            }
            try {
                ((DataflowPipelineJob)Mockito.verify((Object)this.mockJob, (VerificationMode)Mockito.times((int)this.called))).waitToFinish(((Long)Matchers.any(Long.class)).longValue(), (TimeUnit)((Object)Matchers.any(TimeUnit.class)), (MonitoringUtil.JobMessagesHandler)Matchers.any(MonitoringUtil.JobMessagesHandler.class));
            }
            catch (IOException | InterruptedException e) {
                throw new AssertionError((Object)e);
            }
            Assert.assertSame((Object)this.mockJob, (Object)o);
            return true;
        }

        public void describeTo(Description description) {
        }
    }
}

