Oban Thinking
Paradigm shifts for Oban job processing. These insights prevent common bugs and guide proper patterns.
Part 1: Oban (Non-Pro)
The Iron Law: JSON Serialization
JOB ARGS ARE JSON. ATOMS BECOME STRINGS.
This single fact causes most Oban debugging headaches.
Creating - atom keys are fine
MyWorker.new(%{user_id: 123})
Processing - must use string keys (JSON converted atoms to strings)
def perform(%Oban.Job{args: %{"user_id" => user_id}}) do
...
end
Error Handling: Let It Crash
Don't catch errors in Oban jobs. Let them bubble up to Oban for proper handling.
Why?
-
Automatic logging: Oban logs the full error with stacktrace
-
Automatic retries: Jobs retry with exponential backoff
-
Visibility: Failed jobs appear in Oban Web dashboard
-
Consistency: Error states are tracked in the database
Anti-Pattern
Bad: Swallowing errors
def perform(%Oban.Job{} = job) do case do_work(job.args) do {:ok, result} -> {:ok, result} {:error, reason} -> Logger.error("Failed: #{reason}") {:ok, :failed} # Silently marks as complete! end end
Correct Pattern
Good: Let errors propagate
def perform(%Oban.Job{} = job) do result = do_work!(job.args) # Raises on failure {:ok, result} end
Or return error tuple - Oban treats as failure
def perform(%Oban.Job{} = job) do case do_work(job.args) do {:ok, result} -> {:ok, result} {:error, reason} -> {:error, reason} # Oban will retry end end
When to Catch Errors
Only catch errors when you need custom retry logic or want to mark a job as permanently failed:
def perform(%Oban.Job{} = job) do case external_api_call(job.args) do {:ok, result} -> {:ok, result} {:error, :not_found} -> {:cancel, :resource_not_found} # Don't retry {:error, :rate_limited} -> {:snooze, 60} # Retry in 60 seconds {:error, _} -> {:error, :will_retry} # Normal retry end end
Snoozing for Polling
Use {:snooze, seconds} for polling external state instead of manual retry logic:
def perform(%Oban.Job{} = job) do if external_thing_finished?(job.args) do {:ok, :done} else {:snooze, 5} # Check again in 5 seconds end end
Simple Job Chaining
For simple sequential chains (JobA → JobB → JobC), have each job enqueue the next:
def perform(%Oban.Job{} = job) do result = do_work(job.args)
Enqueue next job on success
NextWorker.new(%{data: result}) |> Oban.insert() {:ok, result} end
Don't reach for Oban Pro Workflows for linear chains.
Unique Jobs
Prevent duplicate jobs with the unique option:
use Oban.Worker, queue: :default, unique: [period: 60] # Only one job with same args per 60 seconds
Or scope uniqueness to specific fields
unique: [period: 300, keys: [:user_id]]
Gotcha: Uniqueness is checked on insert, not execution. Two identical jobs inserted 61 seconds apart will both run.
High Throughput: Chunking
For millions of records, chunk work into batches rather than one job per item:
Bad: One job per contact (millions of jobs = database strain)
Enum.each(contacts, &ContactWorker.new(%{id: &1.id}) |> Oban.insert())
Good: Chunk into batches
contacts |> Enum.chunk_every(100) |> Enum.each(&BatchWorker.new(%{contact_ids: Enum.map(&1, fn c -> c.id end)}) |> Oban.insert())
Use bulk inserts without uniqueness constraints for maximum throughput.
Part 2: Oban Pro
Cascade Context: Erlang Term Serialization
Unlike regular job args, cascade context preserves atoms:
Creating - atom keys
Workflow.put_context(%{score_run_id: id})
Processing - atom keys still work!
def my_cascade(%{score_run_id: id}) do
...
end
Dot notation works too
def later_step(context) do context.score_run_id context.previous_result end
Serialization Summary
Creating Processing
Regular jobs atoms ok strings only
Cascade context atoms ok atoms ok
When to Use Workflows
Reserve Workflows for:
-
Complex dependency graphs (not just linear chains)
-
Fan-out/fan-in patterns
-
When you need recorded values across steps
-
Conditional branching based on runtime state
Don't use Workflows for simple A → B → C chains.
Workflow Composition with Graft
When you need a parent workflow to wait for a sub-workflow to complete before continuing, use add_graft instead of add_workflow .
Key Differences
Method Sub-workflow completes before deps run? Output accessible?
add_workflow
No - just inserts jobs No
add_graft
Yes - waits for all jobs Yes, via recorded values
Pattern: Composing Independent Concerns
Don't couple unrelated concerns (e.g., notifications) to domain-specific workflows (e.g., scoring). Instead, create a higher-level orchestrator:
Bad: Notification logic buried in AggregateScores
defmodule AggregateScores do def workflow(score_run_id) do Workflow.new() |> Workflow.add(:aggregate, AggregateJob.new(...)) |> Workflow.add(:send_notification, SendEmail.new(...), deps: :aggregate) # Wrong place! end end
Good: Higher-level workflow composes scoring + notification
defmodule FullRunWithNotifications do def workflow(site_url, opts) do notification_opts = build_notification_opts(opts)
Workflow.new()
|> Workflow.put_context(%{notification_opts: notification_opts})
|> Workflow.add_graft(:scoring, &graft_full_run/1)
|> Workflow.add_cascade(:send_notification, &send_notification/1, deps: :scoring)
end
defp graft_full_run(context) do # Sub-workflow doesn't know about notifications FullRun.workflow(context.site_url, context.opts) |> Workflow.apply_graft() |> Oban.insert_all() end end
Recording Values for Dependent Steps
For a grafted workflow's output to be available to dependent steps, the final job must use recorded: true :
defmodule FinalJob do use Oban.Pro.Worker, queue: :default, recorded: true
def perform(%Oban.Job{} = job) do # Return value becomes available in context {:ok, %{score_run_id: score_run_id, composite_score: score}} end end
Dynamic Workflow Appending
Add jobs to a running workflow with Workflow.append/2 :
def perform(%Oban.Job{} = job) do if needs_extra_step?(job.args) do job |> Workflow.append() |> Workflow.add(:extra, ExtraWorker.new(%{}), deps: [:current_step]) |> Oban.insert_all() end {:ok, :done} end
Caveat: Cannot override context or add dependencies to already-running jobs. For complex dynamic scenarios, check external state in the job itself.
Fan-Out/Fan-In with Batches
To run a final job after multiple paginated workflows complete, use Batch callbacks:
Wrap workflows in a shared batch
batch_id = "import-#{import_id}"
pages |> Enum.each(fn page -> PageWorkflow.workflow(page) |> Batch.from_workflow(batch_id: batch_id) |> Oban.insert_all() end)
Add completion callback
Batch.new(batch_id: batch_id) |> Batch.add_callback(:completed, CompletionWorker) |> Oban.insert()
Tip: Include pagination workers in the batch to prevent premature completion.
Testing Workflows
Don't use inline testing mode - workflows need database interaction.
Use run_workflow/1 for integration tests
assert %{completed: 3} = Workflow.new() |> Workflow.add(:a, WorkerA.new(%{})) |> Workflow.add(:b, WorkerB.new(%{}), deps: [:a]) |> Workflow.add(:c, WorkerC.new(%{}), deps: [:b]) |> run_workflow()
For testing recorded values between workers, insert predecessor jobs with pre-filled metadata.
Red Flags - STOP and Reconsider
Non-Pro:
-
Pattern matching on atom keys in perform/1
-
Catching all errors and returning {:ok, _}
-
Wrapping job logic in try/rescue
-
Creating one job per item when processing millions of records
Pro:
-
Using add_workflow when you need to wait for completion
-
Coupling notifications/emails to domain workflows
-
Not using recorded: true when you need output from grafted workflows
-
Using Workflows for simple linear job chains
-
Testing workflows with inline mode
Any of these? Re-read the serialization rules.