-
-
Notifications
You must be signed in to change notification settings - Fork 83
workflow: avoid duplicate fan-in execution from concurrent predecessors #679
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
geofmureithi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some thoughts
| .await?; | ||
| // TODO(bug): The check of done is not a good one as it can be called more than once if the jobs a too quickly executed | ||
| if results.iter().all(|s| matches!(s.status, Status::Done)) { | ||
| // ===== FIX START ===== |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this line
| // ===== FIX START ===== |
| pending_dependencies: dependency_task_ids, | ||
| }); | ||
| } | ||
| // ===== FIX END ===== |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this line too
| // ===== FIX END ===== |
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately these tests dot actually test the actual functionality that is changing.
I meant the test would:
- Build a basic Dag with a node with multiple predecessors
- Start a test worker
- Push a task
- Ensure only one call was made in the node with multiple predecessors.
This may need some extra work and I can understand if you need extra direction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#[tokio::test]
async fn test_fan_in_runs_once_with_multiple_predecessors() {
let counter = Arc::new(AtomicUsize::new(0));
let barrier = Arc::new(Barrier::new(2));
let dag = DagFlow::new("fan-in-single-exec-workflow");
let a = {
let barrier = barrier.clone();
dag.add_node("a", task_fn(move |task: u32| {
let barrier = barrier.clone();
async move {
barrier.wait().await;
task
}
}))
};
let b = {
let barrier = barrier.clone();
dag.add_node("b", task_fn(move |task: u32| {
let barrier = barrier.clone();
async move {
barrier.wait().await;
task
}
}))
};
let counter_clone = Arc::clone(&counter);
let _fan_in = dag
.add_node(
"fan_in",
task_fn(move |task: (u32, u32), worker: WorkerContext| {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
worker.stop().unwrap();
task.0 + task.1
}
}),
)
.depends_on((&a, &b));
let mut backend: JsonStorage<Value> = JsonStorage::new_temp().unwrap();
backend.start_fan_out((1u32, 2u32)).await.unwrap();
let worker = WorkerBuilder::new("rango-tango")
.backend(backend)
.build(dag);
worker.run().await.unwrap();
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
Does this test coverage look sufficient to you, or would you prefer a different structure?
I did remove Fix start and fix end comment, should I also remove TODO comment?
Description
Fixes #673
Fix a bug where a fan-in node in a DAG workflow could be executed more than once when multiple predecessor nodes enqueue it concurrently.
This change ensures that only a single designated predecessor is allowed to trigger fan-in execution once all dependencies are complete.
Type of Change
Bug fix (non-breaking change which fixes an issue)
Testing
I have added tests that prove my fix is effective or that my feature works
I have run the existing tests and they pass
I have run cargo fmt and cargo clippy
Checklist
My code follows the code style of this project
I have performed a self-review of my own code
I have commented my code, particularly in hard-to-understand areas
My changes generate no new warnings
I have added tests that prove my fix is effective or that my feature works
New and existing unit tests pass locally with my changes
Additional Notes
This fix addresses duplicate enqueue caused specifically by concurrent fan-in scheduling.