Skip to content

Conversation

@next-n
Copy link

@next-n next-n commented Jan 30, 2026

Description
Fixes #673
Fix a bug where a fan-in node in a DAG workflow could be executed more than once when multiple predecessor nodes enqueue it concurrently.
This change ensures that only a single designated predecessor is allowed to trigger fan-in execution once all dependencies are complete.

Type of Change

Bug fix (non-breaking change which fixes an issue)

Testing

I have added tests that prove my fix is effective or that my feature works

I have run the existing tests and they pass

I have run cargo fmt and cargo clippy

Checklist

My code follows the code style of this project

I have performed a self-review of my own code

I have commented my code, particularly in hard-to-understand areas

My changes generate no new warnings

I have added tests that prove my fix is effective or that my feature works

New and existing unit tests pass locally with my changes

Additional Notes

This fix addresses duplicate enqueue caused specifically by concurrent fan-in scheduling.

@next-n next-n requested a review from geofmureithi as a code owner January 30, 2026 13:31
Copy link
Member

@geofmureithi geofmureithi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some thoughts

.await?;
// TODO(bug): The check of done is not a good one as it can be called more than once if the jobs a too quickly executed
if results.iter().all(|s| matches!(s.status, Status::Done)) {
// ===== FIX START =====
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line

Suggested change
// ===== FIX START =====

pending_dependencies: dependency_task_ids,
});
}
// ===== FIX END =====
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line too

Suggested change
// ===== FIX END =====

}

#[cfg(test)]
mod tests {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately these tests dot actually test the actual functionality that is changing.
I meant the test would:

  1. Build a basic Dag with a node with multiple predecessors
  2. Start a test worker
  3. Push a task
  4. Ensure only one call was made in the node with multiple predecessors.

This may need some extra work and I can understand if you need extra direction.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#[tokio::test]
async fn test_fan_in_runs_once_with_multiple_predecessors() {
let counter = Arc::new(AtomicUsize::new(0));
let barrier = Arc::new(Barrier::new(2));

    let dag = DagFlow::new("fan-in-single-exec-workflow");

    let a = {
        let barrier = barrier.clone();
        dag.add_node("a", task_fn(move |task: u32| {
            let barrier = barrier.clone();
            async move {
                barrier.wait().await;
                task
            }
        }))
    };

    let b = {
        let barrier = barrier.clone();
        dag.add_node("b", task_fn(move |task: u32| {
            let barrier = barrier.clone();
            async move {
                barrier.wait().await;
                task
            }
        }))
    };

    let counter_clone = Arc::clone(&counter);
    let _fan_in = dag
        .add_node(
            "fan_in",
            task_fn(move |task: (u32, u32), worker: WorkerContext| {
                let counter = Arc::clone(&counter_clone);
                async move {
                    counter.fetch_add(1, Ordering::SeqCst);
                    worker.stop().unwrap();
                    task.0 + task.1
                }
            }),
        )
        .depends_on((&a, &b));

    let mut backend: JsonStorage<Value> = JsonStorage::new_temp().unwrap();
    backend.start_fan_out((1u32, 2u32)).await.unwrap();

    let worker = WorkerBuilder::new("rango-tango")
        .backend(backend)
        .build(dag);

    worker.run().await.unwrap();

    assert_eq!(counter.load(Ordering::SeqCst), 1);
}

Does this test coverage look sufficient to you, or would you prefer a different structure?
I did remove Fix start and fix end comment, should I also remove TODO comment?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

DAG fan‑in node can execute more than once when dependencies finish close together

2 participants