# Error Handling

When a task fails, you need a way to run cleanup logic, send notifications, or trigger compensating actions. Both durable tasks and DAGs support error handling, but the mechanism differs: durable tasks use standard try/catch blocks, while DAGs declare a special on-failure task.

#### Durable Tasks

## Try/Catch in Durable Tasks

Durable tasks are regular functions, so you handle errors with your language's native error handling (`try`/`except` in Python, `try`/`catch` in TypeScript/Go). This gives you full control over what happens when a child task or operation fails.

### Handling child task errors

When spawning child tasks, wrap the call in a try/catch block to handle failures gracefully:

#### Python

```python
try:
    child_wf.run(
        ChildInput(a="b"),
    )
except Exception as e:
    print(f"Child workflow failed: {e}")
```

#### Typescript

```typescript
export const withErrorHandling = hatchet.task({
  name: 'parent-error-handling',
  fn: async () => {
    try {
      const childRes = await child.run({ N: 1 });

      return {
        Result: childRes.Value,
      };
    } catch (error) {
      // decide how to proceed here
      return {
        Result: -1,
      };
    }
  },
});
```

#### Go

```go
result, err := childWorkflow.Run(hCtx, ChildInput{Value: 1})
if err != nil {
	// Handle error from child workflow
	fmt.Printf("Child workflow failed: %v\n", err)
	// Decide how to proceed - retry, skip, or fail the parent
}
```

#### Ruby

```ruby
begin
  FANOUT_CHILD_WF.run({ "a" => "b" })
rescue StandardError => e
  puts "Child workflow failed: #{e.message}"
end
```

### Common patterns

- **Retry with backoff** — Catch the error, sleep, and retry the child task.
- **Fallback logic** — If a primary path fails, spawn a different child task as a fallback.
- **Partial failure handling** — In a fan-out, collect results from successful children and handle failures individually rather than failing the entire workflow.
- **Cleanup** — Release resources, cancel in-progress work, or notify external systems.

#### DAGs

## On-Failure Tasks

The on-failure task is a special task that runs when any task in the workflow fails. It lets you handle errors, perform cleanup, or trigger notifications declaratively as part of the workflow definition.

### Defining an on-failure task

You can define an on-failure task on your workflow the same as you'd define any other task:

#### Python

```python
# This workflow will fail because the step will throw an error
# we define an onFailure step to handle this case

on_failure_wf = hatchet.workflow(name="OnFailureWorkflow")


@on_failure_wf.task(execution_timeout=timedelta(seconds=1))
def step1(input: EmptyModel, ctx: Context) -> None:
    # 👀 this step will always raise an exception
    raise Exception(ERROR_TEXT)


# 👀 After the workflow fails, this special step will run
@on_failure_wf.on_failure_task()
def on_failure(input: EmptyModel, ctx: Context) -> dict[str, str]:
    # 👀 we can do things like perform cleanup logic
    # or notify a user here

    # 👀 Fetch the errors from upstream step runs from the context
    print(ctx.task_run_errors)

    return {"status": "success"}
```

Note: Only one on-failure task can be defined per workflow.


#### Typescript

```typescript
// This workflow will fail because `step1` throws. We define an `onFailure` handler to run cleanup.
export const failureWorkflow = hatchet.workflow({
  name: 'on-failure-workflow',
});

failureWorkflow.task({
  name: 'step1',
  executionTimeout: '1s',
  fn: async () => {
    throw new Error(ERROR_TEXT);
  },
});

// 👀 After the workflow fails, this special step will run
failureWorkflow.onFailure({
  name: 'on_failure',
  fn: async (_input, ctx) => {
    console.log('onFailure for run:', ctx.workflowRunId());
    console.log('upstream errors:', ctx.errors());

    return {
      status: 'success',
    };
  },
});
```

#### Go

```go
multiStepWorkflow.OnFailure(func(ctx hatchet.Context, input FailureInput) (FailureHandlerOutput, error) {
	log.Printf("Multi-step failure handler called for input: %s", input.Message)

	stepErrors := ctx.StepRunErrors()
	var errorDetails string
	for stepName, errorMsg := range stepErrors {
		log.Printf("Multi-step: Step '%s' failed with error: %s", stepName, errorMsg)
		errorDetails += stepName + ": " + errorMsg + "; "
	}

	// Access successful step outputs for cleanup
	var step1Output TaskOutput
	if err := ctx.StepOutput("first-step", &step1Output); err == nil {
		log.Printf("First step completed successfully with: %s", step1Output.Message)
	}

	return FailureHandlerOutput{
		FailureHandled: true,
		ErrorDetails:   "Multi-step workflow failed: " + errorDetails,
		OriginalInput:  input.Message,
	}, nil
})
```

#### Ruby

```ruby
# This workflow will fail because the step will throw an error
# we define an onFailure step to handle this case

ON_FAILURE_WF = HATCHET.workflow(name: "OnFailureWorkflow")

ON_FAILURE_WF.task(:step1, execution_timeout: 1) do |input, ctx|
  # This step will always raise an exception
  raise ERROR_TEXT
end

# After the workflow fails, this special step will run
ON_FAILURE_WF.on_failure_task do |input, ctx|
  # We can do things like perform cleanup logic
  # or notify a user here

  # Fetch the errors from upstream step runs from the context
  puts ctx.task_run_errors.inspect

  { "status" => "success" }
end
```

The on-failure task will be executed only if any of the main tasks in the workflow fail.

### Use cases

- Performing cleanup tasks after a task failure in a workflow
- Sending notifications or alerts about the failure
- Logging additional information for debugging purposes
- Triggering a compensating action or a fallback task
