Assigned
Status Update
Comments
sa...@google.com <sa...@google.com> #2
I've created this PIT for customer visibility
va...@google.com <va...@google.com>
je...@google.com <je...@google.com> #3
Hello,
This issue report has been forwarded to the Cloud Dataflow Product team so that they may investigate it, but there is no ETA for a resolution today. Future updates regarding this issue will be provided here.
sa...@google.com <sa...@google.com> #4
We were able to get it working with a somewhat hacky fix working with DirectRunner by wrapping the DoFn in _TimeoutDoFn so that we can set the state tracker which is a main thread local variable in the child thread.
def process(self, *args, **kwargs):
if self._pool is None:
self._pool = concurrent.futures.ThreadPoolExecutor(10)
from apache_beam.runners.worker.statesampler import get_current_tracker, set_current_tracker
current_tracker = get_current_tracker()
def wrapped_process():
set_current_tracker(current_tracker)
return list(self._fn.process(*args, **kwargs))
# Ensure we iterate over the entire output list in the given amount of time.
try:
return self._pool.submit(wrapped_process).result(self._timeout)
except TimeoutError:
self._pool.shutdown(wait=False)
self._pool = None
raise
Description
This will create a public issue which anybody can view and comment on.
Please provide as much information as possible. At least, this should include a description of your issue and steps to reproduce the problem. If possible please provide a summary of what steps or workarounds you have already tried, and any docs or articles you found (un)helpful.
Problem you have encountered:
Custom counters defined for a dataflow job are visible in the dataflow console as Function without handlling exception and not being exported to dashboard
When running DataFlow, cx observed that using exception handling results in Metrics recorded by the beam DoFn not being logged into the dashboard. Cx provided a minimal example below where the same exact DoFn is used under 3 conditions:
-no exception handling -timeout -use_subprocess.
The DoFn simply increments a counter and yields. Only the counter for the DoFn without exception handling is logged.
What you expected to happen:
Only metrics for DoFns without exception handling are logged.
Steps to reproduce:
run this minimal reproducible example in your env
--runner=DataflowRunne
Other information (workarounds you have tried, documentation consulted, etc):