fix(flowchart): implement proper task queue for concurrent example generation

Replace sequential example generation with a proper task queue system that
correctly handles concurrent requests to the Web Worker pool.

Root cause of previous issues: Each worker stored only ONE resolve/reject
callback, so concurrent requests would overwrite each other's callbacks,
causing promises to never resolve or resolve with wrong data.

Solution:
- Add unique requestId to all worker messages for request/response matching
- Implement task queue with dispatch logic for pending work
- Track pending requests in a Map keyed by requestId
- Workers echo back requestId so responses match their originating requests
- Both /flowchart page and workshop page now generate concurrently

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Thomas Hallock 2026-01-20 20:51:36 -06:00
parent de720ab39b
commit e9c4bb1ed8
7 changed files with 277 additions and 171 deletions

View File

@ -90,7 +90,11 @@
"Bash(python3 -m py_compile:*)",
"Bash(python3:*)",
"Bash(while read f)",
"Bash(do sed -i '' 's/var\\(--nav-height\\)/var\\(--app-nav-height, 72px\\)/g' \"$f\" echo \"Fixed: $f\" done)"
"Bash(do sed -i '' 's/var\\(--nav-height\\)/var\\(--app-nav-height, 72px\\)/g' \"$f\" echo \"Fixed: $f\" done)",
"mcp__sqlite__list_tables",
"mcp__sqlite__describe_table",
"mcp__sqlite__write_query",
"Bash(git diff:*)"
],
"deny": [],
"ask": []

View File

@ -180,7 +180,7 @@ export default function FlowchartPickerPage() {
}, [])
// Generate examples for ALL flowcharts (published + drafts) in a single unified effect
// Processes sequentially because the worker pool cancels concurrent requests
// Uses concurrent generation - the worker pool properly queues requests via request IDs
useEffect(() => {
if (isLoadingPublished || isLoadingDrafts) return
@ -233,50 +233,49 @@ export default function FlowchartPickerPage() {
generatingIdsRef.current.add(item.id)
}
// Generate examples sequentially
async function generateAll() {
for (const item of itemsToGenerate) {
try {
let executable: ExecutableFlowchart
let mermaidContent: string | undefined
// Generate examples for a single item
async function generateForItem(item: ItemToGenerate): Promise<void> {
try {
let executable: ExecutableFlowchart
let mermaidContent: string | undefined
if (item.type === 'published-hardcoded') {
const flowchartData = getFlowchart(item.id)
if (!flowchartData) continue
executable = await loadFlowchart(flowchartData.definition, flowchartData.mermaid)
mermaidContent = flowchartData.mermaid
} else if (item.type === 'published-database') {
const response = await fetch(`/api/flowcharts/${item.id}`)
if (!response.ok) continue
const data = await response.json()
const { definition, mermaid } = data.flowchart as {
definition: FlowchartDefinition
mermaid: string
}
executable = await loadFlowchart(definition, mermaid)
mermaidContent = mermaid
} else {
// Draft
const definition: FlowchartDefinition = JSON.parse(item.definitionJson!)
executable = await loadFlowchart(definition, item.mermaidContent!)
mermaidContent = item.mermaidContent
if (item.type === 'published-hardcoded') {
const flowchartData = getFlowchart(item.id)
if (!flowchartData) return
executable = await loadFlowchart(flowchartData.definition, flowchartData.mermaid)
mermaidContent = flowchartData.mermaid
} else if (item.type === 'published-database') {
const response = await fetch(`/api/flowcharts/${item.id}`)
if (!response.ok) return
const data = await response.json()
const { definition, mermaid } = data.flowchart as {
definition: FlowchartDefinition
mermaid: string
}
const diagnosticReport = diagnoseFlowchart(executable.definition, mermaidContent)
const examples = await generateExamplesAsync(executable, 10, {})
setCardExamples((prev) =>
new Map(prev).set(item.id, { flowchart: executable, examples, diagnosticReport })
)
} catch (err) {
console.error(`[Examples] Failed to generate for ${item.title}:`, err)
} finally {
generatingIdsRef.current.delete(item.id)
executable = await loadFlowchart(definition, mermaid)
mermaidContent = mermaid
} else {
// Draft
const definition: FlowchartDefinition = JSON.parse(item.definitionJson!)
executable = await loadFlowchart(definition, item.mermaidContent!)
mermaidContent = item.mermaidContent
}
const diagnosticReport = diagnoseFlowchart(executable.definition, mermaidContent)
const examples = await generateExamplesAsync(executable, 10, {})
setCardExamples((prev) =>
new Map(prev).set(item.id, { flowchart: executable, examples, diagnosticReport })
)
} catch (err) {
console.error(`[Examples] Failed to generate for ${item.title}:`, err)
} finally {
generatingIdsRef.current.delete(item.id)
}
}
generateAll()
// Generate all examples concurrently - the worker pool handles queuing
Promise.all(itemsToGenerate.map(generateForItem))
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [isLoadingPublished, isLoadingDrafts, publishedFlowcharts, draftSessions, draftDiagnostics])
@ -1005,7 +1004,9 @@ export default function FlowchartPickerPage() {
onClick={() => handleResumeDraft(session.id)}
flowchart={cardData?.flowchart}
examples={cardData?.examples}
diagnosticReport={cardData?.diagnosticReport ?? draftDiagnostics.get(session.id)}
diagnosticReport={
cardData?.diagnosticReport ?? draftDiagnostics.get(session.id)
}
actions={[
{
label: 'Edit',

View File

@ -86,19 +86,6 @@ export default function WorkshopPage() {
// Examples for worksheet generation
const [worksheetExamples, setWorksheetExamples] = useState<GeneratedExample[]>([])
// Track if WorksheetDebugPanel is generating (to sequence with FlowchartExampleGrid)
// Initialize to true since we start on worksheet tab - WorksheetDebugPanel will set to false when done
const [isDebugPanelGenerating, setIsDebugPanelGenerating] = useState(true)
const handleDebugPanelGenerationStart = useCallback(() => setIsDebugPanelGenerating(true), [])
const handleDebugPanelGenerationComplete = useCallback(() => setIsDebugPanelGenerating(false), [])
// Reset generation state when flowchart changes (to re-sequence generation)
useEffect(() => {
if (executableFlowchart && activeTab === 'worksheet') {
setIsDebugPanelGenerating(true)
}
}, [executableFlowchart?.definition.id, activeTab])
// Generate examples when flowchart changes
useEffect(() => {
if (!executableFlowchart) {
@ -1216,12 +1203,7 @@ export default function WorkshopPage() {
Create PDF Worksheet
</button>
{/* Debug Panel - shows generated examples with answers */}
<WorksheetDebugPanel
flowchart={executableFlowchart}
problemCount={10}
onGenerationStart={handleDebugPanelGenerationStart}
onGenerationComplete={handleDebugPanelGenerationComplete}
/>
<WorksheetDebugPanel flowchart={executableFlowchart} problemCount={10} />
</div>
)}
{activeTab === 'worksheet' && !executableFlowchart && (
@ -1251,7 +1233,6 @@ export default function WorkshopPage() {
// Navigate to test page - it will use the passed values
router.push(`/flowchart/workshop/${sessionId}/test`)
}}
waitForReady={activeTab === 'worksheet' && isDebugPanelGenerating}
/>
</div>
</div>
@ -1678,13 +1659,10 @@ function ExamplesTab({
definition,
flowchart,
onTestExample,
waitForReady = false,
}: {
definition: FlowchartDefinition | null
flowchart: ExecutableFlowchart | null
onTestExample: (values: Record<string, ProblemValue>) => void
/** When true, wait before generating examples (to sequence with other generators) */
waitForReady?: boolean
}) {
if (!definition) {
return (
@ -1709,7 +1687,6 @@ function ExamplesTab({
onSelect={onTestExample}
compact={true}
enableCaching={false} // Don't cache in workshop - always show fresh examples
waitForReady={waitForReady}
/>
<p
className={css({

View File

@ -40,12 +40,6 @@ export interface FlowchartExampleGridProps {
showDifficultyFilter?: boolean
/** Compact mode for smaller displays (default: false) */
compact?: boolean
/**
* When true, wait before generating examples. Use this to sequence generation
* with other components that share the web worker pool.
* When undefined/false, generation starts immediately.
*/
waitForReady?: boolean
}
/**
@ -68,7 +62,6 @@ export function FlowchartExampleGrid({
showDice = true,
showDifficultyFilter = true,
compact = false,
waitForReady = false,
}: FlowchartExampleGridProps) {
// Displayed examples
const [displayedExamples, setDisplayedExamples] = useState<GeneratedExample[]>([])
@ -110,15 +103,7 @@ export function FlowchartExampleGrid({
}, [flowchart, analysis])
// Load/generate examples on mount or when flowchart changes
// If waitForReady is true, wait until it becomes false before generating
useEffect(() => {
// If we need to wait, don't start generation yet
if (waitForReady) {
setIsLoading(true)
setError(null)
return
}
// Create a flag to track if this effect is still active (for cleanup)
let isActive = true
@ -174,7 +159,7 @@ export function FlowchartExampleGrid({
return () => {
isActive = false
}
}, [flowchart, exampleCount, constraints, storageKey, waitForReady])
}, [flowchart, exampleCount, constraints, storageKey])
// Calculate difficulty range for visual indicators
const difficultyRange = useMemo(() => {
@ -394,7 +379,7 @@ export function FlowchartExampleGrid({
color: { base: 'gray.500', _dark: 'gray.400' },
})}
>
{waitForReady ? 'Waiting...' : 'Generating examples...'}
Generating examples...
</div>
)
}

View File

@ -13,10 +13,6 @@ interface WorksheetDebugPanelProps {
flowchart: ExecutableFlowchart
/** Number of problems to generate (default: 10) */
problemCount?: number
/** Called when example generation starts */
onGenerationStart?: () => void
/** Called when example generation completes (success or error) */
onGenerationComplete?: () => void
}
/** Difficulty tier type */
@ -26,12 +22,7 @@ type DifficultyTier = 'easy' | 'medium' | 'hard'
* Debug panel for testing worksheet generation.
* Shows generated problems with their computed answers, raw values, and difficulty tiers.
*/
export function WorksheetDebugPanel({
flowchart,
problemCount = 10,
onGenerationStart,
onGenerationComplete,
}: WorksheetDebugPanelProps) {
export function WorksheetDebugPanel({ flowchart, problemCount = 10 }: WorksheetDebugPanelProps) {
const [examples, setExamples] = useState<GeneratedExample[]>([])
const [isLoading, setIsLoading] = useState(true)
const [error, setError] = useState<string | null>(null)
@ -46,7 +37,6 @@ export function WorksheetDebugPanel({
const generateExamples = useCallback(async () => {
setIsLoading(true)
setError(null)
onGenerationStart?.()
try {
const newExamples = await generateExamplesAsync(flowchart, problemCount, {
positiveAnswersOnly: false,
@ -57,9 +47,8 @@ export function WorksheetDebugPanel({
setError(err instanceof Error ? err.message : 'Failed to generate examples')
} finally {
setIsLoading(false)
onGenerationComplete?.()
}
}, [flowchart, problemCount, onGenerationStart, onGenerationComplete])
}, [flowchart, problemCount])
// Calculate difficulty range
const difficultyRange = useMemo(() => {

View File

@ -3,6 +3,8 @@
*
* Provides a promise-based API for generating examples off the main thread,
* with support for parallel execution across multiple workers.
*
* Uses a task queue to properly handle concurrent requests without callback clobbering.
*/
import type { ExecutableFlowchart } from './schema'
@ -15,15 +17,58 @@ import type { WorkerResponse } from './example-generator.worker'
const WORKER_COUNT =
typeof navigator !== 'undefined' ? Math.max(2, Math.min(navigator.hardwareConcurrency - 1, 6)) : 4
// Unique ID generator for requests
let nextRequestId = 0
function generateRequestId(): string {
return `req-${nextRequestId++}-${Date.now()}`
}
/**
* A task to be processed by a worker
*/
interface WorkerTask {
requestId: string
type: 'generate' | 'generate-partial'
flowchart: ExecutableFlowchart
constraints: GenerationConstraints
// For 'generate' type
count?: number
// For 'generate-partial' type
pathIndices?: number[]
}
/**
* Tracks a pending request's state
*/
interface PendingRequest {
requestId: string
totalTasks: number
completedTasks: number
results: GeneratedExample[][] // Results from each task
errors: Error[]
resolve: (examples: GeneratedExample[]) => void
reject: (error: Error) => void
count: number // Final count for selection
}
/**
* Worker state - just tracks if busy
*/
interface WorkerState {
worker: Worker
busy: boolean
resolve: ((examples: GeneratedExample[]) => void) | null
reject: ((error: Error) => void) | null
currentRequestId: string | null // Which request this worker is processing
}
// The worker pool
let workerPool: WorkerState[] | null = null
// Task queue - tasks waiting to be processed
const taskQueue: WorkerTask[] = []
// Pending requests - map of requestId to request state
const pendingRequests = new Map<string, PendingRequest>()
/**
* Initialize the worker pool lazily
*/
@ -35,29 +80,17 @@ function getWorkerPool(): WorkerState[] {
const state: WorkerState = {
worker,
busy: false,
resolve: null,
reject: null,
currentRequestId: null,
}
worker.onmessage = (event: MessageEvent<WorkerResponse>) => {
const { type } = event.data
if (type === 'result') {
state.resolve?.(event.data.examples)
} else if (type === 'error') {
state.reject?.(new Error(event.data.message))
}
state.busy = false
state.resolve = null
state.reject = null
handleWorkerResponse(state, event.data)
}
worker.onerror = (error) => {
console.error('Web worker error:', error)
const message = error.message || 'Web worker failed to load or execute'
state.reject?.(new Error(message))
state.busy = false
state.resolve = null
state.reject = null
handleWorkerError(state, new Error(message))
}
workerPool.push(state)
@ -66,10 +99,117 @@ function getWorkerPool(): WorkerState[] {
return workerPool
}
/**
* Handle a response from a worker
*/
function handleWorkerResponse(workerState: WorkerState, response: WorkerResponse): void {
const { requestId } = response
const request = pendingRequests.get(requestId)
// Mark worker as free
workerState.busy = false
workerState.currentRequestId = null
if (request) {
if (response.type === 'result') {
request.results.push(response.examples)
request.completedTasks++
} else if (response.type === 'error') {
request.errors.push(new Error(response.message))
request.completedTasks++
}
// Check if all tasks for this request are complete
if (request.completedTasks >= request.totalTasks) {
pendingRequests.delete(requestId)
if (request.errors.length > 0 && request.results.length === 0) {
// All tasks failed
request.reject(request.errors[0])
} else {
// Merge all results and finalize
const allExamples = request.results.flat()
const finalExamples = mergeAndFinalizeExamples(allExamples, request.count)
request.resolve(finalExamples)
}
}
}
// Try to dispatch next task from queue
dispatchNextTask()
}
/**
* Handle an error from a worker (not a task error, but a worker failure)
*/
function handleWorkerError(workerState: WorkerState, error: Error): void {
const requestId = workerState.currentRequestId
workerState.busy = false
workerState.currentRequestId = null
if (requestId) {
const request = pendingRequests.get(requestId)
if (request) {
request.errors.push(error)
request.completedTasks++
// Check if all tasks for this request are complete
if (request.completedTasks >= request.totalTasks) {
pendingRequests.delete(requestId)
if (request.errors.length > 0 && request.results.length === 0) {
request.reject(request.errors[0])
} else {
const allExamples = request.results.flat()
const finalExamples = mergeAndFinalizeExamples(allExamples, request.count)
request.resolve(finalExamples)
}
}
}
}
// Try to dispatch next task from queue
dispatchNextTask()
}
/**
* Find a free worker and dispatch the next task from the queue
*/
function dispatchNextTask(): void {
if (taskQueue.length === 0) return
const pool = getWorkerPool()
const freeWorker = pool.find((w) => !w.busy)
if (!freeWorker) return
const task = taskQueue.shift()!
freeWorker.busy = true
freeWorker.currentRequestId = task.requestId
if (task.type === 'generate') {
freeWorker.worker.postMessage({
type: 'generate',
requestId: task.requestId,
flowchart: task.flowchart,
count: task.count,
constraints: task.constraints,
})
} else {
freeWorker.worker.postMessage({
type: 'generate-partial',
requestId: task.requestId,
flowchart: task.flowchart,
pathIndices: task.pathIndices,
constraints: task.constraints,
})
}
}
/**
* Generate diverse examples using parallel Web Workers.
*
* Splits paths across multiple workers for faster generation.
* Properly handles concurrent requests via a task queue.
*/
export async function generateExamplesAsync(
flowchart: ExecutableFlowchart,
@ -77,82 +217,83 @@ export async function generateExamplesAsync(
constraints: GenerationConstraints
): Promise<GeneratedExample[]> {
const pool = getWorkerPool()
const requestId = generateRequestId()
// Analyze flowchart to get path count
const analysis = analyzeFlowchart(flowchart)
const totalPaths = analysis.paths.length
// If very few paths, just use single worker (overhead not worth it)
// If very few paths, just use single task
if (totalPaths <= 2) {
return generateExamplesSingleWorker(flowchart, count, constraints)
return new Promise((resolve, reject) => {
// Create request tracking
const request: PendingRequest = {
requestId,
totalTasks: 1,
completedTasks: 0,
results: [],
errors: [],
resolve,
reject,
count,
}
pendingRequests.set(requestId, request)
// Add single task to queue
taskQueue.push({
requestId,
type: 'generate',
flowchart,
count,
constraints,
})
// Try to dispatch immediately
dispatchNextTask()
})
}
// Split paths among workers
// Split paths among workers - create multiple tasks
const pathsPerWorker = Math.ceil(totalPaths / pool.length)
const workerTasks: Promise<GeneratedExample[]>[] = []
const tasks: WorkerTask[] = []
for (let i = 0; i < pool.length; i++) {
const startIdx = i * pathsPerWorker
const endIdx = Math.min(startIdx + pathsPerWorker, totalPaths)
if (startIdx >= totalPaths) break // No more paths to assign
if (startIdx >= totalPaths) break
const pathIndices = Array.from({ length: endIdx - startIdx }, (_, j) => startIdx + j)
const workerState = pool[i]
const task = new Promise<GeneratedExample[]>((resolve, reject) => {
workerState.resolve = resolve
workerState.reject = reject
workerState.busy = true
workerState.worker.postMessage({
type: 'generate-partial',
flowchart,
pathIndices,
constraints,
})
})
workerTasks.push(task)
}
// Wait for all workers to complete
const results = await Promise.all(workerTasks)
// Merge results from all workers
const allExamples = results.flat()
// Finalize selection (fast, done on main thread)
return mergeAndFinalizeExamples(allExamples, count)
}
/**
* Fallback to single worker for simple flowcharts
*/
function generateExamplesSingleWorker(
flowchart: ExecutableFlowchart,
count: number,
constraints: GenerationConstraints
): Promise<GeneratedExample[]> {
const pool = getWorkerPool()
const workerState = pool[0]
return new Promise((resolve, reject) => {
// Cancel any pending request
if (workerState.reject) {
workerState.reject(new Error('Cancelled by new request'))
}
workerState.resolve = resolve
workerState.reject = reject
workerState.busy = true
workerState.worker.postMessage({
type: 'generate',
tasks.push({
requestId,
type: 'generate-partial',
flowchart,
count,
pathIndices,
constraints,
})
}
return new Promise((resolve, reject) => {
// Create request tracking
const request: PendingRequest = {
requestId,
totalTasks: tasks.length,
completedTasks: 0,
results: [],
errors: [],
resolve,
reject,
count,
}
pendingRequests.set(requestId, request)
// Add all tasks to queue
taskQueue.push(...tasks)
// Try to dispatch as many as we can
for (let i = 0; i < tasks.length; i++) {
dispatchNextTask()
}
})
}
@ -166,4 +307,7 @@ export function terminateExampleWorkers(): void {
}
workerPool = null
}
// Clear any pending state
taskQueue.length = 0
pendingRequests.clear()
}

View File

@ -15,6 +15,7 @@ import { generateDiverseExamples, generateExamplesForPaths } from './loader'
export interface GenerateExamplesRequest {
type: 'generate'
requestId: string // Unique ID to match response to request
flowchart: ExecutableFlowchart
count: number
constraints: GenerationConstraints
@ -22,6 +23,7 @@ export interface GenerateExamplesRequest {
export interface GenerateExamplesPartialRequest {
type: 'generate-partial'
requestId: string // Unique ID to match response to request
flowchart: ExecutableFlowchart
pathIndices: number[] // Which paths this worker should process
constraints: GenerationConstraints
@ -29,11 +31,13 @@ export interface GenerateExamplesPartialRequest {
export interface GenerateExamplesResponse {
type: 'result'
requestId: string // Echo back the request ID
examples: GeneratedExample[]
}
export interface GenerateExamplesError {
type: 'error'
requestId: string // Echo back the request ID
message: string
}
@ -43,22 +47,24 @@ export type WorkerResponse = GenerateExamplesResponse | GenerateExamplesError
// Handle incoming messages
self.onmessage = (event: MessageEvent<WorkerMessage>) => {
const data = event.data
const { requestId } = data
try {
if (data.type === 'generate') {
// Full generation mode
const examples = generateDiverseExamples(data.flowchart, data.count, data.constraints)
const response: GenerateExamplesResponse = { type: 'result', examples }
const response: GenerateExamplesResponse = { type: 'result', requestId, examples }
self.postMessage(response)
} else if (data.type === 'generate-partial') {
// Partial generation mode - only process assigned paths
const examples = generateExamplesForPaths(data.flowchart, data.pathIndices, data.constraints)
const response: GenerateExamplesResponse = { type: 'result', examples }
const response: GenerateExamplesResponse = { type: 'result', requestId, examples }
self.postMessage(response)
}
} catch (error) {
const response: GenerateExamplesError = {
type: 'error',
requestId,
message: error instanceof Error ? error.message : 'Unknown error',
}
self.postMessage(response)