Skip to content

Fix lost state updates in Set state#39175

Open
shunping wants to merge 6 commits into
apache:masterfrom
shunping:fix-flaky-state-test
Open

Fix lost state updates in Set state#39175
shunping wants to merge 6 commits into
apache:masterfrom
shunping:fix-flaky-state-test

Conversation

@shunping

@shunping shunping commented Jun 30, 2026

Copy link
Copy Markdown
Collaborator

This PR fixes a race condition in SynchronousSetRuntimeState where asynchronous state requests triggered by state compaction or early clears were not being awaited.

When elements are added to SynchronousSetRuntimeState, it occasionally triggers data compaction (_compact_data) which launches asynchronous clear and extend requests to the State Handler. However, these futures were not tracked previously. The commit() method only blocked on the last futures generated during the commit phase, causing outstanding compaction futures to complete out of order or after the bundle processor finished, resulting in lost state updates.

An example failed test:
https://github.com/apache/beam/actions/runs/28001498086/job/82874623596

Traceback:

_______ StatefulDoFnOnDirectRunnerTest.test_stateful_set_state_portably ________
[gw2] linux -- Python 3.10.20 /runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310/py310/bin/python

self = <apache_beam.transforms.userstate_test.StatefulDoFnOnDirectRunnerTest testMethod=test_stateful_set_state_portably>

    def test_stateful_set_state_portably(self):
      class SetStatefulDoFn(beam.DoFn):
    
        SET_STATE = SetStateSpec('buffer', VarIntCoder())
    
        def process(self, element, set_state=beam.DoFn.StateParam(SET_STATE)):
          _, value = element
          aggregated_value = 0
          set_state.add(value)
          for saved_value in set_state.read():
            aggregated_value += saved_value
          yield aggregated_value
    
>     with TestPipeline() as p:

apache_beam/transforms/userstate_test.py:715: 
...
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline job-045 failed in state FAILED: bundle inst010 stage-009 failed:Traceback (most recent call last):
E         File "apache_beam/runners/common.py", line 1499, in apache_beam.runners.common.DoFnRunner.process
E           return self.do_fn_invoker.invoke_process(windowed_value)
E         File "apache_beam/runners/common.py", line 913, in apache_beam.runners.common.PerWindowInvoker.invoke_process
E           self._invoke_process_per_window(
E         File "apache_beam/runners/common.py", line 1058, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
E           self.process_method(*args_for_process, **kwargs_for_process),
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310/py310/lib/python3.10/site-packages/apache_beam/transforms/core.py", line 2126, in <lambda>
E           wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310/py310/lib/python3.10/site-packages/apache_beam/testing/util.py", line 202, in _equal
E           raise BeamAssertException(msg)
E       apache_beam.testing.util.BeamAssertException: Failed assert: [1, 3, 6, 10, 10] == [1, 4, 6, 3, 7], unexpected elements [4, 7], missing elements [10, 10]
E       

In this failed scenario, processing the first bundle [1, 3, 2] produces [1, 4, 6]. However, a compaction occurs prior to the commit. While the clear and extend requests are dispatched to the state handler, the bundle boundary's commit() does not await them. As a result, the extend call is not invoked before the bundle finishes, and the runner state remains empty. When [3, 4] subsequently arrives, it reads an empty state from the runner and yields [3, 7].

@shunping shunping marked this pull request as ready for review June 30, 2026 22:16
@gemini-code-assist

Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a race condition in SynchronousSetRuntimeState where asynchronous state requests were not being properly tracked or awaited. By ensuring all outstanding state operations are completed during the commit phase, the change prevents data loss caused by compaction futures finishing out of order or after the bundle processor has already finalized.

Highlights

  • Asynchronous State Tracking: Introduced a _futures list in SynchronousSetRuntimeState to track all asynchronous state requests triggered by compaction and early clears.
  • Commit Synchronization: Updated the commit() method to aggregate and await all outstanding asynchronous futures, ensuring that state updates are fully processed before the bundle completes.
  • Regression Testing: Added a new test case test_stateful_set_state_compaction_race_portably that simulates latency in state requests to verify the fix for the identified race condition.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions

Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @jrmccluskey for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces tracking of outstanding asynchronous state requests in bundle_processor.py to ensure they are properly awaited and committed during the commit phase, and adds a corresponding unit test to verify stateful set state compaction under race conditions. The reviewer suggested a critical improvement to the commit logic: using a while loop to repeatedly drain self._futures rather than a single snapshot, ensuring that any new futures appended while awaiting previous ones are also fully resolved before the commit returns.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread sdks/python/apache_beam/runners/worker/bundle_processor.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant