Platform
ScaiWave ScaiGrid ScaiCore ScaiBot ScaiDrive ScaiKey Models Tools & Services
Solutions
Organisations Developers Internet Service Providers Managed Service Providers AI-in-a-Box
Resources
Support Documentation Blog Downloads
Company
About Research Careers Investment Opportunities Contact
Log in

Build a routed pipeline

You're going from zero to a routed pipeline where messages enter at the first queue, the routing engine pushes premium-customer messages to a priority queue, and a consumer drains both. Roughly 15 minutes.

1. Plan the shape#

Before any API calls, settle these:

  • Scope name. One scope per logical pipeline. We'll use orders.
  • Queues. new-orders (entry) and priority-orders (where premium customers' messages go).
  • Routing condition. "If labels.customer_tier == 'premium', route to priority-orders."
  • Consumer. A single worker that claims from both queues.

2. Create the scope and both queues#

Create the scope, then both queues inside it. The entry queue is FIFO so first-in goes first; the priority queue is priority-ordered so when premium orders arrive their priority_tier/priority_score decide which gets claimed first.

bash
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
SCOPE=$(curl -s -X POST "$SCAIGRID_HOST/v1/modules/scaiqueue/scopes" \
  -H "Authorization: Bearer $SCAIGRID_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"slug": "orders", "display_name": "Orders"}' | jq -r .data.id)

ENTRY=$(curl -s -X POST "$SCAIGRID_HOST/v1/modules/scaiqueue/scopes/$SCOPE/queues" \
  -H "Authorization: Bearer $SCAIGRID_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"slug":"new-orders","display_name":"New Orders","ordering":"fifo"}' | jq -r .data.id)

PRIORITY=$(curl -s -X POST "$SCAIGRID_HOST/v1/modules/scaiqueue/scopes/$SCOPE/queues" \
  -H "Authorization: Bearer $SCAIGRID_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"slug":"priority-orders","display_name":"Priority Orders","ordering":"priority"}' | jq -r .data.id)
python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import httpx, os
H = {"Authorization": f"Bearer {os.environ['SCAIGRID_API_KEY']}"}
HOST = os.environ["SCAIGRID_HOST"]
SCOPE = httpx.post(f"{HOST}/v1/modules/scaiqueue/scopes", headers=H,
    json={"slug": "orders", "display_name": "Orders"}).json()["data"]["id"]
ENTRY = httpx.post(f"{HOST}/v1/modules/scaiqueue/scopes/{SCOPE}/queues", headers=H,
    json={"slug": "new-orders", "display_name": "New Orders", "ordering": "fifo"}).json()["data"]["id"]
PRIORITY = httpx.post(f"{HOST}/v1/modules/scaiqueue/scopes/{SCOPE}/queues", headers=H,
    json={"slug": "priority-orders", "display_name": "Priority Orders",
          "ordering": "priority"}).json()["data"]["id"]
javascript
1
2
3
4
5
6
7
8
9
const H = { "Authorization": `Bearer ${process.env.SCAIGRID_API_KEY}`,
            "Content-Type": "application/json" };
const HOST = process.env.SCAIGRID_HOST;
const scope = (await (await fetch(`${HOST}/v1/modules/scaiqueue/scopes`,
  {method:"POST", headers:H, body:JSON.stringify({slug:"orders",display_name:"Orders"})})).json()).data;
const entry = (await (await fetch(`${HOST}/v1/modules/scaiqueue/scopes/${scope.id}/queues`,
  {method:"POST", headers:H, body:JSON.stringify({slug:"new-orders",display_name:"New Orders",ordering:"fifo"})})).json()).data;
const priority = (await (await fetch(`${HOST}/v1/modules/scaiqueue/scopes/${scope.id}/queues`,
  {method:"POST", headers:H, body:JSON.stringify({slug:"priority-orders",display_name:"Priority Orders",ordering:"priority"})})).json()).data;

3. Add the routing rule#

A rule on a scope is evaluated on each publish. The first matching rule (in priority-ascending order) wins.

bash
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
curl -X POST "$SCAIGRID_HOST/v1/modules/scaiqueue/scopes/$SCOPE/routing-rules" \
  -H "Authorization: Bearer $SCAIGRID_API_KEY" \
  -H "Content-Type: application/json" \
  -d "{
    \"name\": \"Premium → priority queue\",
    \"priority\": 10,
    \"enabled\": true,
    \"trigger\": {\"event\": \"message_published\"},
    \"conditions\": [{\"field\": \"labels.customer_tier\", \"op\": \"eq\", \"value\": \"premium\"}],
    \"action\": {\"type\": \"route_to\", \"target_queue_id\": \"$PRIORITY\"}
  }"

Dry-run a hypothetical message against the rules before committing real traffic:

bash
1
2
3
4
curl -X POST "$SCAIGRID_HOST/v1/modules/scaiqueue/scopes/$SCOPE/routing-rules/test" \
  -H "Authorization: Bearer $SCAIGRID_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"trigger": "message_published", "message": {"labels": {"customer_tier": "premium"}}}'

The response lists the rules that would match without applying any of them.

4. Publish#

Publish two messages — one premium, one standard — to the entry queue. The routing rule fires on the premium one and emits a routed copy on priority-orders.

python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
for tier in ("standard", "premium"):
    httpx.post(
        f"{HOST}/v1/modules/scaiqueue/scopes/{SCOPE}/queues/{ENTRY}/messages",
        headers={**H, "Content-Type": "application/json"},
        json={
            "type": "order.new",
            "body": {"order_id": f"ord_{tier}"},
            "labels": {"customer_tier": tier},
            "correlation_id": f"corr_{tier}",
        },
    )

5. Consume from both queues#

A single worker can claim from either queue. Loop, claim, do work, complete.

python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import time
for queue_id in (ENTRY, PRIORITY):
    while True:
        claimed = httpx.post(
            f"{HOST}/v1/modules/scaiqueue/scopes/{SCOPE}/queues/{queue_id}/messages/claim",
            headers={**H, "Content-Type": "application/json"},
            json={"batch_size": 5, "visibility_timeout_s": 60},
        ).json()["data"]
        if not claimed:
            break
        for m in claimed:
            # ... do the work using m["body"], m["labels"], m["correlation_id"] ...
            httpx.post(
                f"{HOST}/v1/modules/scaiqueue/scopes/{SCOPE}/messages/{m['id']}/complete",
                headers={**H, "Content-Type": "application/json"},
                json={"response": {"shipped": True}},
            )
        time.sleep(0.1)

6. Trace one order across both queues#

The correlation_id you set survives the routing step. Pull the chain back out:

bash
1
2
curl "$SCAIGRID_HOST/v1/modules/scaiqueue/scopes/$SCOPE/messages?correlation_id=corr_premium" \
  -H "Authorization: Bearer $SCAIGRID_API_KEY"

You'll see the original entry-queue row and the routed copy on priority-orders. For the full audit trail (publish, rule_applied, claim, complete events), use GET /scopes/{scope_id}/audit/trace/{correlation_id}.

What you'd add next#

  • Retry policy. Set max_retries and a sensible visibility_timeout_s on publish; failed messages back off and eventually land in _dead_letter.
  • Subscriptions. If you want a side-channel observer (analytics, audit), create a subscription rather than a competing consumer.
  • Schemas. Register a order.new schema and validate payloads before publishing — catches typos before they reach a worker.
  • More rules. Routing supports chains; the engine has a 5-hop circuit breaker so you can't loop yourself into starvation.
Updated 2026-05-18 15:01:32 View source (.md) rev 12