🧠 HeyCMO
Features

X Stream Router

Real-time tweet ingestion that classifies each match into brand_mention, competitor, lead_signal, or noise β€” without spinning up an agent for every fire.

X Stream Router

The X Stream Router is HeyCMO's bridge between real-time X (Twitter) data and the agent layer. It listens to filtered tweet streams (rules registered against the X Filtered Stream API), classifies every match into one of four buckets, persists the result as a structured Event, and only fires a synchronous agent workflow for the small slice of matches that are time-sensitive. Everything else is queued for the next scheduled engagement run.

Configuration

PropertyValue
Phase1.1
Router moduleapps/api/agent/lib/x-stream-router.ts
Config loaderapps/api/agent/lib/x-stream-config.ts
Event type prefixx_stream_
Subscribed tox.tweet.matched (via events.ts)
Drained byengagement-response workflow
UI surface/dashboard activity feed (event_type filter x_stream_*)

Why a separate router

Without a router, every matched tweet would spin up a full agent workflow. A single noisy rule on a popular topic can fire 1,000+ matches per day; at $0.05–$0.20 per agent run, that's bankruptcy money for a $99/mo product.

The router avoids that cost in three steps:

  1. Cheap deterministic classifier decides which of four buckets a tweet belongs to β€” no LLM call per tweet.
  2. Structured Event row is persisted with event_type prefixed x_stream_* so dashboards and scheduled agents can pick it up later.
  3. Inline agent firing only for HIGH-signal brand mentions from authoritative authors β€” i.e. cases where seconds matter (a viral mention, a complaint going public). Everything else waits for the next scheduled engagement-response run.

The heuristics are intentionally simple and explainable. A future iteration can swap the classifier for an LLM call gated by signal strength (e.g. "if author has >5k followers AND tweet has >5 replies, classify with GPT-4o-mini") without touching the routing layer.

Classification buckets

BucketMeaningDownstream action
brand_mentionTweet mentions the customer's brand or handleDrained into engagement-response reply queue
competitorTweet matches a tracked competitor ruleQueued for next research run
lead_signalTweet matches an intent rule (e.g. "looking for…")Queued for lead-outreach enrichment
noiseMatch did not pass any signal thresholdArchived only β€” no further action

Event shape

Each classified match becomes a row in Event:

{
  customerId: string,
  type: 'x_stream_brand_mention' | 'x_stream_competitor' | 'x_stream_lead_signal' | 'x_stream_noise',
  details: {
    text: string,
    authorUsername: string,
    authorFollowers: number,
    url: string,
    tweetId: string,
    rule: string,        // which X stream rule matched
    score: number,       // 0..1 signal strength
  },
  createdAt: Date,
}

The drainer in engagement-response.ts reads only x_stream_brand_mention events from the last 24 hours, marks them consumed via the x_stream_consumed sibling-event pattern (so the table stays append-only β€” no schema migration needed for the consumed bit), and routes them through the comment-classifier alongside Composio-fetched comments.

Append-only consumption pattern

Rather than mutate the source Event row when an agent processes it, the router emits a paired x_stream_consumed event referencing the original event id. The db.events.markXStreamEventsConsumed helper handles the write; the db.events.listUnconsumedXStreamEvents helper does an anti-join against the consumed sibling so the next drain run skips events already handled.

This pattern means:

  • The Event table is fully append-only β€” every row is immutable history.
  • Re-running the engagement workflow is safe β€” already-consumed events are invisible to the next drain.
  • A future migration to a real consumed flag column can backfill from the sibling rows without data loss.

Where to look in the code

  • apps/api/agent/lib/x-stream-router.ts β€” classifier + dispatcher
  • apps/api/agent/lib/x-stream-config.ts β€” per-customer rule config loader
  • apps/api/agent/workflows/engagement-response.ts β€” drainXStreamBrandMentions function consumes events
  • apps/api/infra/__tests__/x-stream-events.integration.test.ts β€” end-to-end coverage

On this page