X Stream Router
Real-time tweet ingestion that classifies each match into brand_mention, competitor, lead_signal, or noise β without spinning up an agent for every fire.
X Stream Router
The X Stream Router is HeyCMO's bridge between real-time X (Twitter) data and the agent layer. It listens to filtered tweet streams (rules registered against the X Filtered Stream API), classifies every match into one of four buckets, persists the result as a structured Event, and only fires a synchronous agent workflow for the small slice of matches that are time-sensitive. Everything else is queued for the next scheduled engagement run.
Configuration
| Property | Value |
|---|---|
| Phase | 1.1 |
| Router module | apps/api/agent/lib/x-stream-router.ts |
| Config loader | apps/api/agent/lib/x-stream-config.ts |
| Event type prefix | x_stream_ |
| Subscribed to | x.tweet.matched (via events.ts) |
| Drained by | engagement-response workflow |
| UI surface | /dashboard activity feed (event_type filter x_stream_*) |
Why a separate router
Without a router, every matched tweet would spin up a full agent workflow. A single noisy rule on a popular topic can fire 1,000+ matches per day; at $0.05β$0.20 per agent run, that's bankruptcy money for a $99/mo product.
The router avoids that cost in three steps:
- Cheap deterministic classifier decides which of four buckets a tweet belongs to β no LLM call per tweet.
- Structured
Eventrow is persisted withevent_typeprefixedx_stream_*so dashboards and scheduled agents can pick it up later. - Inline agent firing only for HIGH-signal brand mentions from authoritative authors β i.e. cases where seconds matter (a viral mention, a complaint going public). Everything else waits for the next scheduled engagement-response run.
The heuristics are intentionally simple and explainable. A future iteration can swap the classifier for an LLM call gated by signal strength (e.g. "if author has >5k followers AND tweet has >5 replies, classify with GPT-4o-mini") without touching the routing layer.
Classification buckets
| Bucket | Meaning | Downstream action |
|---|---|---|
brand_mention | Tweet mentions the customer's brand or handle | Drained into engagement-response reply queue |
competitor | Tweet matches a tracked competitor rule | Queued for next research run |
lead_signal | Tweet matches an intent rule (e.g. "looking forβ¦") | Queued for lead-outreach enrichment |
noise | Match did not pass any signal threshold | Archived only β no further action |
Event shape
Each classified match becomes a row in Event:
{
customerId: string,
type: 'x_stream_brand_mention' | 'x_stream_competitor' | 'x_stream_lead_signal' | 'x_stream_noise',
details: {
text: string,
authorUsername: string,
authorFollowers: number,
url: string,
tweetId: string,
rule: string, // which X stream rule matched
score: number, // 0..1 signal strength
},
createdAt: Date,
}The drainer in engagement-response.ts reads only x_stream_brand_mention events from the last 24 hours, marks them consumed via the x_stream_consumed sibling-event pattern (so the table stays append-only β no schema migration needed for the consumed bit), and routes them through the comment-classifier alongside Composio-fetched comments.
Append-only consumption pattern
Rather than mutate the source Event row when an agent processes it, the router emits a paired x_stream_consumed event referencing the original event id. The db.events.markXStreamEventsConsumed helper handles the write; the db.events.listUnconsumedXStreamEvents helper does an anti-join against the consumed sibling so the next drain run skips events already handled.
This pattern means:
- The
Eventtable is fully append-only β every row is immutable history. - Re-running the engagement workflow is safe β already-consumed events are invisible to the next drain.
- A future migration to a real consumed flag column can backfill from the sibling rows without data loss.
Where to look in the code
apps/api/agent/lib/x-stream-router.tsβ classifier + dispatcherapps/api/agent/lib/x-stream-config.tsβ per-customer rule config loaderapps/api/agent/workflows/engagement-response.tsβdrainXStreamBrandMentionsfunction consumes eventsapps/api/infra/__tests__/x-stream-events.integration.test.tsβ end-to-end coverage
Related
- Engagement Response β the workflow that drains brand-mention events
- Provenance & RSS β where every published reply lands for audit
Dogfooding (Customer
heycmo runs heycmo's own marketing through the same agent infrastructure customers use β same caps, same approval queue, same provenance.
Engagement Response
Sam (Community Manager) replies to comments, mentions, and DMs across LinkedIn, X, Instagram, and Facebook β with a default-on approval queue.