Status Update
Comments
pu...@google.com <pu...@google.com>
je...@google.com <je...@google.com> #2
Hello,
To troubleshoot the issue further, I have created a private ticket to provide some information about the issue (for which you should have received a notification). Please provide requested information there. Don't put any personal information, including project identifiers in this public ticket.
je...@google.com <je...@google.com> #3
Hello,
Thank you for reaching out to us with your request.
We have duly noted your feedback and will thoroughly validate it. While we cannot provide an estimated time of implementation or guarantee the fulfillment of the issue, please be assured that your input is highly valued. Your feedback enables us to enhance our products and services.
We appreciate your continued trust and support in improving our Google Cloud Platform products. In case you want to report a new issue, please do not hesitate to create a new issue on the
Once again, we sincerely appreciate your valuable feedback; Thank you for your understanding and collaboration.
ar...@getcruise.com <ar...@getcruise.com> #4
Thank you for the update. Can somebody from the team please either (1) suggest any alternative workarounds or (2) confirm that the desired optimal behavior is not possible in Dataflow?
pp...@google.com <pp...@google.com> #5
(For context, you’ve met Arwin a few times over GVC & he also co-presented with Sayat at the BEAM Summit earlier this year.)
re...@google.com <re...@google.com> #6
Adding others.
ar...@getcruise.com <ar...@getcruise.com> #8
Hi, thank you for the response.
I tried this and I don't think it worked. While beam.Reshuffle prevents fusion, it does not prevent bundling. The ParDo that happen after a Reshuffle is still subject to bundling, which is causing the parallelism issues. Could you suggest a solution in order to control the bundle size?
Thanks,
Arwin
xq...@google.com <xq...@google.com> #9
One idea that we can try is to use BatchElements:
More details are in this doc:
ar...@getcruise.com <ar...@getcruise.com> #10
To clarify, you're suggesting to use BatchElements
with max_batch_size=1
right?
I read the design doc and it seems to suggest that there are two types of batching:
I don't think that batching within bundles would help - as I understand it the total parallelism of the job is based on the number of bundles. Since batching within bundles does not change the number or size of bundles, and bundles are the fundamental unit of parallelism in Beam/Dataflow, parallelism across the cluster will still be limited.
Batching across bundles won't do anything with max_batch_size=1
right? If we made the batch size bigger, that would be even worse because we would be "combining" bundles, which means fewer, bigger bundles, and even less parallelism.
To reiterate, what I'm trying to accomplish is to have smaller but more numerous bundles in my pipeline in order to increase parallelism. Please let me know if I misunderstood and there is a way to accomplish this.
Thanks!
Arwin
xq...@google.com <xq...@google.com> #11
BatchElements with max_batch_size=1 within bundles might be helpful for multithreading since there is no easy way to control the bundle size. This needs to be tested.
I checked your code again. Two questions:
- Not directly solve the bundle issue but if you add ReShuffle after beam.Create, you should be able to see the job scales up to more than 1 VM.
- What is the reason not using our built-in file ios to read your files? The reason I ask is our file ios actually use ReadAllFiles (
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsource.py#L402 ). It has min_bundle_sizem which is minimum size of data ranges that should be generated when splitting a file into data ranges and desired_bundle_size. Both help further splitting a file.
Description
As I understand it, "bundles" are the unit of parallelism in Dataflow / Apache Beam. The bundle size is automagically determined by the runner (Dataflow), and users don't have a way to manually fine-tune this bundle size parameter. Most of the time this works great.
But we have a pipeline which does not run properly, due to inappropriate bundle size (I think):
The pipeline ishttps://console.cloud.google.com/dataflow/jobs/us-central1/2024-12-08_03_44_45-7558016061365232360 and the job: 2024-12-08_03_44_45-7558016061365232360
We have a DoFn with "fanout," where a single input element produces many outputs after performing expensive processing. Dataflow groups these input elements into large bundles, which means fewer bundles are created overall. This limits parallelism because the processing of each large bundle cannot be distributed across multiple workers, slowing down the pipeline.
Using a fusion breaker like GroupByKey doesn’t help because the outputs of the GroupByKey are also subject to large bundling, which continues to limit parallelism. Additionally, the overhead of the GroupByKey operation can further degrade performance without addressing the core issue.
To be more specific here is what the pipeline is essentially doing:
The pipeline processes a list of file paths, downloading each file and processing it in chunks using a sliding window algorithm. Beam groups the file paths into large bundles for processing, but as I mentioned this limits parallelism. For this workload, the optimal bundle size is likely 1, as each file must be processed independently to maintain file and time locality for the sliding window.
The file contents cannot be split across the pipeline, as splitting would break the sliding window’s requirement for contiguous chunks. We would need to regroup them using GroupByKey, which would add expensive shuffle bandwidth and serde costs, as the file contents are large.
I think the best way to do this is to hardcode our
beam.ParDo(MyFn())
bundle size to 1 (but notbeam.WriteToBigQuery
!) Can Dataflow implement this? Or is there an alternative architecture that the team can suggest?