You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I’ve written a test that reproduces an issue with GroupIntoBatches when it’s placed between two aggregations.
The test simulates a scenario where GroupIntoBatches is introduced between aggregations using two different FixedWindow configurations. The difference between the two windows lies in their settings: one uses accumulatingFiredPanes while the other uses discardingFiredPanes.
Here’s what I’ve observed:
Without GroupIntoBatches, the fired pane from the first aggregation immediately propagates to the second aggregation.
When GroupIntoBatches is introduced, the second aggregation only produces an output after the second pane is fired from the first aggregation.
Is there a known workaround for this behavior? Could I be missing any configuration or specific tuning to address this issue?
Apache Beam version is 2.61.
Thank you!
@TestvoidwhenGroupIntoBatchesBetweenAggregates_thenSecondAggregateOutputWithAdditionalDelayOfWindowDuration() {
DataEntryone = DataEntry.builder().entryId("counter").qualifier("one").build();
DataEntrytwo = DataEntry.builder().entryId("counter").qualifier("two").build();
DataEntrythree = DataEntry.builder().entryId("counter").qualifier("three").build();
DataEntryfour = DataEntry.builder().entryId("counter").qualifier("four").build();
DataEntryfive = DataEntry.builder().entryId("counter").qualifier("five").build();
TestStream<DataEntry> createEvents = TestStream.create(SerializableCoder.of(DataEntry.class))
.addElements(TimestampedValue.of(one, EPOCH))
.advanceWatermarkTo(EPOCH.plus(standardMinutes(1L)))
.addElements(TimestampedValue.of(two, EPOCH.plus(standardSeconds(20L))))
.advanceWatermarkTo(EPOCH.plus(standardMinutes(5L)))
.addElements(TimestampedValue.of(three, EPOCH.plus(standardSeconds(40L))))
.advanceWatermarkTo(EPOCH.plus(standardMinutes(8L)))
.addElements(TimestampedValue.of(four, EPOCH.plus(standardSeconds(42L))))
.advanceWatermarkTo(EPOCH.plus(standardMinutes(10L)))
// arrived after allowed lateness - ignored
.addElements(TimestampedValue.of(five, EPOCH.plus(standardSeconds(42L))))
.advanceWatermarkToInfinity();
PCollection<KV<String, Integer>> pCollection = pipeline
.apply("values", createEvents)
.apply(
Window.<DataEntry>into(FixedWindows.of(Duration.standardMinutes(5)))
.triggering(AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()))
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
.withAllowedLateness(Duration.standardMinutes(5), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
.accumulatingFiredPanes() // the only difference with the next window
)
.apply(ParDo.of(newLoggerDoFn("Input")))
// aggregate using the first window
.apply(WithKeys.of(DataEntry::getEntryId).withKeyType(strings()))
.apply(GroupByKey.create())
.apply(Values.create())
.apply(Flatten.iterables())
.apply(ParDo.of(newLoggerDoFn("After Accum Aggregate")))
// batch elements
.apply("DummyKey", WithKeys.of(""))
.apply(GroupIntoBatches.<String, DataEntry>ofSize(1)
.withMaxBufferingDuration(Duration.standardSeconds(1))
.withShardedKey())
.apply(Values.create())
.apply(Flatten.iterables())
.apply(
Window.<DataEntry>into(FixedWindows.of(Duration.standardMinutes(5)))
.triggering(AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()))
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
.withAllowedLateness(Duration.standardMinutes(5), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
.discardingFiredPanes() // the only difference with the previous window
)
.apply(ParDo.of(newLoggerDoFn("Discarding Grouping Window")))
// aggregate using the second window
.apply(WithKeys.of(DataEntry::getEntryId).withKeyType(strings()))
.apply(Combine.perKey(newCountCombiner()))
.apply(ParDo.of(newKvLoggerDoFn("Final Combine")));
// expectedPAssert.that("On Time Pane", pCollection)
.inOnTimePane(boundedWindow)
.containsInAnyOrder(List.of(KV.of("counter", 2))); // but contains only last KV.of("counter", 4)PAssert.that("Late Pane", pCollection)
.inLatePane(boundedWindow)
.containsInAnyOrder(List.of(KV.of("counter", 4)));
PAssert.that("Full Window", pCollection)
.inWindow(boundedWindow)
.containsInAnyOrder(List.of(KV.of("counter", 2), KV.of("counter", 4)));
pipeline.run().waitUntilFinish();
}
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
Component: Python SDK
Component: Java SDK
Component: Go SDK
Component: Typescript SDK
Component: IO connector
Component: Beam YAML
Component: Beam examples
Component: Beam playground
Component: Beam katas
Component: Website
Component: Infrastructure
Component: Spark Runner
Component: Flink Runner
Component: Samza Runner
Component: Twister2 Runner
Component: Hazelcast Jet Runner
Component: Google Cloud Dataflow Runner
The text was updated successfully, but these errors were encountered:
What happened?
Hello everyone,
I’ve written a test that reproduces an issue with
GroupIntoBatches
when it’s placed between two aggregations.The test simulates a scenario where
GroupIntoBatches
is introduced between aggregations using two differentFixedWindow
configurations. The difference between the two windows lies in their settings: one usesaccumulatingFiredPanes
while the other usesdiscardingFiredPanes
.Here’s what I’ve observed:
Without
GroupIntoBatches
, the fired pane from the first aggregation immediately propagates to the second aggregation.When
GroupIntoBatches
is introduced, the second aggregation only produces an output after the second pane is fired from the first aggregation.Is there a known workaround for this behavior? Could I be missing any configuration or specific tuning to address this issue?
Apache Beam version is
2.61
.Thank you!
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
The text was updated successfully, but these errors were encountered: