/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.dataflow;

import com.google.api.client.auth.oauth2.Credential;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.ListJobsResponse;
import com.google.api.services.dataflow.model.WorkerPool;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.channels.FileChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.beam.runners.dataflow.DataflowJobAlreadyExistsException;
import org.apache.beam.runners.dataflow.DataflowJobAlreadyUpdatedException;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator;
import org.apache.beam.runners.dataflow.internal.IsmFormat;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.io.BigQueryIO;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.runners.dataflow.TestCountingSource;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.NoopPathValidator;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.util.TestCredential;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

@RunWith(value=JUnit4.class)
public class DataflowPipelineRunnerTest {
    private static final String PROJECT_ID = "some-project";
    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    private static void assertValidJob(Job job) {
        Assert.assertNull((Object)job.getId());
        Assert.assertNull((Object)job.getCurrentState());
        Assert.assertTrue((boolean)Pattern.matches("[a-z]([-a-z0-9]*[a-z0-9])?", job.getName()));
    }

    private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
        options.setStableUniqueNames(PipelineOptions.CheckEnabled.ERROR);
        options.setRunner(DataflowPipelineRunner.class);
        Pipeline p = Pipeline.create((PipelineOptions)options);
        ((PCollection)p.apply((PTransform)TextIO.Read.named((String)"ReadMyFile").from("gs://bucket/object"))).apply((PTransform)TextIO.Write.named((String)"WriteMyFile").to("gs://bucket/object"));
        return p;
    }

    private static Dataflow buildMockDataflow(ArgumentCaptor<Job> jobCaptor) throws IOException {
        Dataflow mockDataflowClient = (Dataflow)Mockito.mock(Dataflow.class);
        Dataflow.Projects mockProjects = (Dataflow.Projects)Mockito.mock(Dataflow.Projects.class);
        Dataflow.Projects.Jobs mockJobs = (Dataflow.Projects.Jobs)Mockito.mock(Dataflow.Projects.Jobs.class);
        Dataflow.Projects.Jobs.Create mockRequest = (Dataflow.Projects.Jobs.Create)Mockito.mock(Dataflow.Projects.Jobs.Create.class);
        Dataflow.Projects.Jobs.List mockList = (Dataflow.Projects.Jobs.List)Mockito.mock(Dataflow.Projects.Jobs.List.class);
        Mockito.when((Object)mockDataflowClient.projects()).thenReturn((Object)mockProjects);
        Mockito.when((Object)mockProjects.jobs()).thenReturn((Object)mockJobs);
        Mockito.when((Object)mockJobs.create((String)Matchers.eq((Object)PROJECT_ID), (Job)jobCaptor.capture())).thenReturn((Object)mockRequest);
        Mockito.when((Object)mockJobs.list((String)Matchers.eq((Object)PROJECT_ID))).thenReturn((Object)mockList);
        Mockito.when((Object)mockList.setPageToken(Matchers.anyString())).thenReturn((Object)mockList);
        Mockito.when((Object)mockList.execute()).thenReturn((Object)new ListJobsResponse().setJobs(Arrays.asList(new Job().setName("oldjobname").setId("oldJobId").setCurrentState("JOB_STATE_RUNNING"))));
        Job resultJob = new Job();
        resultJob.setId("newid");
        Mockito.when((Object)mockRequest.execute()).thenReturn((Object)resultJob);
        return mockDataflowClient;
    }

    private GcsUtil buildMockGcsUtil(boolean bucketExists) throws IOException {
        GcsUtil mockGcsUtil = (GcsUtil)Mockito.mock(GcsUtil.class);
        Mockito.when((Object)mockGcsUtil.create((GcsPath)Matchers.any(GcsPath.class), Matchers.anyString())).then((Answer)new Answer<SeekableByteChannel>(){

            public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
                return FileChannel.open(Files.createTempFile("channel-", ".tmp", new FileAttribute[0]), StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
            }
        });
        Mockito.when((Object)mockGcsUtil.isGcsPatternSupported(Matchers.anyString())).thenReturn((Object)true);
        Mockito.when((Object)mockGcsUtil.expand((GcsPath)Matchers.any(GcsPath.class))).then((Answer)new Answer<List<GcsPath>>(){

            public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
                return ImmutableList.of((Object)((GcsPath)invocation.getArguments()[0]));
            }
        });
        Mockito.when((Object)mockGcsUtil.bucketExists((GcsPath)Matchers.any(GcsPath.class))).thenReturn((Object)bucketExists);
        return mockGcsUtil;
    }

    private DataflowPipelineOptions buildPipelineOptions() throws IOException {
        ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class);
        return this.buildPipelineOptions((ArgumentCaptor<Job>)jobCaptor);
    }

    private DataflowPipelineOptions buildPipelineOptions(ArgumentCaptor<Job> jobCaptor) throws IOException {
        DataflowPipelineOptions options = (DataflowPipelineOptions)PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setRunner(DataflowPipelineRunner.class);
        options.setProject(PROJECT_ID);
        options.setTempLocation("gs://somebucket/some/path");
        options.setFilesToStage(new LinkedList());
        options.setDataflowClient(DataflowPipelineRunnerTest.buildMockDataflow(jobCaptor));
        options.setGcsUtil(this.buildMockGcsUtil(true));
        options.setGcpCredential((Credential)new TestCredential());
        return options;
    }

    @Test
    public void testFromOptionsWithUppercaseConvertsToLowercase() throws Exception {
        String mixedCase = "ThisJobNameHasMixedCase";
        ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class);
        DataflowPipelineOptions options = this.buildPipelineOptions((ArgumentCaptor<Job>)jobCaptor);
        options.setJobName(mixedCase);
        DataflowPipelineRunner runner = DataflowPipelineRunner.fromOptions((PipelineOptions)options);
        Assert.assertThat((Object)options.getJobName(), (Matcher)org.hamcrest.Matchers.equalTo((Object)mixedCase.toLowerCase()));
    }

    @Test
    public void testRun() throws IOException {
        ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class);
        DataflowPipelineOptions options = this.buildPipelineOptions((ArgumentCaptor<Job>)jobCaptor);
        Pipeline p = this.buildDataflowPipeline(options);
        DataflowPipelineJob job = (DataflowPipelineJob)p.run();
        Assert.assertEquals((Object)"newid", (Object)job.getJobId());
        DataflowPipelineRunnerTest.assertValidJob((Job)jobCaptor.getValue());
    }

    @Test
    public void testRunReturnDifferentRequestId() throws IOException {
        DataflowPipelineOptions options = this.buildPipelineOptions();
        Dataflow mockDataflowClient = options.getDataflowClient();
        Dataflow.Projects.Jobs.Create mockRequest = (Dataflow.Projects.Jobs.Create)Mockito.mock(Dataflow.Projects.Jobs.Create.class);
        Mockito.when((Object)mockDataflowClient.projects().jobs().create((String)Matchers.eq((Object)PROJECT_ID), (Job)Matchers.any(Job.class))).thenReturn((Object)mockRequest);
        Job resultJob = new Job();
        resultJob.setId("newid");
        resultJob.setClientRequestId("different_request_id");
        Mockito.when((Object)mockRequest.execute()).thenReturn((Object)resultJob);
        Pipeline p = this.buildDataflowPipeline(options);
        try {
            p.run();
            Assert.fail((String)"Expected DataflowJobAlreadyExistsException");
        }
        catch (DataflowJobAlreadyExistsException expected) {
            Assert.assertThat((Object)expected.getMessage(), (Matcher)org.hamcrest.Matchers.containsString((String)"If you want to submit a second job, try again by setting a different name using --jobName."));
            Assert.assertEquals((Object)expected.getJob().getJobId(), (Object)resultJob.getId());
        }
    }

    @Test
    public void testUpdate() throws IOException {
        ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class);
        DataflowPipelineOptions options = this.buildPipelineOptions((ArgumentCaptor<Job>)jobCaptor);
        options.setUpdate(true);
        options.setJobName("oldJobName");
        Pipeline p = this.buildDataflowPipeline(options);
        DataflowPipelineJob job = (DataflowPipelineJob)p.run();
        Assert.assertEquals((Object)"newid", (Object)job.getJobId());
        DataflowPipelineRunnerTest.assertValidJob((Job)jobCaptor.getValue());
    }

    @Test
    public void testUpdateNonExistentPipeline() throws IOException {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Could not find running job named badjobname");
        DataflowPipelineOptions options = this.buildPipelineOptions();
        options.setUpdate(true);
        options.setJobName("badJobName");
        Pipeline p = this.buildDataflowPipeline(options);
        p.run();
    }

    @Test
    public void testUpdateAlreadyUpdatedPipeline() throws IOException {
        DataflowPipelineOptions options = this.buildPipelineOptions();
        options.setUpdate(true);
        options.setJobName("oldJobName");
        Dataflow mockDataflowClient = options.getDataflowClient();
        Dataflow.Projects.Jobs.Create mockRequest = (Dataflow.Projects.Jobs.Create)Mockito.mock(Dataflow.Projects.Jobs.Create.class);
        Mockito.when((Object)mockDataflowClient.projects().jobs().create((String)Matchers.eq((Object)PROJECT_ID), (Job)Matchers.any(Job.class))).thenReturn((Object)mockRequest);
        final Job resultJob = new Job();
        resultJob.setId("newid");
        resultJob.setClientRequestId("different_request_id");
        Mockito.when((Object)mockRequest.execute()).thenReturn((Object)resultJob);
        Pipeline p = this.buildDataflowPipeline(options);
        this.thrown.expect(DataflowJobAlreadyUpdatedException.class);
        this.thrown.expect((Matcher)new TypeSafeMatcher<DataflowJobAlreadyUpdatedException>(){

            public void describeTo(Description description) {
                String string = String.valueOf(resultJob.getId());
                description.appendText(string.length() != 0 ? "Expected job ID: ".concat(string) : new String("Expected job ID: "));
            }

            protected boolean matchesSafely(DataflowJobAlreadyUpdatedException item) {
                return resultJob.getId().equals(item.getJob().getJobId());
            }
        });
        this.thrown.expectMessage("The job named oldjobname with id: oldJobId has already been updated into job id: newid and cannot be updated again.");
        p.run();
    }

    @Test
    public void testRunWithFiles() throws IOException {
        GcsUtil mockGcsUtil = this.buildMockGcsUtil(true);
        String gcsStaging = "gs://somebucket/some/path";
        String gcsTemp = "gs://somebucket/some/temp/path";
        String cloudDataflowDataset = "somedataset";
        File temp1 = File.createTempFile("DataflowPipelineRunnerTest", "txt");
        temp1.deleteOnExit();
        File temp2 = File.createTempFile("DataflowPipelineRunnerTest2", "txt");
        temp2.deleteOnExit();
        String overridePackageName = "alias.txt";
        ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class);
        DataflowPipelineOptions options = (DataflowPipelineOptions)PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        String string = String.valueOf(temp2.getAbsolutePath());
        options.setFilesToStage((List)ImmutableList.of((Object)temp1.getAbsolutePath(), (Object)new StringBuilder(1 + String.valueOf(overridePackageName).length() + String.valueOf(string).length()).append(overridePackageName).append("=").append(string).toString()));
        options.setStagingLocation("gs://somebucket/some/path");
        options.setTempLocation("gs://somebucket/some/temp/path");
        options.setTempDatasetId("somedataset");
        options.setProject(PROJECT_ID);
        options.setJobName("job");
        options.setDataflowClient(DataflowPipelineRunnerTest.buildMockDataflow((ArgumentCaptor<Job>)jobCaptor));
        options.setGcsUtil(mockGcsUtil);
        options.setGcpCredential((Credential)new TestCredential());
        Pipeline p = this.buildDataflowPipeline(options);
        DataflowPipelineJob job = (DataflowPipelineJob)p.run();
        Assert.assertEquals((Object)"newid", (Object)job.getJobId());
        Job workflowJob = (Job)jobCaptor.getValue();
        DataflowPipelineRunnerTest.assertValidJob(workflowJob);
        Assert.assertEquals((long)2L, (long)((WorkerPool)workflowJob.getEnvironment().getWorkerPools().get(0)).getPackages().size());
        DataflowPackage workflowPackage1 = (DataflowPackage)((WorkerPool)workflowJob.getEnvironment().getWorkerPools().get(0)).getPackages().get(0);
        Assert.assertThat((Object)workflowPackage1.getName(), (Matcher)org.hamcrest.Matchers.startsWith((String)temp1.getName()));
        DataflowPackage workflowPackage2 = (DataflowPackage)((WorkerPool)workflowJob.getEnvironment().getWorkerPools().get(0)).getPackages().get(1);
        Assert.assertEquals((Object)overridePackageName, (Object)workflowPackage2.getName());
        Assert.assertEquals((Object)"storage.googleapis.com/somebucket/some/temp/path", (Object)workflowJob.getEnvironment().getTempStoragePrefix());
        Assert.assertEquals((Object)"somedataset", (Object)workflowJob.getEnvironment().getDataset());
        Assert.assertEquals((Object)ReleaseInfo.getReleaseInfo().getName(), workflowJob.getEnvironment().getUserAgent().get("name"));
        Assert.assertEquals((Object)ReleaseInfo.getReleaseInfo().getVersion(), workflowJob.getEnvironment().getUserAgent().get("version"));
    }

    @Test
    public void runWithDefaultFilesToStage() throws Exception {
        DataflowPipelineOptions options = this.buildPipelineOptions();
        options.setFilesToStage(null);
        DataflowPipelineRunner.fromOptions((PipelineOptions)options);
        Assert.assertTrue((!options.getFilesToStage().isEmpty() ? 1 : 0) != 0);
    }

    @Test
    public void detectClassPathResourceWithFileResources() throws Exception {
        File file = this.tmpFolder.newFile("file");
        File file2 = this.tmpFolder.newFile("file2");
        URLClassLoader classLoader = new URLClassLoader(new URL[]{file.toURI().toURL(), file2.toURI().toURL()});
        Assert.assertEquals((Object)ImmutableList.of((Object)file.getAbsolutePath(), (Object)file2.getAbsolutePath()), (Object)DataflowPipelineRunner.detectClassPathResourcesToStage((ClassLoader)classLoader));
    }

    @Test
    public void detectClassPathResourcesWithUnsupportedClassLoader() {
        ClassLoader mockClassLoader = (ClassLoader)Mockito.mock(ClassLoader.class);
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Unable to use ClassLoader to detect classpath elements.");
        DataflowPipelineRunner.detectClassPathResourcesToStage((ClassLoader)mockClassLoader);
    }

    @Test
    public void detectClassPathResourceWithNonFileResources() throws Exception {
        String url = "http://www.google.com/all-the-secrets.jar";
        URLClassLoader classLoader = new URLClassLoader(new URL[]{new URL(url)});
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage(new StringBuilder(33 + String.valueOf(url).length()).append("Unable to convert url (").append(url).append(") to file.").toString());
        DataflowPipelineRunner.detectClassPathResourcesToStage((ClassLoader)classLoader);
    }

    @Test
    public void testGcsStagingLocationInitialization() throws Exception {
        String gcsTemp = "gs://somebucket/some/temp/path";
        DataflowPipelineOptions options = (DataflowPipelineOptions)PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setTempLocation(gcsTemp);
        options.setProject(PROJECT_ID);
        options.setGcpCredential((Credential)new TestCredential());
        options.setGcsUtil(this.buildMockGcsUtil(true));
        DataflowPipelineRunner.fromOptions((PipelineOptions)options);
        Assert.assertNotNull((Object)options.getStagingLocation());
    }

    @Test
    public void testNonGcsFilePathInReadFailure() throws IOException {
        ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class);
        Pipeline p = this.buildDataflowPipeline(this.buildPipelineOptions((ArgumentCaptor<Job>)jobCaptor));
        p.apply((PTransform)TextIO.Read.named((String)"ReadMyNonGcsFile").from(this.tmpFolder.newFile().getPath()));
        this.thrown.expectCause(org.hamcrest.Matchers.allOf((Matcher)org.hamcrest.Matchers.instanceOf(IllegalArgumentException.class), (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)org.hamcrest.Matchers.containsString((String)"expected a valid 'gs://' path but was given"))));
        p.run();
        DataflowPipelineRunnerTest.assertValidJob((Job)jobCaptor.getValue());
    }

    @Test
    public void testNonGcsFilePathInWriteFailure() throws IOException {
        Pipeline p = this.buildDataflowPipeline(this.buildPipelineOptions());
        PCollection pc = (PCollection)p.apply((PTransform)TextIO.Read.named((String)"ReadMyGcsFile").from("gs://bucket/object"));
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage(org.hamcrest.Matchers.containsString((String)"expected a valid 'gs://' path but was given"));
        pc.apply((PTransform)TextIO.Write.named((String)"WriteMyNonGcsFile").to("/tmp/file"));
    }

    @Test
    public void testMultiSlashGcsFileReadPath() throws IOException {
        ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class);
        Pipeline p = this.buildDataflowPipeline(this.buildPipelineOptions((ArgumentCaptor<Job>)jobCaptor));
        p.apply((PTransform)TextIO.Read.named((String)"ReadInvalidGcsFile").from("gs://bucket/tmp//file"));
        this.thrown.expectCause(org.hamcrest.Matchers.allOf((Matcher)org.hamcrest.Matchers.instanceOf(IllegalArgumentException.class), (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)org.hamcrest.Matchers.containsString((String)"consecutive slashes"))));
        p.run();
        DataflowPipelineRunnerTest.assertValidJob((Job)jobCaptor.getValue());
    }

    @Test
    public void testMultiSlashGcsFileWritePath() throws IOException {
        Pipeline p = this.buildDataflowPipeline(this.buildPipelineOptions());
        PCollection pc = (PCollection)p.apply((PTransform)TextIO.Read.named((String)"ReadMyGcsFile").from("gs://bucket/object"));
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("consecutive slashes");
        pc.apply((PTransform)TextIO.Write.named((String)"WriteInvalidGcsFile").to("gs://bucket/tmp//file"));
    }

    @Test
    public void testInvalidTempLocation() throws IOException {
        ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class);
        DataflowPipelineOptions options = this.buildPipelineOptions((ArgumentCaptor<Job>)jobCaptor);
        options.setTempLocation("file://temp/location");
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage(org.hamcrest.Matchers.containsString((String)"expected a valid 'gs://' path but was given"));
        DataflowPipelineRunner.fromOptions((PipelineOptions)options);
        DataflowPipelineRunnerTest.assertValidJob((Job)jobCaptor.getValue());
    }

    @Test
    public void testInvalidStagingLocation() throws IOException {
        DataflowPipelineOptions options = this.buildPipelineOptions();
        options.setStagingLocation("file://my/staging/location");
        try {
            DataflowPipelineRunner.fromOptions((PipelineOptions)options);
            Assert.fail((String)"fromOptions should have failed");
        }
        catch (IllegalArgumentException e) {
            Assert.assertThat((Object)e.getMessage(), (Matcher)org.hamcrest.Matchers.containsString((String)"expected a valid 'gs://' path but was given"));
        }
        options.setStagingLocation("my/staging/location");
        try {
            DataflowPipelineRunner.fromOptions((PipelineOptions)options);
            Assert.fail((String)"fromOptions should have failed");
        }
        catch (IllegalArgumentException e) {
            Assert.assertThat((Object)e.getMessage(), (Matcher)org.hamcrest.Matchers.containsString((String)"expected a valid 'gs://' path but was given"));
        }
    }

    @Test
    public void testNonExistentTempLocation() throws IOException {
        ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class);
        GcsUtil mockGcsUtil = this.buildMockGcsUtil(false);
        DataflowPipelineOptions options = this.buildPipelineOptions((ArgumentCaptor<Job>)jobCaptor);
        options.setGcsUtil(mockGcsUtil);
        options.setTempLocation("gs://non-existent-bucket/location");
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage(org.hamcrest.Matchers.containsString((String)"Output path does not exist or is not writeable: gs://non-existent-bucket/location"));
        DataflowPipelineRunner.fromOptions((PipelineOptions)options);
        DataflowPipelineRunnerTest.assertValidJob((Job)jobCaptor.getValue());
    }

    @Test
    public void testNonExistentStagingLocation() throws IOException {
        ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class);
        GcsUtil mockGcsUtil = this.buildMockGcsUtil(false);
        DataflowPipelineOptions options = this.buildPipelineOptions((ArgumentCaptor<Job>)jobCaptor);
        options.setGcsUtil(mockGcsUtil);
        options.setStagingLocation("gs://non-existent-bucket/location");
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage(org.hamcrest.Matchers.containsString((String)"Output path does not exist or is not writeable: gs://non-existent-bucket/location"));
        DataflowPipelineRunner.fromOptions((PipelineOptions)options);
        DataflowPipelineRunnerTest.assertValidJob((Job)jobCaptor.getValue());
    }

    @Test
    public void testNoProjectFails() {
        DataflowPipelineOptions options = (DataflowPipelineOptions)PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setRunner(DataflowPipelineRunner.class);
        options.setProject(null);
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Project id");
        this.thrown.expectMessage("when running a Dataflow in the cloud");
        DataflowPipelineRunner.fromOptions((PipelineOptions)options);
    }

    @Test
    public void testProjectId() throws IOException {
        DataflowPipelineOptions options = (DataflowPipelineOptions)PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setRunner(DataflowPipelineRunner.class);
        options.setProject("foo-12345");
        options.setStagingLocation("gs://spam/ham/eggs");
        options.setGcsUtil(this.buildMockGcsUtil(true));
        options.setGcpCredential((Credential)new TestCredential());
        DataflowPipelineRunner.fromOptions((PipelineOptions)options);
    }

    @Test
    public void testProjectPrefix() throws IOException {
        DataflowPipelineOptions options = (DataflowPipelineOptions)PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setRunner(DataflowPipelineRunner.class);
        options.setProject("google.com:some-project-12345");
        options.setStagingLocation("gs://spam/ham/eggs");
        options.setGcsUtil(this.buildMockGcsUtil(true));
        options.setGcpCredential((Credential)new TestCredential());
        DataflowPipelineRunner.fromOptions((PipelineOptions)options);
    }

    @Test
    public void testProjectNumber() throws IOException {
        DataflowPipelineOptions options = (DataflowPipelineOptions)PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setRunner(DataflowPipelineRunner.class);
        options.setProject("12345");
        options.setStagingLocation("gs://spam/ham/eggs");
        options.setGcsUtil(this.buildMockGcsUtil(true));
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Project ID");
        this.thrown.expectMessage("project number");
        DataflowPipelineRunner.fromOptions((PipelineOptions)options);
    }

    @Test
    public void testProjectDescription() throws IOException {
        DataflowPipelineOptions options = (DataflowPipelineOptions)PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setRunner(DataflowPipelineRunner.class);
        options.setProject("some project");
        options.setStagingLocation("gs://spam/ham/eggs");
        options.setGcsUtil(this.buildMockGcsUtil(true));
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Project ID");
        this.thrown.expectMessage("project description");
        DataflowPipelineRunner.fromOptions((PipelineOptions)options);
    }

    @Test
    public void testInvalidNumberOfWorkerHarnessThreads() throws IOException {
        DataflowPipelineOptions options = (DataflowPipelineOptions)PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setRunner(DataflowPipelineRunner.class);
        options.setProject("foo-12345");
        options.setStagingLocation("gs://spam/ham/eggs");
        options.setGcsUtil(this.buildMockGcsUtil(true));
        ((DataflowPipelineDebugOptions)options.as(DataflowPipelineDebugOptions.class)).setNumberOfWorkerHarnessThreads(-1);
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Number of worker harness threads");
        this.thrown.expectMessage("Please make sure the value is non-negative.");
        DataflowPipelineRunner.fromOptions((PipelineOptions)options);
    }

    @Test
    public void testNoStagingLocationAndNoTempLocationFails() {
        DataflowPipelineOptions options = (DataflowPipelineOptions)PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setRunner(DataflowPipelineRunner.class);
        options.setProject("foo-project");
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("Missing required value: at least one of tempLocation or stagingLocation must be set.");
        DataflowPipelineRunner.fromOptions((PipelineOptions)options);
    }

    @Test
    public void testStagingLocationAndNoTempLocationSucceeds() throws Exception {
        DataflowPipelineOptions options = (DataflowPipelineOptions)PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setRunner(DataflowPipelineRunner.class);
        options.setGcpCredential((Credential)new TestCredential());
        options.setProject("foo-project");
        options.setStagingLocation("gs://spam/ham/eggs");
        options.setGcsUtil(this.buildMockGcsUtil(true));
        DataflowPipelineRunner.fromOptions((PipelineOptions)options);
    }

    @Test
    public void testTempLocationAndNoStagingLocationSucceeds() throws Exception {
        DataflowPipelineOptions options = (DataflowPipelineOptions)PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setRunner(DataflowPipelineRunner.class);
        options.setGcpCredential((Credential)new TestCredential());
        options.setProject("foo-project");
        options.setTempLocation("gs://spam/ham/eggs");
        options.setGcsUtil(this.buildMockGcsUtil(true));
        DataflowPipelineRunner.fromOptions((PipelineOptions)options);
    }

    @Test
    public void testInvalidJobName() throws IOException {
        List<String> invalidNames = Arrays.asList("invalid_name", "0invalid", "invalid-");
        List<String> expectedReason = Arrays.asList("JobName invalid", "JobName invalid", "JobName invalid");
        for (int i = 0; i < invalidNames.size(); ++i) {
            DataflowPipelineOptions options = this.buildPipelineOptions();
            options.setJobName(invalidNames.get(i));
            try {
                DataflowPipelineRunner.fromOptions((PipelineOptions)options);
                String string = String.valueOf(options.getJobName());
                Assert.fail((String)(string.length() != 0 ? "Expected IllegalArgumentException for jobName ".concat(string) : new String("Expected IllegalArgumentException for jobName ")));
                continue;
            }
            catch (IllegalArgumentException e) {
                Assert.assertThat((Object)e.getMessage(), (Matcher)org.hamcrest.Matchers.containsString((String)expectedReason.get(i)));
            }
        }
    }

    @Test
    public void testValidJobName() throws IOException {
        List<String> names = Arrays.asList("ok", "Ok", "A-Ok", "ok-123", "this-one-is-fairly-long-01234567890123456789");
        for (String name : names) {
            DataflowPipelineOptions options = this.buildPipelineOptions();
            options.setJobName(name);
            DataflowPipelineRunner runner = DataflowPipelineRunner.fromOptions((PipelineOptions)options);
            Assert.assertNotNull((Object)runner);
        }
    }

    @Test
    public void testTransformTranslatorMissing() throws IOException {
        ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class);
        DataflowPipelineOptions options = this.buildPipelineOptions((ArgumentCaptor<Job>)jobCaptor);
        Pipeline p = Pipeline.create((PipelineOptions)options);
        ((PCollection)p.apply((PTransform)Create.of(Arrays.asList(1, 2, 3)))).apply((PTransform)new TestTransform());
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage(org.hamcrest.Matchers.containsString((String)"no translator registered"));
        DataflowPipelineTranslator.fromOptions((DataflowPipelineOptions)options).translate(p, (DataflowPipelineRunner)p.getRunner(), Collections.emptyList());
        DataflowPipelineRunnerTest.assertValidJob((Job)jobCaptor.getValue());
    }

    @Test
    public void testTransformTranslator() throws IOException {
        DataflowPipelineOptions options = this.buildPipelineOptions();
        Pipeline p = Pipeline.create((PipelineOptions)options);
        TestTransform transform = new TestTransform();
        ((PCollection)p.apply((PTransform)Create.of(Arrays.asList(1, 2, 3)).withCoder((Coder)BigEndianIntegerCoder.of()))).apply((PTransform)transform);
        DataflowPipelineTranslator translator = DataflowPipelineRunner.fromOptions((PipelineOptions)options).getTranslator();
        DataflowPipelineTranslator.registerTransformTranslator(TestTransform.class, (DataflowPipelineTranslator.TransformTranslator)new DataflowPipelineTranslator.TransformTranslator<TestTransform>(){

            public void translate(TestTransform transform, DataflowPipelineTranslator.TranslationContext context) {
                transform.translated = true;
                context.addStep((PTransform)transform, "TestTranslate");
                context.addOutput("output", (PValue)context.getOutput((PTransform)transform));
            }
        });
        translator.translate(p, (DataflowPipelineRunner)p.getRunner(), Collections.emptyList());
        Assert.assertTrue((boolean)transform.translated);
    }

    @Test
    public void testApplyIsScopedToExactClass() throws IOException {
        DataflowPipelineOptions options = this.buildPipelineOptions();
        Pipeline p = Pipeline.create((PipelineOptions)options);
        Create.TimestampedValues transform = Create.timestamped(Arrays.asList(TimestampedValue.of((Object)"TestString", (Instant)Instant.now())));
        p.apply((PTransform)transform);
        CompositeTransformRecorder recorder = new CompositeTransformRecorder();
        p.traverseTopologically((Pipeline.PipelineVisitor)recorder);
        Assert.assertThat((String)"Expected to have seen CreateTimestamped composite transform.", recorder.getCompositeTransforms(), (Matcher)org.hamcrest.Matchers.hasItem((Object)transform));
        Assert.assertThat((String)"Expected to have two composites, CreateTimestamped and Create.Values", recorder.getCompositeTransforms(), (Matcher)org.hamcrest.Matchers.hasItem((Matcher)org.hamcrest.Matchers.isA(Create.Values.class)));
    }

    @Test
    public void testToString() {
        DataflowPipelineOptions options = (DataflowPipelineOptions)PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setJobName("TestJobName");
        options.setProject("test-project");
        options.setTempLocation("gs://test/temp/location");
        options.setGcpCredential((Credential)new TestCredential());
        options.setPathValidatorClass(NoopPathValidator.class);
        Assert.assertEquals((Object)"DataflowPipelineRunner#testjobname", (Object)DataflowPipelineRunner.fromOptions((PipelineOptions)options).toString());
    }

    private static PipelineOptions makeOptions(boolean streaming) {
        DataflowPipelineOptions options = (DataflowPipelineOptions)PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setRunner(DataflowPipelineRunner.class);
        options.setStreaming(streaming);
        options.setJobName("TestJobName");
        options.setProject("test-project");
        options.setTempLocation("gs://test/temp/location");
        options.setGcpCredential((Credential)new TestCredential());
        options.setPathValidatorClass(NoopPathValidator.class);
        return options;
    }

    private void testUnsupportedSource(PTransform<PInput, ?> source, String name, boolean streaming) throws Exception {
        String mode = streaming ? "streaming" : "batch";
        this.thrown.expect(UnsupportedOperationException.class);
        this.thrown.expectMessage(new StringBuilder(53 + String.valueOf(mode).length() + String.valueOf(name).length()).append("The DataflowPipelineRunner in ").append(mode).append(" mode does not support ").append(name).toString());
        Pipeline p = Pipeline.create((PipelineOptions)DataflowPipelineRunnerTest.makeOptions(streaming));
        p.apply(source);
        p.run();
    }

    @Test
    public void testBoundedSourceUnsupportedInStreaming() throws Exception {
        this.testUnsupportedSource((PTransform<PInput, ?>)AvroSource.readFromFileWithClass((String)"foo", String.class), "Read.Bounded", true);
    }

    @Test
    public void testBigQueryIOSourceUnsupportedInStreaming() throws Exception {
        this.testUnsupportedSource((PTransform<PInput, ?>)BigQueryIO.Read.from((String)"project:bar.baz").withoutValidation(), "BigQueryIO.Read", true);
    }

    @Test
    public void testAvroIOSourceUnsupportedInStreaming() throws Exception {
        this.testUnsupportedSource((PTransform<PInput, ?>)AvroIO.Read.from((String)"foo"), "AvroIO.Read", true);
    }

    @Test
    public void testTextIOSourceUnsupportedInStreaming() throws Exception {
        this.testUnsupportedSource((PTransform<PInput, ?>)TextIO.Read.from((String)"foo"), "TextIO.Read", true);
    }

    @Test
    public void testReadBoundedSourceUnsupportedInStreaming() throws Exception {
        this.testUnsupportedSource((PTransform<PInput, ?>)Read.from((BoundedSource)AvroSource.from((String)"/tmp/test")), "Read.Bounded", true);
    }

    @Test
    public void testReadUnboundedUnsupportedInBatch() throws Exception {
        this.testUnsupportedSource((PTransform<PInput, ?>)Read.from((UnboundedSource)new TestCountingSource(1)), "Read.Unbounded", false);
    }

    private void testUnsupportedSink(PTransform<PCollection<String>, PDone> sink, String name, boolean streaming) throws Exception {
        this.thrown.expect(UnsupportedOperationException.class);
        String string = String.valueOf(name);
        this.thrown.expectMessage(string.length() != 0 ? "The DataflowPipelineRunner in streaming mode does not support ".concat(string) : new String("The DataflowPipelineRunner in streaming mode does not support "));
        Pipeline p = Pipeline.create((PipelineOptions)DataflowPipelineRunnerTest.makeOptions(streaming));
        ((PCollection)p.apply((PTransform)Create.of((Object[])new String[]{"foo"}))).apply(sink);
        p.run();
    }

    @Test
    public void testAvroIOSinkUnsupportedInStreaming() throws Exception {
        this.testUnsupportedSink((PTransform<PCollection<String>, PDone>)AvroIO.Write.to((String)"foo").withSchema(String.class), "AvroIO.Write", true);
    }

    @Test
    public void testTextIOSinkUnsupportedInStreaming() throws Exception {
        this.testUnsupportedSink((PTransform<PCollection<String>, PDone>)TextIO.Write.to((String)"foo"), "TextIO.Write", true);
    }

    @Test
    public void testBatchViewAsSingletonToIsmRecord() throws Exception {
        DoFnTester doFnTester = DoFnTester.of((DoFn)new DataflowPipelineRunner.BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn((Coder)GlobalWindow.Coder.INSTANCE));
        Assert.assertThat((Object)doFnTester.processBatch((Iterable)ImmutableList.of((Object)KV.of((Object)0, (Object)ImmutableList.of((Object)KV.of((Object)GlobalWindow.INSTANCE, (Object)WindowedValue.valueInGlobalWindow((Object)"a")))))), (Matcher)IsIterableContainingInOrder.contains((Object[])new IsmFormat.IsmRecord[]{IsmFormat.IsmRecord.of((List)ImmutableList.of((Object)GlobalWindow.INSTANCE), (Object)WindowedValue.valueInGlobalWindow((Object)"a"))}));
    }

    @Test
    public void testBatchViewAsSingletonToIsmRecordWithMultipleValuesThrowsException() throws Exception {
        DoFnTester doFnTester = DoFnTester.of((DoFn)new DataflowPipelineRunner.BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn((Coder)GlobalWindow.Coder.INSTANCE));
        try {
            doFnTester.processBatch((Iterable)ImmutableList.of((Object)KV.of((Object)0, (Object)ImmutableList.of((Object)KV.of((Object)GlobalWindow.INSTANCE, (Object)WindowedValue.valueInGlobalWindow((Object)"a")), (Object)KV.of((Object)GlobalWindow.INSTANCE, (Object)WindowedValue.valueInGlobalWindow((Object)"b"))))));
            Assert.fail((String)"Expected UserCodeException");
        }
        catch (UserCodeException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof IllegalStateException));
            IllegalStateException rootCause = (IllegalStateException)e.getCause();
            Assert.assertThat((Object)rootCause.getMessage(), (Matcher)org.hamcrest.Matchers.containsString((String)"found for singleton within window"));
        }
    }

    @Test
    public void testBatchViewAsListToIsmRecordForGlobalWindow() throws Exception {
        DoFnTester doFnTester = DoFnTester.of((DoFn)new DataflowPipelineRunner.BatchViewAsList.ToIsmRecordForGlobalWindowDoFn());
        Assert.assertThat((Object)doFnTester.processBatch((Iterable)ImmutableList.of((Object)"a", (Object)"b", (Object)"c")), (Matcher)IsIterableContainingInOrder.contains((Object[])new IsmFormat.IsmRecord[]{IsmFormat.IsmRecord.of((List)ImmutableList.of((Object)GlobalWindow.INSTANCE, (Object)0L), (Object)WindowedValue.valueInGlobalWindow((Object)"a")), IsmFormat.IsmRecord.of((List)ImmutableList.of((Object)GlobalWindow.INSTANCE, (Object)1L), (Object)WindowedValue.valueInGlobalWindow((Object)"b")), IsmFormat.IsmRecord.of((List)ImmutableList.of((Object)GlobalWindow.INSTANCE, (Object)2L), (Object)WindowedValue.valueInGlobalWindow((Object)"c"))}));
    }

    @Test
    public void testBatchViewAsListToIsmRecordForNonGlobalWindow() throws Exception {
        DoFnTester doFnTester = DoFnTester.of((DoFn)new DataflowPipelineRunner.BatchViewAsList.ToIsmRecordForNonGlobalWindowDoFn(IntervalWindow.getCoder()));
        IntervalWindow windowA = new IntervalWindow(new Instant(0L), new Instant(10L));
        IntervalWindow windowB = new IntervalWindow(new Instant(10L), new Instant(20L));
        IntervalWindow windowC = new IntervalWindow(new Instant(20L), new Instant(30L));
        ImmutableList inputElements = ImmutableList.of((Object)KV.of((Object)1, (Object)ImmutableList.of((Object)KV.of((Object)windowA, (Object)WindowedValue.of((Object)110L, (Instant)new Instant(1L), (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING)), (Object)KV.of((Object)windowA, (Object)WindowedValue.of((Object)111L, (Instant)new Instant(3L), (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING)), (Object)KV.of((Object)windowA, (Object)WindowedValue.of((Object)112L, (Instant)new Instant(4L), (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING)), (Object)KV.of((Object)windowB, (Object)WindowedValue.of((Object)120L, (Instant)new Instant(12L), (BoundedWindow)windowB, (PaneInfo)PaneInfo.NO_FIRING)), (Object)KV.of((Object)windowB, (Object)WindowedValue.of((Object)121L, (Instant)new Instant(14L), (BoundedWindow)windowB, (PaneInfo)PaneInfo.NO_FIRING)))), (Object)KV.of((Object)2, (Object)ImmutableList.of((Object)KV.of((Object)windowC, (Object)WindowedValue.of((Object)210L, (Instant)new Instant(25L), (BoundedWindow)windowC, (PaneInfo)PaneInfo.NO_FIRING)))));
        Assert.assertThat((Object)doFnTester.processBatch((Iterable)inputElements), (Matcher)IsIterableContainingInOrder.contains((Object[])new IsmFormat.IsmRecord[]{IsmFormat.IsmRecord.of((List)ImmutableList.of((Object)windowA, (Object)0L), (Object)WindowedValue.of((Object)110L, (Instant)new Instant(1L), (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING)), IsmFormat.IsmRecord.of((List)ImmutableList.of((Object)windowA, (Object)1L), (Object)WindowedValue.of((Object)111L, (Instant)new Instant(3L), (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING)), IsmFormat.IsmRecord.of((List)ImmutableList.of((Object)windowA, (Object)2L), (Object)WindowedValue.of((Object)112L, (Instant)new Instant(4L), (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING)), IsmFormat.IsmRecord.of((List)ImmutableList.of((Object)windowB, (Object)0L), (Object)WindowedValue.of((Object)120L, (Instant)new Instant(12L), (BoundedWindow)windowB, (PaneInfo)PaneInfo.NO_FIRING)), IsmFormat.IsmRecord.of((List)ImmutableList.of((Object)windowB, (Object)1L), (Object)WindowedValue.of((Object)121L, (Instant)new Instant(14L), (BoundedWindow)windowB, (PaneInfo)PaneInfo.NO_FIRING)), IsmFormat.IsmRecord.of((List)ImmutableList.of((Object)windowC, (Object)0L), (Object)WindowedValue.of((Object)210L, (Instant)new Instant(25L), (BoundedWindow)windowC, (PaneInfo)PaneInfo.NO_FIRING))}));
    }

    @Test
    public void testToIsmRecordForMapLikeDoFn() throws Exception {
        TupleTag outputForSizeTag = new TupleTag();
        TupleTag outputForEntrySetTag = new TupleTag();
        VarLongCoder keyCoder = VarLongCoder.of();
        Coder windowCoder = IntervalWindow.getCoder();
        IsmFormat.IsmRecordCoder ismCoder = IsmFormat.IsmRecordCoder.of((int)1, (int)2, (List)ImmutableList.of((Object)IsmFormat.MetadataKeyCoder.of((Coder)keyCoder), (Object)IntervalWindow.getCoder(), (Object)BigEndianLongCoder.of()), (Coder)WindowedValue.FullWindowedValueCoder.of((Coder)VarLongCoder.of(), (Coder)windowCoder));
        DoFnTester doFnTester = DoFnTester.of((DoFn)new DataflowPipelineRunner.BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn(outputForSizeTag, outputForEntrySetTag, windowCoder, (Coder)keyCoder, ismCoder, false));
        doFnTester.setSideOutputTags(TupleTagList.of((List)ImmutableList.of((Object)outputForSizeTag, (Object)outputForEntrySetTag)));
        IntervalWindow windowA = new IntervalWindow(new Instant(0L), new Instant(10L));
        IntervalWindow windowB = new IntervalWindow(new Instant(10L), new Instant(20L));
        IntervalWindow windowC = new IntervalWindow(new Instant(20L), new Instant(30L));
        ImmutableList inputElements = ImmutableList.of((Object)KV.of((Object)1, (Object)ImmutableList.of((Object)KV.of((Object)KV.of((Object)1L, (Object)windowA), (Object)WindowedValue.of((Object)110L, (Instant)new Instant(1L), (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING)), (Object)KV.of((Object)KV.of((Object)1L, (Object)windowA), (Object)WindowedValue.of((Object)111L, (Instant)new Instant(2L), (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING)), (Object)KV.of((Object)KV.of((Object)2L, (Object)windowA), (Object)WindowedValue.of((Object)120L, (Instant)new Instant(3L), (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING)), (Object)KV.of((Object)KV.of((Object)2L, (Object)windowB), (Object)WindowedValue.of((Object)210L, (Instant)new Instant(11L), (BoundedWindow)windowB, (PaneInfo)PaneInfo.NO_FIRING)), (Object)KV.of((Object)KV.of((Object)3L, (Object)windowB), (Object)WindowedValue.of((Object)220L, (Instant)new Instant(12L), (BoundedWindow)windowB, (PaneInfo)PaneInfo.NO_FIRING)))), (Object)KV.of((Object)2, (Object)ImmutableList.of((Object)KV.of((Object)KV.of((Object)4L, (Object)windowC), (Object)WindowedValue.of((Object)330L, (Instant)new Instant(21L), (BoundedWindow)windowC, (PaneInfo)PaneInfo.NO_FIRING)))));
        Assert.assertThat((Object)doFnTester.processBatch((Iterable)inputElements), (Matcher)IsIterableContainingInOrder.contains((Object[])new IsmFormat.IsmRecord[]{IsmFormat.IsmRecord.of((List)ImmutableList.of((Object)1L, (Object)windowA, (Object)0L), (Object)WindowedValue.of((Object)110L, (Instant)new Instant(1L), (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING)), IsmFormat.IsmRecord.of((List)ImmutableList.of((Object)1L, (Object)windowA, (Object)1L), (Object)WindowedValue.of((Object)111L, (Instant)new Instant(2L), (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING)), IsmFormat.IsmRecord.of((List)ImmutableList.of((Object)2L, (Object)windowA, (Object)0L), (Object)WindowedValue.of((Object)120L, (Instant)new Instant(3L), (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING)), IsmFormat.IsmRecord.of((List)ImmutableList.of((Object)2L, (Object)windowB, (Object)0L), (Object)WindowedValue.of((Object)210L, (Instant)new Instant(11L), (BoundedWindow)windowB, (PaneInfo)PaneInfo.NO_FIRING)), IsmFormat.IsmRecord.of((List)ImmutableList.of((Object)3L, (Object)windowB, (Object)0L), (Object)WindowedValue.of((Object)220L, (Instant)new Instant(12L), (BoundedWindow)windowB, (PaneInfo)PaneInfo.NO_FIRING)), IsmFormat.IsmRecord.of((List)ImmutableList.of((Object)4L, (Object)windowC, (Object)0L), (Object)WindowedValue.of((Object)330L, (Instant)new Instant(21L), (BoundedWindow)windowC, (PaneInfo)PaneInfo.NO_FIRING))}));
        Assert.assertThat((Object)doFnTester.takeSideOutputElements(outputForSizeTag), (Matcher)IsIterableContainingInOrder.contains((Object[])new KV[]{KV.of((Object)ismCoder.hash((List)ImmutableList.of((Object)IsmFormat.getMetadataKey(), (Object)windowA)), (Object)KV.of((Object)windowA, (Object)2L)), KV.of((Object)ismCoder.hash((List)ImmutableList.of((Object)IsmFormat.getMetadataKey(), (Object)windowB)), (Object)KV.of((Object)windowB, (Object)2L)), KV.of((Object)ismCoder.hash((List)ImmutableList.of((Object)IsmFormat.getMetadataKey(), (Object)windowC)), (Object)KV.of((Object)windowC, (Object)1L))}));
        Assert.assertThat((Object)doFnTester.takeSideOutputElements(outputForEntrySetTag), (Matcher)IsIterableContainingInOrder.contains((Object[])new KV[]{KV.of((Object)ismCoder.hash((List)ImmutableList.of((Object)IsmFormat.getMetadataKey(), (Object)windowA)), (Object)KV.of((Object)windowA, (Object)1L)), KV.of((Object)ismCoder.hash((List)ImmutableList.of((Object)IsmFormat.getMetadataKey(), (Object)windowA)), (Object)KV.of((Object)windowA, (Object)2L)), KV.of((Object)ismCoder.hash((List)ImmutableList.of((Object)IsmFormat.getMetadataKey(), (Object)windowB)), (Object)KV.of((Object)windowB, (Object)2L)), KV.of((Object)ismCoder.hash((List)ImmutableList.of((Object)IsmFormat.getMetadataKey(), (Object)windowB)), (Object)KV.of((Object)windowB, (Object)3L)), KV.of((Object)ismCoder.hash((List)ImmutableList.of((Object)IsmFormat.getMetadataKey(), (Object)windowC)), (Object)KV.of((Object)windowC, (Object)4L))}));
    }

    @Test
    public void testToIsmRecordForMapLikeDoFnWithoutUniqueKeysThrowsException() throws Exception {
        TupleTag outputForSizeTag = new TupleTag();
        TupleTag outputForEntrySetTag = new TupleTag();
        VarLongCoder keyCoder = VarLongCoder.of();
        Coder windowCoder = IntervalWindow.getCoder();
        IsmFormat.IsmRecordCoder ismCoder = IsmFormat.IsmRecordCoder.of((int)1, (int)2, (List)ImmutableList.of((Object)IsmFormat.MetadataKeyCoder.of((Coder)keyCoder), (Object)IntervalWindow.getCoder(), (Object)BigEndianLongCoder.of()), (Coder)WindowedValue.FullWindowedValueCoder.of((Coder)VarLongCoder.of(), (Coder)windowCoder));
        DoFnTester doFnTester = DoFnTester.of((DoFn)new DataflowPipelineRunner.BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn(outputForSizeTag, outputForEntrySetTag, windowCoder, (Coder)keyCoder, ismCoder, true));
        doFnTester.setSideOutputTags(TupleTagList.of((List)ImmutableList.of((Object)outputForSizeTag, (Object)outputForEntrySetTag)));
        IntervalWindow windowA = new IntervalWindow(new Instant(0L), new Instant(10L));
        ImmutableList inputElements = ImmutableList.of((Object)KV.of((Object)1, (Object)ImmutableList.of((Object)KV.of((Object)KV.of((Object)1L, (Object)windowA), (Object)WindowedValue.of((Object)110L, (Instant)new Instant(1L), (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING)), (Object)KV.of((Object)KV.of((Object)1L, (Object)windowA), (Object)WindowedValue.of((Object)111L, (Instant)new Instant(2L), (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING)))));
        try {
            doFnTester.processBatch((Iterable)inputElements);
            Assert.fail((String)"Expected UserCodeException");
        }
        catch (UserCodeException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof IllegalStateException));
            IllegalStateException rootCause = (IllegalStateException)e.getCause();
            Assert.assertThat((Object)rootCause.getMessage(), (Matcher)org.hamcrest.Matchers.containsString((String)"Unique keys are expected but found key"));
        }
    }

    @Test
    public void testToIsmMetadataRecordForSizeDoFn() throws Exception {
        TupleTag outputForSizeTag = new TupleTag();
        TupleTag outputForEntrySetTag = new TupleTag();
        VarLongCoder keyCoder = VarLongCoder.of();
        Coder windowCoder = IntervalWindow.getCoder();
        IsmFormat.IsmRecordCoder ismCoder = IsmFormat.IsmRecordCoder.of((int)1, (int)2, (List)ImmutableList.of((Object)IsmFormat.MetadataKeyCoder.of((Coder)keyCoder), (Object)IntervalWindow.getCoder(), (Object)BigEndianLongCoder.of()), (Coder)WindowedValue.FullWindowedValueCoder.of((Coder)VarLongCoder.of(), (Coder)windowCoder));
        DoFnTester doFnTester = DoFnTester.of((DoFn)new DataflowPipelineRunner.BatchViewAsMultimap.ToIsmMetadataRecordForSizeDoFn(windowCoder));
        doFnTester.setSideOutputTags(TupleTagList.of((List)ImmutableList.of((Object)outputForSizeTag, (Object)outputForEntrySetTag)));
        IntervalWindow windowA = new IntervalWindow(new Instant(0L), new Instant(10L));
        IntervalWindow windowB = new IntervalWindow(new Instant(10L), new Instant(20L));
        IntervalWindow windowC = new IntervalWindow(new Instant(20L), new Instant(30L));
        ImmutableList inputElements = ImmutableList.of((Object)KV.of((Object)1, (Object)ImmutableList.of((Object)KV.of((Object)windowA, (Object)2L), (Object)KV.of((Object)windowA, (Object)3L), (Object)KV.of((Object)windowB, (Object)7L))), (Object)KV.of((Object)ismCoder.hash((List)ImmutableList.of((Object)IsmFormat.getMetadataKey(), (Object)windowB)), (Object)ImmutableList.of((Object)KV.of((Object)windowC, (Object)9L))));
        Assert.assertThat((Object)doFnTester.processBatch((Iterable)inputElements), (Matcher)IsIterableContainingInOrder.contains((Object[])new IsmFormat.IsmRecord[]{IsmFormat.IsmRecord.meta((List)ImmutableList.of((Object)IsmFormat.getMetadataKey(), (Object)windowA, (Object)0L), (byte[])CoderUtils.encodeToByteArray((Coder)VarLongCoder.of(), (Object)5L)), IsmFormat.IsmRecord.meta((List)ImmutableList.of((Object)IsmFormat.getMetadataKey(), (Object)windowB, (Object)0L), (byte[])CoderUtils.encodeToByteArray((Coder)VarLongCoder.of(), (Object)7L)), IsmFormat.IsmRecord.meta((List)ImmutableList.of((Object)IsmFormat.getMetadataKey(), (Object)windowC, (Object)0L), (byte[])CoderUtils.encodeToByteArray((Coder)VarLongCoder.of(), (Object)9L))}));
    }

    @Test
    public void testToIsmMetadataRecordForKeyDoFn() throws Exception {
        TupleTag outputForSizeTag = new TupleTag();
        TupleTag outputForEntrySetTag = new TupleTag();
        VarLongCoder keyCoder = VarLongCoder.of();
        Coder windowCoder = IntervalWindow.getCoder();
        IsmFormat.IsmRecordCoder ismCoder = IsmFormat.IsmRecordCoder.of((int)1, (int)2, (List)ImmutableList.of((Object)IsmFormat.MetadataKeyCoder.of((Coder)keyCoder), (Object)IntervalWindow.getCoder(), (Object)BigEndianLongCoder.of()), (Coder)WindowedValue.FullWindowedValueCoder.of((Coder)VarLongCoder.of(), (Coder)windowCoder));
        DoFnTester doFnTester = DoFnTester.of((DoFn)new DataflowPipelineRunner.BatchViewAsMultimap.ToIsmMetadataRecordForKeyDoFn((Coder)keyCoder, windowCoder));
        doFnTester.setSideOutputTags(TupleTagList.of((List)ImmutableList.of((Object)outputForSizeTag, (Object)outputForEntrySetTag)));
        IntervalWindow windowA = new IntervalWindow(new Instant(0L), new Instant(10L));
        IntervalWindow windowB = new IntervalWindow(new Instant(10L), new Instant(20L));
        IntervalWindow windowC = new IntervalWindow(new Instant(20L), new Instant(30L));
        ImmutableList inputElements = ImmutableList.of((Object)KV.of((Object)1, (Object)ImmutableList.of((Object)KV.of((Object)windowA, (Object)2L), (Object)KV.of((Object)windowA, (Object)3L), (Object)KV.of((Object)windowB, (Object)3L))), (Object)KV.of((Object)ismCoder.hash((List)ImmutableList.of((Object)IsmFormat.getMetadataKey(), (Object)windowB)), (Object)ImmutableList.of((Object)KV.of((Object)windowC, (Object)3L))));
        Assert.assertThat((Object)doFnTester.processBatch((Iterable)inputElements), (Matcher)IsIterableContainingInOrder.contains((Object[])new IsmFormat.IsmRecord[]{IsmFormat.IsmRecord.meta((List)ImmutableList.of((Object)IsmFormat.getMetadataKey(), (Object)windowA, (Object)1L), (byte[])CoderUtils.encodeToByteArray((Coder)VarLongCoder.of(), (Object)2L)), IsmFormat.IsmRecord.meta((List)ImmutableList.of((Object)IsmFormat.getMetadataKey(), (Object)windowA, (Object)2L), (byte[])CoderUtils.encodeToByteArray((Coder)VarLongCoder.of(), (Object)3L)), IsmFormat.IsmRecord.meta((List)ImmutableList.of((Object)IsmFormat.getMetadataKey(), (Object)windowB, (Object)1L), (byte[])CoderUtils.encodeToByteArray((Coder)VarLongCoder.of(), (Object)3L)), IsmFormat.IsmRecord.meta((List)ImmutableList.of((Object)IsmFormat.getMetadataKey(), (Object)windowC, (Object)1L), (byte[])CoderUtils.encodeToByteArray((Coder)VarLongCoder.of(), (Object)3L))}));
    }

    @Test
    public void testToMapDoFn() throws Exception {
        Coder windowCoder = IntervalWindow.getCoder();
        DoFnTester doFnTester = DoFnTester.of((DoFn)new DataflowPipelineRunner.BatchViewAsMap.ToMapDoFn(windowCoder));
        IntervalWindow windowA = new IntervalWindow(new Instant(0L), new Instant(10L));
        IntervalWindow windowB = new IntervalWindow(new Instant(10L), new Instant(20L));
        IntervalWindow windowC = new IntervalWindow(new Instant(20L), new Instant(30L));
        ImmutableList inputElements = ImmutableList.of((Object)KV.of((Object)1, (Object)ImmutableList.of((Object)KV.of((Object)windowA, (Object)WindowedValue.of((Object)KV.of((Object)1L, (Object)11L), (Instant)new Instant(3L), (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING)), (Object)KV.of((Object)windowA, (Object)WindowedValue.of((Object)KV.of((Object)2L, (Object)21L), (Instant)new Instant(7L), (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING)), (Object)KV.of((Object)windowB, (Object)WindowedValue.of((Object)KV.of((Object)2L, (Object)21L), (Instant)new Instant(13L), (BoundedWindow)windowB, (PaneInfo)PaneInfo.NO_FIRING)), (Object)KV.of((Object)windowB, (Object)WindowedValue.of((Object)KV.of((Object)3L, (Object)31L), (Instant)new Instant(15L), (BoundedWindow)windowB, (PaneInfo)PaneInfo.NO_FIRING)))), (Object)KV.of((Object)2, (Object)ImmutableList.of((Object)KV.of((Object)windowC, (Object)WindowedValue.of((Object)KV.of((Object)4L, (Object)41L), (Instant)new Instant(25L), (BoundedWindow)windowC, (PaneInfo)PaneInfo.NO_FIRING)))));
        List output = doFnTester.processBatch((Iterable)inputElements);
        Assert.assertEquals((long)3L, (long)output.size());
        Map outputMap = (Map)((WindowedValue)((IsmFormat.IsmRecord)output.get(0)).getValue()).getValue();
        Assert.assertEquals((long)2L, (long)outputMap.size());
        Assert.assertEquals((Object)ImmutableMap.of((Object)1L, (Object)11L, (Object)2L, (Object)21L), (Object)outputMap);
        outputMap = (Map)((WindowedValue)((IsmFormat.IsmRecord)output.get(1)).getValue()).getValue();
        Assert.assertEquals((long)2L, (long)outputMap.size());
        Assert.assertEquals((Object)ImmutableMap.of((Object)2L, (Object)21L, (Object)3L, (Object)31L), (Object)outputMap);
        outputMap = (Map)((WindowedValue)((IsmFormat.IsmRecord)output.get(2)).getValue()).getValue();
        Assert.assertEquals((long)1L, (long)outputMap.size());
        Assert.assertEquals((Object)ImmutableMap.of((Object)4L, (Object)41L), (Object)outputMap);
    }

    @Test
    public void testToMultimapDoFn() throws Exception {
        Coder windowCoder = IntervalWindow.getCoder();
        DoFnTester doFnTester = DoFnTester.of((DoFn)new DataflowPipelineRunner.BatchViewAsMultimap.ToMultimapDoFn(windowCoder));
        IntervalWindow windowA = new IntervalWindow(new Instant(0L), new Instant(10L));
        IntervalWindow windowB = new IntervalWindow(new Instant(10L), new Instant(20L));
        IntervalWindow windowC = new IntervalWindow(new Instant(20L), new Instant(30L));
        ImmutableList inputElements = ImmutableList.of((Object)KV.of((Object)1, (Object)ImmutableList.of((Object)KV.of((Object)windowA, (Object)WindowedValue.of((Object)KV.of((Object)1L, (Object)11L), (Instant)new Instant(3L), (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING)), (Object)KV.of((Object)windowA, (Object)WindowedValue.of((Object)KV.of((Object)1L, (Object)12L), (Instant)new Instant(5L), (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING)), (Object)KV.of((Object)windowA, (Object)WindowedValue.of((Object)KV.of((Object)2L, (Object)21L), (Instant)new Instant(7L), (BoundedWindow)windowA, (PaneInfo)PaneInfo.NO_FIRING)), (Object)KV.of((Object)windowB, (Object)WindowedValue.of((Object)KV.of((Object)2L, (Object)21L), (Instant)new Instant(13L), (BoundedWindow)windowB, (PaneInfo)PaneInfo.NO_FIRING)), (Object)KV.of((Object)windowB, (Object)WindowedValue.of((Object)KV.of((Object)3L, (Object)31L), (Instant)new Instant(15L), (BoundedWindow)windowB, (PaneInfo)PaneInfo.NO_FIRING)))), (Object)KV.of((Object)2, (Object)ImmutableList.of((Object)KV.of((Object)windowC, (Object)WindowedValue.of((Object)KV.of((Object)4L, (Object)41L), (Instant)new Instant(25L), (BoundedWindow)windowC, (PaneInfo)PaneInfo.NO_FIRING)))));
        List output = doFnTester.processBatch((Iterable)inputElements);
        Assert.assertEquals((long)3L, (long)output.size());
        Map outputMap = (Map)((WindowedValue)((IsmFormat.IsmRecord)output.get(0)).getValue()).getValue();
        Assert.assertEquals((long)2L, (long)outputMap.size());
        Assert.assertThat(outputMap.get(1L), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{11L, 12L}));
        Assert.assertThat(outputMap.get(2L), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{21L}));
        outputMap = (Map)((WindowedValue)((IsmFormat.IsmRecord)output.get(1)).getValue()).getValue();
        Assert.assertEquals((long)2L, (long)outputMap.size());
        Assert.assertThat(outputMap.get(2L), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{21L}));
        Assert.assertThat(outputMap.get(3L), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{31L}));
        outputMap = (Map)((WindowedValue)((IsmFormat.IsmRecord)output.get(2)).getValue()).getValue();
        Assert.assertEquals((long)1L, (long)outputMap.size());
        Assert.assertThat(outputMap.get(4L), (Matcher)org.hamcrest.Matchers.containsInAnyOrder((Object[])new Long[]{41L}));
    }

    private static class CompositeTransformRecorder
    extends Pipeline.PipelineVisitor.Defaults {
        private List<PTransform<?, ?>> transforms = new ArrayList();

        private CompositeTransformRecorder() {
        }

        public Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
            if (node.getTransform() != null) {
                this.transforms.add(node.getTransform());
            }
            return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
        }

        public List<PTransform<?, ?>> getCompositeTransforms() {
            return this.transforms;
        }
    }

    public static class TestTransform
    extends PTransform<PCollection<Integer>, PCollection<Integer>> {
        public boolean translated = false;

        public PCollection<Integer> apply(PCollection<Integer> input) {
            return PCollection.createPrimitiveOutputInternal((Pipeline)input.getPipeline(), (WindowingStrategy)WindowingStrategy.globalDefault(), (PCollection.IsBounded)input.isBounded());
        }

        protected Coder<?> getDefaultOutputCoder(PCollection<Integer> input) {
            return input.getCoder();
        }
    }
}

