Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ children = [
- `:queue_enabled` - Enable queue processing (default: `true`)
- `:stale_lock_timeout` - Seconds before lock is stale (default: `300`)
- `:heartbeat_interval` - Worker heartbeat interval in ms (default: `30_000`)
- `:log_level` - Log level for internal queries, or `false` to disable (default: `false`)

## Testing

Expand Down
23 changes: 21 additions & 2 deletions lib/durable/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule Durable.Config do
* `:queue_enabled` - Enable/disable queue processing (default: `true`)
* `:stale_lock_timeout` - Seconds before a lock is considered stale (default: `300`)
* `:heartbeat_interval` - Milliseconds between worker heartbeats (default: `30_000`)
* `:log_level` - Log level for internal Durable queries, or `false` to disable (default: `false`)

## Examples

Expand Down Expand Up @@ -43,7 +44,8 @@ defmodule Durable.Config do
stale_lock_timeout: pos_integer(),
heartbeat_interval: pos_integer(),
scheduled_modules: [module()],
scheduler_interval: pos_integer()
scheduler_interval: pos_integer(),
log_level: Logger.level() | false
}

defstruct [
Expand All @@ -55,7 +57,8 @@ defmodule Durable.Config do
:stale_lock_timeout,
:heartbeat_interval,
:scheduled_modules,
:scheduler_interval
:scheduler_interval,
:log_level
]

@schema [
Expand Down Expand Up @@ -103,6 +106,12 @@ defmodule Durable.Config do
type: :pos_integer,
default: 60_000,
doc: "Milliseconds between scheduler polls for due schedules"
],
log_level: [
type:
{:in, [false, :emergency, :alert, :critical, :error, :warning, :notice, :info, :debug]},
default: false,
doc: "Log level for internal Durable queries, or false to disable logging"
]
]

Expand Down Expand Up @@ -229,6 +238,16 @@ defmodule Durable.Config do
get(name).heartbeat_interval
end

@doc """
Gets the log level for internal Durable queries.

Returns `false` to disable logging, or a Logger level atom.
"""
@spec log_level(atom()) :: Logger.level() | false
def log_level(name \\ Durable) do
get(name).log_level
end

@doc """
Returns the Task.Supervisor name for a Durable instance.

Expand Down
47 changes: 28 additions & 19 deletions lib/durable/queue/adapters/postgres.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ defmodule Durable.Queue.Adapters.Postgres do
RETURNING id, workflow_module, workflow_name, queue, priority, input, context, scheduled_at, current_step;
"""

case SQL.query(repo, sql, [queue, limit, node_id]) do
case SQL.query(repo, sql, [queue, limit, node_id], log: config.log_level) do
{:ok, %{rows: rows, columns: columns}} ->
rows
|> Enum.map(&parse_row(&1, columns))
Expand All @@ -54,15 +54,16 @@ defmodule Durable.Queue.Adapters.Postgres do
@impl true
def ack(%Config{} = config, job_id) when is_binary(job_id) do
repo = config.repo
log_opts = [log: config.log_level]

case repo.get(WorkflowExecution, job_id) do
case repo.get(WorkflowExecution, job_id, log_opts) do
nil ->
{:error, :not_found}

execution ->
execution
|> WorkflowExecution.unlock_changeset()
|> repo.update()
|> repo.update(log_opts)

:ok
end
Expand All @@ -71,8 +72,9 @@ defmodule Durable.Queue.Adapters.Postgres do
@impl true
def nack(%Config{} = config, job_id, reason) when is_binary(job_id) do
repo = config.repo
log_opts = [log: config.log_level]

case repo.get(WorkflowExecution, job_id) do
case repo.get(WorkflowExecution, job_id, log_opts) do
nil ->
{:error, :not_found}

Expand All @@ -87,7 +89,7 @@ defmodule Durable.Queue.Adapters.Postgres do
locked_by: nil,
locked_at: nil
)
|> repo.update()
|> repo.update(log_opts)

:ok
end
Expand All @@ -96,8 +98,9 @@ defmodule Durable.Queue.Adapters.Postgres do
@impl true
def reschedule(%Config{} = config, job_id, run_at) when is_binary(job_id) do
repo = config.repo
log_opts = [log: config.log_level]

case repo.get(WorkflowExecution, job_id) do
case repo.get(WorkflowExecution, job_id, log_opts) do
nil ->
{:error, :not_found}

Expand All @@ -109,7 +112,7 @@ defmodule Durable.Queue.Adapters.Postgres do
locked_by: nil,
locked_at: nil
)
|> repo.update()
|> repo.update(log_opts)

:ok
end
Expand All @@ -119,6 +122,7 @@ defmodule Durable.Queue.Adapters.Postgres do
def recover_stale_locks(%Config{} = config, timeout_seconds) when timeout_seconds > 0 do
repo = config.repo
cutoff = DateTime.add(DateTime.utc_now(), -timeout_seconds, :second)
log_opts = [log: config.log_level]

{count, _} =
from(w in WorkflowExecution,
Expand All @@ -127,11 +131,14 @@ defmodule Durable.Queue.Adapters.Postgres do
where: w.locked_at < ^cutoff
)
|> repo.update_all(
set: [
status: :pending,
locked_by: nil,
locked_at: nil
]
[
set: [
status: :pending,
locked_by: nil,
locked_at: nil
]
],
log_opts
)

{:ok, count}
Expand All @@ -143,13 +150,14 @@ defmodule Durable.Queue.Adapters.Postgres do
def heartbeat(%Config{} = config, job_id) when is_binary(job_id) do
repo = config.repo
now = DateTime.utc_now()
log_opts = [log: config.log_level]

{count, _} =
from(w in WorkflowExecution,
where: w.id == ^job_id,
where: w.status == :running
)
|> repo.update_all(set: [locked_at: now])
|> repo.update_all([set: [locked_at: now]], log_opts)

if count == 1 do
:ok
Expand All @@ -161,35 +169,36 @@ defmodule Durable.Queue.Adapters.Postgres do
@impl true
def get_stats(%Config{} = config, queue) when is_binary(queue) do
repo = config.repo
log_opts = [log: config.log_level]
base_query = from(w in WorkflowExecution, where: w.queue == ^queue)

pending =
from(w in base_query, where: w.status == :pending)
|> repo.aggregate(:count)
|> repo.aggregate(:count, log_opts)

running =
from(w in base_query, where: w.status == :running)
|> repo.aggregate(:count)
|> repo.aggregate(:count, log_opts)

completed =
from(w in base_query, where: w.status == :completed)
|> repo.aggregate(:count)
|> repo.aggregate(:count, log_opts)

failed =
from(w in base_query, where: w.status == :failed)
|> repo.aggregate(:count)
|> repo.aggregate(:count, log_opts)

waiting =
from(w in base_query, where: w.status == :waiting)
|> repo.aggregate(:count)
|> repo.aggregate(:count, log_opts)

scheduled =
from(w in base_query,
where: w.status == :pending,
where: not is_nil(w.scheduled_at),
where: w.scheduled_at > ^DateTime.utc_now()
)
|> repo.aggregate(:count)
|> repo.aggregate(:count, log_opts)

%{
queue: queue,
Expand Down
6 changes: 4 additions & 2 deletions lib/durable/scheduler/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -348,24 +348,26 @@ defmodule Durable.Scheduler.API do
def get_due_schedules(config) do
repo = config.repo
now = DateTime.utc_now()
log_opts = [log: config.log_level]

from(s in ScheduledWorkflow,
where: s.enabled == true and s.next_run_at <= ^now,
lock: "FOR UPDATE SKIP LOCKED"
)
|> repo.all()
|> repo.all(log_opts)
end

@doc false
def mark_run(schedule, config) do
repo = config.repo
now = DateTime.utc_now()
log_opts = [log: config.log_level]

{:ok, next_run} = compute_next_run(schedule.cron_expression, schedule.timezone)

schedule
|> ScheduledWorkflow.run_changeset(now, next_run)
|> repo.update()
|> repo.update(log_opts)
end

# ============================================================================
Expand Down
38 changes: 25 additions & 13 deletions lib/durable/wait/timeout_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ defmodule Durable.Wait.TimeoutWorker do

defp process_timed_out_inputs(config, now) do
repo = config.repo
log_opts = [log: config.log_level]

# Find pending inputs that have timed out
query =
Expand All @@ -106,19 +107,22 @@ defmodule Durable.Wait.TimeoutWorker do
preload: [:workflow]
)

timed_out = repo.all(query)
timed_out = repo.all(query, log_opts)

Enum.each(timed_out, fn pending_input ->
handle_input_timeout(repo, pending_input, config)
handle_input_timeout(pending_input, config)
end)
end

defp handle_input_timeout(repo, pending_input, config) do
defp handle_input_timeout(pending_input, config) do
repo = config.repo
log_opts = [log: config.log_level]

# Mark as timed out
{:ok, _} =
pending_input
|> PendingInput.timeout_changeset()
|> repo.update()
|> repo.update(log_opts)

# Determine how to handle
on_timeout = pending_input.on_timeout || :resume
Expand Down Expand Up @@ -153,6 +157,7 @@ defmodule Durable.Wait.TimeoutWorker do

defp process_timed_out_events(config, now) do
repo = config.repo
log_opts = [log: config.log_level]

# Find pending events that have timed out (only single events, not in groups)
query =
Expand All @@ -166,19 +171,22 @@ defmodule Durable.Wait.TimeoutWorker do
preload: [:workflow]
)

timed_out = repo.all(query)
timed_out = repo.all(query, log_opts)

Enum.each(timed_out, fn pending_event ->
handle_event_timeout(repo, pending_event, config)
handle_event_timeout(pending_event, config)
end)
end

defp handle_event_timeout(repo, pending_event, config) do
defp handle_event_timeout(pending_event, config) do
repo = config.repo
log_opts = [log: config.log_level]

# Mark as timed out
{:ok, _} =
pending_event
|> PendingEvent.timeout_changeset()
|> repo.update()
|> repo.update(log_opts)

# Resume workflow with timeout value
timeout_value = deserialize_timeout_value(pending_event.timeout_value)
Expand All @@ -202,6 +210,7 @@ defmodule Durable.Wait.TimeoutWorker do

defp process_timed_out_wait_groups(config, now) do
repo = config.repo
log_opts = [log: config.log_level]

# Find wait groups that have timed out
query =
Expand All @@ -213,25 +222,28 @@ defmodule Durable.Wait.TimeoutWorker do
preload: [:workflow]
)

timed_out = repo.all(query)
timed_out = repo.all(query, log_opts)

Enum.each(timed_out, fn wait_group ->
handle_wait_group_timeout(repo, wait_group, config)
handle_wait_group_timeout(wait_group, config)
end)
end

defp handle_wait_group_timeout(repo, wait_group, config) do
defp handle_wait_group_timeout(wait_group, config) do
repo = config.repo
log_opts = [log: config.log_level]

# Mark wait group as timed out
{:ok, _} =
wait_group
|> WaitGroup.timeout_changeset()
|> repo.update()
|> repo.update(log_opts)

# Mark all related pending events as timed out
from(p in PendingEvent,
where: p.wait_group_id == ^wait_group.id and p.status == :pending
)
|> repo.update_all(set: [status: :timeout, completed_at: DateTime.utc_now()])
|> repo.update_all([set: [status: :timeout, completed_at: DateTime.utc_now()]], log_opts)

# Resume workflow with timeout value and partial results
timeout_value = deserialize_timeout_value(wait_group.timeout_value)
Expand Down