# Conditions & Branching

Workflows often need to branch: run different paths depending on data, skip steps when conditions aren't met, or wait for a combination of signals before proceeding. Both durable tasks and DAGs support conditional logic, but the approach differs.


#### Durable Tasks

## Procedural Branching

Durable tasks use standard language control flow (`if`/`else`, `match`, loops) to branch at runtime. Because the task is a single long-running function, you can make decisions based on any data available during execution: inputs, intermediate results, API responses, or child task outputs.

```python
@workflow.durable_task()
async def process(input: ProcessInput, ctx: DurableContext):
    result = await ctx.run_child(analyze_task, input)

    if result["score"] > 0.8:
        await ctx.run_child(fast_path_task, result)
    else:
        await ctx.run_child(slow_path_task, result)
        await ctx.run_child(review_task, result)
```

This is one of the key advantages of durable tasks: branching logic is expressed directly in code, making it easy to handle complex, dynamic flows. Each branch can spawn different children, sleep for different durations, or wait for different events.

> **Warning:** Branching logic must be **deterministic** between checkpoints. If the task is
>   evicted and replayed, the same branches must execute in the same order. Base
>   decisions on checkpoint outputs (child results, event payloads) rather than
>   wall-clock time or external state that may change between replays. See [Best
>   Practices](/v1/patterns/mixing-patterns#determinism-in-durable-tasks) for
>   details.

## Or Groups

Durable tasks can combine multiple wait conditions using or groups. An or group evaluates to `True` if **at least one** of its conditions is satisfied, letting you express "proceed on timeout or event, whichever comes first."

#### Python

```python
@durable_workflow.durable_task()
async def wait_for_or_group_1(
    _i: EmptyModel, ctx: DurableContext
) -> dict[str, str | int | float]:
    start = time.time()
    wait_result = await ctx.aio_wait_for(
        uuid4().hex,
        or_(
            SleepCondition(timedelta(seconds=SLEEP_TIME)),
            UserEventCondition(event_key=EVENT_KEY),
        ),
    )

    key = list(wait_result.keys())[0]
    event_id = list(wait_result[key].keys())[0]

    return {
        "runtime": time.time() - start,
        "key": key,
        "event_id": event_id,
    }
```

`or_()` wraps a `SleepCondition` and a `UserEventCondition` into a single or group. The task will resume as soon as either the sleep expires or the event arrives.

#### Typescript

```typescript
export const durableEvent = hatchet.durableTask({
  name: 'durable-event',
  executionTimeout: '10m',
  fn: async (_, ctx) => {
    const res = await ctx.waitForEvent(EVENT_KEY);

    console.log('res', res);

    return {
      Value: 'done',
    };
  },
});
```

#### Go

```go
task := client.NewStandaloneDurableTask("long-running-task", func(ctx hatchet.DurableContext, input DurableInput) (DurableOutput, error) {
	log.Printf("Starting task, will sleep for %d seconds", input.Delay)

	if _, err := ctx.WaitForEvent("user:updated", ""); err != nil {
		return DurableOutput{}, err
	}

	log.Printf("Finished waiting for event, processing message: %s", input.Message)

	return DurableOutput{
		ProcessedAt: time.Now().Format(time.RFC3339),
		Message:     "Processed: " + input.Message,
	}, nil
})
```

#### Ruby

```ruby
DURABLE_EVENT_TASK = HATCHET.durable_task(name: "DurableEventTask") do |input, ctx|
  res = ctx.wait_for(
    "event",
    Hatchet::UserEventCondition.new(event_key: "user:update")
  )

  puts "got event #{res}"
end

DURABLE_EVENT_TASK_WITH_FILTER = HATCHET.durable_task(name: "DurableEventWithFilterTask") do |input, ctx|
```

#### DAGs

## Parent Conditions

Parent conditions let a DAG task decide whether to run based on the output of a parent task. This enables branching logic within a DAG: different paths can execute depending on runtime data, while the overall graph structure remains fixed and visible in the dashboard.

Parent conditions can be used with two operators:

- **`skip_if`** — skip the task if the parent output matches the condition.
- **`cancel_if`** — cancel the task (and its downstream dependents) if the parent output matches the condition.

> **Warning:** A task cancelled by `cancel_if` behaves like any other cancellation in Hatchet
>   — downstream tasks will be cancelled as well.

### Branching example

A common pattern is to create two sibling tasks with complementary parent conditions. For example, one task runs when a value is greater than 50 and the other runs when it is less than or equal to 50. Only one branch executes per run.

First, declare a base task that returns a value:

#### Python

```python
@task_condition_workflow.task()
def start(input: EmptyModel, ctx: Context) -> StepOutput:
    return StepOutput(random_number=random.randint(1, 100))
```

#### Typescript

```typescript
const start = taskConditionWorkflow.task({
  name: 'start',
  fn: () => {
    return {
      randomNumber: Math.floor(Math.random() * 100) + 1,
    };
  },
});
```

#### Go

```go
start := workflow.NewTask("start", func(ctx hatchet.Context, _ any) (StepOutput, error) {
	return StepOutput{RandomNumber: rand.Intn(100) + 1}, nil //nolint:gosec
})
```

#### Ruby

```ruby
COND_START = TASK_CONDITION_WORKFLOW.task(:start) do |input, ctx|
  { "random_number" => rand(1..100) }
end
```

Then add two branches that use `ParentCondition` with `skip_if`:

#### Python

```python
@task_condition_workflow.task(
    parents=[wait_for_sleep],
    skip_if=[
        ParentCondition(
            parent=wait_for_sleep,
            expression="output.random_number > 50",
        )
    ],
)
def left_branch(input: EmptyModel, ctx: Context) -> StepOutput:
    return StepOutput(random_number=random.randint(1, 100))


@task_condition_workflow.task(
    parents=[wait_for_sleep],
    skip_if=[
        ParentCondition(
            parent=wait_for_sleep,
            expression="output.random_number <= 50",
        )
    ],
)
def right_branch(input: EmptyModel, ctx: Context) -> StepOutput:
    return StepOutput(random_number=random.randint(1, 100))
```

#### Typescript

```typescript
const leftBranch = taskConditionWorkflow.task({
  name: 'leftBranch',
  parents: [waitForSleep],
  skipIf: [new ParentCondition(waitForSleep, 'output.randomNumber > 50')],
  fn: () => {
    return {
      randomNumber: Math.floor(Math.random() * 100) + 1,
    };
  },
});

const rightBranch = taskConditionWorkflow.task({
  name: 'rightBranch',
  parents: [waitForSleep],
  skipIf: [new ParentCondition(waitForSleep, 'output.randomNumber <= 50')],
  fn: () => {
    return {
      randomNumber: Math.floor(Math.random() * 100) + 1,
    };
  },
});
```

#### Go

```go
leftBranch := workflow.NewTask("left-branch", func(ctx hatchet.Context, _ any) (StepOutput, error) {
	return StepOutput{RandomNumber: rand.Intn(100) + 1}, nil //nolint:gosec
},
	hatchet.WithParents(waitForSleep),
	hatchet.WithSkipIf(hatchet.ParentCondition(waitForSleep, "output.random_number > 50")),
)

rightBranch := workflow.NewTask("right-branch", func(ctx hatchet.Context, _ any) (StepOutput, error) {
	return StepOutput{RandomNumber: rand.Intn(100) + 1}, nil //nolint:gosec
},
	hatchet.WithParents(waitForSleep),
	hatchet.WithSkipIf(hatchet.ParentCondition(waitForSleep, "output.random_number <= 50")),
)
```

#### Ruby

```ruby
LEFT_BRANCH = TASK_CONDITION_WORKFLOW.task(
  :left_branch,
  parents: [WAIT_FOR_SLEEP],
  skip_if: [
    Hatchet::ParentCondition.new(
      parent: WAIT_FOR_SLEEP,
      expression: "output.random_number > 50"
    )
  ]
) do |input, ctx|
  { "random_number" => rand(1..100) }
end

RIGHT_BRANCH = TASK_CONDITION_WORKFLOW.task(
  :right_branch,
  parents: [WAIT_FOR_SLEEP],
  skip_if: [
    Hatchet::ParentCondition.new(
      parent: WAIT_FOR_SLEEP,
      expression: "output.random_number <= 50"
    )
  ]
) do |input, ctx|
  { "random_number" => rand(1..100) }
end
```

These two tasks check whether the output of the base task was greater or less than `50`, respectively. Only one of the two will run per workflow execution.

### Checking if a task was skipped

Downstream tasks can check whether a parent was skipped using `ctx.was_skipped`:

#### Python

```python
@task_condition_workflow.task(
    parents=[
        start,
        wait_for_sleep,
        wait_for_event,
        skip_on_event,
        left_branch,
        right_branch,
    ],
)
def sum(input: EmptyModel, ctx: Context) -> RandomSum:
    one = ctx.task_output(start).random_number
    two = ctx.task_output(wait_for_event).random_number
    three = ctx.task_output(wait_for_sleep).random_number
    four = (
        ctx.task_output(skip_on_event).random_number
        if not ctx.was_skipped(skip_on_event)
        else 0
    )

    five = (
        ctx.task_output(left_branch).random_number
        if not ctx.was_skipped(left_branch)
        else 0
    )
    six = (
        ctx.task_output(right_branch).random_number
        if not ctx.was_skipped(right_branch)
        else 0
    )

    return RandomSum(sum=one + two + three + four + five + six)
```

#### Typescript

```typescript
taskConditionWorkflow.task({
  name: 'sum',
  parents: [start, waitForSleep, waitForEvent, skipOnEvent, leftBranch, rightBranch],
  fn: async (_, ctx: Context<any, any>) => {
    const one = (await ctx.parentOutput(start)).randomNumber;
    const two = (await ctx.parentOutput(waitForEvent)).randomNumber;
    const three = (await ctx.parentOutput(waitForSleep)).randomNumber;
    const four = (await ctx.parentOutput(skipOnEvent))?.randomNumber || 0;
    const five = (await ctx.parentOutput(leftBranch))?.randomNumber || 0;
    const six = (await ctx.parentOutput(rightBranch))?.randomNumber || 0;

    return {
      sum: one + two + three + four + five + six,
    };
  },
});
```

#### Go

```go
_ = workflow.NewTask("sum", func(ctx hatchet.Context, _ any) (RandomSum, error) {
	var startOut StepOutput
	err := ctx.ParentOutput(start, &startOut)
	if err != nil {
		return RandomSum{}, err
	}

	var waitForEventOut StepOutput
	err = ctx.ParentOutput(waitForEvent, &waitForEventOut)
	if err != nil {
		return RandomSum{}, err
	}

	var waitForSleepOut StepOutput
	err = ctx.ParentOutput(waitForSleep, &waitForSleepOut)
	if err != nil {
		return RandomSum{}, err
	}

	total := startOut.RandomNumber + waitForEventOut.RandomNumber + waitForSleepOut.RandomNumber

	if !ctx.WasSkipped(skipOnEvent) {
		var out StepOutput
		err = ctx.ParentOutput(skipOnEvent, &out)
		if err == nil {
			total += out.RandomNumber
		}
	}

	if !ctx.WasSkipped(leftBranch) {
		var out StepOutput
		err = ctx.ParentOutput(leftBranch, &out)
		if err == nil {
			total += out.RandomNumber
		}
	}

	if !ctx.WasSkipped(rightBranch) {
		var out StepOutput
		err = ctx.ParentOutput(rightBranch, &out)
		if err == nil {
			total += out.RandomNumber
		}
	}

	return RandomSum{Sum: total}, nil
}, hatchet.WithParents(
	start,
	waitForSleep,
	waitForEvent,
	skipOnEvent,
	leftBranch,
	rightBranch,
))
```

#### Ruby

```ruby
TASK_CONDITION_WORKFLOW.task(
  :sum,
  parents: [COND_START, WAIT_FOR_SLEEP, WAIT_FOR_EVENT, SKIP_ON_EVENT, LEFT_BRANCH, RIGHT_BRANCH]
) do |input, ctx|
  one = ctx.task_output(COND_START)["random_number"]
  two = ctx.task_output(WAIT_FOR_EVENT)["random_number"]
  three = ctx.task_output(WAIT_FOR_SLEEP)["random_number"]
  four = ctx.was_skipped?(SKIP_ON_EVENT) ? 0 : ctx.task_output(SKIP_ON_EVENT)["random_number"]
  five = ctx.was_skipped?(LEFT_BRANCH) ? 0 : ctx.task_output(LEFT_BRANCH)["random_number"]
  six = ctx.was_skipped?(RIGHT_BRANCH) ? 0 : ctx.task_output(RIGHT_BRANCH)["random_number"]

  { "sum" => one + two + three + four + five + six }
end
```

## Or Groups

DAG tasks can declare multiple conditions that work together to control when and whether a task runs. Conditions of different types (parent conditions, [event conditions](/v1/events), and [sleep conditions](/v1/sleep)) can be mixed on a single task using **or groups**.

An **or group** is a set of conditions combined with an `Or` operator. The group evaluates to `True` if **at least one** of its conditions is satisfied. Multiple or groups on the same task are combined with `AND`, so every group must have at least one satisfied condition for the task to proceed.

This lets you express arbitrarily complex sets of conditions in [conjunctive normal form](https://en.wikipedia.org/wiki/Conjunctive_normal_form) (CNF).

### Sleep + Event example

The most common combination is a sleep condition with an event condition: proceed when an external signal arrives _or_ after a timeout (whichever comes first). This is ideal for human-in-the-loop workflows where you want a deadline.

#### Python

```python
@task_condition_workflow.task(
    parents=[start],
    wait_for=[
        or_(
            SleepCondition(duration=timedelta(minutes=1)),
            UserEventCondition(event_key="wait_for_event:start"),
        )
    ],
)
def wait_for_event(input: EmptyModel, ctx: Context) -> StepOutput:
    return StepOutput(random_number=random.randint(1, 100))
```

`or_()` wraps a `SleepCondition` and a `UserEventCondition` into a single or group. The task will start as soon as either the sleep expires or the event arrives.

#### Typescript

```typescript
const waitForEvent = taskConditionWorkflow.task({
  name: 'waitForEvent',
  parents: [start],
  waitFor: [Or(new SleepCondition('1m'), new UserEventCondition('wait_for_event:start', 'true'))],
  fn: () => {
    return {
      randomNumber: Math.floor(Math.random() * 100) + 1,
    };
  },
});
```

`Or()` wraps a `SleepCondition` and a `UserEventCondition` into a single or group. The task will start as soon as either the sleep expires or the event arrives.

#### Go

```go
waitForEvent := workflow.NewTask("wait-for-event", func(ctx hatchet.Context, _ any) (StepOutput, error) {
	return StepOutput{RandomNumber: rand.Intn(100) + 1}, nil //nolint:gosec
},
	hatchet.WithParents(start),
	hatchet.WithWaitFor(hatchet.OrCondition(
		hatchet.SleepCondition(1*time.Minute),
		hatchet.UserEventCondition("wait_for_event:start", ""),
	)),
)
```

`hatchet.WithWaitFor` and `hatchet.WithSkipIf` attach conditions to the task. The task will wait for the sleep to expire before starting, and will be skipped if the event arrives.

#### Ruby

```ruby
WAIT_FOR_EVENT = TASK_CONDITION_WORKFLOW.task(
  :wait_for_event,
  parents: [COND_START],
  wait_for: [
    Hatchet.or_(
      Hatchet::SleepCondition.new(60),
      Hatchet::UserEventCondition.new(event_key: "wait_for_event:start")
    )
  ]
) do |input, ctx|
  { "random_number" => rand(1..100) }
end
```

`Hatchet.or_()` wraps a `SleepCondition` and a `UserEventCondition` into a single or group. The task will start as soon as either the sleep expires or the event arrives.

### Multiple or groups

For more complex logic, you can declare multiple or groups on a single task. Consider three conditions:

- **Condition A**: Parent output is greater than 50
- **Condition B**: Sleep for 30 seconds
- **Condition C**: Receive the `payment:processed` event

To proceed if (A _or_ B) **and** (A _or_ C), declare two or groups:

1. Group 1: `A or B`
2. Group 2: `A or C`

The task will run once both groups are satisfied. If A is true, both groups pass immediately. If A is false, the task needs both B (sleep expires) and C (event arrives).

### Common combinations

Combination, Use case

Sleep + Event, Proceed after a timeout _or_ when an external signal arrives (whichever comes first)
Parent + Event, Proceed if a parent output meets a threshold _or_ a manual override event arrives
Parent + Sleep, Proceed if a parent indicates readiness _or_ after a maximum wait time
All three, Complex gates combining data-driven, time-based, and event-driven conditions
