fix(flowchart-workshop): fix LLM streaming for generation and reconnection

- Fix race condition where watch endpoint couldn't find active generation
  because generate hadn't registered yet. Workshop page now triggers
  /generate before connecting to /watch.

- Add polling fallback in watch endpoint (up to 3s) for edge cases where
  generate route is still starting up.

- Add progress panel for regeneration - was missing because the panel
  was only shown when !hasDraft.

- Add comprehensive logging throughout generation pipeline for debugging.

- Improve generation registry with subscriber management and accumulated
  reasoning text for reconnection support.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Thomas Hallock 2026-01-23 13:02:34 -06:00
parent f916358614
commit 325e0f483e
6 changed files with 301 additions and 29 deletions

View File

@ -49,6 +49,12 @@ export async function POST(request: Request, { params }: RouteParams) {
const url = new URL(request.url);
const debug = url.searchParams.get("debug") === "true";
// Always log route hit for debugging
console.log(`[generate] POST /api/flowchart-workshop/sessions/${id}/generate`, {
debug,
timestamp: new Date().toISOString(),
});
if (debug) {
console.log("[generate] Debug mode enabled");
}
@ -162,7 +168,9 @@ export async function POST(request: Request, { params }: RouteParams) {
} | null = null;
// Start tracking this generation in the registry (for reconnection support)
console.log(`[generate] Starting generation registry for session ${id}`);
const generationState = startGeneration(id);
console.log(`[generate] Generation state created`, { sessionId: generationState.sessionId, status: generationState.status });
// Throttled save of reasoning text to database (for durability)
let lastReasoningSaveTime = 0;
@ -226,6 +234,12 @@ Return the result as a JSON object matching the GeneratedFlowchartSchema.`;
// Stream the LLM response with reasoning
// Use debug option to enable detailed logging in the LLM client
console.log(`[generate] Creating LLM stream`, {
provider: "openai",
model: "gpt-5.2",
promptLength: fullPrompt.length,
debug,
});
const llmStream = llm.stream({
provider: "openai",
model: "gpt-5.2",
@ -236,18 +250,26 @@ Return the result as a JSON object matching the GeneratedFlowchartSchema.`;
summary: "auto",
},
timeoutMs: 300_000, // 5 minutes for complex flowchart generation
debug, // Enable LLM client debug logging if debug=true
debug: true, // ALWAYS enable LLM client debug logging for now
});
console.log(`[generate] LLM stream created, starting iteration`);
// Forward all stream events to the client AND broadcast to registry subscribers
// The for-await loop processes all LLM events regardless of client state
let eventCount = 0;
console.log(`[generate] Entering event loop...`);
for await (const event of llmStream as AsyncGenerator<
StreamEvent<GeneratedFlowchart>,
void,
unknown
>) {
eventCount++;
console.log(`[generate] LLM event #${eventCount}:`, event.type);
switch (event.type) {
case "started": {
console.log(`[generate] LLM started event`, { responseId: event.responseId });
const startedData = {
responseId: event.responseId,
message: "Generating flowchart...",
@ -258,6 +280,13 @@ Return the result as a JSON object matching the GeneratedFlowchartSchema.`;
}
case "reasoning": {
if (eventCount <= 5 || eventCount % 50 === 0) {
console.log(`[generate] Reasoning event #${eventCount}`, {
textLength: event.text?.length,
isDelta: event.isDelta,
summaryIndex: event.summaryIndex,
});
}
const reasoningData = {
text: event.text,
summaryIndex: event.summaryIndex,
@ -273,6 +302,12 @@ Return the result as a JSON object matching the GeneratedFlowchartSchema.`;
}
case "output_delta": {
if (eventCount <= 5 || eventCount % 50 === 0) {
console.log(`[generate] Output delta event #${eventCount}`, {
textLength: event.text?.length,
outputIndex: event.outputIndex,
});
}
const outputData = {
text: event.text,
outputIndex: event.outputIndex,
@ -283,7 +318,7 @@ Return the result as a JSON object matching the GeneratedFlowchartSchema.`;
}
case "error":
console.error("[generate] LLM error:", event.message, event.code);
console.error("[generate] LLM error event:", event.message, event.code);
// This is an LLM error, not a client error
llmError = { message: event.message, code: event.code };
sendEvent("error", {
@ -294,11 +329,16 @@ Return the result as a JSON object matching the GeneratedFlowchartSchema.`;
break;
case "complete":
console.log(`[generate] LLM complete event`, {
hasData: !!event.data,
usage: event.usage,
});
finalResult = event.data;
usage = event.usage;
break;
}
}
console.log(`[generate] Event loop finished, total events: ${eventCount}`);
} catch (error) {
// This catch is for unexpected errors (network issues, etc.)
// NOT for client disconnect (those are caught in sendEvent)
@ -310,6 +350,11 @@ Return the result as a JSON object matching the GeneratedFlowchartSchema.`;
// ALWAYS update database based on LLM result, regardless of client connection
// This is the key fix: DB operations happen outside the try-catch for client errors
console.log(`[generate] Post-loop processing`, {
hasError: !!llmError,
hasFinalResult: !!finalResult,
hasUsage: !!usage,
});
if (llmError) {
// LLM failed - update session to error state, clear reasoning
await db

View File

@ -31,6 +31,11 @@ interface RouteParams {
export async function GET(request: Request, { params }: RouteParams) {
const { id } = await params
// Always log route hit for debugging
console.log(`[watch] GET /api/flowchart-workshop/sessions/${id}/watch`, {
timestamp: new Date().toISOString(),
})
if (!id) {
return new Response(JSON.stringify({ error: 'Session ID required' }), {
status: 400,
@ -50,7 +55,7 @@ export async function GET(request: Request, { params }: RouteParams) {
}
// Verify session exists and belongs to user
const session = await db.query.workshopSessions.findFirst({
let session = await db.query.workshopSessions.findFirst({
where: and(eq(schema.workshopSessions.id, id), eq(schema.workshopSessions.userId, userId)),
})
@ -87,11 +92,64 @@ export async function GET(request: Request, { params }: RouteParams) {
}
// Check if there's an active generation in the registry
const activeGeneration = getGeneration(id)
// If not found immediately, poll briefly in case the generate route is still starting up
let activeGeneration = getGeneration(id)
let isActive = isGenerationActive(id)
if (activeGeneration && isGenerationActive(id)) {
console.log(`[watch] Initial generation state check`, {
sessionId: id,
hasActiveGeneration: !!activeGeneration,
isActive,
sessionState: session.state,
})
// If no active generation found AND session is not complete (refining state),
// the generate route may still be starting up. Poll briefly.
// This handles the race condition where watch connects before generate registers.
const shouldPoll = !isActive && session.state !== 'refining'
if (shouldPoll) {
console.log(`[watch] No active generation yet (state=${session.state}), polling...`)
const maxWaitMs = 3000 // Wait up to 3 seconds
const pollIntervalMs = 100
const startTime = Date.now()
while (Date.now() - startTime < maxWaitMs) {
await new Promise((resolve) => setTimeout(resolve, pollIntervalMs))
activeGeneration = getGeneration(id)
isActive = isGenerationActive(id)
if (isActive) {
console.log(`[watch] Found active generation after ${Date.now() - startTime}ms`)
break
}
}
if (!isActive) {
console.log(`[watch] No active generation found after ${maxWaitMs}ms polling`)
}
// Re-fetch session in case state changed during polling (e.g., generation completed)
const refreshedSession = await db.query.workshopSessions.findFirst({
where: and(eq(schema.workshopSessions.id, id), eq(schema.workshopSessions.userId, userId)),
})
if (refreshedSession) {
session = refreshedSession
console.log(`[watch] Refreshed session state: ${session.state}`)
}
}
console.log(`[watch] Final generation state`, {
sessionId: id,
hasActiveGeneration: !!activeGeneration,
isActive,
status: activeGeneration?.status,
accumulatedReasoningLength: activeGeneration?.accumulatedReasoning?.length,
subscriberCount: activeGeneration?.subscribers?.size,
})
if (activeGeneration && isActive) {
// LIVE PATH: Subscribe to the active generation stream
console.log(`[watch] Subscribing to live generation for session ${id}`)
console.log(`[watch] LIVE PATH: Subscribing to live generation for session ${id}`)
// Send accumulated content so far (seeds the client)
sendEvent('state', {
@ -101,8 +159,16 @@ export async function GET(request: Request, { params }: RouteParams) {
})
// Subscribe to live updates
console.log(`[watch] Subscribing to registry for session ${id}`)
let watchEventCount = 0
const unsubscribe = subscribe(id, (event: StreamEvent) => {
watchEventCount++
if (watchEventCount <= 5 || watchEventCount % 50 === 0) {
console.log(`[watch] Received event #${watchEventCount}:`, event.type)
}
if (!clientConnected) {
console.log(`[watch] Client disconnected, unsubscribing`)
unsubscribe?.()
return
}
@ -123,12 +189,14 @@ export async function GET(request: Request, { params }: RouteParams) {
break
}
case 'complete': {
console.log(`[watch] Received complete event, closing stream`)
sendEvent('complete', event.data)
unsubscribe?.()
closeStream()
break
}
case 'error': {
console.log(`[watch] Received error event:`, event.data)
const data = event.data as { message: string }
sendEvent('error', data)
unsubscribe?.()
@ -137,13 +205,18 @@ export async function GET(request: Request, { params }: RouteParams) {
}
}
})
console.log(`[watch] Subscription established, unsubscribe available: ${!!unsubscribe}`)
// Clean up on disconnect
// Note: The stream will stay open until generation completes or client disconnects
// The registry will clean up the subscription if the subscriber throws
} else {
// DB PATH: Generation is not active, return DB state
console.log(`[watch] No active generation for session ${id}, returning DB state`)
console.log(`[watch] DB PATH: No active generation for session ${id}`, {
sessionState: session.state,
hasDraftDefinition: !!session.draftDefinitionJson,
hasReasoningText: !!session.currentReasoningText,
})
// Send current state from DB
sendEvent('state', {

View File

@ -304,14 +304,43 @@ export default function WorkshopPage() {
const watchController = new AbortController()
abortControllerRef.current = watchController
// Connect to watch endpoint
// Determine if we need to trigger generation (session is 'initial' with topic but no draft)
const needsGeneration =
session.state === 'initial' && session.topicDescription && !session.draftDefinitionJson
// Connect to watch endpoint (after triggering generation if needed)
const connectWatch = async () => {
// If session needs generation, trigger it FIRST before connecting to watch
// This ensures the registry entry exists when we connect
if (needsGeneration) {
console.log(`[workshop-client] Triggering generation for session ${sessionId}`)
try {
// Fire-and-forget POST to /generate - don't await the full response
// Just ensure the request starts processing before we connect to watch
fetch(`/api/flowchart-workshop/sessions/${sessionId}/generate`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ topicDescription: session.topicDescription }),
}).catch((err) => {
console.error('[workshop-client] Generate request failed:', err)
})
// Give the server a moment to register the generation
await new Promise((resolve) => setTimeout(resolve, 200))
} catch (err) {
console.error('[workshop-client] Failed to trigger generation:', err)
}
}
console.log(`[workshop-client] Connecting to watch endpoint for session ${sessionId}`)
try {
const response = await fetch(`/api/flowchart-workshop/sessions/${sessionId}/watch`, {
signal: watchController.signal,
})
console.log(`[workshop-client] Watch response:`, { status: response.status, ok: response.ok, hasBody: !!response.body })
if (!response.ok || !response.body) {
console.error(`[workshop-client] Watch connection failed`, { status: response.status })
dispatch({ type: 'STREAM_ERROR', message: 'Failed to connect to watch stream' })
return
}
@ -319,10 +348,15 @@ export default function WorkshopPage() {
const reader = response.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
let watchEventCount = 0
console.log(`[workshop-client] Starting to read watch stream`)
while (true) {
const { done, value } = await reader.read()
if (done) break
if (done) {
console.log(`[workshop-client] Watch stream done, total events: ${watchEventCount}`)
break
}
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
@ -335,6 +369,10 @@ export default function WorkshopPage() {
} else if (line.startsWith('data: ') && currentEvent) {
try {
const data = JSON.parse(line.slice(6))
watchEventCount++
if (watchEventCount <= 5 || watchEventCount % 50 === 0) {
console.log(`[workshop-client] Watch event #${watchEventCount}:`, currentEvent, data?.reasoningText?.length ? `(reasoning: ${data.reasoningText.length} chars)` : '')
}
switch (currentEvent) {
case 'state':
@ -473,6 +511,8 @@ export default function WorkshopPage() {
const handleGenerate = useCallback(async () => {
if (!session?.topicDescription) return
console.log(`[workshop-client] handleGenerate called for session ${sessionId}`)
// Reset and start
dispatch({ type: 'START_STREAMING', streamType: 'generate' })
setIsProgressPanelExpanded(true)
@ -481,6 +521,7 @@ export default function WorkshopPage() {
abortControllerRef.current = new AbortController()
try {
console.log(`[workshop-client] Fetching generate endpoint`)
const response = await fetch(`/api/flowchart-workshop/sessions/${sessionId}/generate`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
@ -488,23 +529,42 @@ export default function WorkshopPage() {
signal: abortControllerRef.current.signal,
})
console.log(`[workshop-client] Generate response received:`, { status: response.status, ok: response.ok })
// Track events for logging
let generateEventCount = 0
// Parse the SSE stream
await parseFlowchartSSE(
response,
{
onStarted: (responseId) => {
generateEventCount++
console.log(`[workshop-client] Generate onStarted:`, responseId)
dispatch({ type: 'STREAM_STARTED', responseId })
},
onProgress: (stage, message) => {
generateEventCount++
console.log(`[workshop-client] Generate onProgress:`, stage, message)
dispatch({ type: 'STREAM_PROGRESS', stage, message })
},
onReasoning: (text, isDelta) => {
generateEventCount++
if (generateEventCount <= 5 || generateEventCount % 50 === 0) {
console.log(`[workshop-client] Generate onReasoning #${generateEventCount}:`, { textLength: text?.length, isDelta })
}
dispatch({ type: 'STREAM_REASONING', text, append: isDelta })
},
onOutputDelta: (text) => {
generateEventCount++
if (generateEventCount <= 5 || generateEventCount % 50 === 0) {
console.log(`[workshop-client] Generate onOutputDelta #${generateEventCount}:`, { textLength: text?.length })
}
dispatch({ type: 'STREAM_OUTPUT', text, append: true })
},
onComplete: (result) => {
generateEventCount++
console.log(`[workshop-client] Generate onComplete`, { totalEvents: generateEventCount, hasResult: !!result })
dispatch({ type: 'STREAM_COMPLETE', result })
// Update session with the generated content
if (isGenerateResult(result)) {
@ -526,20 +586,24 @@ export default function WorkshopPage() {
}
},
onError: (message) => {
console.error(`[workshop-client] Generate onError:`, message)
dispatch({ type: 'STREAM_ERROR', message })
setError(message)
},
onCancelled: () => {
console.log(`[workshop-client] Generate onCancelled`)
dispatch({ type: 'STREAM_CANCELLED' })
},
},
abortControllerRef.current.signal
)
console.log(`[workshop-client] parseFlowchartSSE completed`)
} catch (err) {
if (err instanceof Error && err.name === 'AbortError') {
console.log(`[workshop-client] Generate aborted`)
dispatch({ type: 'STREAM_CANCELLED' })
} else {
console.error('Generation failed:', err)
console.error('[workshop-client] Generation failed:', err)
const message = err instanceof Error ? err.message : 'Generation failed'
dispatch({ type: 'STREAM_ERROR', message })
setError(message)
@ -1208,6 +1272,19 @@ export default function WorkshopPage() {
{isExportingPDF ? 'Exporting...' : 'Download PDF'}
</button>
</div>
{/* Progress panel during regeneration */}
{isGenerating && streamingState.streamType === 'generate' && (
<div className={css({ marginBottom: '4' })}>
<GenerationProgressPanel
isExpanded={isProgressPanelExpanded}
onToggle={() => setIsProgressPanelExpanded(!isProgressPanelExpanded)}
status={streamingState.status}
progressMessage={progressMessage || 'Regenerating flowchart...'}
reasoningText={streamingState.reasoningText}
onCancel={handleCancel}
/>
</div>
)}
<DebugMermaidDiagram
mermaidContent={session.draftMermaidContent || ''}
currentNodeId=""

View File

@ -110,20 +110,9 @@ export function CreateFlowchartModal({ open, onOpenChange }: CreateFlowchartModa
const { session } = await response.json()
// Fire off generation immediately (fire-and-forget)
// The generate route is resilient to client disconnect, so it will
// continue processing even after we navigate away
fetch(`/api/flowchart-workshop/sessions/${session.id}/generate`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ topicDescription }),
}).catch((err) => {
// Log but don't block navigation - the session page will show the error
console.error('Failed to start generation:', err)
})
// Navigate to the workshop immediately
// The page will connect to the watch endpoint and see the live stream
// The workshop page will trigger generation and connect to the watch endpoint
console.log(`[create-modal] Navigating to /flowchart/workshop/${session.id}`)
router.push(`/flowchart/workshop/${session.id}`)
} catch (err) {
setError(err instanceof Error ? err.message : 'Failed to create session')

View File

@ -53,6 +53,13 @@ const activeGenerations = new Map<string, GenerationState>()
*/
export function startGeneration(sessionId: string): GenerationState {
// Clean up any existing state for this session
const existing = activeGenerations.get(sessionId)
if (existing) {
console.log(`[registry] Cleaning up existing generation for session ${sessionId}`, {
previousStatus: existing.status,
previousSubscriberCount: existing.subscribers.size,
})
}
activeGenerations.delete(sessionId)
const state: GenerationState = {
@ -65,6 +72,9 @@ export function startGeneration(sessionId: string): GenerationState {
}
activeGenerations.set(sessionId, state)
console.log(`[registry] Started generation for session ${sessionId}`, {
totalActiveGenerations: activeGenerations.size,
})
return state
}
@ -89,21 +99,50 @@ export function isGenerationActive(sessionId: string): boolean {
*/
export function subscribe(sessionId: string, subscriber: Subscriber): (() => void) | null {
const state = activeGenerations.get(sessionId)
if (!state) return null
if (!state) {
console.log(`[registry] Subscribe failed: no generation state for session ${sessionId}`)
return null
}
state.subscribers.add(subscriber)
console.log(`[registry] New subscriber added for session ${sessionId}`, {
subscriberCount: state.subscribers.size,
status: state.status,
})
return () => {
state.subscribers.delete(subscriber)
console.log(`[registry] Subscriber removed for session ${sessionId}`, {
subscriberCount: state.subscribers.size,
})
}
}
// Track broadcast counts per session to avoid log spam
const broadcastCounts = new Map<string, number>()
/**
* Broadcast an event to all subscribers of a generation
*/
export function broadcast(sessionId: string, event: StreamEvent): void {
const state = activeGenerations.get(sessionId)
if (!state) return
if (!state) {
console.log(`[registry] Broadcast failed: no generation state for session ${sessionId}`)
return
}
// Count broadcasts for this session
const count = (broadcastCounts.get(sessionId) ?? 0) + 1
broadcastCounts.set(sessionId, count)
// Log first few and then periodically
if (count <= 3 || count % 100 === 0) {
console.log(`[registry] Broadcast #${count}`, {
sessionId,
eventType: event.type,
subscriberCount: state.subscribers.size,
})
}
// Update accumulated content
if (event.type === 'reasoning') {
@ -125,7 +164,7 @@ export function broadcast(sessionId: string, event: StreamEvent): void {
subscriber(event)
} catch (err) {
// Remove failed subscribers
console.error('[generation-registry] Subscriber error:', err)
console.error('[registry] Subscriber error:', err)
state.subscribers.delete(subscriber)
}
}
@ -136,7 +175,16 @@ export function broadcast(sessionId: string, event: StreamEvent): void {
*/
export function completeGeneration(sessionId: string, result: unknown): void {
const state = activeGenerations.get(sessionId)
if (!state) return
if (!state) {
console.log(`[registry] completeGeneration: no state for session ${sessionId}`)
return
}
console.log(`[registry] Completing generation for session ${sessionId}`, {
subscriberCount: state.subscribers.size,
accumulatedReasoningLength: state.accumulatedReasoning.length,
accumulatedOutputLength: state.accumulatedOutput.length,
})
state.status = 'complete'
state.result = result
@ -144,8 +192,12 @@ export function completeGeneration(sessionId: string, result: unknown): void {
// Broadcast completion to all subscribers
broadcast(sessionId, { type: 'complete', data: result })
// Clean broadcast count
broadcastCounts.delete(sessionId)
// Clean up after a delay (give subscribers time to receive the event)
setTimeout(() => {
console.log(`[registry] Cleaning up completed generation for session ${sessionId}`)
activeGenerations.delete(sessionId)
}, 5000)
}
@ -155,7 +207,15 @@ export function completeGeneration(sessionId: string, result: unknown): void {
*/
export function failGeneration(sessionId: string, error: string): void {
const state = activeGenerations.get(sessionId)
if (!state) return
if (!state) {
console.log(`[registry] failGeneration: no state for session ${sessionId}`)
return
}
console.log(`[registry] Failing generation for session ${sessionId}`, {
error,
subscriberCount: state.subscribers.size,
})
state.status = 'error'
state.error = error
@ -163,8 +223,12 @@ export function failGeneration(sessionId: string, error: string): void {
// Broadcast error to all subscribers
broadcast(sessionId, { type: 'error', data: { message: error } })
// Clean broadcast count
broadcastCounts.delete(sessionId)
// Clean up after a delay
setTimeout(() => {
console.log(`[registry] Cleaning up failed generation for session ${sessionId}`)
activeGenerations.delete(sessionId)
}, 5000)
}

View File

@ -121,6 +121,8 @@ export async function parseFlowchartSSE(
callbacks: FlowchartSSEEvents,
signal?: AbortSignal
): Promise<void> {
console.log(`[sse-parser] parseFlowchartSSE called`, { status: response.status, ok: response.ok })
if (!response.ok) {
const errorText = await response.text()
let errorMessage = 'Request failed'
@ -130,33 +132,46 @@ export async function parseFlowchartSSE(
} catch {
errorMessage = errorText || errorMessage
}
console.error(`[sse-parser] Response not OK:`, errorMessage)
callbacks.onError?.(errorMessage, String(response.status))
return
}
const reader = response.body?.getReader()
if (!reader) {
console.error(`[sse-parser] No response body`)
callbacks.onError?.('No response body')
return
}
const decoder = new TextDecoder()
let buffer = ''
let chunkCount = 0
let eventCount = 0
console.log(`[sse-parser] Starting to read response body`)
try {
while (true) {
// Check for cancellation
if (signal?.aborted) {
console.log(`[sse-parser] Signal aborted`)
callbacks.onCancelled?.()
break
}
const { done, value } = await reader.read()
chunkCount++
if (done) {
console.log(`[sse-parser] Reader done after ${chunkCount} chunks, ${eventCount} events`)
break
}
if (chunkCount <= 3 || chunkCount % 100 === 0) {
console.log(`[sse-parser] Chunk #${chunkCount} received, size: ${value?.length}`)
}
buffer += decoder.decode(value, { stream: true })
// Process complete events (separated by double newlines)
@ -180,6 +195,11 @@ export async function parseFlowchartSSE(
if (!eventType || !eventData) continue
eventCount++
if (eventCount <= 5 || eventCount % 50 === 0) {
console.log(`[sse-parser] Event #${eventCount}: ${eventType}`)
}
try {
const data = JSON.parse(eventData)
@ -213,17 +233,21 @@ export async function parseFlowchartSSE(
break
}
} catch (parseError) {
console.error('Failed to parse SSE event data:', parseError, eventData)
console.error('[sse-parser] Failed to parse SSE event data:', parseError, eventData)
}
}
}
console.log(`[sse-parser] Reader loop finished normally, total events: ${eventCount}`)
} catch (error) {
if (signal?.aborted) {
console.log(`[sse-parser] Caught abort error`)
callbacks.onCancelled?.()
} else {
console.error(`[sse-parser] Stream error:`, error)
callbacks.onError?.(error instanceof Error ? error.message : 'Stream error')
}
} finally {
console.log(`[sse-parser] Releasing reader lock`)
reader.releaseLock()
}
}