Event-Driven Architecture
Introduzione
Quando hai piu' server MCP che operano nello stesso dominio, nasce l'esigenza di coordinamento. L'architettura event-driven permette ai server di comunicare in modo disaccoppiato: un server pubblica un evento, altri server reagiscono senza conoscersi direttamente.
Il Problema del Coordinamento
Immagina due server: time-tracking (traccia ore lavorate) e project-economics (gestisce budget). Quando si logga del tempo, il budget dovrebbe aggiornarsi. Senza eventi:
ACCOPPIAMENTO DIRETTO (da evitare)
time-tracking project-economics
| |
| ── import e chiamata diretta -> |
| store.updateBudget(...) |
| |
I server si conoscono e dipendono l'uno dall'altro.
Aggiungere un terzo server richiede modificare time-tracking.
Con un event bus:
DISACCOPPIAMENTO VIA EVENTI
time-tracking EventBus project-economics
| | |
| ── publish ──> | |
| "time:logged" | ── deliver ────────────────> |
| | "time:logged" |
| | |
| | retrospective-manager |
| | | |
| | ── deliver ────────────────> |
| | "time:logged" |
I server non si conoscono. Il time-tracking pubblica un evento e non sa chi lo riceve. Aggiungere un nuovo subscriber non richiede modifiche al publisher.
Interfaccia EventBus
Definisci un'interfaccia astratta per il bus:
// bus.ts
export type EventHandler<T = unknown> = (payload: T) => void | Promise<void>;
export type PatternHandler = (event: string, payload: unknown) => void | Promise<void>;
export interface EventBus {
/** Pubblica un evento con il suo payload */
publish(event: string, payload: unknown): Promise<void>;
/** Sottoscriviti a un evento specifico */
subscribe(event: string, handler: EventHandler): () => void;
/** Sottoscriviti con pattern wildcard (es. "time:*") */
subscribePattern(pattern: string, handler: PatternHandler): () => void;
/** Rimuovi tutte le sottoscrizioni */
clear(): void;
}
Il metodo subscribe ritorna una funzione di unsubscribe, utile per il cleanup.
Implementazione Locale (In-Process)
Per server che girano nello stesso processo, un EventEmitter di Node.js basta:
// local-bus.ts
import { EventEmitter } from "node:events";
import micromatch from "micromatch";
import type { EventBus, EventHandler, PatternHandler } from "./bus.js";
interface PatternSubscription {
pattern: string;
handler: PatternHandler;
}
export class LocalEventBus implements EventBus {
private emitter = new EventEmitter();
private patternSubs: PatternSubscription[] = [];
constructor() {
this.emitter.setMaxListeners(100);
}
async publish(event: string, payload: unknown): Promise<void> {
// Notifica subscriber esatti
this.emitter.emit(event, payload);
// Notifica subscriber pattern
for (const sub of this.patternSubs) {
if (micromatch.isMatch(event, sub.pattern)) {
try {
await sub.handler(event, payload);
} catch {
// Errori nei subscriber non bloccano la pubblicazione
}
}
}
}
subscribe(event: string, handler: EventHandler): () => void {
this.emitter.on(event, handler);
return () => this.emitter.off(event, handler);
}
subscribePattern(pattern: string, handler: PatternHandler): () => void {
const sub: PatternSubscription = { pattern, handler };
this.patternSubs.push(sub);
return () => {
const index = this.patternSubs.indexOf(sub);
if (index >= 0) this.patternSubs.splice(index, 1);
};
}
clear(): void {
this.emitter.removeAllListeners();
this.patternSubs = [];
}
}
micromatch supporta glob pattern: "time:*" matcha "time:logged", "time:anomaly", etc. Il pattern "*" matcha qualsiasi evento.
Type-Safety sugli Eventi
Definisci una mappa di eventi tipizzati:
// events.ts
export interface EventMap {
// Time tracking
"time:entry-logged": {
taskId: string;
durationMinutes: number;
userId?: string;
};
"time:anomaly-detected": {
type: string;
description: string;
userId?: string;
};
// Project economics
"economics:budget-alert": {
projectName: string;
percentageUsed: number;
severity: "warning" | "critical";
};
"economics:cost-updated": {
projectName: string;
totalSpent: number;
};
// Sprint
"scrum:sprint-completed": {
sprintName: string;
completedPoints: number;
totalPoints: number;
};
}
export type EventName = keyof EventMap;
export type EventPayload<E extends EventName> = EventMap[E];
Rendi l'interfaccia EventBus type-safe:
export interface TypedEventBus {
publish<E extends EventName>(event: E, payload: EventPayload<E>): Promise<void>;
subscribe<E extends EventName>(event: E, handler: (payload: EventPayload<E>) => void): () => void;
subscribePattern(pattern: string, handler: PatternHandler): () => void;
clear(): void;
}
Ora il compilatore verifica che ogni publish() invii il payload corretto:
// Corretto: payload matcha EventMap["time:entry-logged"]
eventBus.publish("time:entry-logged", {
taskId: "TASK-1",
durationMinutes: 60,
});
// Errore TypeScript: 'durationMinutes' mancante
eventBus.publish("time:entry-logged", {
taskId: "TASK-1",
});
Wiring: Pubblicazione negli Handler dei Tool
Il pattern standard e' passare eventBus opzionale ai tool:
// tools/log-time.ts
import type { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import type { EventBus } from "../bus.js";
import type { TimeStore } from "../services/time-store.js";
import { z } from "zod";
export function registerLogTime(
server: McpServer,
store: TimeStore,
eventBus?: EventBus, // Opzionale: funziona anche senza
): void {
server.tool(
"log-time",
"Registra tempo lavorato su un task",
{
taskId: z.string(),
durationMinutes: z.number().int().positive(),
},
async ({ taskId, durationMinutes }) => {
try {
const entry = store.logTime({ taskId, durationMinutes });
// Fire-and-forget: pubblica l'evento senza attendere i subscriber
eventBus?.publish("time:entry-logged", {
taskId,
durationMinutes,
});
return {
content: [{ type: "text", text: JSON.stringify(entry, null, 2) }],
};
} catch (error) {
return {
content: [{
type: "text",
text: `Errore: ${error instanceof Error ? error.message : String(error)}`,
}],
isError: true,
};
}
},
);
}
Pattern Fire-and-Forget
eventBus?.publish("time:entry-logged", payload);
?.(optional chaining): seeventBuse'undefined, non fa nulla- Non si usa
await: la pubblicazione non deve rallentare il tool - Errori nei subscriber sono catturati dal bus, mai propagati al tool
Collaboration Handlers
I server che reagiscono agli eventi definiscono i loro handler in un file collaboration.ts:
// collaboration.ts
import type { EventBus } from "../bus.js";
import type { EconomicsStore } from "../services/economics-store.js";
export function setupCollaborationHandlers(
eventBus: EventBus,
store: EconomicsStore,
): void {
// Quando il time-tracking logga tempo, aggiorna i costi
eventBus.subscribe("time:entry-logged", (payload) => {
const data = payload as {
taskId: string;
durationMinutes: number;
};
const hourlyRate = 50; // Default rate
const cost = (data.durationMinutes / 60) * hourlyRate;
store.addLaborCost(data.taskId, cost);
});
// Quando uno sprint finisce, salva una snapshot per analisi
eventBus.subscribe("scrum:sprint-completed", (payload) => {
const data = payload as {
sprintName: string;
completedPoints: number;
};
store.saveSprintSnapshot(data.sprintName, data.completedPoints);
});
}
Wiring nella Server Factory
// server.ts
export function createProjectEconomicsServer(options?: {
eventBus?: EventBus;
storeOptions?: { inMemory?: boolean };
}) {
const server = new McpServer({
name: "project-economics",
version: "1.0.0",
});
const store = new EconomicsStore(options?.storeOptions);
registerSetBudget(server, store);
registerLogCost(server, store, options?.eventBus);
// Attiva la collaborazione solo se c'e' un event bus
if (options?.eventBus) {
setupCollaborationHandlers(options.eventBus, store);
}
return { server, store };
}
Pattern Wildcard: Workflow Orchestrator
Il pattern subscribePattern("*", handler) permette di intercettare tutti gli eventi. E' il cuore di un workflow orchestrator:
// Il workflow orchestrator ascolta TUTTI gli eventi
eventBus.subscribePattern("*", async (event, payload) => {
// Cerca workflow il cui trigger matcha questo evento
const workflows = store.getActiveWorkflowsByTrigger(event);
for (const workflow of workflows) {
// Verifica le condizioni del trigger
if (matchesConditions(workflow.triggerConditions, payload)) {
await executeWorkflow(workflow, payload);
}
}
});
MockEventBus per i Test
Per i test, usa un bus che registra gli eventi pubblicati:
export class MockEventBus implements EventBus {
public published: Array<{ event: string; payload: unknown }> = [];
async publish(event: string, payload: unknown): Promise<void> {
this.published.push({ event, payload });
}
subscribe(): () => void {
return () => {};
}
subscribePattern(): () => void {
return () => {};
}
clear(): void {
this.published = [];
}
// Utility per asserzioni
wasPublished(eventName: string): boolean {
return this.published.some((e) => e.event === eventName);
}
getPublishedEvents(eventName: string) {
return this.published.filter((e) => e.event === eventName);
}
}
Test:
it("should publish time:entry-logged event", async () => {
const eventBus = new MockEventBus();
const { server } = createTimeTrackingServer({ eventBus, storeOptions: { inMemory: true } });
const harness = await createTestHarness(server);
await harness.client.callTool({
name: "log-time",
arguments: { taskId: "TASK-1", durationMinutes: 60 },
});
expect(eventBus.wasPublished("time:entry-logged")).toBe(true);
const events = eventBus.getPublishedEvents("time:entry-logged");
expect(events[0].payload).toEqual({
taskId: "TASK-1",
durationMinutes: 60,
});
});
Riepilogo
In questo capitolo hai imparato:
- Perche' l'architettura event-driven e' essenziale per server multipli
- Come implementare un EventBus locale con EventEmitter + micromatch
- Type-safety sugli eventi con EventMap tipizzata
- Il pattern fire-and-forget per pubblicare eventi dai tool
- Collaboration handlers per reagire agli eventi di altri server
- Pattern wildcard per orchestratori
- MockEventBus per testing
Prossimo: Comunicazione Cross-Server