# Middleware & Dependency Injection

Middleware lets you run logic **before** and **after** every task on a client, without touching individual task definitions. Common uses include injecting request IDs, enriching inputs with shared data, encrypting/decrypting payloads, and normalizing or augmenting outputs.


  This feature is experimental, and middleware hook signatures may change in
  future releases.


#### Python

Hatchet's Python SDK uses FastAPI-style dependency injection to run logic
before tasks and inject the results as parameters. Dependencies are declared
as functions and wired into tasks with `Depends`.

#### Typescript

Middleware hooks are registered on the client with `withMiddleware` and are
fully type-safe — TypeScript sees the union of fields from the task input
type and any values returned by `before` hooks, and similarly for task
outputs and `after` hooks.

#### Go

> **Info:** Middleware support for the Go SDK is coming soon. Join our
>       [Discord](https://hatchet.run/discord) to stay up to date.

#### Ruby

In Ruby, this pattern uses callable objects (lambdas/procs) passed as `deps`
when defining tasks. Dependencies are evaluated before each task run and
made available via `ctx.deps`.

## Defining Middleware

#### Python

Define your dependency functions — they receive the workflow input and context, and their return values are injected into the task as parameters.

```python
async def async_dep(input: EmptyModel, ctx: Context) -> str:
    return ASYNC_DEPENDENCY_VALUE


def sync_dep(input: EmptyModel, ctx: Context) -> str:
    return SYNC_DEPENDENCY_VALUE


@asynccontextmanager
async def async_cm_dep(
    input: EmptyModel, ctx: Context, async_dep: Annotated[str, Depends(async_dep)]
) -> AsyncGenerator[str, None]:
    try:
        yield ASYNC_CM_DEPENDENCY_VALUE + "_" + async_dep
    finally:
        pass


@contextmanager
def sync_cm_dep(
    input: EmptyModel, ctx: Context, sync_dep: Annotated[str, Depends(sync_dep)]
) -> Generator[str, None, None]:
    try:
        yield SYNC_CM_DEPENDENCY_VALUE + "_" + sync_dep
    finally:
        pass


@contextmanager
def base_cm_dep(input: EmptyModel, ctx: Context) -> Generator[str, None, None]:
    try:
        yield CHAINED_CM_VALUE
    finally:
        pass


def chained_dep(
    input: EmptyModel, ctx: Context, base_cm: Annotated[str, Depends(base_cm_dep)]
) -> str:
    return "chained_" + base_cm


@asynccontextmanager
async def base_async_cm_dep(
    input: EmptyModel, ctx: Context
) -> AsyncGenerator[str, None]:
    try:
        yield CHAINED_ASYNC_CM_VALUE
    finally:
        pass


async def chained_async_dep(
    input: EmptyModel,
    ctx: Context,
    base_async_cm: Annotated[str, Depends(base_async_cm_dep)],
) -> str:
    return "chained_" + base_async_cm
```

#### Typescript

Create a client and attach middleware with `before` and `after` hooks.

- **`before(input, ctx)`** runs before the task. Its return value **replaces** the task input.
- **`after(output, ctx, input)`** runs after the task. Its return value **replaces** the task output.

```typescript
import { HatchetClient, HatchetMiddleware } from '@hatchet/v1';

export type GlobalInputType = {
  first: number;
  second: number;
};

export type GlobalOutputType = {
  extra: number;
};

const myMiddleware = {
  before: (input, ctx) => {
    console.log('before', input.first);
    return { ...input, dependency: 'abc-123' };
  },
  after: (output, ctx, input) => {
    return { ...output, additionalData: 2 };
  },
} satisfies HatchetMiddleware;

export const hatchetWithMiddleware = HatchetClient.init<
  GlobalInputType,
  GlobalOutputType
>().withMiddleware(myMiddleware);
```


  **Spread the original value if you want to keep it.** The return value of each hook **replaces** the input (or output) entirely — it does not shallow-merge. If you omit `...input` in a `before` hook, the original fields are lost. The same applies to `...output` in an `after` hook.

  ```typescript
      // ✅ Keeps original fields and adds `requestId`
      before: (input) => ({ ...input, requestId: crypto.randomUUID() })

      // ❌ Replaces input entirely — task only receives { requestId }
      before: (input) => ({ requestId: crypto.randomUUID() })
  ```


### Chaining Middleware

You can chain multiple `.withMiddleware()` calls to run hooks in sequence. Each `before` hook receives the return value of the previous `before` hook (or the original input for the first hook), and each `after` hook receives the return value of the previous `after` hook.

```typescript
const firstMiddleware = {
  before: (input, ctx) => {
    console.log('before', input.first);
    return { ...input, dependency: 'abc-123' };
  },
  after: (output, ctx, input) => {
    return { ...output, firstExtra: 3 };
  },
} satisfies HatchetMiddleware;

const secondMiddleware = {
  before: (input, ctx) => {
    console.log('before', input.dependency); // available from previous middleware
    return { ...input, anotherDep: true };
  },
  after: (output, ctx, input) => {
    return { ...output, secondExtra: 4 };
  },
} satisfies HatchetMiddleware;

export const hatchetWithMiddlewareChaining = HatchetClient.init()
  .withMiddleware(firstMiddleware)
  .withMiddleware(secondMiddleware);
```

#### Go

> **Info:** Middleware support for the Go SDK is coming soon. Join our [Discord](https://hatchet.run/discord) to stay up to date.

#### Ruby

Define your dependencies as callable objects (lambdas). They receive the input, context, and optionally a hash of previously resolved dependencies for chaining.

```ruby
sync_dep = ->(_input, _ctx) { SYNC_DEPENDENCY_VALUE }
async_dep = ->(_input, _ctx) { ASYNC_DEPENDENCY_VALUE }

sync_cm_dep = lambda { |_input, _ctx, deps|
  "#{SYNC_CM_DEPENDENCY_VALUE}_#{deps[:sync_dep]}"
}

async_cm_dep = lambda { |_input, _ctx, deps|
  "#{ASYNC_CM_DEPENDENCY_VALUE}_#{deps[:async_dep]}"
}

chained_dep = ->(_input, _ctx, deps) { "chained_#{CHAINED_CM_VALUE}" }
chained_async_dep = ->(_input, _ctx, deps) { "chained_#{CHAINED_ASYNC_CM_VALUE}" }
```

## How Middleware Executes

#### Python

Dependencies are resolved before each task execution. Each dependency function receives the original workflow input and the task context, and its return value is injected as a named parameter to the task function.

#### Typescript

When a task runs, the worker applies middleware hooks in this order:


### Before hooks run in registration order

Each `before` hook receives the current input and the task `Context`. Its return value **replaces** the input for the next hook (or the task itself). Returning `undefined` (or `void`) skips replacement and passes the input through unchanged.

### The task function executes

The task receives the final input after all `before` hooks have run.

### After hooks run in registration order

Each `after` hook receives the current output, the task `Context`, and the final input. Its return value **replaces** the output for the next hook (or the final result). Returning `undefined` skips replacement.


Both `before` and `after` hooks can be **async** — return a `Promise` and it will be awaited before proceeding.

> **Info:** If a middleware hook throws an error, the task run fails with that error. There is no built-in error recovery within middleware — use try/catch inside your hooks if you need graceful fallback.

### The `ctx` Parameter

The second parameter of both `before` and `after` hooks is the task `Context` object. This gives middleware access to:
- `ctx.workflowRunId` — the ID of the current workflow run
- `ctx.stepRunId` — the ID of the current step run
- `ctx.log()` — emit structured logs visible in the Hatchet dashboard
- `ctx.cancel()` — cancel the current run from within middleware

### Global Types vs Middleware Types

There are two ways extra fields end up on a task's input:

Mechanism, Set via, Required at call site?, Available at runtime?

**Global input type**, `HatchetClient.init()`, Yes — callers must provide these fields, Yes
**Middleware before hook**, `.withMiddleware({ before })`, No — injected automatically by the worker, Yes

Global input types (`T` in `init()`) represent fields that **callers must supply** when triggering a task. This is useful when you know every task must always receive certain parameters — for example, a `userId` for authentication or a `tenantId` for multi-tenant routing. By declaring these as the global type, TypeScript enforces that every caller provides them.

Middleware `before` hooks, on the other hand, inject fields that are **computed at runtime** (e.g. request IDs, decrypted secrets, fetched config) and are **not** required from callers.

```typescript
    type RequiredContext = { userId: string; orgId: string };

    const client = HatchetClient.init()
      .withMiddleware({
        before: (input) => ({
          ...input,
          resolvedAt: Date.now(),        // injected, not required from caller
          permissions: lookupPerms(input.userId), // derived from global type
        }),
      });

    // Callers MUST provide userId and orgId — TypeScript enforces this
    await myTask.run({ userId: 'usr_123', orgId: 'org_456', /* ...task fields */ });
```

#### Go

> **Info:** Middleware support for the Go SDK is coming soon. Join our [Discord](https://hatchet.run/discord) to stay up to date.

#### Ruby

Dependencies are resolved in the order they are declared in the `deps` hash. Each dependency function can optionally receive already-resolved dependencies as its third argument, enabling chaining.

## Using Middleware in Tasks

#### Python

Inject dependencies into your tasks using `Depends` and type annotations. The dependency results are passed directly as function parameters.

```python
@hatchet.task()
async def async_task_with_dependencies(
    _i: EmptyModel,
    ctx: Context,
    async_dep: Annotated[str, Depends(async_dep)],
    sync_dep: Annotated[str, Depends(sync_dep)],
    async_cm_dep: Annotated[str, Depends(async_cm_dep)],
    sync_cm_dep: Annotated[str, Depends(sync_cm_dep)],
    chained_dep: Annotated[str, Depends(chained_dep)],
    chained_async_dep: Annotated[str, Depends(chained_async_dep)],
) -> Output:
    return Output(
        sync_dep=sync_dep,
        async_dep=async_dep,
        async_cm_dep=async_cm_dep,
        sync_cm_dep=sync_cm_dep,
        chained_dep=chained_dep,
        chained_async_dep=chained_async_dep,
    )
```


  Your dependency functions must take two positional arguments: the workflow input and the `Context` (the same as any other task).


#### Typescript

Tasks created from a middleware-enabled client automatically receive the merged input and output types. There is no extra configuration needed on the task itself.

```typescript
import { hatchetWithMiddleware } from './client';

type TaskInput = {
  message: string;
};

type TaskOutput = {
  message: string;
};

export const taskWithMiddleware = hatchetWithMiddleware.task({
  name: 'task-with-middleware',
  fn: (input, _ctx) => {
    console.log('task', input.message); // string  (from TaskInput)
    console.log('task', input.first); // number  (from GlobalInputType)
    console.log('task', input.second); // number  (from GlobalInputType)
    console.log('task', input.dependency); // string  (from Pre Middleware)
    return {
      message: input.message,
      extra: 1,
    };
  },
});
// !!
```

The task's `input` type is the intersection of `TaskInput`, `GlobalInputType`, and the return type of the `before` middleware hook. The task's return type must satisfy `TaskOutput` and `GlobalOutputType`, while the caller receives the intersection of those with the `after` middleware return type.

#### Go

> **Info:** Middleware support for the Go SDK is coming soon. Join our [Discord](https://hatchet.run/discord) to stay up to date.

#### Ruby

Pass a `deps` hash when defining a task. The resolved dependency values are available inside the task block via `ctx.deps`.

```ruby
ASYNC_TASK_WITH_DEPS = HATCHET.task(
  name: "async_task_with_dependencies",
  deps: {
    sync_dep: sync_dep,
    async_dep: async_dep,
    sync_cm_dep: sync_cm_dep,
    async_cm_dep: async_cm_dep,
    chained_dep: chained_dep,
    chained_async_dep: chained_async_dep
  }
) do |input, ctx|
  {
    "sync_dep" => ctx.deps[:sync_dep],
    "async_dep" => ctx.deps[:async_dep],
    "async_cm_dep" => ctx.deps[:async_cm_dep],
    "sync_cm_dep" => ctx.deps[:sync_cm_dep],
    "chained_dep" => ctx.deps[:chained_dep],
    "chained_async_dep" => ctx.deps[:chained_async_dep]
  }
end

SYNC_TASK_WITH_DEPS = HATCHET.task(
  name: "sync_task_with_dependencies",
  deps: {
    sync_dep: sync_dep,
    async_dep: async_dep,
    sync_cm_dep: sync_cm_dep,
    async_cm_dep: async_cm_dep,
    chained_dep: chained_dep,
    chained_async_dep: chained_async_dep
  }
) do |input, ctx|
  {
    "sync_dep" => ctx.deps[:sync_dep],
    "async_dep" => ctx.deps[:async_dep],
    "async_cm_dep" => ctx.deps[:async_cm_dep],
    "sync_cm_dep" => ctx.deps[:sync_cm_dep],
    "chained_dep" => ctx.deps[:chained_dep],
    "chained_async_dep" => ctx.deps[:chained_async_dep]
  }
end

DURABLE_ASYNC_TASK_WITH_DEPS = HATCHET.durable_task(
  name: "durable_async_task_with_dependencies",
  deps: {
    sync_dep: sync_dep,
    async_dep: async_dep,
    sync_cm_dep: sync_cm_dep,
    async_cm_dep: async_cm_dep,
    chained_dep: chained_dep,
    chained_async_dep: chained_async_dep
  }
) do |input, ctx|
  {
    "sync_dep" => ctx.deps[:sync_dep],
    "async_dep" => ctx.deps[:async_dep],
    "async_cm_dep" => ctx.deps[:async_cm_dep],
    "sync_cm_dep" => ctx.deps[:sync_cm_dep],
    "chained_dep" => ctx.deps[:chained_dep],
    "chained_async_dep" => ctx.deps[:chained_async_dep]
  }
end

DURABLE_SYNC_TASK_WITH_DEPS = HATCHET.durable_task(
  name: "durable_sync_task_with_dependencies",
  deps: {
    sync_dep: sync_dep,
    async_dep: async_dep,
    sync_cm_dep: sync_cm_dep,
    async_cm_dep: async_cm_dep,
    chained_dep: chained_dep,
    chained_async_dep: chained_async_dep
  }
) do |input, ctx|
  {
    "sync_dep" => ctx.deps[:sync_dep],
    "async_dep" => ctx.deps[:async_dep],
    "async_cm_dep" => ctx.deps[:async_cm_dep],
    "sync_cm_dep" => ctx.deps[:sync_cm_dep],
    "chained_dep" => ctx.deps[:chained_dep],
    "chained_async_dep" => ctx.deps[:chained_async_dep]
  }
end

DI_WORKFLOW = HATCHET.workflow(name: "dependency-injection-workflow")

# Workflow tasks with dependencies follow the same pattern
DI_WORKFLOW.task(:wf_task_with_dependencies) do |input, ctx|
  {
    "sync_dep" => SYNC_DEPENDENCY_VALUE,
    "async_dep" => ASYNC_DEPENDENCY_VALUE
  }
end
```

## Running a Worker

#### Python

No special worker configuration is needed — dependencies are evaluated automatically each time a task runs.

#### Typescript

Workers are created from the same middleware-enabled client. No special setup is required — the middleware hooks are applied automatically when tasks execute.

```typescript
import { hatchetWithMiddleware } from './client';
import { taskWithMiddleware } from './workflow';

async function main() {
  const worker = await hatchetWithMiddleware.worker('task-with-middleware', {
    workflows: [taskWithMiddleware],
  });

  await worker.start();
}

if (require.main === module) {
  main();
}
```

#### Go

> **Info:** Middleware support for the Go SDK is coming soon. Join our [Discord](https://hatchet.run/discord) to stay up to date.

#### Ruby

No special worker configuration is needed — dependencies are resolved automatically before each task execution.

## Practical Examples

The examples below show TypeScript middleware for common production patterns. Each can be adapted to the Python dependency injection model by extracting the same logic into a dependency function.

### End-to-End Encryption

Encrypt sensitive input fields before they reach the Hatchet server, and decrypt the output on the way back. This ensures plaintext data never leaves your worker process.

```typescript
import { HatchetClient, HatchetMiddleware } from '@hatchet/v1';
import { randomUUID, createCipheriv, createDecipheriv, randomBytes } from 'crypto';
```

> **Info:** The `before` hook decrypts incoming data so your task function works with
>   plaintext. The `after` hook encrypts the output before it is stored. The
>   encryption key never leaves the worker environment.

### Offloading Large Payloads to S3

When task inputs or outputs exceed Hatchet's payload size limit (or you simply want to keep large blobs out of the control plane), upload them to S3 and pass a signed URL instead.

```typescript
import { S3Client, PutObjectCommand, GetObjectCommand } from '@aws-sdk/client-s3';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';

const ALGORITHM = 'aes-256-gcm';
const KEY = Buffer.from(process.env.ENCRYPTION_KEY!, 'hex');

type EncryptedEnvelope = { ciphertext: string; iv: string; tag: string };

function encrypt(plaintext: string): EncryptedEnvelope {
  const iv = randomBytes(16);
  const cipher = createCipheriv(ALGORITHM, KEY, iv);
  const encrypted = Buffer.concat([cipher.update(plaintext, 'utf8'), cipher.final()]);
  return {
    ciphertext: encrypted.toString('base64'),
    iv: iv.toString('base64'),
    tag: cipher.getAuthTag().toString('base64'),
  };
}

function decrypt(ciphertext: string, iv: string, tag: string): string {
  const decipher = createDecipheriv(ALGORITHM, KEY, Buffer.from(iv, 'base64'));
  decipher.setAuthTag(Buffer.from(tag, 'base64'));
  return decipher.update(ciphertext, 'base64', 'utf8') + decipher.final('utf8');
}

type EncryptedInput = { encrypted?: EncryptedEnvelope };

const e2eEncryption: HatchetMiddleware = {
  before: (input) => {
    if (!input.encrypted) {
      return input;
    }
    const { ciphertext, iv, tag } = input.encrypted;
    const decrypted = JSON.parse(decrypt(ciphertext, iv, tag));
    return { ...input, ...decrypted, encrypted: undefined };
  },
  after: (output) => {
    const payload = JSON.stringify(output);
    return { encrypted: encrypt(payload) };
  },
};

const encryptionClient = HatchetClient.init().withMiddleware(e2eEncryption);

const s3 = new S3Client({ region: process.env.AWS_REGION });
const BUCKET = process.env.S3_BUCKET!;
const PAYLOAD_THRESHOLD = 256 * 1024; // 256 KB

async function uploadToS3(data: unknown): Promise<string> {
  const key = `hatchet-payloads/${randomUUID()}.json`;
  await s3.send(
    new PutObjectCommand({
      Bucket: BUCKET,
      Key: key,
      Body: JSON.stringify(data),
      ContentType: 'application/json',
    })
  );
  return getSignedUrl(s3, new GetObjectCommand({ Bucket: BUCKET, Key: key }), {
    expiresIn: 3600,
  });
}

async function downloadFromS3(url: string): Promise<unknown> {
  const res = await fetch(url);
  return res.json();
}

type S3Input = { s3Url?: string };

const s3Offload: HatchetMiddleware = {
  before: async (input) => {
    if (input.s3Url) {
      const restored = (await downloadFromS3(input.s3Url)) as Record<string, any>;
      return { ...restored, s3Url: undefined };
    }
    return input;
  },
  after: async (output) => {
    const serialized = JSON.stringify(output);
    if (serialized.length > PAYLOAD_THRESHOLD) {
      const url = await uploadToS3(output);
      return { s3Url: url };
    }
    return output;
  },
};

const s3Client = HatchetClient.init().withMiddleware(s3Offload);
```


  The caller is responsible for uploading oversized inputs to S3 before triggering the task. The `before` hook only handles the download side. You can use the same `uploadToS3` helper on the caller side to upload the input and pass `{ __s3Url: url }` as the task input.


## FAQ

### What is Hatchet middleware and how does it differ from Express middleware?

Hatchet middleware runs **inside the worker process** around each task invocation — not on an HTTP request path. A `before` hook transforms input before the task runs, and an `after` hook transforms output after. Unlike Express middleware, there is no `next()` function; hooks return their result directly and the runner chains them automatically.

### Can I use middleware with both tasks and workflows?

Yes. Middleware is registered on the `HatchetClient` instance, so it applies to every task created from that client — whether the task is a standalone `client.task()` or part of a multi-step `client.workflow()`. Each step in a workflow will have middleware applied independently.

### Does middleware run on the server or on the worker?

Middleware runs entirely **on the worker**. The Hatchet server never sees or executes your middleware code. This is what makes patterns like end-to-end encryption possible — plaintext data stays within your infrastructure.

### What happens if my middleware throws an error?

If a `before` or `after` hook throws (or returns a rejected `Promise`), the task run fails with that error. There is no automatic retry of middleware itself, but the task's configured retry policy will still apply, re-running the task (and its middleware) from scratch.

### Can I use async/await in middleware hooks?

Yes. Both `before` and `after` hooks can be synchronous or asynchronous. If a hook returns a `Promise`, the worker will `await` it before proceeding to the next hook or the task function.

### How do I share state between `before` and `after` hooks?

The `after` hook receives the task input (after `before` hooks have run) as its third argument. Add fields in `before` (e.g. `startedAt`, `traceId`) and read them from `input` in `after`. There is no separate shared context object — the input itself is the carrier.

### Does middleware apply to child tasks spawned via fanout?

Middleware is scoped to the **client instance**. If a child task is defined on the same middleware-enabled client, its middleware will run when that child task executes. If the child task uses a different client instance, only that client's middleware (if any) applies.

### Can I selectively skip middleware for certain tasks?

Middleware applies to **all** tasks on a given client. To skip middleware for specific tasks, create a second client without middleware and define those tasks on it. This is a deliberate design choice — middleware is a cross-cutting concern, and selective opt-out is handled at the client boundary.

### Is there a performance overhead to using middleware?

Middleware hooks are plain JavaScript functions that run in-process on the worker. The overhead is the execution time of your hook code. For lightweight operations (adding a field, logging), the overhead is negligible. For heavier operations (network calls like S3 uploads or decryption), the task's total duration will include that time, so keep hooks as efficient as possible.

### What is the difference between global types and middleware types in TypeScript?

Global types (`HatchetClient.init()`) define fields that **callers must provide** when triggering a task. Middleware types (inferred from `withMiddleware` return values) define fields that are **injected at runtime** by the worker. Both end up on the task's `input` type, but only global types appear in the caller-facing `run()` signature.

### Can I use middleware for rate limiting or authentication?

Yes. A `before` hook can check rate limits, validate API keys, or verify JWTs before the task runs. If the check fails, throw an error to abort the task. However, for rate limiting specifically, consider using Hatchet's built-in [rate limiting](/home/rate-limits) feature, which operates at the scheduling layer and is more efficient than in-worker checks.

### How do I test middleware in isolation?

Middleware hooks are plain functions — you can unit-test them directly by calling them with mock input and a mock context object. For integration tests, the e2e test pattern of creating a client, attaching middleware, defining a task, starting a worker, and asserting on the result works well. See the [middleware example on GitHub](https://github.com/hatchet-dev/hatchet/tree/main/examples/typescript/middleware) for a complete test setup.
