# Pydantic Support

The V1 Hatchet SDK leans heavily on [Pydantic](https://docs.pydantic.dev/latest/) (both internally and externally) for handling validation of workflow inputs and outputs, method inputs, and more.

### Usage

To enable Pydantic for validation, you'll need to:

1. Provide an `input_validator` as a parameter to your `workflow`.
2. Add return type hints for your `tasks`.

### Default Behavior

By default, if no `input_validator` is provided, the `EmptyModel` is used, which is a Pydantic model that accepts any input. For example:

```python
from hatchet_sdk import Context, DurableContext, EmptyModel, Hatchet

hatchet = Hatchet()


@hatchet.task()
def simple(input: EmptyModel, ctx: Context) -> dict[str, str]:
    return {"result": "Hello, world!"}


@hatchet.durable_task()
async def simple_durable(input: EmptyModel, ctx: DurableContext) -> dict[str, str]:
    # durable tasks should be async
    return {"result": "Hello, world!"}


def main() -> None:
    worker = hatchet.worker(
        "test-worker",
        workflows=[simple, simple_durable],
    )
    worker.start()
```

In this simple example, the `input` that's injected into the task accepts an argument `input`, which is of type `EmptyModel`. The `EmptyModel` can be imported directly from Hatchet, and is an alias for:

```python
from pydantic import BaseModel, ConfigDict

class EmptyModel(BaseModel):
    model_config = ConfigDict(extra="allow")
```

Note that since `extra="allow"` is set, workflows will not fail with validation errors if an extra field is provided.

### Example Usage

We highly recommend creating Pydantic models to represent your workflow inputs and outputs. This will help you catch errors early and ensure that your workflows are well-typed. For example, consider a fanout workflow like this:

```python
class ParentInput(BaseModel):
    n: int = 100


class ChildInput(BaseModel):
    a: str


parent_wf = hatchet.workflow(name="FanoutParent", input_validator=ParentInput)
child_wf = hatchet.workflow(name="FanoutChild", input_validator=ChildInput)


@parent_wf.task(execution_timeout=timedelta(minutes=5))
async def spawn(input: ParentInput, ctx: Context) -> dict[str, Any]:
    print("spawning child")

    result = await child_wf.aio_run_many(
        [
            child_wf.create_bulk_run_item(
                input=ChildInput(a=str(i)),
                additional_metadata={"hello": "earth"},
                key=f"child{i}",
            )
            for i in range(input.n)
        ],
    )

    print(f"results {result}")

    return {"results": result}
```

In this case, we've defined two workflows: a parent and a child. They both have their inputs typed, and the parent spawns the child. Note that `child_wf.create_workflow_run_config` is typed, so the type checker (and your IDE) know the type of the input to the child workflow.

Then, the child tasks are defined as follows:

```python
@child_wf.task()
async def process(input: ChildInput, ctx: Context) -> dict[str, str]:
    print(f"child process {input.a}")
    return {"status": input.a}


@child_wf.task(parents=[process])
async def process2(input: ChildInput, ctx: Context) -> dict[str, str]:
    process_output = ctx.task_output(process)
    a = process_output["status"]

    return {"status2": a + "2"}
```

In the children, the inputs are validated by Pydantic, so you can access their attributes directly without needing a type cast or parsing a dictionary with the inputs instead.
