Usar el event bus
Publicar un evento de dominio desde un aggregate, suscribir un listener que reaccione en otro módulo.
El event bus es in-process y en memoria. Existe para mantener los módulos desacoplados a nivel de código — iam no importa notifications; en cambio, levanta user.created y un listener en notifications reacciona.
Implementación
apps/server/src/infrastructure/events/event-bus.ts:
export interface IEventBus {
publish(events: ReadonlyArray<DomainEvent>): Promise<void>;
subscribe(eventName: string, handler: EventHandler): void;
}
export class InMemoryEventBus implements IEventBus { /* ... */ }Un handler que tira excepción es logueado y absorbido — un mal listener no falla el aggregate que publica.
1. Levantar un evento desde un aggregate
Dentro del método de dominio del aggregate:
this.addDomainEvent({
name: 'user.created',
aggregateId: this.id,
occurredAt: new Date(),
payload: { email: this.email.value, locale: this.locale },
});Los eventos se acumulan en el aggregate pero todavía no se publican.
2. Publicar en el use case
Después de persistir el aggregate:
async execute(input: RegisterInput): Promise<Result<User, DomainError>> {
const userResult = User.create(input);
if (userResult.isErr()) return userResult;
await this.users.save(userResult.value);
await this.bus.publish(userResult.value.pullEvents());
return userResult;
}pullEvents() devuelve los eventos acumulados y los borra del aggregate, así que re-guardar nunca re-publica.
3. Suscribir un listener
En el bootstrap del módulo receptor (típicamente llamado desde bootstrap/container.ts o un register-listeners.ts dedicado):
// apps/server/src/modules/notifications/infrastructure/listeners.ts
export const registerNotificationListeners = (deps: {
bus: IEventBus;
jobs: JobScheduler;
}) => {
deps.bus.subscribe('user.created', async (event) => {
await deps.jobs.enqueue('emails', {
kind: 'welcome',
to: { email: event.payload.email },
recipientName: event.payload.recipientName,
appName: 'UseDeploy',
locale: event.payload.locale,
});
});
};El listener hace el trabajo síncrono mínimo — típicamente: encolar un job de BullMQ. El trabajo pesado (mandar el email, llamar a un tercero) va en el worker.
4. Tipá los nombres de eventos
Agregá el nombre del evento a un union compartido si querés que el sistema de tipos imponga que sólo te suscribas a eventos existentes:
type ApplicationEvent =
| { name: 'user.created'; payload: { /* ... */ } }
| { name: 'subscription.activated'; payload: { /* ... */ } };Cuándo usar eventos vs llamada directa
| Usá un evento cuando | Usá llamada directa cuando |
|---|---|
| Múltiples módulos podrían reaccionar | Exactamente un módulo es dueño del próximo paso |
| La reacción es best-effort (no se necesita garantía transaccional) | El próximo paso debe tener éxito para que la operación se considere hecha |
| La reacción es async / puede diferirse | El resultado se necesita para la respuesta HTTP |
El bus no es una message queue. Los eventos disparan in-process; ante un crash del server a mitad del publish, los eventos in-flight se pierden. Si necesitás durabilidad, el listener debería encolar un job de BullMQ y persistir ahí. Para fan-out cross-instance, montá Redis pub/sub encima — el bus en memoria es el default porque la mayoría de los listeners simplemente encolan.
Testing
Inyectá InMemoryEventBus desde el código de producción o construí un double que registre:
const recorded: DomainEvent[] = [];
const bus: IEventBus = {
publish: async (events) => { recorded.push(...events); },
subscribe: () => {},
};Hacé asserts contra recorded después de que corre el use case. Si el use case se olvida de publicar, el test falla.