Build a routed pipeline
You're going from zero to a routed pipeline where messages enter at the first queue, the routing engine pushes premium-customer messages to a priority queue, and a consumer drains both. Roughly 15 minutes.
1. Plan the shape
Before any API calls, settle these:
- Scope name. One scope per logical pipeline. We'll use
orders.
- Queues.
new-orders (entry) and priority-orders (where premium customers' messages go).
- Routing condition. "If
labels.customer_tier == 'premium', route to priority-orders."
- Consumer. A single worker that claims from both queues.
2. Create the scope and both queues
Create the scope, then both queues inside it. The entry queue is FIFO so first-in goes first; the priority queue is priority-ordered so when premium orders arrive their priority_tier/priority_score decide which gets claimed first.
1
2
3
4
5
6
7
8
9
10
11
12
13
14 | SCOPE=$(curl -s -X POST "$SCAIGRID_HOST/v1/modules/scaiqueue/scopes" \
-H "Authorization: Bearer $SCAIGRID_API_KEY" \
-H "Content-Type: application/json" \
-d '{"slug": "orders", "display_name": "Orders"}' | jq -r .data.id)
ENTRY=$(curl -s -X POST "$SCAIGRID_HOST/v1/modules/scaiqueue/scopes/$SCOPE/queues" \
-H "Authorization: Bearer $SCAIGRID_API_KEY" \
-H "Content-Type: application/json" \
-d '{"slug":"new-orders","display_name":"New Orders","ordering":"fifo"}' | jq -r .data.id)
PRIORITY=$(curl -s -X POST "$SCAIGRID_HOST/v1/modules/scaiqueue/scopes/$SCOPE/queues" \
-H "Authorization: Bearer $SCAIGRID_API_KEY" \
-H "Content-Type: application/json" \
-d '{"slug":"priority-orders","display_name":"Priority Orders","ordering":"priority"}' | jq -r .data.id)
|
| import httpx, os
H = {"Authorization": f"Bearer {os.environ['SCAIGRID_API_KEY']}"}
HOST = os.environ["SCAIGRID_HOST"]
SCOPE = httpx.post(f"{HOST}/v1/modules/scaiqueue/scopes", headers=H,
json={"slug": "orders", "display_name": "Orders"}).json()["data"]["id"]
ENTRY = httpx.post(f"{HOST}/v1/modules/scaiqueue/scopes/{SCOPE}/queues", headers=H,
json={"slug": "new-orders", "display_name": "New Orders", "ordering": "fifo"}).json()["data"]["id"]
PRIORITY = httpx.post(f"{HOST}/v1/modules/scaiqueue/scopes/{SCOPE}/queues", headers=H,
json={"slug": "priority-orders", "display_name": "Priority Orders",
"ordering": "priority"}).json()["data"]["id"]
|
| const H = { "Authorization": `Bearer ${process.env.SCAIGRID_API_KEY}`,
"Content-Type": "application/json" };
const HOST = process.env.SCAIGRID_HOST;
const scope = (await (await fetch(`${HOST}/v1/modules/scaiqueue/scopes`,
{method:"POST", headers:H, body:JSON.stringify({slug:"orders",display_name:"Orders"})})).json()).data;
const entry = (await (await fetch(`${HOST}/v1/modules/scaiqueue/scopes/${scope.id}/queues`,
{method:"POST", headers:H, body:JSON.stringify({slug:"new-orders",display_name:"New Orders",ordering:"fifo"})})).json()).data;
const priority = (await (await fetch(`${HOST}/v1/modules/scaiqueue/scopes/${scope.id}/queues`,
{method:"POST", headers:H, body:JSON.stringify({slug:"priority-orders",display_name:"Priority Orders",ordering:"priority"})})).json()).data;
|
3. Add the routing rule
A rule on a scope is evaluated on each publish. The first matching rule (in priority-ascending order) wins.
| curl -X POST "$SCAIGRID_HOST/v1/modules/scaiqueue/scopes/$SCOPE/routing-rules" \
-H "Authorization: Bearer $SCAIGRID_API_KEY" \
-H "Content-Type: application/json" \
-d "{
\"name\": \"Premium → priority queue\",
\"priority\": 10,
\"enabled\": true,
\"trigger\": {\"event\": \"message_published\"},
\"conditions\": [{\"field\": \"labels.customer_tier\", \"op\": \"eq\", \"value\": \"premium\"}],
\"action\": {\"type\": \"route_to\", \"target_queue_id\": \"$PRIORITY\"}
}"
|
Dry-run a hypothetical message against the rules before committing real traffic:
| curl -X POST "$SCAIGRID_HOST/v1/modules/scaiqueue/scopes/$SCOPE/routing-rules/test" \
-H "Authorization: Bearer $SCAIGRID_API_KEY" \
-H "Content-Type: application/json" \
-d '{"trigger": "message_published", "message": {"labels": {"customer_tier": "premium"}}}'
|
The response lists the rules that would match without applying any of them.
4. Publish
Publish two messages — one premium, one standard — to the entry queue. The routing rule fires on the premium one and emits a routed copy on priority-orders.
| for tier in ("standard", "premium"):
httpx.post(
f"{HOST}/v1/modules/scaiqueue/scopes/{SCOPE}/queues/{ENTRY}/messages",
headers={**H, "Content-Type": "application/json"},
json={
"type": "order.new",
"body": {"order_id": f"ord_{tier}"},
"labels": {"customer_tier": tier},
"correlation_id": f"corr_{tier}",
},
)
|
5. Consume from both queues
A single worker can claim from either queue. Loop, claim, do work, complete.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 | import time
for queue_id in (ENTRY, PRIORITY):
while True:
claimed = httpx.post(
f"{HOST}/v1/modules/scaiqueue/scopes/{SCOPE}/queues/{queue_id}/messages/claim",
headers={**H, "Content-Type": "application/json"},
json={"batch_size": 5, "visibility_timeout_s": 60},
).json()["data"]
if not claimed:
break
for m in claimed:
# ... do the work using m["body"], m["labels"], m["correlation_id"] ...
httpx.post(
f"{HOST}/v1/modules/scaiqueue/scopes/{SCOPE}/messages/{m['id']}/complete",
headers={**H, "Content-Type": "application/json"},
json={"response": {"shipped": True}},
)
time.sleep(0.1)
|
6. Trace one order across both queues
The correlation_id you set survives the routing step. Pull the chain back out:
| curl "$SCAIGRID_HOST/v1/modules/scaiqueue/scopes/$SCOPE/messages?correlation_id=corr_premium" \
-H "Authorization: Bearer $SCAIGRID_API_KEY"
|
You'll see the original entry-queue row and the routed copy on priority-orders. For the full audit trail (publish, rule_applied, claim, complete events), use GET /scopes/{scope_id}/audit/trace/{correlation_id}.
What you'd add next
- Retry policy. Set
max_retries and a sensible visibility_timeout_s on publish; failed messages back off and eventually land in _dead_letter.
- Subscriptions. If you want a side-channel observer (analytics, audit), create a subscription rather than a competing consumer.
- Schemas. Register a
order.new schema and validate payloads before publishing — catches typos before they reach a worker.
- More rules. Routing supports chains; the engine has a 5-hop circuit breaker so you can't loop yourself into starvation.