# Declarative Workflow Design (DAGs)

Hatchet workflows are designed in a **Directed Acyclic Graph (DAG)** format, where each task is a node in the graph, and the dependencies between tasks are the edges. This structure ensures that workflows are organized, predictable, and free from circular dependencies.


## How DAG Workflows Work


### You declare the graph

Define tasks and their dependencies upfront. Hatchet knows the full shape of work before execution begins.

### Hatchet executes in order

Tasks run as soon as their parents complete. Independent tasks run in parallel automatically. A worker slot is only assigned when a task is ready to execute, so tasks waiting on parents consume no resources. Each task has configurable [retry policies](/v1/retry-policies) and [timeouts](/v1/timeouts).

### Results flow downstream

Task outputs are cached and passed to child tasks. If a failure occurs mid-workflow, completed tasks don't re-run.

### Everything is observable

Every task execution is tracked in the dashboard — inputs, outputs, durations, and errors. You can see exactly where a workflow succeeded or failed.


## Defining a Workflow

Start by declaring a workflow with a name. The workflow object can declare additional workflow-level configuration options which we'll cover later.

The returned object is an instance of the `Workflow` class, which is the primary interface for interacting with the workflow (i.e. [running](/v1/running-your-task#run-and-wait), [enqueuing](/v1/running-your-task#fire-and-forget), [scheduling](/v1/scheduled-runs), etc).

#### Python

```python
dag_workflow = hatchet.workflow(name="DAGWorkflow")
```

#### Typescript

```typescript
// First, we declare the workflow
export const dag = hatchet.workflow({
  name: 'simple',
});
```

#### Go

```go
workflow := client.NewWorkflow("dag-workflow")
```

#### Ruby

```ruby
DAG_WORKFLOW = HATCHET.workflow(name: "DAGWorkflow")
```


  The Workflow return object can be interacted with in the same way as a
  [task](/v1/tasks), however, it can only take a subset of options which are
  applied at the task level.


## Defining a Task

Now that we have a workflow, we can define a task to be executed as part of the workflow. Tasks are defined by calling the `task` method on the workflow object.

The `task` method takes a name and a function that defines the task's behavior. The function will receive the workflow's input and return the task's output. Tasks also accept a number of other configuration options, which are covered elsewhere in our documentation.

#### Python

In Python, the `task` method is a decorator, which is used like this to wrap a function:

```python
@dag_workflow.task(execution_timeout=timedelta(seconds=5))
def step1(input: EmptyModel, ctx: Context) -> StepOutput:
    return StepOutput(random_number=random.randint(1, 100))
```

The function takes two arguments: `input`, which is a Pydantic model, and `ctx`, which is the Hatchet `Context` object. We'll discuss both of these more later.

> **Info:** In the internals of Hatchet, the task is called using _positional arguments_, meaning that you can name `input` and `ctx` whatever you like.
>
> For instance, `def task_1(foo: EmptyModel, bar: Context) -> None:` is perfectly valid.

#### Typescript

```typescript
// Next, we declare the tasks bound to the workflow
const toLower = dag.task({
  name: 'to-lower',
  fn: (input) => {
    return {
      TransformedMessage: input.Message.toLowerCase(),
    };
  },
});
```

The `fn` argument is a function that takes the workflow's input and a
context object. The context object contains information about the workflow
run (e.g. the run ID, the workflow's input, etc). It can be synchronous or
asynchronous.

#### Go

```go
step1 := workflow.NewTask("step-1", func(ctx hatchet.Context, input Input) (StepOutput, error) {
	return StepOutput{
		Step:   1,
		Result: input.Value * 2,
	}, nil
})
```

#### Ruby

```ruby
STEP1 = DAG_WORKFLOW.task(:step1, execution_timeout: 5) do |input, ctx|
  { "random_number" => rand(1..100) }
end

STEP2 = DAG_WORKFLOW.task(:step2, execution_timeout: 5) do |input, ctx|
  { "random_number" => rand(1..100) }
end
```

## Building a DAG with Task Dependencies

The power of Hatchet's workflow design comes from connecting tasks into a DAG structure. Tasks can specify dependencies (parents) which must complete successfully before the task can start.

#### Python

```python
@dag_workflow.task(execution_timeout=timedelta(seconds=5))
async def step2(input: EmptyModel, ctx: Context) -> StepOutput:
    return StepOutput(random_number=random.randint(1, 100))


@dag_workflow.task(parents=[step1, step2])
async def step3(input: EmptyModel, ctx: Context) -> RandomSum:
    one = ctx.task_output(step1).random_number
    two = ctx.task_output(step2).random_number

    return RandomSum(sum=one + two)
```

#### Typescript

```typescript
dag.task({
  name: 'reverse',
  parents: [toLower],
  fn: async (input, ctx) => {
    const lower = await ctx.parentOutput(toLower);
    return {
      Original: input.Message,
      Transformed: lower.TransformedMessage.split('').reverse().join(''),
    };
  },
});
```

#### Go

```go
step2 := workflow.NewTask("step-2", func(ctx hatchet.Context, input Input) (StepOutput, error) {
	// Get output from step 1
	var step1Output StepOutput
	if err := ctx.ParentOutput(step1, &step1Output); err != nil {
		return StepOutput{}, err
	}

	return StepOutput{
		Step:   2,
		Result: step1Output.Result + 10,
	}, nil
}, hatchet.WithParents(step1))
```

#### Ruby

```ruby
DAG_WORKFLOW.task(:step3, parents: [STEP1, STEP2]) do |input, ctx|
  one = ctx.task_output(STEP1)["random_number"]
  two = ctx.task_output(STEP2)["random_number"]

  { "sum" => one + two }
end

DAG_WORKFLOW.task(:step4, parents: [STEP1, :step3]) do |input, ctx|
  puts(
    "executed step4",
    Time.now.strftime("%H:%M:%S"),
    input.inspect,
    ctx.task_output(STEP1).inspect,
    ctx.task_output(:step3).inspect
  )

  { "step4" => "step4" }
end
```

## Accessing Parent Task Outputs

As shown in the examples above, tasks can access outputs from their parent tasks using the context object:

#### Python

```python
@dag_workflow.task(execution_timeout=timedelta(seconds=5))
async def step2(input: EmptyModel, ctx: Context) -> StepOutput:
    return StepOutput(random_number=random.randint(1, 100))


@dag_workflow.task(parents=[step1, step2])
async def step3(input: EmptyModel, ctx: Context) -> RandomSum:
    one = ctx.task_output(step1).random_number
    two = ctx.task_output(step2).random_number

    return RandomSum(sum=one + two)
```

#### Typescript

```typescript
dag.task({
  name: 'task-with-parent-output',
  parents: [toLower],
  fn: async (input, ctx) => {
    const lower = await ctx.parentOutput(toLower);
    return {
      Original: input.Message,
      Transformed: lower.TransformedMessage.split('').reverse().join(''),
    };
  },
});
```

#### Go

```go
// Inside a task with parent dependencies
var parentOutput ParentOutputType
err := ctx.ParentOutput(parentTask, &parentOutput)
if err != nil {
    return nil, err
}
```

#### Ruby

```ruby
DAG_WORKFLOW.task(:step3, parents: [STEP1, STEP2]) do |input, ctx|
  one = ctx.task_output(STEP1)["random_number"]
  two = ctx.task_output(STEP2)["random_number"]

  { "sum" => one + two }
end

DAG_WORKFLOW.task(:step4, parents: [STEP1, :step3]) do |input, ctx|
  puts(
    "executed step4",
    Time.now.strftime("%H:%M:%S"),
    input.inspect,
    ctx.task_output(STEP1).inspect,
    ctx.task_output(:step3).inspect
  )

  { "step4" => "step4" }
end
```

## Running a Workflow

You can run workflows directly or enqueue them for asynchronous execution. All the same methods for running a task are available for workflows!

#### Python

```python
dag_workflow.run()
```

#### Typescript

```typescript
const input = { Message: 'Hello, World!' };

// Run workflow and wait for the result
const result = await simple.run(input);

// Enqueue workflow to be executed asynchronously
const runReference = await simple.runNoWait(input);
```

#### Go

```go
// Run workflow and wait for the result
result, err := simple.Run(ctx, input)

// Enqueue workflow to be executed asynchronously
runID, err := simple.RunNoWait(ctx, input)
```

#### Ruby

```ruby
result = DAG_WORKFLOW.run
puts result
```

## Pre-Determined Pipelines

DAGs naturally model fixed multi-stage pipelines where the sequence of tasks and their dependencies are known before execution. ETL workflows, document processing pipelines, and CI/CD workflows all follow this pattern: each stage depends on the previous, and the overall structure is visible and predictable in the dashboard.
