-
-
Notifications
You must be signed in to change notification settings - Fork 4.4k
feat(workflow_engine): Setup the Task for process_workflow_updates #93553
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(workflow_engine): Setup the Task for process_workflow_updates #93553
Conversation
Codecov ReportAttention: Patch coverage is ✅ All tests successful. No failed tests found.
Additional details and impacted files@@ Coverage Diff @@
## master #93553 +/- ##
==========================================
+ Coverage 88.00% 88.04% +0.03%
==========================================
Files 10322 10340 +18
Lines 595043 596685 +1642
Branches 23141 23141
==========================================
+ Hits 523691 525355 +1664
+ Misses 70859 70837 -22
Partials 493 493 |
5001848
to
8b6fffc
Compare
## Description Starting implementation for: https://www.notion.so/sentry/Workflow-Status-Changes-1fa8b10e4b5d80a48acddb95d160da1f?source=copy_link#1fa8b10e4b5d80e6bb1aef39cab2c6dc This will add a registry of handlers when an activity is created, we'll use this in the [workflow engine to kick-off a task](#93553) to execute `process_workflows`. This also adds a way to pass the detector_id through the StatusMessageData all the way to a hook that we can invoke and read directly from the StatusMessageData later. We are also passed the `activity` so `workflow_engine` can trigger `.send_notification` (if evaluated to do so).
- Create a task for handling activity updates
8814284
to
62fc6a1
Compare
src/sentry/workflow_engine/tasks.py
Outdated
max_retries=5, | ||
soft_time_limit=50, | ||
time_limit=60, | ||
silo_mode=SiloMode.REGION, | ||
taskworker_config=config.TaskworkerConfig( | ||
namespace=namespaces.workflow_engine_tasks, | ||
processing_deadline_duration=60, | ||
retry=retry.Retry( | ||
times=3, | ||
delay=5, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anyone have any thoughts on what these values for retry / limits should be? just using what we have been.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There isn't any right/wrong answer here. However, if you are going to define Retry
you should also define on which exceptions that retrys should be performed. Without that retries will only be automatically performed for TimeoutError
or an explicit RetryError
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 thanks for the additional info -- seems like a number we can fudge around with in the future then, and we can use the same configuration as we are with issue alert processing (that's where i pulled these initial numbers from)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when would we not want to go with acks_late=True
in cases where we'd rather double process than drop one?
src/sentry/workflow_engine/tasks.py
Outdated
max_retries=5, | ||
soft_time_limit=50, | ||
time_limit=60, | ||
silo_mode=SiloMode.REGION, | ||
taskworker_config=config.TaskworkerConfig( | ||
namespace=namespaces.workflow_engine_tasks, | ||
processing_deadline_duration=60, | ||
retry=retry.Retry( | ||
times=3, | ||
delay=5, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There isn't any right/wrong answer here. However, if you are going to define Retry
you should also define on which exceptions that retrys should be performed. Without that retries will only be automatically performed for TimeoutError
or an explicit RetryError
.
|
||
The task will get the Activity from the database, create a WorkflowEventData object, | ||
and then process the data in `process_workflows`. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any ideas on what kind of throughput tasks in the workflow_tasks namespace will have?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm - not sure about throughput, i think that it will vary pretty heavily depending on configurations / products. for example, a metric issue with anomaly detection will be more computing time than evaluating if an event is new.
In the end, the throughput would be similar to issue alerts / metric alerts / crons -- all three of those product verticals will be executing here, but they're all fairly different rates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Knowing that it will be similar to alerts is helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add this to the alerts taskworker pools when we get US deployed. In other regions throughput will be small enough that it isn't a concern.
- Add queue to server config - improved task name / queue - added acks_late=True
src/sentry/workflow_engine/tasks.py
Outdated
""" | ||
Process a workflow task identified by the given Activity ID and Detector ID. | ||
|
||
This task will retry up to 3 times with a delay of 5 seconds between attempts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can leave this to the annotation to specify and remove this.
## Description Starting implementation for: https://www.notion.so/sentry/Workflow-Status-Changes-1fa8b10e4b5d80a48acddb95d160da1f?source=copy_link#1fa8b10e4b5d80e6bb1aef39cab2c6dc This will add a registry of handlers when an activity is created, we'll use this in the [workflow engine to kick-off a task](#93553) to execute `process_workflows`. This also adds a way to pass the detector_id through the StatusMessageData all the way to a hook that we can invoke and read directly from the StatusMessageData later. We are also passed the `activity` so `workflow_engine` can trigger `.send_notification` (if evaluated to do so).
## Description Starting implementation for: https://www.notion.so/sentry/Workflow-Status-Changes-1fa8b10e4b5d80a48acddb95d160da1f?source=copy_link#1fa8b10e4b5d80e6bb1aef39cab2c6dc This will add a registry of handlers when an activity is created, we'll use this in the [workflow engine to kick-off a task](#93553) to execute `process_workflows`. This also adds a way to pass the detector_id through the StatusMessageData all the way to a hook that we can invoke and read directly from the StatusMessageData later. We are also passed the `activity` so `workflow_engine` can trigger `.send_notification` (if evaluated to do so).
…93553) ## Description - Create a celery task namespace for the workflow engine tasks - Create a task for processing an activity in the workflow_engine -- this will allow us to create a celery task to async execute process workflows when the issue platform creates an activity In the code I mention there being a follow-up PR: #93580 is the initial PR to support the `Activity` / `Group` models.
Description
In the code I mention there being a follow-up PR: #93580 is the initial PR to support the
Activity
/Group
models.