diff --git a/CLAUDE.md b/CLAUDE.md index 61be1c3..8a9c885 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -114,6 +114,7 @@ children = [ - `:queue_enabled` - Enable queue processing (default: `true`) - `:stale_lock_timeout` - Seconds before lock is stale (default: `300`) - `:heartbeat_interval` - Worker heartbeat interval in ms (default: `30_000`) +- `:log_level` - Log level for internal queries, or `false` to disable (default: `false`) ## Testing diff --git a/lib/durable/config.ex b/lib/durable/config.ex index 333b1e6..a14aff1 100644 --- a/lib/durable/config.ex +++ b/lib/durable/config.ex @@ -14,6 +14,7 @@ defmodule Durable.Config do * `:queue_enabled` - Enable/disable queue processing (default: `true`) * `:stale_lock_timeout` - Seconds before a lock is considered stale (default: `300`) * `:heartbeat_interval` - Milliseconds between worker heartbeats (default: `30_000`) + * `:log_level` - Log level for internal Durable queries, or `false` to disable (default: `false`) ## Examples @@ -43,7 +44,8 @@ defmodule Durable.Config do stale_lock_timeout: pos_integer(), heartbeat_interval: pos_integer(), scheduled_modules: [module()], - scheduler_interval: pos_integer() + scheduler_interval: pos_integer(), + log_level: Logger.level() | false } defstruct [ @@ -55,7 +57,8 @@ defmodule Durable.Config do :stale_lock_timeout, :heartbeat_interval, :scheduled_modules, - :scheduler_interval + :scheduler_interval, + :log_level ] @schema [ @@ -103,6 +106,12 @@ defmodule Durable.Config do type: :pos_integer, default: 60_000, doc: "Milliseconds between scheduler polls for due schedules" + ], + log_level: [ + type: + {:in, [false, :emergency, :alert, :critical, :error, :warning, :notice, :info, :debug]}, + default: false, + doc: "Log level for internal Durable queries, or false to disable logging" ] ] @@ -229,6 +238,16 @@ defmodule Durable.Config do get(name).heartbeat_interval end + @doc """ + Gets the log level for internal Durable queries. + + Returns `false` to disable logging, or a Logger level atom. + """ + @spec log_level(atom()) :: Logger.level() | false + def log_level(name \\ Durable) do + get(name).log_level + end + @doc """ Returns the Task.Supervisor name for a Durable instance. diff --git a/lib/durable/queue/adapters/postgres.ex b/lib/durable/queue/adapters/postgres.ex index 8ebaf5d..0e6db93 100644 --- a/lib/durable/queue/adapters/postgres.ex +++ b/lib/durable/queue/adapters/postgres.ex @@ -39,7 +39,7 @@ defmodule Durable.Queue.Adapters.Postgres do RETURNING id, workflow_module, workflow_name, queue, priority, input, context, scheduled_at, current_step; """ - case SQL.query(repo, sql, [queue, limit, node_id]) do + case SQL.query(repo, sql, [queue, limit, node_id], log: config.log_level) do {:ok, %{rows: rows, columns: columns}} -> rows |> Enum.map(&parse_row(&1, columns)) @@ -54,15 +54,16 @@ defmodule Durable.Queue.Adapters.Postgres do @impl true def ack(%Config{} = config, job_id) when is_binary(job_id) do repo = config.repo + log_opts = [log: config.log_level] - case repo.get(WorkflowExecution, job_id) do + case repo.get(WorkflowExecution, job_id, log_opts) do nil -> {:error, :not_found} execution -> execution |> WorkflowExecution.unlock_changeset() - |> repo.update() + |> repo.update(log_opts) :ok end @@ -71,8 +72,9 @@ defmodule Durable.Queue.Adapters.Postgres do @impl true def nack(%Config{} = config, job_id, reason) when is_binary(job_id) do repo = config.repo + log_opts = [log: config.log_level] - case repo.get(WorkflowExecution, job_id) do + case repo.get(WorkflowExecution, job_id, log_opts) do nil -> {:error, :not_found} @@ -87,7 +89,7 @@ defmodule Durable.Queue.Adapters.Postgres do locked_by: nil, locked_at: nil ) - |> repo.update() + |> repo.update(log_opts) :ok end @@ -96,8 +98,9 @@ defmodule Durable.Queue.Adapters.Postgres do @impl true def reschedule(%Config{} = config, job_id, run_at) when is_binary(job_id) do repo = config.repo + log_opts = [log: config.log_level] - case repo.get(WorkflowExecution, job_id) do + case repo.get(WorkflowExecution, job_id, log_opts) do nil -> {:error, :not_found} @@ -109,7 +112,7 @@ defmodule Durable.Queue.Adapters.Postgres do locked_by: nil, locked_at: nil ) - |> repo.update() + |> repo.update(log_opts) :ok end @@ -119,6 +122,7 @@ defmodule Durable.Queue.Adapters.Postgres do def recover_stale_locks(%Config{} = config, timeout_seconds) when timeout_seconds > 0 do repo = config.repo cutoff = DateTime.add(DateTime.utc_now(), -timeout_seconds, :second) + log_opts = [log: config.log_level] {count, _} = from(w in WorkflowExecution, @@ -127,11 +131,14 @@ defmodule Durable.Queue.Adapters.Postgres do where: w.locked_at < ^cutoff ) |> repo.update_all( - set: [ - status: :pending, - locked_by: nil, - locked_at: nil - ] + [ + set: [ + status: :pending, + locked_by: nil, + locked_at: nil + ] + ], + log_opts ) {:ok, count} @@ -143,13 +150,14 @@ defmodule Durable.Queue.Adapters.Postgres do def heartbeat(%Config{} = config, job_id) when is_binary(job_id) do repo = config.repo now = DateTime.utc_now() + log_opts = [log: config.log_level] {count, _} = from(w in WorkflowExecution, where: w.id == ^job_id, where: w.status == :running ) - |> repo.update_all(set: [locked_at: now]) + |> repo.update_all([set: [locked_at: now]], log_opts) if count == 1 do :ok @@ -161,27 +169,28 @@ defmodule Durable.Queue.Adapters.Postgres do @impl true def get_stats(%Config{} = config, queue) when is_binary(queue) do repo = config.repo + log_opts = [log: config.log_level] base_query = from(w in WorkflowExecution, where: w.queue == ^queue) pending = from(w in base_query, where: w.status == :pending) - |> repo.aggregate(:count) + |> repo.aggregate(:count, log_opts) running = from(w in base_query, where: w.status == :running) - |> repo.aggregate(:count) + |> repo.aggregate(:count, log_opts) completed = from(w in base_query, where: w.status == :completed) - |> repo.aggregate(:count) + |> repo.aggregate(:count, log_opts) failed = from(w in base_query, where: w.status == :failed) - |> repo.aggregate(:count) + |> repo.aggregate(:count, log_opts) waiting = from(w in base_query, where: w.status == :waiting) - |> repo.aggregate(:count) + |> repo.aggregate(:count, log_opts) scheduled = from(w in base_query, @@ -189,7 +198,7 @@ defmodule Durable.Queue.Adapters.Postgres do where: not is_nil(w.scheduled_at), where: w.scheduled_at > ^DateTime.utc_now() ) - |> repo.aggregate(:count) + |> repo.aggregate(:count, log_opts) %{ queue: queue, diff --git a/lib/durable/scheduler/api.ex b/lib/durable/scheduler/api.ex index 565cb1e..c4a6b74 100644 --- a/lib/durable/scheduler/api.ex +++ b/lib/durable/scheduler/api.ex @@ -348,24 +348,26 @@ defmodule Durable.Scheduler.API do def get_due_schedules(config) do repo = config.repo now = DateTime.utc_now() + log_opts = [log: config.log_level] from(s in ScheduledWorkflow, where: s.enabled == true and s.next_run_at <= ^now, lock: "FOR UPDATE SKIP LOCKED" ) - |> repo.all() + |> repo.all(log_opts) end @doc false def mark_run(schedule, config) do repo = config.repo now = DateTime.utc_now() + log_opts = [log: config.log_level] {:ok, next_run} = compute_next_run(schedule.cron_expression, schedule.timezone) schedule |> ScheduledWorkflow.run_changeset(now, next_run) - |> repo.update() + |> repo.update(log_opts) end # ============================================================================ diff --git a/lib/durable/wait/timeout_worker.ex b/lib/durable/wait/timeout_worker.ex index d9dc035..f7028a8 100644 --- a/lib/durable/wait/timeout_worker.ex +++ b/lib/durable/wait/timeout_worker.ex @@ -95,6 +95,7 @@ defmodule Durable.Wait.TimeoutWorker do defp process_timed_out_inputs(config, now) do repo = config.repo + log_opts = [log: config.log_level] # Find pending inputs that have timed out query = @@ -106,19 +107,22 @@ defmodule Durable.Wait.TimeoutWorker do preload: [:workflow] ) - timed_out = repo.all(query) + timed_out = repo.all(query, log_opts) Enum.each(timed_out, fn pending_input -> - handle_input_timeout(repo, pending_input, config) + handle_input_timeout(pending_input, config) end) end - defp handle_input_timeout(repo, pending_input, config) do + defp handle_input_timeout(pending_input, config) do + repo = config.repo + log_opts = [log: config.log_level] + # Mark as timed out {:ok, _} = pending_input |> PendingInput.timeout_changeset() - |> repo.update() + |> repo.update(log_opts) # Determine how to handle on_timeout = pending_input.on_timeout || :resume @@ -153,6 +157,7 @@ defmodule Durable.Wait.TimeoutWorker do defp process_timed_out_events(config, now) do repo = config.repo + log_opts = [log: config.log_level] # Find pending events that have timed out (only single events, not in groups) query = @@ -166,19 +171,22 @@ defmodule Durable.Wait.TimeoutWorker do preload: [:workflow] ) - timed_out = repo.all(query) + timed_out = repo.all(query, log_opts) Enum.each(timed_out, fn pending_event -> - handle_event_timeout(repo, pending_event, config) + handle_event_timeout(pending_event, config) end) end - defp handle_event_timeout(repo, pending_event, config) do + defp handle_event_timeout(pending_event, config) do + repo = config.repo + log_opts = [log: config.log_level] + # Mark as timed out {:ok, _} = pending_event |> PendingEvent.timeout_changeset() - |> repo.update() + |> repo.update(log_opts) # Resume workflow with timeout value timeout_value = deserialize_timeout_value(pending_event.timeout_value) @@ -202,6 +210,7 @@ defmodule Durable.Wait.TimeoutWorker do defp process_timed_out_wait_groups(config, now) do repo = config.repo + log_opts = [log: config.log_level] # Find wait groups that have timed out query = @@ -213,25 +222,28 @@ defmodule Durable.Wait.TimeoutWorker do preload: [:workflow] ) - timed_out = repo.all(query) + timed_out = repo.all(query, log_opts) Enum.each(timed_out, fn wait_group -> - handle_wait_group_timeout(repo, wait_group, config) + handle_wait_group_timeout(wait_group, config) end) end - defp handle_wait_group_timeout(repo, wait_group, config) do + defp handle_wait_group_timeout(wait_group, config) do + repo = config.repo + log_opts = [log: config.log_level] + # Mark wait group as timed out {:ok, _} = wait_group |> WaitGroup.timeout_changeset() - |> repo.update() + |> repo.update(log_opts) # Mark all related pending events as timed out from(p in PendingEvent, where: p.wait_group_id == ^wait_group.id and p.status == :pending ) - |> repo.update_all(set: [status: :timeout, completed_at: DateTime.utc_now()]) + |> repo.update_all([set: [status: :timeout, completed_at: DateTime.utc_now()]], log_opts) # Resume workflow with timeout value and partial results timeout_value = deserialize_timeout_value(wait_group.timeout_value)