diff --git a/CHANGES.md b/CHANGES.md index b7bf40f8d8d0..6852a18b3f3f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -96,11 +96,11 @@ ## Bugfixes * Fix data loss issues when reading gzipped files with TextIO (Python) ([#18390](https://github.com/apache/beam/issues/18390), [#31040](https://github.com/apache/beam/issues/31040)). -* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231)) * Prism * Fixed an edge case where Bundle Finalization might not become enabled. ([#33493](https://github.com/apache/beam/issues/33493)). * Fixed session window aggregation, which wasn't being performed per-key. ([#33542](https://github.com/apache/beam/issues/33542)).) +* Fixed a Dataflow template creation issue that ignores template file creation errors (Java) ([#33636](https://github.com/apache/beam/issues/33636)) ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 0e91bb2d8a52..833089b5fcc3 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -128,7 +128,7 @@ dependencies { testImplementation project(path: ":sdks:java:extensions:python", configuration: "testRuntimeMigration") testImplementation library.java.google_cloud_dataflow_java_proto_library_all testImplementation library.java.jackson_dataformat_yaml - testImplementation library.java.mockito_core + testImplementation library.java.mockito_inline validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") validatesRunner project(path: project.path, configuration: "testRuntimeMigration") validatesRunner library.java.hamcrest diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 9ca6e95ed95a..7de6484cba58 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -39,12 +39,10 @@ import com.google.api.services.dataflow.model.WorkerPool; import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; -import java.io.BufferedWriter; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; -import java.io.PrintWriter; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.ArrayList; @@ -1524,15 +1522,9 @@ public DataflowPipelineJob run(Pipeline pipeline) { fileLocation.startsWith("/") || fileLocation.startsWith("gs://"), "Location must be local or on Cloud Storage, got %s.", fileLocation); - ResourceId fileResource = FileSystems.matchNewResource(fileLocation, false /* isDirectory */); - String workSpecJson = DataflowPipelineTranslator.jobToString(newJob); - try (PrintWriter printWriter = - new PrintWriter( - new BufferedWriter( - new OutputStreamWriter( - Channels.newOutputStream(FileSystems.create(fileResource, MimeTypes.TEXT)), - UTF_8)))) { - printWriter.print(workSpecJson); + + try { + printWorkSpecJsonToFile(fileLocation, newJob); LOG.info("Printed job specification to {}", fileLocation); } catch (IOException ex) { String error = String.format("Cannot create output file at %s", fileLocation); @@ -1542,6 +1534,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { LOG.warn(error, ex); } } + if (isTemplate) { LOG.info("Template successfully created."); return new DataflowTemplateJob(); @@ -1629,6 +1622,18 @@ public DataflowPipelineJob run(Pipeline pipeline) { return dataflowPipelineJob; } + private static void printWorkSpecJsonToFile(String fileLocation, Job job) throws IOException { + String workSpecJson = DataflowPipelineTranslator.jobToString(job); + ResourceId fileResource = FileSystems.matchNewResource(fileLocation, false /* isDirectory */); + try (OutputStreamWriter writer = + new OutputStreamWriter( + Channels.newOutputStream(FileSystems.create(fileResource, MimeTypes.TEXT)), UTF_8)) { + // Not using PrintWriter as it swallows IOException. + // Not using BufferedWriter as this invokes write() only once. + writer.write(workSpecJson); + } + } + private static EnvironmentInfo getEnvironmentInfoFromEnvironmentId( String environmentId, RunnerApi.Pipeline pipelineProto) { RunnerApi.Environment environment = diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 106b15de6e4d..b3a5f5f705d5 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -41,19 +41,18 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeFalse; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyListOf; -import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.isA; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.powermock.api.mockito.PowerMockito.mockStatic; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.JsonNode; @@ -77,8 +76,9 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; -import java.io.Writer; +import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.StandardOpenOption; @@ -153,6 +153,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.ShardedKey; import org.apache.beam.sdk.util.construction.BeamUrns; import org.apache.beam.sdk.util.construction.Environments; @@ -191,10 +192,8 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; /** * Tests for the {@link DataflowRunner}. @@ -282,6 +281,7 @@ private static Dataflow buildMockDataflow(Dataflow.Projects.Locations.Jobs mockJ mock(Dataflow.Projects.Locations.Jobs.List.class); when(mockDataflowClient.projects()).thenReturn(mockProjects); + when(mockDataflowClient.getBaseUrl()).thenReturn("dataflow.googleapis.com"); when(mockProjects.locations()).thenReturn(mockLocations); when(mockLocations.jobs()).thenReturn(mockJobs); when(mockJobs.create(eq(PROJECT_ID), eq(REGION_ID), isA(Job.class))).thenReturn(mockRequest); @@ -337,7 +337,7 @@ private static GcsUtil buildMockGcsUtil() throws IOException { .verifyBucketAccessible(GcsPath.fromUri(NON_EXISTENT_BUCKET)); // Let every valid path be matched - when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + when(mockGcsUtil.getObjects(anyList())) .thenAnswer( invocationOnMock -> { List gcsPaths = (List) invocationOnMock.getArguments()[0]; @@ -473,37 +473,32 @@ public void testSettingOfSdkPipelineOptions() throws IOException { assertThat(sdkPipelineOptions, hasKey("options")); Map optionsMap = (Map) sdkPipelineOptions.get("options"); - assertThat(optionsMap, hasEntry("appName", (Object) options.getAppName())); - assertThat(optionsMap, hasEntry("project", (Object) options.getProject())); + assertThat(optionsMap, hasEntry("appName", options.getAppName())); + assertThat(optionsMap, hasEntry("project", options.getProject())); assertThat( - optionsMap, - hasEntry("pathValidatorClass", (Object) options.getPathValidatorClass().getName())); - assertThat(optionsMap, hasEntry("runner", (Object) options.getRunner().getName())); - assertThat(optionsMap, hasEntry("jobName", (Object) options.getJobName())); - assertThat(optionsMap, hasEntry("tempLocation", (Object) options.getTempLocation())); - assertThat(optionsMap, hasEntry("stagingLocation", (Object) options.getStagingLocation())); + optionsMap, hasEntry("pathValidatorClass", options.getPathValidatorClass().getName())); + assertThat(optionsMap, hasEntry("runner", options.getRunner().getName())); + assertThat(optionsMap, hasEntry("jobName", options.getJobName())); + assertThat(optionsMap, hasEntry("tempLocation", options.getTempLocation())); + assertThat(optionsMap, hasEntry("stagingLocation", options.getStagingLocation())); assertThat( - optionsMap, - hasEntry("stableUniqueNames", (Object) options.getStableUniqueNames().toString())); - assertThat(optionsMap, hasEntry("streaming", (Object) options.isStreaming())); + optionsMap, hasEntry("stableUniqueNames", options.getStableUniqueNames().toString())); + assertThat(optionsMap, hasEntry("streaming", options.isStreaming())); assertThat( optionsMap, - hasEntry( - "numberOfWorkerHarnessThreads", (Object) options.getNumberOfWorkerHarnessThreads())); - assertThat(optionsMap, hasEntry("region", (Object) options.getRegion())); + hasEntry("numberOfWorkerHarnessThreads", options.getNumberOfWorkerHarnessThreads())); + assertThat(optionsMap, hasEntry("region", options.getRegion())); } /** * Test that the region is set in the generated JSON pipeline options even when a default value is * grabbed from the environment. */ - @RunWith(PowerMockRunner.class) - @PrepareForTest(DefaultGcpRegionFactory.class) - public static class DefaultRegionTest { - @Test - public void testDefaultRegionSet() throws Exception { - mockStatic(DefaultGcpRegionFactory.class); - PowerMockito.when(DefaultGcpRegionFactory.getRegionFromEnvironment()).thenReturn(REGION_ID); + @Test + public void testDefaultRegionSet() throws Exception { + try (MockedStatic mocked = + Mockito.mockStatic(DefaultGcpRegionFactory.class)) { + mocked.when(DefaultGcpRegionFactory::getRegionFromEnvironment).thenReturn(REGION_ID); Dataflow.Projects.Locations.Jobs mockJobs = mock(Dataflow.Projects.Locations.Jobs.class); DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); @@ -526,7 +521,7 @@ public void testDefaultRegionSet() throws Exception { assertThat(sdkPipelineOptions, hasKey("options")); Map optionsMap = (Map) sdkPipelineOptions.get("options"); - assertThat(optionsMap, hasEntry("region", (Object) options.getRegion())); + assertThat(optionsMap, hasEntry("region", options.getRegion())); } } @@ -585,8 +580,7 @@ public static class JacksonIncompatibleDeserializer @Override public JacksonIncompatible deserialize( - JsonParser jsonParser, DeserializationContext deserializationContext) - throws IOException, JsonProcessingException { + JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException { return new JacksonIncompatible(jsonParser.readValueAs(String.class)); } } @@ -599,7 +593,7 @@ public void serialize( JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) - throws IOException, JsonProcessingException { + throws IOException { jsonGenerator.writeString(jacksonIncompatible.value); } } @@ -621,7 +615,7 @@ public void testSettingOfPipelineOptionsWithCustomUserType() throws IOException jobCaptor.getValue().getEnvironment().getSdkPipelineOptions(); assertThat(sdkPipelineOptions, hasKey("options")); Map optionsMap = (Map) sdkPipelineOptions.get("options"); - assertThat(optionsMap, hasEntry("jacksonIncompatible", (Object) "userCustomTypeTest")); + assertThat(optionsMap, hasEntry("jacksonIncompatible", "userCustomTypeTest")); } @Test @@ -936,7 +930,7 @@ public void testRunWithFiles() throws IOException { String overridePackageName = "alias.txt"; - when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + when(mockGcsUtil.getObjects(anyList())) .thenReturn( ImmutableList.of( GcsUtil.StorageObjectOrIOException.create(new FileNotFoundException("some/path")))); @@ -1006,7 +1000,7 @@ public void testRunWithMissingFiles() throws IOException { String overridePackageName = "alias.txt"; - when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + when(mockGcsUtil.getObjects(anyList())) .thenReturn( ImmutableList.of( GcsUtil.StorageObjectOrIOException.create(new FileNotFoundException("some/path")))); @@ -1045,7 +1039,7 @@ public void runWithDefaultFilesToStage() throws Exception { } @Test - public void testGcsStagingLocationInitialization() throws Exception { + public void testGcsStagingLocationInitialization() { // Set temp location (required), and check that staging location is set. DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setTempLocation(VALID_TEMP_BUCKET); @@ -1564,7 +1558,7 @@ public void testResolveArtifacts() throws IOException { } @Test - public void testGcpTempAndNoTempLocationSucceeds() throws Exception { + public void testGcpTempAndNoTempLocationSucceeds() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setRunner(DataflowRunner.class); options.setGcpCredential(new TestCredential()); @@ -1577,7 +1571,7 @@ public void testGcpTempAndNoTempLocationSucceeds() throws Exception { } @Test - public void testTempLocationAndNoGcpTempLocationSucceeds() throws Exception { + public void testTempLocationAndNoGcpTempLocationSucceeds() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setRunner(DataflowRunner.class); options.setGcpCredential(new TestCredential()); @@ -1697,8 +1691,7 @@ public String getUrn() { @Override public RunnerApi.FunctionSpec translate( - AppliedPTransform application, SdkComponents components) - throws IOException { + AppliedPTransform application, SdkComponents components) { return RunnerApi.FunctionSpec.newBuilder().setUrn(getUrn(application.getTransform())).build(); } } @@ -1774,7 +1767,7 @@ public void testTransformTranslator() throws IOException { assertTrue(transform.translated); } - private void verifySdkHarnessConfiguration(DataflowPipelineOptions options) throws IOException { + private void verifySdkHarnessConfiguration(DataflowPipelineOptions options) { Pipeline p = Pipeline.create(options); p.apply(Create.of(Arrays.asList(1, 2, 3))); @@ -1908,7 +1901,7 @@ public void testSettingConflictingEnableAndDisableExperimentsThrowsException() t /** Records all the composite transforms visited within the Pipeline. */ private static class CompositeTransformRecorder extends PipelineVisitor.Defaults { - private List> transforms = new ArrayList<>(); + private final List> transforms = new ArrayList<>(); @Override public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { @@ -1944,7 +1937,7 @@ public void testApplyIsScopedToExactClass() throws IOException { assertThat( "Expected to have two composites, CreateTimestamped and Create.Values", recorder.getCompositeTransforms(), - hasItem(Matchers.>isA((Class) Create.Values.class))); + hasItem(Matchers.>isA(Create.Values.class))); } @Test @@ -2012,26 +2005,127 @@ public void testTemplateRunnerWithUploadGraph() throws Exception { /** * Tests that the {@link DataflowRunner} with {@code --templateLocation} throws the appropriate - * exception when an output file is not writable. + * exception when an output file is not creatable. */ @Test - public void testTemplateRunnerLoggedErrorForFile() throws Exception { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setJobName("TestJobName"); - options.setRunner(DataflowRunner.class); + public void testTemplateRunnerLoggedErrorForFileNotCreatable() throws Exception { + + DataflowPipelineOptions options = buildPipelineOptions(); options.setTemplateLocation("//bad/path"); - options.setProject("test-project"); - options.setRegion(REGION_ID); - options.setTempLocation(tmpFolder.getRoot().getPath()); - options.setGcpCredential(new TestCredential()); - options.setPathValidatorClass(NoopPathValidator.class); Pipeline p = Pipeline.create(options); thrown.expectMessage("Cannot create output file at"); thrown.expect(RuntimeException.class); + thrown.expectCause( + hasProperty("message", containsString("Unable to create parent directories for"))); + p.run(); } + private static WritableByteChannel createWritableByteChannelThrowsIOExceptionAtClose( + String errorMessage) { + return new WritableByteChannel() { + @Override + public int write(ByteBuffer src) { + int remaining = src.remaining(); + src.get(new byte[remaining]); + return remaining; + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void close() throws IOException { + throw new IOException(errorMessage); + } + }; + } + + /** + * Tests that the {@link DataflowRunner} with {@code --templateLocation} throws the appropriate + * exception when an output file throws IOException at close. + */ + @Test + public void testTemplateRunnerLoggedErrorForFileCloseError() throws Exception { + File templateLocation = tmpFolder.newFile(); + String closeErrorMessage = "Unable to close"; + + try (MockedStatic mocked = + Mockito.mockStatic(FileSystems.class, CALLS_REAL_METHODS)) { + mocked + .when( + () -> + FileSystems.create( + FileSystems.matchNewResource(templateLocation.getPath(), false), + MimeTypes.TEXT)) + .thenReturn(createWritableByteChannelThrowsIOExceptionAtClose(closeErrorMessage)); + + DataflowPipelineOptions options = buildPipelineOptions(); + options.setTemplateLocation(templateLocation.getPath()); + Pipeline p = Pipeline.create(options); + + thrown.expectMessage("Cannot create output file at"); + thrown.expect(RuntimeException.class); + thrown.expectCause(Matchers.isA(IOException.class)); + thrown.expectCause(hasProperty("message", is(closeErrorMessage))); + + p.run(); + } + } + + private static WritableByteChannel createWritableByteChannelThrowsIOExceptionAtWrite( + String errorMessage) { + return new WritableByteChannel() { + @Override + public int write(ByteBuffer src) throws IOException { + throw new IOException(errorMessage); + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void close() {} + }; + } + + /** + * Tests that the {@link DataflowRunner} with {@code --templateLocation} throws the appropriate + * exception when an output file throws IOException at close. + */ + @Test + public void testTemplateRunnerLoggedErrorForFileWriteError() throws Exception { + File templateLocation = tmpFolder.newFile(); + String closeErrorMessage = "Unable to write"; + + try (MockedStatic mocked = + Mockito.mockStatic(FileSystems.class, CALLS_REAL_METHODS)) { + mocked + .when( + () -> + FileSystems.create( + FileSystems.matchNewResource(templateLocation.getPath(), false), + MimeTypes.TEXT)) + .thenReturn(createWritableByteChannelThrowsIOExceptionAtWrite(closeErrorMessage)); + + thrown.expectMessage("Cannot create output file at"); + thrown.expect(RuntimeException.class); + thrown.expectCause(Matchers.isA(IOException.class)); + thrown.expectCause(hasProperty("message", is(closeErrorMessage))); + + DataflowPipelineOptions options = buildPipelineOptions(); + options.setTemplateLocation(templateLocation.getPath()); + Pipeline p = Pipeline.create(options); + + p.run(); + } + } + @Test public void testGetContainerImageForJobFromOption() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); @@ -2130,8 +2224,8 @@ public void testStreamingWriteWithShardingReturnsSameTransform() { WriteFilesResult originalResult = objs.apply(original); WriteFilesResult replacementResult = objs.apply(replacement); - assertTrue(replacement.getNumShardsProvider() == null); - assertTrue(replacement.getComputeNumShards() == null); + assertNull(replacement.getNumShardsProvider()); + assertNull(replacement.getComputeNumShards()); assertTrue(replacement.getWithAutoSharding()); } @@ -2171,7 +2265,7 @@ public void testMergingStatefulRejectedInBatch() throws Exception { } private void verifyGroupIntoBatchesOverrideCount( - Pipeline p, Boolean withShardedKey, Boolean expectOverriden) { + Pipeline p, Boolean withShardedKey, Boolean expectOverridden) { final int batchSize = 2; List> testValues = Arrays.asList(KV.of("A", 1), KV.of("B", 0), KV.of("A", 2), KV.of("A", 4), KV.of("A", 8)); @@ -2241,7 +2335,7 @@ public CompositeBehavior enterCompositeTransform(Node node) { return CompositeBehavior.ENTER_TRANSFORM; } }); - if (expectOverriden) { + if (expectOverridden) { assertTrue(sawGroupIntoBatchesOverride.get()); } else { assertFalse(sawGroupIntoBatchesOverride.get()); @@ -2249,7 +2343,7 @@ public CompositeBehavior enterCompositeTransform(Node node) { } private void verifyGroupIntoBatchesOverrideBytes( - Pipeline p, Boolean withShardedKey, Boolean expectOverriden) { + Pipeline p, Boolean withShardedKey, Boolean expectOverridden) { final long batchSizeBytes = 2; List> testValues = Arrays.asList( @@ -2315,7 +2409,7 @@ public CompositeBehavior enterCompositeTransform(Node node) { return CompositeBehavior.ENTER_TRANSFORM; } }); - if (expectOverriden) { + if (expectOverridden) { assertTrue(sawGroupIntoBatchesOverride.get()); } else { assertFalse(sawGroupIntoBatchesOverride.get()); @@ -2491,13 +2585,13 @@ public void testBigQueryDLQWarning(BigQueryIO.Write.Method method, boolean proce .getFailedInserts() .apply( MapElements.into(TypeDescriptors.voids()) - .via(SerializableFunctions.constant((Void) null))); + .via(SerializableFunctions.constant(null))); } else { result .getFailedStorageApiInserts() .apply( MapElements.into(TypeDescriptors.voids()) - .via(SerializableFunctions.constant((Void) null))); + .via(SerializableFunctions.constant(null))); } } p.run(); @@ -2582,14 +2676,14 @@ public ExpansionApi.DiscoverSchemaTransformResponse discover( } @Override - public void close() throws Exception { + public void close() { // do nothing } }; } @Override - public void close() throws Exception { + public void close() { // do nothing } } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java index 25c939fce6be..b381396f2bcc 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java @@ -22,8 +22,9 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; -import static org.powermock.api.mockito.PowerMockito.mockStatic; -import static org.powermock.api.mockito.PowerMockito.when; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.mockStatic; import java.io.IOException; import java.io.InputStream; @@ -42,10 +43,7 @@ import org.junit.rules.TestRule; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Answers; -import org.mockito.ArgumentMatchers; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; +import org.mockito.MockedStatic; /** Tests for {@link DataflowPipelineOptions}. */ @RunWith(JUnit4.class) @@ -211,66 +209,66 @@ public void testDefaultStagingLocationUnset() { options.getStagingLocation(); } - @RunWith(PowerMockRunner.class) - @PrepareForTest(DefaultGcpRegionFactory.class) - public static class DefaultGcpRegionFactoryTest { - @Test - public void testDefaultGcpRegionUnset() - throws IOException, InterruptedException, TimeoutException { - mockStatic(DefaultGcpRegionFactory.class); - when(DefaultGcpRegionFactory.getRegionFromEnvironment()).thenReturn(null); - when(DefaultGcpRegionFactory.getRegionFromGcloudCli(ArgumentMatchers.anyLong())) - .thenReturn(""); + @Test + public void testDefaultGcpRegionUnset() { + try (MockedStatic mocked = mockStatic(DefaultGcpRegionFactory.class)) { + mocked.when(DefaultGcpRegionFactory::getRegionFromEnvironment).thenReturn(null); + mocked.when(() -> DefaultGcpRegionFactory.getRegionFromGcloudCli(anyLong())).thenReturn(""); DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); assertEquals("", options.getRegion()); } + } - @Test - public void testDefaultGcpRegionUnsetIgnoresGcloudException() - throws IOException, InterruptedException, TimeoutException { - mockStatic(DefaultGcpRegionFactory.class); - when(DefaultGcpRegionFactory.getRegionFromEnvironment()).thenReturn(null); - when(DefaultGcpRegionFactory.getRegionFromGcloudCli(ArgumentMatchers.anyLong())) + @Test + public void testDefaultGcpRegionUnsetIgnoresGcloudException() { + try (MockedStatic mocked = mockStatic(DefaultGcpRegionFactory.class)) { + mocked.when(DefaultGcpRegionFactory::getRegionFromEnvironment).thenReturn(null); + mocked + .when(() -> DefaultGcpRegionFactory.getRegionFromGcloudCli(anyLong())) .thenThrow(new IOException()); DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); assertEquals("", options.getRegion()); } + } - @Test - public void testDefaultGcpRegionFromEnvironment() { - mockStatic(DefaultGcpRegionFactory.class); - when(DefaultGcpRegionFactory.getRegionFromEnvironment()).thenReturn("us-west1"); + @Test + public void testDefaultGcpRegionFromEnvironment() { + try (MockedStatic mocked = mockStatic(DefaultGcpRegionFactory.class)) { + mocked.when(DefaultGcpRegionFactory::getRegionFromEnvironment).thenReturn("us-west1"); DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); assertEquals("us-west1", options.getRegion()); } + } - @Test - public void testDefaultGcpRegionFromGcloud() - throws IOException, InterruptedException, TimeoutException { - mockStatic(DefaultGcpRegionFactory.class); - when(DefaultGcpRegionFactory.getRegionFromEnvironment()).thenReturn(null); - when(DefaultGcpRegionFactory.getRegionFromGcloudCli(ArgumentMatchers.anyLong())) + @Test + public void testDefaultGcpRegionFromGcloud() { + try (MockedStatic mocked = mockStatic(DefaultGcpRegionFactory.class)) { + mocked.when(DefaultGcpRegionFactory::getRegionFromEnvironment).thenReturn(null); + mocked + .when(() -> DefaultGcpRegionFactory.getRegionFromGcloudCli(anyLong())) .thenReturn("us-west1"); DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); assertEquals("us-west1", options.getRegion()); } + } - /** - * If gcloud gets stuck, test that {@link DefaultGcpRegionFactory#getRegionFromGcloudCli(long)} - * times out instead of blocking forever. - */ - @Test(timeout = 10000L) - public void testGetRegionFromGcloudCliTimeout() - throws IOException, InterruptedException, TimeoutException { - mockStatic(DefaultGcpRegionFactory.class, Answers.CALLS_REAL_METHODS); - when(DefaultGcpRegionFactory.startGcloud()) + /** + * If gcloud gets stuck, test that {@link DefaultGcpRegionFactory#getRegionFromGcloudCli(long)} + * times out instead of blocking forever. + */ + @Test(timeout = 10000L) + public void testGetRegionFromGcloudCliTimeout() { + try (MockedStatic mocked = + mockStatic(DefaultGcpRegionFactory.class, CALLS_REAL_METHODS)) { + mocked + .when(DefaultGcpRegionFactory::startGcloud) .thenReturn( new Process() { @Override public OutputStream getOutputStream() { return new OutputStream() { @Override - public void write(int b) throws IOException { + public void write(int b) { // Do nothing. } }; @@ -280,7 +278,7 @@ public void write(int b) throws IOException { public InputStream getInputStream() { return new InputStream() { @Override - public int read() throws IOException { + public int read() { // Return EOF immediately. return -1; } @@ -291,7 +289,7 @@ public int read() throws IOException { public InputStream getErrorStream() { return new InputStream() { @Override - public int read() throws IOException { + public int read() { // Never return EOF to create an infinite loop. try { // Sleep so we don't buffer too many bytes and run out of memory. @@ -305,7 +303,7 @@ public int read() throws IOException { } @Override - public int waitFor() throws InterruptedException { + public int waitFor() { // Unused. return 0; }