---
summary: "Multi-stage flow \u2014 scope, two queues, a routing rule that moves matched\
  \ messages between them, end-to-end producer and consumer."
title: Build a routed pipeline
path: tutorials/build-a-pipeline
status: published
---

You're going from zero to a routed pipeline where messages enter at the first queue, the routing engine pushes premium-customer messages to a priority queue, and a consumer drains both. Roughly 15 minutes.

## 1. Plan the shape

Before any API calls, settle these:

- **Scope name.** One scope per logical pipeline. We'll use `orders`.
- **Queues.** `new-orders` (entry) and `priority-orders` (where premium customers' messages go).
- **Routing condition.** "If `labels.customer_tier == 'premium'`, route to `priority-orders`."
- **Consumer.** A single worker that claims from both queues.

## 2. Create the scope and both queues

Create the scope, then both queues inside it. The entry queue is FIFO so first-in goes first; the priority queue is `priority`-ordered so when premium orders arrive their `priority_tier`/`priority_score` decide which gets claimed first.

```bash
SCOPE=$(curl -s -X POST "$SCAIGRID_HOST/v1/modules/scaiqueue/scopes" \
  -H "Authorization: Bearer $SCAIGRID_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"slug": "orders", "display_name": "Orders"}' | jq -r .data.id)

ENTRY=$(curl -s -X POST "$SCAIGRID_HOST/v1/modules/scaiqueue/scopes/$SCOPE/queues" \
  -H "Authorization: Bearer $SCAIGRID_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"slug":"new-orders","display_name":"New Orders","ordering":"fifo"}' | jq -r .data.id)

PRIORITY=$(curl -s -X POST "$SCAIGRID_HOST/v1/modules/scaiqueue/scopes/$SCOPE/queues" \
  -H "Authorization: Bearer $SCAIGRID_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"slug":"priority-orders","display_name":"Priority Orders","ordering":"priority"}' | jq -r .data.id)
```

```python
import httpx, os
H = {"Authorization": f"Bearer {os.environ['SCAIGRID_API_KEY']}"}
HOST = os.environ["SCAIGRID_HOST"]
SCOPE = httpx.post(f"{HOST}/v1/modules/scaiqueue/scopes", headers=H,
    json={"slug": "orders", "display_name": "Orders"}).json()["data"]["id"]
ENTRY = httpx.post(f"{HOST}/v1/modules/scaiqueue/scopes/{SCOPE}/queues", headers=H,
    json={"slug": "new-orders", "display_name": "New Orders", "ordering": "fifo"}).json()["data"]["id"]
PRIORITY = httpx.post(f"{HOST}/v1/modules/scaiqueue/scopes/{SCOPE}/queues", headers=H,
    json={"slug": "priority-orders", "display_name": "Priority Orders",
          "ordering": "priority"}).json()["data"]["id"]
```

```javascript
const H = { "Authorization": `Bearer ${process.env.SCAIGRID_API_KEY}`,
            "Content-Type": "application/json" };
const HOST = process.env.SCAIGRID_HOST;
const scope = (await (await fetch(`${HOST}/v1/modules/scaiqueue/scopes`,
  {method:"POST", headers:H, body:JSON.stringify({slug:"orders",display_name:"Orders"})})).json()).data;
const entry = (await (await fetch(`${HOST}/v1/modules/scaiqueue/scopes/${scope.id}/queues`,
  {method:"POST", headers:H, body:JSON.stringify({slug:"new-orders",display_name:"New Orders",ordering:"fifo"})})).json()).data;
const priority = (await (await fetch(`${HOST}/v1/modules/scaiqueue/scopes/${scope.id}/queues`,
  {method:"POST", headers:H, body:JSON.stringify({slug:"priority-orders",display_name:"Priority Orders",ordering:"priority"})})).json()).data;
```

## 3. Add the routing rule

A rule on a scope is evaluated on each publish. The first matching rule (in priority-ascending order) wins.

```bash
curl -X POST "$SCAIGRID_HOST/v1/modules/scaiqueue/scopes/$SCOPE/routing-rules" \
  -H "Authorization: Bearer $SCAIGRID_API_KEY" \
  -H "Content-Type: application/json" \
  -d "{
    \"name\": \"Premium → priority queue\",
    \"priority\": 10,
    \"enabled\": true,
    \"trigger\": {\"event\": \"message_published\"},
    \"conditions\": [{\"field\": \"labels.customer_tier\", \"op\": \"eq\", \"value\": \"premium\"}],
    \"action\": {\"type\": \"route_to\", \"target_queue_id\": \"$PRIORITY\"}
  }"
```

Dry-run a hypothetical message against the rules before committing real traffic:

```bash
curl -X POST "$SCAIGRID_HOST/v1/modules/scaiqueue/scopes/$SCOPE/routing-rules/test" \
  -H "Authorization: Bearer $SCAIGRID_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"trigger": "message_published", "message": {"labels": {"customer_tier": "premium"}}}'
```

The response lists the rules that would match without applying any of them.

## 4. Publish

Publish two messages — one premium, one standard — to the entry queue. The routing rule fires on the premium one and emits a routed copy on `priority-orders`.

```python
for tier in ("standard", "premium"):
    httpx.post(
        f"{HOST}/v1/modules/scaiqueue/scopes/{SCOPE}/queues/{ENTRY}/messages",
        headers={**H, "Content-Type": "application/json"},
        json={
            "type": "order.new",
            "body": {"order_id": f"ord_{tier}"},
            "labels": {"customer_tier": tier},
            "correlation_id": f"corr_{tier}",
        },
    )
```

## 5. Consume from both queues

A single worker can claim from either queue. Loop, claim, do work, complete.

```python
import time
for queue_id in (ENTRY, PRIORITY):
    while True:
        claimed = httpx.post(
            f"{HOST}/v1/modules/scaiqueue/scopes/{SCOPE}/queues/{queue_id}/messages/claim",
            headers={**H, "Content-Type": "application/json"},
            json={"batch_size": 5, "visibility_timeout_s": 60},
        ).json()["data"]
        if not claimed:
            break
        for m in claimed:
            # ... do the work using m["body"], m["labels"], m["correlation_id"] ...
            httpx.post(
                f"{HOST}/v1/modules/scaiqueue/scopes/{SCOPE}/messages/{m['id']}/complete",
                headers={**H, "Content-Type": "application/json"},
                json={"response": {"shipped": True}},
            )
        time.sleep(0.1)
```

## 6. Trace one order across both queues

The `correlation_id` you set survives the routing step. Pull the chain back out:

```bash
curl "$SCAIGRID_HOST/v1/modules/scaiqueue/scopes/$SCOPE/messages?correlation_id=corr_premium" \
  -H "Authorization: Bearer $SCAIGRID_API_KEY"
```

You'll see the original entry-queue row and the routed copy on `priority-orders`. For the full audit trail (publish, rule_applied, claim, complete events), use `GET /scopes/{scope_id}/audit/trace/{correlation_id}`.

## What you'd add next

- **Retry policy.** Set `max_retries` and a sensible `visibility_timeout_s` on publish; failed messages back off and eventually land in `_dead_letter`.
- **Subscriptions.** If you want a side-channel observer (analytics, audit), create a subscription rather than a competing consumer.
- **Schemas.** Register a `order.new` schema and validate payloads before publishing — catches typos before they reach a worker.
- **More rules.** Routing supports chains; the engine has a 5-hop circuit breaker so you can't loop yourself into starvation.
