Use the event bus
Publish a domain event from an aggregate, subscribe a listener that reacts in another module.
The event bus is in-process and in-memory. It exists to keep modules decoupled at the code level — iam doesn't import notifications; instead it raises user.created and a listener in notifications reacts.
Implementation
apps/server/src/infrastructure/events/event-bus.ts:
export interface IEventBus {
publish(events: ReadonlyArray<DomainEvent>): Promise<void>;
subscribe(eventName: string, handler: EventHandler): void;
}
export class InMemoryEventBus implements IEventBus { /* ... */ }A handler that throws is logged and swallowed — one bad listener does not fail the publishing aggregate.
1. Raise an event from an aggregate
Inside the aggregate's domain method:
this.addDomainEvent({
name: 'user.created',
aggregateId: this.id,
occurredAt: new Date(),
payload: { email: this.email.value, locale: this.locale },
});Events accumulate on the aggregate but are not yet published.
2. Publish in the use case
After persisting the aggregate:
async execute(input: RegisterInput): Promise<Result<User, DomainError>> {
const userResult = User.create(input);
if (userResult.isErr()) return userResult;
await this.users.save(userResult.value);
await this.bus.publish(userResult.value.pullEvents());
return userResult;
}pullEvents() returns the accumulated events and clears them off the aggregate, so re-saving never re-publishes.
3. Subscribe a listener
In the receiving module's bootstrap (typically called from bootstrap/container.ts or a dedicated register-listeners.ts):
// apps/server/src/modules/notifications/infrastructure/listeners.ts
export const registerNotificationListeners = (deps: {
bus: IEventBus;
jobs: JobScheduler;
}) => {
deps.bus.subscribe('user.created', async (event) => {
await deps.jobs.enqueue('emails', {
kind: 'welcome',
to: { email: event.payload.email },
recipientName: event.payload.recipientName,
appName: 'UseDeploy',
locale: event.payload.locale,
});
});
};The listener does the minimum synchronous work — typically: enqueue a BullMQ job. Heavy work (sending the email, calling a third party) belongs in the worker.
4. Type your event names
Add the event name to a shared union if you want the type system to enforce that you only subscribe to existing events:
type ApplicationEvent =
| { name: 'user.created'; payload: { /* ... */ } }
| { name: 'subscription.activated'; payload: { /* ... */ } };When to use events vs a direct call
| Use an event when | Use a direct call when |
|---|---|
| Multiple modules might react | Exactly one module owns the next step |
| The reaction is best-effort (no transactional guarantee needed) | The next step must succeed for the operation to be considered done |
| The reaction is async / can be deferred | The result is needed for the HTTP response |
The bus is not a message queue. Events fire in-process; on a server crash mid-publish, in-flight events are lost. If you need durability, the listener should enqueue a BullMQ job and persist there. For cross-instance fan-out, layer Redis pub/sub on top — the in-memory bus is the default because most listeners just enqueue.
Testing
Inject InMemoryEventBus from the production code or build a recording double:
const recorded: DomainEvent[] = [];
const bus: IEventBus = {
publish: async (events) => { recorded.push(...events); },
subscribe: () => {},
};Assert against recorded after the use case runs. If the use case forgets to publish, the test fails.