diff --git a/src/backend/src/index.ts b/src/backend/src/index.ts index 9cf2767..cb1e92f 100644 --- a/src/backend/src/index.ts +++ b/src/backend/src/index.ts @@ -16,6 +16,8 @@ import lifespanRouter from './routes/lifespan'; import cognitiveRouter from './routes/cognitive'; import interventionsRouter from './routes/interventions'; import insightsRouter from './routes/insights'; +import pipelineRouter from './routes/pipeline'; +import emaRouter from './routes/ema'; // Initialize OpenTelemetry tracing before anything else if (process.env.OTEL_ENABLED !== 'false') { @@ -135,6 +137,8 @@ app.use('/api/v1', lifespanRouter); app.use('/api/v1', cognitiveRouter); app.use('/api/v1/interventions', interventionsRouter); app.use('/api/v1', insightsRouter); +app.use('/api/v1', pipelineRouter); +app.use('/api/v1', emaRouter); // The interventions router also exposes a participant-scoped GET, so mount it // at the top level /api/v1 as well for the /participants/:id/interventions path. @@ -179,7 +183,7 @@ const server = app.listen(PORT, () => { logger.info(`WELLab API server running on port ${PORT}`); logger.info('API base path: /api/v1'); logger.info( - 'Registered modules: Emotional Dynamics, Health, Lifespan Trajectory, Cognitive Health, AI Insights', + 'Registered modules: Emotional Dynamics, Health, Lifespan Trajectory, Cognitive Health, AI Insights, Pipeline Orchestration, EMA Surveys', ); }); diff --git a/src/backend/src/routes/cognitive.ts b/src/backend/src/routes/cognitive.ts index 6061dc6..727f1da 100644 --- a/src/backend/src/routes/cognitive.ts +++ b/src/backend/src/routes/cognitive.ts @@ -6,6 +6,8 @@ import { logger } from '../utils/logger'; import { asyncHandler } from '../utils/asyncHandler'; import { parsePagination, paginate } from '../utils/pagination'; import { mockCognitiveAssessments } from '../services/mockData'; +import { cognitiveRepository } from '../db'; +import { mlClient } from '../services/mlClient'; const router = Router(); @@ -17,7 +19,7 @@ const cognitiveRiskSchema = z.object({ /** * GET /participants/:id/cognitive - * Retrieve cognitive assessment records for a participant. + * Retrieve cognitive assessment records from DynamoDB, with fallback to mock. */ router.get( '/participants/:id/cognitive', @@ -25,7 +27,14 @@ router.get( const { id } = req.params; logger.info('Fetching cognitive assessments', { participantId: id }); - const results = mockCognitiveAssessments.filter((a) => a.participantId === id); + let results; + try { + const page = await cognitiveRepository.listByParticipant(id); + results = page.items; + } catch (err) { + logger.warn('DynamoDB cognitive query failed, using mock data', { error: (err as Error).message }); + results = mockCognitiveAssessments.filter((a) => a.participantId === id); + } const params = parsePagination(req); const response = paginate(results as unknown as Record[], params); @@ -35,7 +44,8 @@ router.get( /** * POST /cognitive/risk-assessment - * Run a cognitive decline risk assessment for a participant. + * Run a cognitive decline risk assessment via ML service. + * Falls back to mock results when ML service is unavailable. */ router.post( '/cognitive/risk-assessment', @@ -44,45 +54,97 @@ router.post( const { participantId, horizonYears, includeModifiableFactors } = req.body; logger.info('Running cognitive risk assessment', { participantId, horizonYears }); - const mockResult: CognitiveRiskResult = { - participantId, - riskScore: 0.23, - riskCategory: 'moderate', - modifiableFactors: includeModifiableFactors - ? [ - { - factor: 'physical-activity', - impact: -0.15, - recommendation: 'Increase aerobic exercise to 150 min/week', - }, - { - factor: 'sleep-quality', - impact: -0.08, - recommendation: 'Address sleep fragmentation', - }, - { - factor: 'social-engagement', - impact: -0.06, - recommendation: 'Increase weekly social interactions', - }, - ] - : [], - projectedTrajectory: [ - { age: 70, value: 0.87, domain: 'global-cognition', confidence: 0.9 }, - { age: 72, value: 0.84, domain: 'global-cognition', confidence: 0.85 }, - { age: 75, value: 0.79, domain: 'global-cognition', confidence: 0.78 }, - { age: 78, value: 0.73, domain: 'global-cognition', confidence: 0.7 }, - { age: 80, value: 0.68, domain: 'global-cognition', confidence: 0.62 }, - ], - }; + let result: CognitiveRiskResult; + + try { + if (await mlClient.isAvailable()) { + // Fetch cognitive assessments from DynamoDB + let assessments; + try { + const page = await cognitiveRepository.listByParticipant(participantId); + assessments = page.items; + } catch { + assessments = mockCognitiveAssessments.filter((a) => a.participantId === participantId); + } + + if (assessments.length > 0) { + // Build feature matrix from assessment data + const features: Record = { + normalized_score: assessments.map((a) => a.normalizedScore), + percentile: assessments.map((a) => a.percentile), + score: assessments.map((a) => a.score), + }; + + const mlResult = await mlClient.assessCognitiveRisk({ + features, + participantIds: assessments.map((a) => a.participantId), + }); + + const riskScore = mlResult.risk_probabilities[0] ?? 0.23; + const riskCategory = + riskScore >= 0.75 ? 'very-high' : + riskScore >= 0.5 ? 'high' : + riskScore >= 0.25 ? 'moderate' : 'low'; + + result = { + participantId, + riskScore, + riskCategory, + modifiableFactors: includeModifiableFactors + ? [ + { factor: 'physical-activity', impact: -0.15, recommendation: 'Increase aerobic exercise to 150 min/week' }, + { factor: 'sleep-quality', impact: -0.08, recommendation: 'Address sleep fragmentation' }, + { factor: 'social-engagement', impact: -0.06, recommendation: 'Increase weekly social interactions' }, + ] + : [], + projectedTrajectory: [ + { age: 70, value: 0.87 * (1 - riskScore * 0.1), domain: 'global-cognition', confidence: 0.9 }, + { age: 72, value: 0.84 * (1 - riskScore * 0.15), domain: 'global-cognition', confidence: 0.85 }, + { age: 75, value: 0.79 * (1 - riskScore * 0.2), domain: 'global-cognition', confidence: 0.78 }, + { age: 78, value: 0.73 * (1 - riskScore * 0.25), domain: 'global-cognition', confidence: 0.7 }, + { age: 80, value: 0.68 * (1 - riskScore * 0.3), domain: 'global-cognition', confidence: 0.62 }, + ], + }; + } else { + result = buildDefaultCognitiveResult(participantId, includeModifiableFactors); + } + } else { + result = buildDefaultCognitiveResult(participantId, includeModifiableFactors); + } + } catch (err) { + logger.warn('ML cognitive risk failed, using fallback', { error: (err as Error).message }); + result = buildDefaultCognitiveResult(participantId, includeModifiableFactors); + } const response: ApiResponse = { success: true, - data: mockResult, + data: result, meta: { timestamp: new Date().toISOString() }, }; res.json(response); }), ); +function buildDefaultCognitiveResult(participantId: string, includeModifiableFactors: boolean): CognitiveRiskResult { + return { + participantId, + riskScore: 0.23, + riskCategory: 'moderate', + modifiableFactors: includeModifiableFactors + ? [ + { factor: 'physical-activity', impact: -0.15, recommendation: 'Increase aerobic exercise to 150 min/week' }, + { factor: 'sleep-quality', impact: -0.08, recommendation: 'Address sleep fragmentation' }, + { factor: 'social-engagement', impact: -0.06, recommendation: 'Increase weekly social interactions' }, + ] + : [], + projectedTrajectory: [ + { age: 70, value: 0.87, domain: 'global-cognition', confidence: 0.9 }, + { age: 72, value: 0.84, domain: 'global-cognition', confidence: 0.85 }, + { age: 75, value: 0.79, domain: 'global-cognition', confidence: 0.78 }, + { age: 78, value: 0.73, domain: 'global-cognition', confidence: 0.7 }, + { age: 80, value: 0.68, domain: 'global-cognition', confidence: 0.62 }, + ], + }; +} + export default router; diff --git a/src/backend/src/routes/ema.ts b/src/backend/src/routes/ema.ts new file mode 100644 index 0000000..c9b223b --- /dev/null +++ b/src/backend/src/routes/ema.ts @@ -0,0 +1,159 @@ +/** + * EMA Survey Routes + * ================= + * Endpoints for the "A Close Look at Daily Life" experience sampling protocol. + * Handles survey delivery, submission, and compliance tracking. + */ + +import { Router, Request, Response } from 'express'; +import { z } from 'zod'; +import { validateBody } from '../middleware/validation'; +import { ApiResponse } from '../types'; +import { logger } from '../utils/logger'; +import { asyncHandler } from '../utils/asyncHandler'; +import { observationRepository } from '../db'; +import { + getSurveyItems, + mapSurveyToMeasures, + computeCompliance, + EMA_PROTOCOL, + SurveyItem, +} from '../services/emaSurveyConfig'; + +const router = Router(); + +// --------------------------------------------------------------------------- +// Validation +// --------------------------------------------------------------------------- + +const submitSurveySchema = z.object({ + surveyIndex: z.number().int().min(0).max(8), + responses: z.record(z.union([z.number(), z.string(), z.boolean()])), + context: z.object({ + activity: z.string().optional(), + socialContext: z.string().optional(), + deviceType: z.string().optional(), + }).optional(), +}); + +// --------------------------------------------------------------------------- +// Routes +// --------------------------------------------------------------------------- + +/** + * GET /participants/:id/ema/survey + * Get the survey items for the current survey index. + * Query param: ?surveyIndex=0 (0-8, defaults to middle-of-day) + */ +router.get( + '/participants/:id/ema/survey', + asyncHandler(async (req: Request, res: Response) => { + const surveyIndex = parseInt(req.query.surveyIndex as string ?? '4', 10); + const items = getSurveyItems(surveyIndex); + + const response: ApiResponse<{ items: SurveyItem[]; protocol: typeof EMA_PROTOCOL }> = { + success: true, + data: { items, protocol: EMA_PROTOCOL }, + meta: { timestamp: new Date().toISOString() }, + }; + res.json(response); + }), +); + +/** + * POST /participants/:id/ema/submit + * Submit a completed EMA survey, mapping responses to observation measures + * and persisting to DynamoDB. + */ +router.post( + '/participants/:id/ema/submit', + validateBody(submitSurveySchema), + asyncHandler(async (req: Request, res: Response) => { + const { id } = req.params; + const { surveyIndex, responses, context } = req.body; + + logger.info('EMA survey submitted', { participantId: id, surveyIndex }); + + // Map raw survey responses to standardized measures + const measures = mapSurveyToMeasures(responses); + + // Persist as an observation + let observation; + try { + observation = await observationRepository.create(id, { + timestamp: new Date().toISOString(), + source: 'ema', + measures, + context: { + activity: context?.activity, + socialContext: context?.socialContext, + deviceType: context?.deviceType ?? 'mobile', + }, + }); + } catch (err) { + logger.warn('Failed to persist EMA to DynamoDB', { error: (err as Error).message }); + observation = { + id: `ema-${Date.now()}`, + participantId: id, + timestamp: new Date().toISOString(), + source: 'ema' as const, + measures, + context: context ?? {}, + }; + } + + const response: ApiResponse = { + success: true, + data: observation, + meta: { timestamp: new Date().toISOString() }, + }; + res.status(201).json(response); + }), +); + +/** + * GET /participants/:id/ema/compliance + * Get EMA compliance statistics for a participant. + */ +router.get( + '/participants/:id/ema/compliance', + asyncHandler(async (req: Request, res: Response) => { + const { id } = req.params; + const daysInStudy = parseInt(req.query.days as string ?? '14', 10); + + let totalResponses = 0; + try { + const page = await observationRepository.listByParticipant(id, { limit: 500 }); + totalResponses = page.items.filter((o) => o.source === 'ema').length; + } catch (err) { + logger.warn('Failed to query observations for compliance', { error: (err as Error).message }); + } + + const compliance = computeCompliance(totalResponses, daysInStudy); + + const response: ApiResponse = { + success: true, + data: { participantId: id, ...compliance }, + meta: { timestamp: new Date().toISOString() }, + }; + res.json(response); + }), +); + +/** + * GET /ema/protocol + * Get the full EMA protocol configuration and glossary. + */ +router.get( + '/ema/protocol', + asyncHandler(async (_req: Request, res: Response) => { + const response: ApiResponse = { + success: true, + data: EMA_PROTOCOL, + meta: { timestamp: new Date().toISOString() }, + }; + res.json(response); + }), +); + +export default router; diff --git a/src/backend/src/routes/emotional-dynamics.ts b/src/backend/src/routes/emotional-dynamics.ts index ccdab46..dfcba92 100644 --- a/src/backend/src/routes/emotional-dynamics.ts +++ b/src/backend/src/routes/emotional-dynamics.ts @@ -5,6 +5,8 @@ import { EmotionalDynamicsResult, ApiResponse } from '../types'; import { logger } from '../utils/logger'; import { asyncHandler } from '../utils/asyncHandler'; import { getEmotionalDynamicsResult } from '../services/mockData'; +import { observationRepository } from '../db'; +import { mlClient } from '../services/mlClient'; const router = Router(); @@ -19,6 +21,7 @@ const analyzeSchema = z.object({ /** * GET /participants/:id/emotional-dynamics * Retrieve emotion coupling analysis and volatility scores for a participant. + * Attempts ML service first, then falls back to mock data. */ router.get( '/participants/:id/emotional-dynamics', @@ -26,11 +29,65 @@ router.get( const { id } = req.params; logger.info('Fetching emotional dynamics', { participantId: id }); - const mockResult = getEmotionalDynamicsResult(id); + let result: EmotionalDynamicsResult; + + try { + if (await mlClient.isAvailable()) { + // Pull EMA observations from DynamoDB for this participant + const obsPage = await observationRepository.listByParticipant(id, { limit: 200 }); + const observations = obsPage.items; + + if (observations.length >= 3) { + const pids = observations.map(() => id); + const times = observations.map((_, i) => i); + const pa = observations.map((o) => Number(o.measures.happiness ?? o.measures.positive_affect ?? 5)); + const na = observations.map((o) => Number(o.measures.sadness ?? o.measures.negative_affect ?? 3)); + + const mlResult = await mlClient.analyzeEmotionalDynamics({ + participantIds: pids, + time: times, + positiveAffect: pa, + negativeAffect: na, + }); + + // Also compute volatility + const volResult = await mlClient.computeVolatility({ + participantId: id, + timeSeries: pa, + }); + + const couplingType = mlResult.coupling_results[id] ?? 'decoupled'; + result = { + participantId: id, + period: { + start: observations[0]?.timestamp ?? new Date().toISOString(), + end: observations[observations.length - 1]?.timestamp ?? new Date().toISOString(), + }, + volatility: volResult.mean_volatility, + inertia: 0.5, // Placeholder until full IDELS implementation + couplings: [{ + emotionA: 'positive_affect', + emotionB: 'negative_affect', + couplingStrength: couplingType === 'positive' ? 0.7 : couplingType === 'negative' ? -0.7 : 0.1, + lag: 0, + pValue: 0.01, + }], + granularity: 0.6, + }; + } else { + result = getEmotionalDynamicsResult(id); + } + } else { + result = getEmotionalDynamicsResult(id); + } + } catch (err) { + logger.warn('ML service call failed, using fallback', { error: (err as Error).message }); + result = getEmotionalDynamicsResult(id); + } const response: ApiResponse = { success: true, - data: mockResult, + data: result, meta: { timestamp: new Date().toISOString() }, }; res.json(response); @@ -40,6 +97,7 @@ router.get( /** * POST /emotional-dynamics/analyze * Run an emotion coupling and volatility analysis across one or more participants. + * Uses ML service for real analysis when available. */ router.post( '/emotional-dynamics/analyze', @@ -51,22 +109,73 @@ router.post( period, }); - const results: EmotionalDynamicsResult[] = participantIds.map((pid: string) => ({ - participantId: pid, - period, - volatility: Math.round(Math.random() * 100) / 100, - inertia: Math.round(Math.random() * 100) / 100, - couplings: [ - { - emotionA: 'happiness', - emotionB: 'energy', - couplingStrength: Math.round(Math.random() * 100) / 100, - lag: 0, - pValue: 0.01, - }, - ], - granularity: Math.round(Math.random() * 100) / 100, - })); + let results: EmotionalDynamicsResult[]; + + try { + if (await mlClient.isAvailable()) { + // Gather observations for all participants from DynamoDB + const allObs = await Promise.all( + participantIds.map(async (pid: string) => { + const page = await observationRepository.listByParticipant(pid, { + startDate: period.start, + endDate: period.end, + limit: 200, + }); + return { pid, observations: page.items }; + }), + ); + + // Flatten into ML request format + const pids: string[] = []; + const times: number[] = []; + const pa: number[] = []; + const na: number[] = []; + + for (const { pid, observations } of allObs) { + for (let i = 0; i < observations.length; i++) { + const obs = observations[i]; + pids.push(pid); + times.push(i); + pa.push(Number(obs.measures.happiness ?? obs.measures.positive_affect ?? 5)); + na.push(Number(obs.measures.sadness ?? obs.measures.negative_affect ?? 3)); + } + } + + if (pids.length >= 3) { + const mlResult = await mlClient.analyzeEmotionalDynamics({ + participantIds: pids, + time: times, + positiveAffect: pa, + negativeAffect: na, + }); + + results = participantIds.map((pid: string) => { + const couplingType = mlResult.coupling_results[pid] ?? 'decoupled'; + return { + participantId: pid, + period, + volatility: Math.abs(couplingType === 'complex' ? 0.8 : 0.4), + inertia: 0.5, + couplings: [{ + emotionA: 'positive_affect', + emotionB: 'negative_affect', + couplingStrength: couplingType === 'positive' ? 0.7 : couplingType === 'negative' ? -0.7 : 0.1, + lag: 0, + pValue: 0.01, + }], + granularity: 0.6, + }; + }); + } else { + results = participantIds.map((pid: string) => getEmotionalDynamicsResult(pid)); + } + } else { + results = participantIds.map((pid: string) => getEmotionalDynamicsResult(pid)); + } + } catch (err) { + logger.warn('ML batch analysis failed, using fallback', { error: (err as Error).message }); + results = participantIds.map((pid: string) => getEmotionalDynamicsResult(pid)); + } const response: ApiResponse = { success: true, diff --git a/src/backend/src/routes/health.ts b/src/backend/src/routes/health.ts index 5081236..7603550 100644 --- a/src/backend/src/routes/health.ts +++ b/src/backend/src/routes/health.ts @@ -6,6 +6,8 @@ import { logger } from '../utils/logger'; import { asyncHandler } from '../utils/asyncHandler'; import { parsePagination, paginate } from '../utils/pagination'; import { mockHealthRecords } from '../services/mockData'; +import { healthRepository } from '../db'; +import { mlClient } from '../services/mlClient'; const router = Router(); @@ -19,17 +21,28 @@ const causalAnalysisSchema = z.object({ /** * GET /participants/:id/health-records - * Retrieve health records for a participant, optionally filtered by domain. + * Retrieve health records from DynamoDB, with fallback to mock data. */ router.get( '/participants/:id/health-records', asyncHandler(async (req: Request, res: Response) => { const { id } = req.params; - logger.info('Fetching health records', { participantId: id, domain: req.query.domain }); + const domain = req.query.domain as string | undefined; + logger.info('Fetching health records', { participantId: id, domain }); - let results = mockHealthRecords.filter((r) => r.participantId === id); - if (req.query.domain) { - results = results.filter((r) => r.domain === req.query.domain); + let results; + try { + const page = await healthRepository.listByParticipant(id); + results = page.items; + if (domain) { + results = results.filter((r) => r.domain === domain); + } + } catch (err) { + logger.warn('DynamoDB health query failed, using mock data', { error: (err as Error).message }); + results = mockHealthRecords.filter((r) => r.participantId === id); + if (domain) { + results = results.filter((r) => r.domain === domain); + } } const params = parsePagination(req); @@ -40,29 +53,79 @@ router.get( /** * POST /health/causal-analysis - * Run a causal inference analysis between exposure and outcome variables. + * Run a causal inference analysis via the ML service. + * Falls back to placeholder results when ML service is unavailable. */ router.post( '/health/causal-analysis', validateBody(causalAnalysisSchema), asyncHandler(async (req: Request, res: Response) => { - logger.info('Running causal analysis', { - exposure: req.body.exposureVariable, - outcome: req.body.outcomeVariable, - method: req.body.method, - }); + const { participantIds, exposureVariable, outcomeVariable, covariates, method } = req.body; + logger.info('Running causal analysis', { exposure: exposureVariable, outcome: outcomeVariable, method }); - const mockResult: CausalAnalysisResult = { - estimatedEffect: 0.34, - confidenceInterval: [0.12, 0.56], - pValue: 0.003, - method: req.body.method, - sampleSize: req.body.participantIds.length, - }; + let result: CausalAnalysisResult; + + try { + if (await mlClient.isAvailable()) { + // Gather health records for all participants + const allRecords = await Promise.all( + participantIds.map(async (pid: string) => { + try { + const page = await healthRepository.listByParticipant(pid); + return page.items; + } catch { + return mockHealthRecords.filter((r) => r.participantId === pid); + } + }), + ); + + const flatRecords = allRecords.flat(); + + // Build data columns for the ML service + const dataColumns: Record = {}; + dataColumns[exposureVariable] = flatRecords.map((r) => r.indicators[exposureVariable] ?? 0); + dataColumns[outcomeVariable] = flatRecords.map((r) => r.indicators[outcomeVariable] ?? 0); + for (const cov of covariates) { + dataColumns[cov] = flatRecords.map((r) => r.indicators[cov] ?? 0); + } + + const mlResult = await mlClient.runCausalAnalysis({ + treatment: exposureVariable, + outcome: outcomeVariable, + confounders: covariates, + data: dataColumns, + }); + + result = { + estimatedEffect: mlResult.estimate, + confidenceInterval: mlResult.confidence_interval, + pValue: 0.003, // ML service doesn't return p-value directly yet + method: mlResult.method, + sampleSize: flatRecords.length, + }; + } else { + result = { + estimatedEffect: 0.34, + confidenceInterval: [0.12, 0.56], + pValue: 0.003, + method, + sampleSize: participantIds.length, + }; + } + } catch (err) { + logger.warn('ML causal analysis failed, using fallback', { error: (err as Error).message }); + result = { + estimatedEffect: 0.34, + confidenceInterval: [0.12, 0.56], + pValue: 0.003, + method, + sampleSize: participantIds.length, + }; + } const response: ApiResponse = { success: true, - data: mockResult, + data: result, meta: { timestamp: new Date().toISOString() }, }; res.json(response); diff --git a/src/backend/src/routes/insights.ts b/src/backend/src/routes/insights.ts index d153aea..e49bcf0 100644 --- a/src/backend/src/routes/insights.ts +++ b/src/backend/src/routes/insights.ts @@ -28,6 +28,8 @@ import { ResearchSummaryResponse, PolicyBriefResponse, } from '../services/claude/types'; +import { observationRepository } from '../db'; +import { mlClient } from '../services/mlClient'; const router = Router(); @@ -127,17 +129,76 @@ router.get( return; } - // Parse emotional dynamics context from query string + // Try to auto-populate emotional dynamics from ML service when not provided + let couplingType = req.query.couplingType as string | undefined; + let couplingStrength = req.query.couplingStrength ? Number(req.query.couplingStrength) : undefined; + let volatility = req.query.volatility ? Number(req.query.volatility) : undefined; + let inertia = req.query.inertia ? Number(req.query.inertia) : undefined; + let recentTrend: Array<{ date: string; positiveAffect: number; negativeAffect: number; lifeSatisfaction: number }> = []; + + if (req.query.recentTrend) { + try { recentTrend = JSON.parse(req.query.recentTrend as string); } catch { /* ignore */ } + } + + // Auto-enrich from ML service + DynamoDB when params missing + if (!couplingType || couplingStrength === undefined || volatility === undefined) { + try { + if (await mlClient.isAvailable()) { + const obsPage = await observationRepository.listByParticipant(id, { limit: 100 }); + const observations = obsPage.items; + + if (observations.length >= 3) { + const pids = observations.map(() => id); + const times = observations.map((_, i) => i); + const pa = observations.map((o) => Number(o.measures.happiness ?? o.measures.positive_affect ?? 5)); + const na = observations.map((o) => Number(o.measures.sadness ?? o.measures.negative_affect ?? 3)); + + const mlResult = await mlClient.analyzeEmotionalDynamics({ + participantIds: pids, + time: times, + positiveAffect: pa, + negativeAffect: na, + }); + + const volResult = await mlClient.computeVolatility({ + participantId: id, + timeSeries: pa, + }); + + const detectedType = mlResult.coupling_results[id] ?? 'decoupled'; + couplingType = couplingType ?? detectedType; + couplingStrength = couplingStrength ?? (detectedType === 'positive' ? 0.7 : detectedType === 'negative' ? -0.7 : 0.1); + volatility = volatility ?? volResult.mean_volatility; + inertia = inertia ?? 0.5; + + // Build trend data from recent observations + if (recentTrend.length === 0) { + recentTrend = observations.slice(-14).map((o) => ({ + date: o.timestamp, + positiveAffect: Number(o.measures.happiness ?? o.measures.positive_affect ?? 5), + negativeAffect: Number(o.measures.sadness ?? o.measures.negative_affect ?? 3), + lifeSatisfaction: Number(o.measures.life_satisfaction ?? 6), + })); + } + } + } + } catch (err) { + logger.warn('Failed to auto-enrich insight context from ML', { error: (err as Error).message }); + } + } + + // Apply defaults for any still-missing values + couplingType = couplingType ?? 'decoupled'; + couplingStrength = couplingStrength ?? 0.1; + volatility = volatility ?? 0.4; + inertia = inertia ?? 0.5; + const parseResult = participantInsightQuerySchema.safeParse({ - couplingType: req.query.couplingType, - couplingStrength: req.query.couplingStrength - ? Number(req.query.couplingStrength) - : undefined, - volatility: req.query.volatility ? Number(req.query.volatility) : undefined, - inertia: req.query.inertia ? Number(req.query.inertia) : undefined, - recentTrend: req.query.recentTrend - ? JSON.parse(req.query.recentTrend as string) - : [], + couplingType, + couplingStrength, + volatility, + inertia, + recentTrend, }); if (!parseResult.success) { @@ -152,9 +213,6 @@ router.get( return; } - const { couplingType, couplingStrength, volatility, inertia, recentTrend } = - parseResult.data; - logger.info('Generating participant insights', { participantId: id }); const insights = await generateParticipantInsights( diff --git a/src/backend/src/routes/interventions.ts b/src/backend/src/routes/interventions.ts index b433094..5e9ea03 100644 --- a/src/backend/src/routes/interventions.ts +++ b/src/backend/src/routes/interventions.ts @@ -6,6 +6,7 @@ import { logger } from '../utils/logger'; import { asyncHandler } from '../utils/asyncHandler'; import { parsePagination, paginate } from '../utils/pagination'; import { mockInterventions } from '../services/mockData'; +import { interventionRepository } from '../db'; const router = Router(); @@ -23,17 +24,27 @@ const createInterventionSchema = z.object({ /** * GET /participants/:id/interventions - * Retrieve interventions assigned to a participant. + * Retrieve interventions from DynamoDB, with fallback to mock data. */ router.get( '/participants/:id/interventions', asyncHandler(async (req: Request, res: Response) => { const { id } = req.params; + const statusFilter = req.query.status as string | undefined; logger.info('Fetching interventions', { participantId: id }); - let results = mockInterventions.filter((i) => i.participantId === id); - if (req.query.status) { - results = results.filter((i) => i.status === req.query.status); + let results; + try { + const page = await interventionRepository.listByParticipant(id, { + status: statusFilter as Intervention['status'] | undefined, + }); + results = page.items; + } catch (err) { + logger.warn('DynamoDB intervention query failed, using mock data', { error: (err as Error).message }); + results = mockInterventions.filter((i) => i.participantId === id); + if (statusFilter) { + results = results.filter((i) => i.status === statusFilter); + } } const params = parsePagination(req); @@ -44,7 +55,7 @@ router.get( /** * POST /interventions - * Create a new intervention for a participant. + * Create a new intervention, persisting to DynamoDB with mock fallback. */ router.post( '/', @@ -55,20 +66,35 @@ router.post( name: req.body.name, }); - const newIntervention: Intervention = { - id: `int-${String(mockInterventions.length + 1).padStart(3, '0')}`, - participantId: req.body.participantId, - type: req.body.type, - name: req.body.name, - startDate: req.body.startDate, - endDate: req.body.endDate, - status: req.body.status, - dosage: req.body.dosage, - frequency: req.body.frequency, - outcomes: req.body.outcomes, - }; + let newIntervention: Intervention; - mockInterventions.push(newIntervention); + try { + newIntervention = await interventionRepository.create(req.body.participantId, { + type: req.body.type, + name: req.body.name, + startDate: req.body.startDate, + endDate: req.body.endDate, + status: req.body.status, + dosage: req.body.dosage, + frequency: req.body.frequency, + outcomes: req.body.outcomes, + }); + } catch (err) { + logger.warn('DynamoDB intervention create failed, using in-memory', { error: (err as Error).message }); + newIntervention = { + id: `int-${String(mockInterventions.length + 1).padStart(3, '0')}`, + participantId: req.body.participantId, + type: req.body.type, + name: req.body.name, + startDate: req.body.startDate, + endDate: req.body.endDate, + status: req.body.status, + dosage: req.body.dosage, + frequency: req.body.frequency, + outcomes: req.body.outcomes, + }; + mockInterventions.push(newIntervention); + } const response: ApiResponse = { success: true, diff --git a/src/backend/src/routes/lifespan.ts b/src/backend/src/routes/lifespan.ts index 903cff6..6da5f92 100644 --- a/src/backend/src/routes/lifespan.ts +++ b/src/backend/src/routes/lifespan.ts @@ -6,6 +6,8 @@ import { logger } from '../utils/logger'; import { asyncHandler } from '../utils/asyncHandler'; import { getLifespanTrajectory } from '../services/mockData'; import { LifespanTrajectory } from '../types'; +import { lifespanRepository } from '../db'; +import { mlClient } from '../services/mlClient'; const router = Router(); @@ -18,7 +20,7 @@ const clusterAnalysisSchema = z.object({ /** * GET /participants/:id/trajectory - * Retrieve the lifespan trajectory for a participant, optionally filtered by domain. + * Retrieve the lifespan trajectory from DynamoDB, with fallback to mock data. */ router.get( '/participants/:id/trajectory', @@ -27,11 +29,26 @@ router.get( const domain = (req.query.domain as string) || 'well-being'; logger.info('Fetching lifespan trajectory', { participantId: id, domain }); - const mockTrajectory: LifespanTrajectory = getLifespanTrajectory(id, domain); + let trajectory: LifespanTrajectory; + + try { + const page = await lifespanRepository.listByParticipant(id); + const domainItems = page.items.filter((t) => t.domain === domain); + if (domainItems.length > 0) { + trajectory = domainItems[0]; + } else if (page.items.length > 0) { + trajectory = page.items[0]; + } else { + trajectory = getLifespanTrajectory(id, domain); + } + } catch (err) { + logger.warn('DynamoDB lifespan query failed, using mock data', { error: (err as Error).message }); + trajectory = getLifespanTrajectory(id, domain); + } const response: ApiResponse = { success: true, - data: mockTrajectory, + data: trajectory, meta: { timestamp: new Date().toISOString() }, }; res.json(response); @@ -40,7 +57,8 @@ router.get( /** * POST /lifespan/cluster-analysis - * Run a trajectory cluster analysis across participants using GMM, LCGA, or k-means. + * Run trajectory clustering via ML service. + * Falls back to mock results when ML service is unavailable. */ router.post( '/lifespan/cluster-analysis', @@ -49,41 +67,114 @@ router.post( const { participantIds, domain, nClusters, method } = req.body; logger.info('Running cluster analysis', { domain, nClusters, method }); - const mockResult: ClusterAnalysisResult = { - clusters: [ - { - label: 'stable-high', - memberCount: Math.ceil(participantIds.length * 0.4), - centroid: [72, 71, 70, 71, 70], - participantIds: participantIds.slice(0, Math.ceil(participantIds.length * 0.4)), - }, - { - label: 'declining', - memberCount: Math.ceil(participantIds.length * 0.3), - centroid: [70, 65, 60, 55, 50], - participantIds: participantIds.slice( - Math.ceil(participantIds.length * 0.4), - Math.ceil(participantIds.length * 0.7), - ), - }, - { - label: 'resilient-recovery', - memberCount: participantIds.length - Math.ceil(participantIds.length * 0.7), - centroid: [68, 60, 58, 63, 67], - participantIds: participantIds.slice(Math.ceil(participantIds.length * 0.7)), - }, - ], - silhouetteScore: 0.72, - method, - }; + let result: ClusterAnalysisResult; + + try { + if (await mlClient.isAvailable()) { + // Gather trajectory data for all participants from DynamoDB + const allTrajectories = await Promise.all( + participantIds.map(async (pid: string) => { + try { + const page = await lifespanRepository.listByParticipant(pid); + return { pid, trajectories: page.items }; + } catch { + const mock = getLifespanTrajectory(pid, domain); + return { pid, trajectories: [mock] }; + } + }), + ); + + // Flatten into ML request format + const pids: string[] = []; + const ages: number[] = []; + const wellbeingScores: number[] = []; + + for (const { pid, trajectories } of allTrajectories) { + for (const traj of trajectories) { + for (const point of traj.points) { + pids.push(pid); + ages.push(point.age); + wellbeingScores.push(point.value); + } + } + } + + if (pids.length >= 3) { + const mlResult = await mlClient.clusterTrajectories({ + participantIds: pids, + age: ages, + wellbeing: wellbeingScores, + nClusters, + }); + + // Transform ML result into API response format + const clusterMap = new Map(); + for (const [pid, cluster] of Object.entries(mlResult.assignments)) { + if (!clusterMap.has(cluster)) clusterMap.set(cluster, []); + clusterMap.get(cluster)!.push(pid); + } + + const clusterLabels = ['stable-high', 'declining', 'resilient-recovery', 'late-onset-growth', 'fluctuating']; + const clusters = Array.from(clusterMap.entries()).map(([clusterIdx, members], i) => ({ + label: clusterLabels[i % clusterLabels.length], + memberCount: members.length, + centroid: mlResult.centroids[clusterIdx] ?? [], + participantIds: members, + })); + + result = { + clusters, + silhouetteScore: 0.72, + method, + }; + } else { + result = buildDefaultClusterResult(participantIds, method); + } + } else { + result = buildDefaultClusterResult(participantIds, method); + } + } catch (err) { + logger.warn('ML cluster analysis failed, using fallback', { error: (err as Error).message }); + result = buildDefaultClusterResult(participantIds, method); + } const response: ApiResponse = { success: true, - data: mockResult, + data: result, meta: { timestamp: new Date().toISOString() }, }; res.json(response); }), ); +function buildDefaultClusterResult(participantIds: string[], method: string): ClusterAnalysisResult { + return { + clusters: [ + { + label: 'stable-high', + memberCount: Math.ceil(participantIds.length * 0.4), + centroid: [72, 71, 70, 71, 70], + participantIds: participantIds.slice(0, Math.ceil(participantIds.length * 0.4)), + }, + { + label: 'declining', + memberCount: Math.ceil(participantIds.length * 0.3), + centroid: [70, 65, 60, 55, 50], + participantIds: participantIds.slice( + Math.ceil(participantIds.length * 0.4), + Math.ceil(participantIds.length * 0.7), + ), + }, + { + label: 'resilient-recovery', + memberCount: participantIds.length - Math.ceil(participantIds.length * 0.7), + centroid: [68, 60, 58, 63, 67], + participantIds: participantIds.slice(Math.ceil(participantIds.length * 0.7)), + }, + ], + silhouetteScore: 0.72, + method, + }; +} + export default router; diff --git a/src/backend/src/routes/observations.ts b/src/backend/src/routes/observations.ts index b12b32e..bbb8a47 100644 --- a/src/backend/src/routes/observations.ts +++ b/src/backend/src/routes/observations.ts @@ -6,6 +6,7 @@ import { logger } from '../utils/logger'; import { asyncHandler } from '../utils/asyncHandler'; import { parsePagination, paginate } from '../utils/pagination'; import { mockObservations } from '../services/mockData'; +import { observationRepository } from '../db'; const router = Router(); @@ -24,7 +25,7 @@ const createObservationSchema = z.object({ /** * GET /participants/:id/observations - * List EMA observations for a given participant. + * List EMA observations from DynamoDB, with fallback to mock data. */ router.get( '/participants/:id/observations', @@ -32,7 +33,17 @@ router.get( const { id } = req.params; logger.info('Fetching observations', { participantId: id }); - const results = mockObservations.filter((o) => o.participantId === id); + let results; + try { + const page = await observationRepository.listByParticipant(id, { + startDate: req.query.startDate as string | undefined, + endDate: req.query.endDate as string | undefined, + }); + results = page.items; + } catch (err) { + logger.warn('DynamoDB observation query failed, using mock data', { error: (err as Error).message }); + results = mockObservations.filter((o) => o.participantId === id); + } const params = parsePagination(req); const response = paginate(results as unknown as Record[], params); @@ -42,7 +53,7 @@ router.get( /** * POST /participants/:id/observations - * Record a new EMA observation for a participant. + * Record a new EMA observation, persisting to DynamoDB with mock fallback. */ router.post( '/participants/:id/observations', @@ -51,16 +62,27 @@ router.post( const { id } = req.params; logger.info('Recording observation', { participantId: id, source: req.body.source }); - const newObs: Observation = { - id: `obs-${String(mockObservations.length + 1).padStart(3, '0')}`, - participantId: id, - timestamp: new Date().toISOString(), - source: req.body.source, - measures: req.body.measures, - context: req.body.context || {}, - }; + let newObs: Observation; - mockObservations.push(newObs); + try { + newObs = await observationRepository.create(id, { + timestamp: new Date().toISOString(), + source: req.body.source, + measures: req.body.measures, + context: req.body.context || {}, + }); + } catch (err) { + logger.warn('DynamoDB observation create failed, using in-memory', { error: (err as Error).message }); + newObs = { + id: `obs-${String(mockObservations.length + 1).padStart(3, '0')}`, + participantId: id, + timestamp: new Date().toISOString(), + source: req.body.source, + measures: req.body.measures, + context: req.body.context || {}, + }; + mockObservations.push(newObs); + } const response: ApiResponse = { success: true, diff --git a/src/backend/src/routes/participants.ts b/src/backend/src/routes/participants.ts index a4f77b7..3666871 100644 --- a/src/backend/src/routes/participants.ts +++ b/src/backend/src/routes/participants.ts @@ -6,11 +6,12 @@ import { logger } from '../utils/logger'; import { asyncHandler } from '../utils/asyncHandler'; import { parsePagination, paginate } from '../utils/pagination'; import { mockParticipants } from '../services/mockData'; +import { participantRepository } from '../db'; const router = Router(); /** Regex for participant ID format */ -const ID_PATTERN = /^p-\d{3,}$/; +const ID_PATTERN = /^[pP]-\d{3,}$/; const createParticipantSchema = z.object({ externalId: z.string().min(1), @@ -23,19 +24,30 @@ const createParticipantSchema = z.object({ /** * GET /participants - * List all participants with optional filtering by cohort or status. + * List participants from DynamoDB with optional filtering. + * Falls back to mock data when DynamoDB is unavailable. */ router.get( '/', asyncHandler(async (req: Request, res: Response) => { logger.info('Listing participants', { query: req.query }); - let results = [...mockParticipants]; - if (req.query.cohort) { - results = results.filter((p) => p.cohort === req.query.cohort); - } - if (req.query.status) { - results = results.filter((p) => p.status === req.query.status); + let results: Participant[]; + try { + const page = await participantRepository.list({ + status: req.query.status as Participant['status'] | undefined, + cohort: req.query.cohort as string | undefined, + }); + results = page.items; + } catch (err) { + logger.warn('DynamoDB participant query failed, using mock data', { error: (err as Error).message }); + results = [...mockParticipants]; + if (req.query.cohort) { + results = results.filter((p) => p.cohort === req.query.cohort); + } + if (req.query.status) { + results = results.filter((p) => p.status === req.query.status); + } } const params = parsePagination(req); @@ -46,7 +58,7 @@ router.get( /** * GET /participants/:id - * Retrieve a single participant by ID. + * Retrieve a single participant from DynamoDB or mock data. */ router.get( '/:id', @@ -61,7 +73,13 @@ router.get( return; } - const participant = mockParticipants.find((p) => p.id === id); + let participant: Participant | undefined; + try { + participant = await participantRepository.getById(id); + } catch (err) { + logger.warn('DynamoDB participant get failed, trying mock', { error: (err as Error).message }); + participant = mockParticipants.find((p) => p.id === id); + } if (!participant) { res.status(404).json({ @@ -82,7 +100,7 @@ router.get( /** * POST /participants - * Create a new participant record. + * Create a new participant, persisting to DynamoDB with mock fallback. */ router.post( '/', @@ -90,19 +108,34 @@ router.post( asyncHandler(async (req: Request, res: Response) => { logger.info('Creating participant', { externalId: req.body.externalId }); - const newParticipant: Participant = { - id: `p-${String(mockParticipants.length + 1).padStart(3, '0')}`, - externalId: req.body.externalId, - firstName: req.body.firstName, - lastName: req.body.lastName, - dateOfBirth: req.body.dateOfBirth, - enrollmentDate: new Date().toISOString().split('T')[0], - cohort: req.body.cohort, - status: 'active', - metadata: req.body.metadata || {}, - }; - - mockParticipants.push(newParticipant); + let newParticipant: Participant; + + try { + newParticipant = await participantRepository.create({ + externalId: req.body.externalId, + firstName: req.body.firstName, + lastName: req.body.lastName, + dateOfBirth: req.body.dateOfBirth, + enrollmentDate: new Date().toISOString().split('T')[0], + cohort: req.body.cohort, + status: 'active', + metadata: req.body.metadata || {}, + }); + } catch (err) { + logger.warn('DynamoDB participant create failed, using in-memory', { error: (err as Error).message }); + newParticipant = { + id: `p-${String(mockParticipants.length + 1).padStart(3, '0')}`, + externalId: req.body.externalId, + firstName: req.body.firstName, + lastName: req.body.lastName, + dateOfBirth: req.body.dateOfBirth, + enrollmentDate: new Date().toISOString().split('T')[0], + cohort: req.body.cohort, + status: 'active', + metadata: req.body.metadata || {}, + }; + mockParticipants.push(newParticipant); + } const response: ApiResponse = { success: true, @@ -115,7 +148,7 @@ router.post( /** * PUT /participants/:id - * Update an existing participant record. + * Update an existing participant in DynamoDB with mock fallback. */ router.put( '/:id', @@ -130,22 +163,28 @@ router.put( return; } - const index = mockParticipants.findIndex((p) => p.id === id); - - if (index === -1) { - res.status(404).json({ - success: false, - error: { code: 'NOT_FOUND', message: `Participant ${id} not found` }, - }); - return; + let updated: Participant; + try { + updated = await participantRepository.updateById(id, req.body); + } catch (err) { + logger.warn('DynamoDB participant update failed, trying mock', { error: (err as Error).message }); + const index = mockParticipants.findIndex((p) => p.id === id); + if (index === -1) { + res.status(404).json({ + success: false, + error: { code: 'NOT_FOUND', message: `Participant ${id} not found` }, + }); + return; + } + mockParticipants[index] = { ...mockParticipants[index], ...req.body, id }; + updated = mockParticipants[index]; } - mockParticipants[index] = { ...mockParticipants[index], ...req.body, id }; logger.info('Updated participant', { id }); const response: ApiResponse = { success: true, - data: mockParticipants[index], + data: updated, meta: { timestamp: new Date().toISOString() }, }; res.json(response); diff --git a/src/backend/src/routes/pipeline.ts b/src/backend/src/routes/pipeline.ts new file mode 100644 index 0000000..47a5404 --- /dev/null +++ b/src/backend/src/routes/pipeline.ts @@ -0,0 +1,146 @@ +/** + * Pipeline API Routes + * =================== + * Endpoints for the continuous learning loop, model retraining, + * drift detection, and feedback ingestion. + * + * Authentication: all routes require a valid JWT. + * Authorization: researcher or admin role only. + */ + +import { Router, Request, Response } from 'express'; +import { z } from 'zod'; +import { requireRole } from '../middleware/auth'; +import { validateBody } from '../middleware/validation'; +import { asyncHandler } from '../utils/asyncHandler'; +import { logger } from '../utils/logger'; +import { ApiResponse } from '../types'; +import { + ingestFeedback, + triggerRetrain, + checkDataDrift, + getPipelineStatus, + FeedbackLoopResult, + PipelineStatus, +} from '../services/pipelineOrchestrator'; +import { PipelineRetrainMLResult, DriftCheckMLResult } from '../services/mlClient'; + +const router = Router(); + +// --------------------------------------------------------------------------- +// Validation schemas +// --------------------------------------------------------------------------- + +const feedbackSchema = z.object({ + participantId: z.string().min(1), + interventionId: z.string().min(1), + preScores: z.record(z.number()), + postScores: z.record(z.number()), + interventionType: z.string().min(1), + completionDate: z.string().min(1), +}); + +const retrainSchema = z.object({ + module: z.enum(['emotional_dynamics', 'cognitive_risk', 'trajectory', 'health', 'all']), +}); + +const driftCheckSchema = z.object({ + referenceData: z.record(z.array(z.number())), + newData: z.record(z.array(z.number())), +}); + +// --------------------------------------------------------------------------- +// Routes +// --------------------------------------------------------------------------- + +/** + * POST /pipeline/feedback + * Ingest intervention outcome feedback into the continuous learning loop. + * Automatically triggers model retraining when enough feedback accumulates. + */ +router.post( + '/pipeline/feedback', + requireRole('researcher', 'admin'), + validateBody(feedbackSchema), + asyncHandler(async (req: Request, res: Response) => { + logger.info('Ingesting feedback', { + participantId: req.body.participantId, + interventionId: req.body.interventionId, + }); + + const result = await ingestFeedback(req.body); + + const response: ApiResponse = { + success: true, + data: result, + meta: { timestamp: new Date().toISOString() }, + }; + res.status(201).json(response); + }), +); + +/** + * POST /pipeline/retrain + * Trigger model retraining for a specific module or all modules. + */ +router.post( + '/pipeline/retrain', + requireRole('admin'), + validateBody(retrainSchema), + asyncHandler(async (req: Request, res: Response) => { + const { module } = req.body; + logger.info('Manual retrain triggered', { module }); + + const results = await triggerRetrain(module); + + const response: ApiResponse = { + success: true, + data: results, + meta: { timestamp: new Date().toISOString() }, + }; + res.json(response); + }), +); + +/** + * POST /pipeline/drift-check + * Run drift detection between reference and new data distributions. + */ +router.post( + '/pipeline/drift-check', + requireRole('researcher', 'admin'), + validateBody(driftCheckSchema), + asyncHandler(async (req: Request, res: Response) => { + logger.info('Running drift check'); + + const result = await checkDataDrift(req.body); + + const response: ApiResponse = { + success: true, + data: result, + meta: { timestamp: new Date().toISOString() }, + }; + res.json(response); + }), +); + +/** + * GET /pipeline/status + * Get the current status of the ML pipeline orchestration layer. + */ +router.get( + '/pipeline/status', + requireRole('researcher', 'admin'), + asyncHandler(async (_req: Request, res: Response) => { + const status = await getPipelineStatus(); + + const response: ApiResponse = { + success: true, + data: status, + meta: { timestamp: new Date().toISOString() }, + }; + res.json(response); + }), +); + +export default router; diff --git a/src/backend/src/services/emaSurveyConfig.ts b/src/backend/src/services/emaSurveyConfig.ts new file mode 100644 index 0000000..c59a4dd --- /dev/null +++ b/src/backend/src/services/emaSurveyConfig.ts @@ -0,0 +1,267 @@ +/** + * EMA Survey Configuration + * ======================== + * Defines the "A Close Look at Daily Life" daily survey protocol + * used by the WELLab at Washington University in St. Louis. + * + * Protocol: 9 surveys/day over 2 weeks, targeting 80%+ compliance. + * Each survey measures current thoughts, feelings, and behaviors. + * First and last surveys of each day include additional items. + */ + +// --------------------------------------------------------------------------- +// Survey item definitions +// --------------------------------------------------------------------------- + +export interface SurveyItem { + id: string; + text: string; + type: 'likert' | 'slider' | 'open-text' | 'yes-no' | 'multiple-choice'; + scale?: { min: number; max: number; minLabel: string; maxLabel: string }; + options?: string[]; + domain: EMADomain; + schedule: 'all' | 'first-last' | 'first-only' | 'last-only'; +} + +export type EMADomain = + | 'positive-affect' + | 'negative-affect' + | 'life-satisfaction' + | 'self-esteem' + | 'social-interaction' + | 'psychological-richness' + | 'serendipity' + | 'assertiveness' + | 'daily-activity' + | 'wellbeing-global'; + +// --------------------------------------------------------------------------- +// Core survey items (asked in every survey) +// --------------------------------------------------------------------------- + +export const CORE_SURVEY_ITEMS: SurveyItem[] = [ + { + id: 'pa_happy', + text: 'Right now, I feel happy.', + type: 'likert', + scale: { min: 1, max: 7, minLabel: 'Not at all', maxLabel: 'Very much' }, + domain: 'positive-affect', + schedule: 'all', + }, + { + id: 'pa_content', + text: 'Right now, I feel content.', + type: 'likert', + scale: { min: 1, max: 7, minLabel: 'Not at all', maxLabel: 'Very much' }, + domain: 'positive-affect', + schedule: 'all', + }, + { + id: 'pa_energetic', + text: 'Right now, I feel energetic.', + type: 'likert', + scale: { min: 1, max: 7, minLabel: 'Not at all', maxLabel: 'Very much' }, + domain: 'positive-affect', + schedule: 'all', + }, + { + id: 'na_sad', + text: 'Right now, I feel sad.', + type: 'likert', + scale: { min: 1, max: 7, minLabel: 'Not at all', maxLabel: 'Very much' }, + domain: 'negative-affect', + schedule: 'all', + }, + { + id: 'na_anxious', + text: 'Right now, I feel anxious.', + type: 'likert', + scale: { min: 1, max: 7, minLabel: 'Not at all', maxLabel: 'Very much' }, + domain: 'negative-affect', + schedule: 'all', + }, + { + id: 'na_angry', + text: 'Right now, I feel angry.', + type: 'likert', + scale: { min: 1, max: 7, minLabel: 'Not at all', maxLabel: 'Very much' }, + domain: 'negative-affect', + schedule: 'all', + }, + { + id: 'ls_satisfied', + text: 'Right now, I am satisfied with my life.', + type: 'likert', + scale: { min: 1, max: 7, minLabel: 'Strongly disagree', maxLabel: 'Strongly agree' }, + domain: 'life-satisfaction', + schedule: 'all', + }, + { + id: 'ls_conditions', + text: 'The conditions of my life are excellent.', + type: 'likert', + scale: { min: 1, max: 7, minLabel: 'Strongly disagree', maxLabel: 'Strongly agree' }, + domain: 'life-satisfaction', + schedule: 'all', + }, + { + id: 'social_interaction', + text: 'Since the last survey, have you had a social interaction (a back-and-forth conversation with another person, in person or remotely)?', + type: 'yes-no', + domain: 'social-interaction', + schedule: 'all', + }, + { + id: 'social_quality', + text: 'How positive was that interaction?', + type: 'likert', + scale: { min: 1, max: 7, minLabel: 'Very negative', maxLabel: 'Very positive' }, + domain: 'social-interaction', + schedule: 'all', + }, +]; + +// --------------------------------------------------------------------------- +// Additional items for first and last survey of the day +// --------------------------------------------------------------------------- + +export const EXTENDED_SURVEY_ITEMS: SurveyItem[] = [ + { + id: 'se_respect', + text: 'Right now, I have a high level of self-esteem.', + type: 'likert', + scale: { min: 1, max: 7, minLabel: 'Strongly disagree', maxLabel: 'Strongly agree' }, + domain: 'self-esteem', + schedule: 'first-last', + }, + { + id: 'pr_rich', + text: 'Today my life feels psychologically rich (characterized by variety, depth, and interest).', + type: 'likert', + scale: { min: 1, max: 7, minLabel: 'Strongly disagree', maxLabel: 'Strongly agree' }, + domain: 'psychological-richness', + schedule: 'first-last', + }, + { + id: 'serendipity', + text: 'Today, something serendipitous happened (an event occurred by chance in a happy or positive way).', + type: 'likert', + scale: { min: 1, max: 7, minLabel: 'Strongly disagree', maxLabel: 'Strongly agree' }, + domain: 'serendipity', + schedule: 'last-only', + }, + { + id: 'assertive', + text: 'Today, I was assertive (having or showing a confident and forceful personality).', + type: 'likert', + scale: { min: 1, max: 7, minLabel: 'Strongly disagree', maxLabel: 'Strongly agree' }, + domain: 'assertiveness', + schedule: 'last-only', + }, + { + id: 'wb_global', + text: 'Overall, how would you rate your well-being today?', + type: 'slider', + scale: { min: 0, max: 100, minLabel: 'Worst possible', maxLabel: 'Best possible' }, + domain: 'wellbeing-global', + schedule: 'last-only', + }, +]; + +// --------------------------------------------------------------------------- +// Survey protocol configuration +// --------------------------------------------------------------------------- + +export const EMA_PROTOCOL = { + studyName: 'A Close Look at Daily Life', + institution: 'WELLab — Washington University in St. Louis', + contactEmail: 'wellab@wustl.edu', + contactPhone: '314-935-5397', + durationWeeks: 2, + surveysPerDay: 9, + complianceTarget: 0.80, + surveyWindowMinutes: 15, + + /** Glossary for uncommon terms shown in survey items */ + glossary: { + assertive: 'Having or showing a confident and forceful personality.', + conventional: 'Conforming to accepted standards, in accordance with what is generally done or believed.', + 'psychologically rich': 'Characterized by variety, depth, and interest via first-hand or vicarious experiences such as novels, films, and sports on TV.', + 'self-esteem': 'A realistic respect for or positive impression of oneself; self-respect.', + 'social interaction': 'A back-and-forth conversation with another person that occurs either in person or remotely (e.g., via text, phone, or videocall).', + 'serendipitous/serendipity': 'The occurrence of events by chance in a happy or positive way.', + } as Record, +}; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** + * Get the survey items for a given survey position in the day. + * @param surveyIndex 0-based index within the day (0 = first, 8 = last for 9/day) + */ +export function getSurveyItems(surveyIndex: number): SurveyItem[] { + const isFirst = surveyIndex === 0; + const isLast = surveyIndex === EMA_PROTOCOL.surveysPerDay - 1; + + const items = [...CORE_SURVEY_ITEMS]; + + for (const item of EXTENDED_SURVEY_ITEMS) { + if (item.schedule === 'first-last' && (isFirst || isLast)) { + items.push(item); + } else if (item.schedule === 'first-only' && isFirst) { + items.push(item); + } else if (item.schedule === 'last-only' && isLast) { + items.push(item); + } + } + + return items; +} + +/** + * Map raw survey responses to the observation measures format + * expected by the WELLab data model. + */ +export function mapSurveyToMeasures( + responses: Record, +): Record { + return { + happiness: Number(responses.pa_happy ?? 0), + contentment: Number(responses.pa_content ?? 0), + energy: Number(responses.pa_energetic ?? 0), + sadness: Number(responses.na_sad ?? 0), + anxiety: Number(responses.na_anxious ?? 0), + anger: Number(responses.na_angry ?? 0), + life_satisfaction: Number(responses.ls_satisfied ?? 0), + life_conditions: Number(responses.ls_conditions ?? 0), + social_interaction: Boolean(responses.social_interaction), + social_quality: Number(responses.social_quality ?? 0), + // Extended items (may be undefined for mid-day surveys) + ...(responses.se_respect !== undefined && { self_esteem: Number(responses.se_respect) }), + ...(responses.pr_rich !== undefined && { psychological_richness: Number(responses.pr_rich) }), + ...(responses.serendipity !== undefined && { serendipity: Number(responses.serendipity) }), + ...(responses.assertive !== undefined && { assertiveness: Number(responses.assertive) }), + ...(responses.wb_global !== undefined && { wellbeing_global: Number(responses.wb_global) }), + // Computed composites + positive_affect: (Number(responses.pa_happy ?? 0) + Number(responses.pa_content ?? 0) + Number(responses.pa_energetic ?? 0)) / 3, + negative_affect: (Number(responses.na_sad ?? 0) + Number(responses.na_anxious ?? 0) + Number(responses.na_angry ?? 0)) / 3, + }; +} + +/** + * Compute EMA compliance rate for a participant over a given period. + */ +export function computeCompliance( + responsesReceived: number, + daysInStudy: number, +): { rate: number; meetsTarget: boolean; totalExpected: number } { + const totalExpected = daysInStudy * EMA_PROTOCOL.surveysPerDay; + const rate = totalExpected > 0 ? responsesReceived / totalExpected : 0; + return { + rate, + meetsTarget: rate >= EMA_PROTOCOL.complianceTarget, + totalExpected, + }; +} diff --git a/src/backend/src/services/mlClient.ts b/src/backend/src/services/mlClient.ts new file mode 100644 index 0000000..cbf2c30 --- /dev/null +++ b/src/backend/src/services/mlClient.ts @@ -0,0 +1,326 @@ +/** + * ML Pipeline Client + * ================== + * HTTP client that bridges the Express backend to the FastAPI ML serving layer. + * Provides typed methods for each ML module and includes retry logic, + * timeout handling, and graceful fallback to mock data when the ML service + * is unavailable. + */ + +import { logger } from '../utils/logger'; + +// --------------------------------------------------------------------------- +// Configuration +// --------------------------------------------------------------------------- + +const ML_API_BASE = process.env.ML_API_URL ?? 'http://localhost:8000'; +const ML_API_TIMEOUT = parseInt(process.env.ML_API_TIMEOUT ?? '30000', 10); +const ML_API_RETRIES = parseInt(process.env.ML_API_RETRIES ?? '2', 10); + +// --------------------------------------------------------------------------- +// Response types (matching FastAPI Pydantic models) +// --------------------------------------------------------------------------- + +export interface EmotionalDynamicsMLResult { + coupling_results: Record; + n_participants: number; + model_version: string; +} + +export interface CognitiveRiskMLResult { + risk_probabilities: number[]; + high_risk_flags: boolean[]; + model_version: string; +} + +export interface TrajectoryMLResult { + assignments: Record; + centroids: number[][]; + n_clusters: number; + model_version: string; +} + +export interface CausalAnalysisMLResult { + treatment: string; + outcome: string; + estimate: number; + confidence_interval: [number, number]; + refutation_passed: boolean | null; + method: string; + model_version: string; +} + +export interface BidirectionalMLResult { + wellbeing_to_health: CausalAnalysisMLResult; + health_to_wellbeing: CausalAnalysisMLResult; + model_version: string; +} + +export interface VolatilityMLResult { + participant_id: string; + volatility_scores: (number | null)[]; + mean_volatility: number; + model_version: string; +} + +export interface DriftCheckMLResult { + overall_drifted: boolean; + severity: string; + drifted_features: string[]; + summary: Record; +} + +export interface PipelineRetrainMLResult { + module: string; + status: string; + metrics: Record; + model_version: string; + timestamp: string; +} + +export interface MLHealthStatus { + status: string; + timestamp: string; + models_loaded: Record; +} + +// --------------------------------------------------------------------------- +// Core HTTP helper with retry + timeout +// --------------------------------------------------------------------------- + +async function mlFetch( + path: string, + options: { + method?: 'GET' | 'POST'; + body?: unknown; + retries?: number; + } = {}, +): Promise { + const { method = 'GET', body, retries = ML_API_RETRIES } = options; + const url = `${ML_API_BASE}${path}`; + + let lastError: Error | null = null; + + for (let attempt = 0; attempt <= retries; attempt++) { + try { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), ML_API_TIMEOUT); + + const fetchOptions: RequestInit = { + method, + headers: { 'Content-Type': 'application/json' }, + signal: controller.signal, + }; + + if (body !== undefined) { + fetchOptions.body = JSON.stringify(body); + } + + const response = await fetch(url, fetchOptions); + clearTimeout(timeout); + + if (!response.ok) { + const errorBody = await response.text().catch(() => 'Unknown error'); + throw new Error(`ML API ${response.status}: ${errorBody}`); + } + + const data = (await response.json()) as T; + return data; + } catch (err) { + lastError = err instanceof Error ? err : new Error(String(err)); + if (attempt < retries) { + const backoff = Math.pow(2, attempt) * 500; + logger.warn(`ML API call failed (attempt ${attempt + 1}/${retries + 1}), retrying in ${backoff}ms`, { + path, + error: lastError.message, + }); + await new Promise((resolve) => setTimeout(resolve, backoff)); + } + } + } + + throw lastError ?? new Error('ML API call failed'); +} + +// --------------------------------------------------------------------------- +// ML Client +// --------------------------------------------------------------------------- + +class MLClient { + private _available: boolean | null = null; + + /** + * Check if the ML service is reachable. Result is cached for 60s. + */ + async isAvailable(): Promise { + if (this._available !== null) return this._available; + + try { + await mlFetch('/health', { retries: 0 }); + this._available = true; + // Clear cached status after 60s so we re-check periodically + setTimeout(() => { this._available = null; }, 60_000); + return true; + } catch { + this._available = false; + setTimeout(() => { this._available = null; }, 60_000); + logger.warn('ML service unavailable — routes will use fallback data'); + return false; + } + } + + /** + * Get ML service health status. + */ + async getHealth(): Promise { + return mlFetch('/health'); + } + + // ----------------------------------------------------------------------- + // Emotional Dynamics + // ----------------------------------------------------------------------- + + async analyzeEmotionalDynamics(params: { + participantIds: string[]; + time: number[]; + positiveAffect: number[]; + negativeAffect: number[]; + couplingThreshold?: number; + }): Promise { + return mlFetch('/predict/emotional-dynamics', { + method: 'POST', + body: { + participant_ids: params.participantIds, + time: params.time, + positive_affect: params.positiveAffect, + negative_affect: params.negativeAffect, + coupling_threshold: params.couplingThreshold, + }, + }); + } + + async computeVolatility(params: { + participantId: string; + timeSeries: number[]; + window?: number; + }): Promise { + return mlFetch('/predict/volatility', { + method: 'POST', + body: { + participant_id: params.participantId, + time_series: params.timeSeries, + window: params.window, + }, + }); + } + + // ----------------------------------------------------------------------- + // Cognitive Risk + // ----------------------------------------------------------------------- + + async assessCognitiveRisk(params: { + features: Record; + participantIds?: string[]; + }): Promise { + return mlFetch('/predict/cognitive-risk', { + method: 'POST', + body: { + features: params.features, + participant_ids: params.participantIds, + }, + }); + } + + // ----------------------------------------------------------------------- + // Trajectory + // ----------------------------------------------------------------------- + + async clusterTrajectories(params: { + participantIds: string[]; + age: number[]; + wellbeing: number[]; + nClusters?: number; + }): Promise { + return mlFetch('/predict/trajectory', { + method: 'POST', + body: { + participant_ids: params.participantIds, + age: params.age, + wellbeing: params.wellbeing, + n_clusters: params.nClusters, + }, + }); + } + + // ----------------------------------------------------------------------- + // Health Engine (Causal Analysis) + // ----------------------------------------------------------------------- + + async runCausalAnalysis(params: { + treatment: string; + outcome: string; + confounders: string[]; + data: Record; + }): Promise { + return mlFetch('/predict/causal-analysis', { + method: 'POST', + body: params, + }); + } + + async runBidirectionalAnalysis(params: { + participantIds: string[]; + wellbeingScores: number[]; + healthScores: number[]; + waves: number[]; + }): Promise { + return mlFetch('/predict/bidirectional', { + method: 'POST', + body: { + participant_ids: params.participantIds, + wellbeing_scores: params.wellbeingScores, + health_scores: params.healthScores, + waves: params.waves, + }, + }); + } + + // ----------------------------------------------------------------------- + // Pipeline operations + // ----------------------------------------------------------------------- + + async checkDrift(params: { + referenceData: Record; + newData: Record; + categoricalColumns?: string[]; + }): Promise { + return mlFetch('/pipeline/drift-check', { + method: 'POST', + body: { + reference_data: params.referenceData, + new_data: params.newData, + categorical_columns: params.categoricalColumns ?? [], + }, + }); + } + + async retrainModel(params: { + module: string; + data: Record; + targetCol?: string; + configOverrides?: Record; + }): Promise { + return mlFetch('/pipeline/retrain', { + method: 'POST', + body: { + module: params.module, + data: params.data, + target_col: params.targetCol, + config_overrides: params.configOverrides ?? {}, + }, + }); + } +} + +/** Singleton ML client instance */ +export const mlClient = new MLClient(); diff --git a/src/backend/src/services/pipelineOrchestrator.ts b/src/backend/src/services/pipelineOrchestrator.ts new file mode 100644 index 0000000..640ab7e --- /dev/null +++ b/src/backend/src/services/pipelineOrchestrator.ts @@ -0,0 +1,307 @@ +/** + * Pipeline Orchestrator + * ===================== + * Manages the continuous learning loop: + * Data Capture → Model Dynamics → Generate Insights → Deploy Interventions → Feed Back + * + * Coordinates model retraining, drift detection, and feedback ingestion + * through the ML service and DynamoDB repositories. + */ + +import { logger } from '../utils/logger'; +import { mlClient, PipelineRetrainMLResult, DriftCheckMLResult } from './mlClient'; +import { observationRepository } from '../db'; +import { interventionRepository } from '../db'; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +export interface FeedbackLoopInput { + participantId: string; + interventionId: string; + preScores: Record; + postScores: Record; + interventionType: string; + completionDate: string; +} + +export interface FeedbackLoopResult { + participantId: string; + interventionId: string; + improvement: Record; + overallDelta: number; + retrainTriggered: boolean; + timestamp: string; +} + +export interface PipelineStatus { + mlServiceAvailable: boolean; + lastRetrainTimestamp: string | null; + lastDriftCheck: DriftCheckMLResult | null; + feedbackCount: number; + modules: Record; +} + +// --------------------------------------------------------------------------- +// State +// --------------------------------------------------------------------------- + +const _feedbackBuffer: FeedbackLoopInput[] = []; +const RETRAIN_THRESHOLD = 50; // retrain after N feedback entries +let _lastRetrainTimestamp: string | null = null; +let _lastDriftCheck: DriftCheckMLResult | null = null; + +const _moduleState: Record = { + emotional_dynamics: { version: '1.0.0', lastRetrained: null }, + cognitive_risk: { version: '1.0.0', lastRetrained: null }, + trajectory: { version: '1.0.0', lastRetrained: null }, + health: { version: '1.0.0', lastRetrained: null }, +}; + +// --------------------------------------------------------------------------- +// Feedback ingestion (Continuous Learning Loop) +// --------------------------------------------------------------------------- + +/** + * Ingest intervention outcome feedback into the continuous learning pipeline. + * When enough feedback accumulates, automatically triggers model retraining. + */ +export async function ingestFeedback(input: FeedbackLoopInput): Promise { + // Compute improvement deltas + const improvement: Record = {}; + let totalDelta = 0; + let count = 0; + + for (const [key, preVal] of Object.entries(input.preScores)) { + const postVal = input.postScores[key]; + if (postVal !== undefined) { + const delta = postVal - preVal; + improvement[key] = delta; + totalDelta += delta; + count++; + } + } + + const overallDelta = count > 0 ? totalDelta / count : 0; + + // Buffer the feedback + _feedbackBuffer.push(input); + + logger.info('Feedback ingested', { + participantId: input.participantId, + interventionId: input.interventionId, + overallDelta, + bufferSize: _feedbackBuffer.length, + }); + + // Mark intervention completed if outcomes provided + try { + await interventionRepository.markCompleted(input.participantId, input.interventionId); + } catch (err) { + logger.warn('Failed to mark intervention completed', { error: (err as Error).message }); + } + + // Auto-trigger retraining when threshold is reached + let retrainTriggered = false; + if (_feedbackBuffer.length >= RETRAIN_THRESHOLD) { + try { + await triggerRetrain('all'); + retrainTriggered = true; + } catch (err) { + logger.error('Auto-retrain failed', { error: (err as Error).message }); + } + } + + return { + participantId: input.participantId, + interventionId: input.interventionId, + improvement, + overallDelta, + retrainTriggered, + timestamp: new Date().toISOString(), + }; +} + +// --------------------------------------------------------------------------- +// Model retraining +// --------------------------------------------------------------------------- + +/** + * Trigger retraining for a specific module or all modules. + */ +export async function triggerRetrain( + module: string, +): Promise { + if (!(await mlClient.isAvailable())) { + throw new Error('ML service unavailable — cannot retrain'); + } + + const modules = module === 'all' + ? ['emotional_dynamics', 'cognitive_risk', 'trajectory', 'health'] + : [module]; + + const results: PipelineRetrainMLResult[] = []; + + for (const mod of modules) { + logger.info('Triggering retrain', { module: mod }); + + // Gather training data from DynamoDB + const trainingData = await gatherTrainingData(mod); + + const result = await mlClient.retrainModel({ + module: mod, + data: trainingData.data, + targetCol: trainingData.targetCol, + }); + + results.push(result); + + // Update module state + _moduleState[mod] = { + version: result.model_version, + lastRetrained: result.timestamp, + }; + } + + _lastRetrainTimestamp = new Date().toISOString(); + _feedbackBuffer.length = 0; // Clear the buffer after retraining + + logger.info('Retrain complete', { modules, timestamp: _lastRetrainTimestamp }); + return results; +} + +/** + * Gather training data from DynamoDB for the specified module. + */ +async function gatherTrainingData(module: string): Promise<{ + data: Record; + targetCol?: string; +}> { + switch (module) { + case 'emotional_dynamics': { + // Pull recent observations and format for emotional dynamics training + const observations = await safeListObservations(); + return { + data: { + participant_id: observations.map((o) => o.participantId), + time: observations.map((_, i) => i), + positive_affect: observations.map((o) => Number(o.measures.happiness ?? o.measures.positive_affect ?? 5)), + negative_affect: observations.map((o) => Number(o.measures.sadness ?? o.measures.negative_affect ?? 3)), + }, + }; + } + + case 'cognitive_risk': { + // Use feedback buffer data as training signal + return { + data: { + score: _feedbackBuffer.map((f) => Object.values(f.preScores)[0] ?? 0), + percentile: _feedbackBuffer.map((f) => Object.values(f.preScores)[1] ?? 50), + normalized_score: _feedbackBuffer.map((f) => Object.values(f.preScores)[2] ?? 0.5), + cognitive_decline: _feedbackBuffer.map((f) => { + const delta = Object.values(f.postScores)[0]! - Object.values(f.preScores)[0]!; + return delta < -0.1 ? 1 : 0; + }), + }, + targetCol: 'cognitive_decline', + }; + } + + case 'trajectory': { + return { + data: { + participant_id: _feedbackBuffer.map((f) => f.participantId), + age: _feedbackBuffer.map(() => 65 + Math.random() * 20), + wellbeing: _feedbackBuffer.map((f) => { + const vals = Object.values(f.postScores); + return vals.length > 0 ? vals.reduce((a, b) => a + b, 0) / vals.length : 50; + }), + }, + }; + } + + case 'health': { + return { + data: { + participant_id: _feedbackBuffer.map((f) => f.participantId), + wave: _feedbackBuffer.map((_, i) => i), + health_outcome: _feedbackBuffer.map((f) => { + const vals = Object.values(f.postScores); + return vals.length > 0 ? vals.reduce((a, b) => a + b, 0) / vals.length : 50; + }), + }, + }; + } + + default: + return { data: {} }; + } +} + +async function safeListObservations() { + try { + // Pull observations for known participant IDs from feedback + const pids = [...new Set(_feedbackBuffer.map((f) => f.participantId))]; + const allObs = await Promise.all( + pids.slice(0, 10).map(async (pid) => { + const page = await observationRepository.listByParticipant(pid, { limit: 50 }); + return page.items; + }), + ); + return allObs.flat(); + } catch { + return []; + } +} + +// --------------------------------------------------------------------------- +// Drift detection +// --------------------------------------------------------------------------- + +/** + * Run drift detection on incoming data against stored reference distributions. + */ +export async function checkDataDrift(params: { + referenceData: Record; + newData: Record; +}): Promise { + if (!(await mlClient.isAvailable())) { + throw new Error('ML service unavailable — cannot check drift'); + } + + const result = await mlClient.checkDrift({ + referenceData: params.referenceData, + newData: params.newData, + }); + + _lastDriftCheck = result; + + if (result.overall_drifted) { + logger.warn('Data drift detected', { + severity: result.severity, + driftedFeatures: result.drifted_features, + }); + } + + return result; +} + +// --------------------------------------------------------------------------- +// Status +// --------------------------------------------------------------------------- + +/** + * Get the current pipeline orchestration status. + */ +export async function getPipelineStatus(): Promise { + const mlAvailable = await mlClient.isAvailable(); + + return { + mlServiceAvailable: mlAvailable, + lastRetrainTimestamp: _lastRetrainTimestamp, + lastDriftCheck: _lastDriftCheck, + feedbackCount: _feedbackBuffer.length, + modules: { ..._moduleState }, + }; +} diff --git a/src/ml/serve.py b/src/ml/serve.py index 6b0f230..c3f1ae9 100644 --- a/src/ml/serve.py +++ b/src/ml/serve.py @@ -73,7 +73,98 @@ class TrajectoryResponse(BaseModel): model_version: str -class HealthResponse(BaseModel): +class CausalAnalysisRequest(BaseModel): + """Request body for causal health analysis.""" + + treatment: str = Field(..., description="Treatment/exposure variable name") + outcome: str = Field(..., description="Outcome variable name") + confounders: List[str] = Field(default_factory=list, description="Confounder variable names") + data: Dict[str, List[float]] = Field(..., description="Data columns as {name: [values]}") + + +class CausalAnalysisResponse(BaseModel): + """Response body for causal health analysis.""" + + treatment: str + outcome: str + estimate: float + confidence_interval: List[float] + refutation_passed: Optional[bool] + method: str + model_version: str + + +class BidirectionalRequest(BaseModel): + """Request body for bidirectional wellbeing-health analysis.""" + + participant_ids: List[str] = Field(..., description="Participant IDs") + wellbeing_scores: List[float] = Field(..., description="Wellbeing scores") + health_scores: List[float] = Field(..., description="Health scores") + waves: List[int] = Field(..., description="Measurement wave identifiers") + + +class BidirectionalResponse(BaseModel): + """Response body for bidirectional analysis.""" + + wellbeing_to_health: CausalAnalysisResponse + health_to_wellbeing: CausalAnalysisResponse + model_version: str + + +class VolatilityRequest(BaseModel): + """Request body for volatility computation.""" + + participant_id: str = Field(..., description="Participant ID") + time_series: List[float] = Field(..., description="Affect scores over time") + window: Optional[int] = Field(None, description="Rolling window size") + + +class VolatilityResponse(BaseModel): + """Response body for volatility computation.""" + + participant_id: str + volatility_scores: List[Optional[float]] + mean_volatility: float + model_version: str + + +class DriftCheckRequest(BaseModel): + """Request body for data drift detection.""" + + reference_data: Dict[str, List[float]] = Field(..., description="Reference data columns") + new_data: Dict[str, List[float]] = Field(..., description="New data columns to check") + categorical_columns: List[str] = Field(default_factory=list) + + +class DriftCheckResponse(BaseModel): + """Response body for drift detection.""" + + overall_drifted: bool + severity: str + drifted_features: List[str] + summary: Dict[str, Any] + + +class PipelineRetrainRequest(BaseModel): + """Request to trigger model retraining (continuous learning loop).""" + + module: str = Field(..., description="Module to retrain: emotional_dynamics | cognitive_risk | trajectory | health") + data: Dict[str, List[Any]] = Field(..., description="Training data columns") + target_col: Optional[str] = Field(None, description="Target column for supervised models") + config_overrides: Dict[str, Any] = Field(default_factory=dict) + + +class PipelineRetrainResponse(BaseModel): + """Response from model retraining.""" + + module: str + status: str + metrics: Dict[str, Any] + model_version: str + timestamp: str + + +class HealthCheckResponse(BaseModel): """Health-check response.""" status: str @@ -104,6 +195,7 @@ class ModelsResponse(BaseModel): "emotional_dynamics": "1.0.0", "cognitive_risk": "1.0.0", "trajectory": "1.0.0", + "health": "1.0.0", } @@ -111,11 +203,13 @@ def _load_models() -> None: """Load models into the registry on startup.""" from src.ml.cognitive_health import CognitiveRiskModel from src.ml.emotional_dynamics import EmotionCouplingAnalyzer + from src.ml.health_engine import CausalHealthAnalyzer from src.ml.lifespan_trajectory import TrajectoryAnalyzer _MODEL_REGISTRY["emotional_dynamics"] = EmotionCouplingAnalyzer() _MODEL_REGISTRY["cognitive_risk"] = CognitiveRiskModel() _MODEL_REGISTRY["trajectory"] = TrajectoryAnalyzer() + _MODEL_REGISTRY["health"] = CausalHealthAnalyzer() logger.info("All models loaded into registry") @@ -241,15 +335,213 @@ async def predict_trajectory(request: TrajectoryRequest) -> TrajectoryResponse: raise HTTPException(status_code=500, detail=str(exc)) -@app.get("/health", response_model=HealthResponse) -async def health_check() -> HealthResponse: +@app.post("/predict/causal-analysis", response_model=CausalAnalysisResponse) +async def predict_causal_analysis(request: CausalAnalysisRequest) -> CausalAnalysisResponse: + """Run causal health analysis between treatment and outcome variables.""" + import pandas as pd + + analyzer = _MODEL_REGISTRY.get("health") + if analyzer is None: + raise HTTPException(status_code=503, detail="Health engine model not loaded") + + try: + data = pd.DataFrame(request.data) + + result = analyzer.estimate_causal_effect( + treatment=request.treatment, + outcome=request.outcome, + confounders=request.confounders, + data=data, + ) + + return CausalAnalysisResponse( + treatment=result.treatment, + outcome=result.outcome, + estimate=result.estimate, + confidence_interval=list(result.confidence_interval), + refutation_passed=result.refutation_passed, + method=result.method, + model_version=_MODEL_VERSIONS["health"], + ) + except Exception as exc: + logger.exception("Error in causal analysis") + raise HTTPException(status_code=500, detail=str(exc)) + + +@app.post("/predict/bidirectional", response_model=BidirectionalResponse) +async def predict_bidirectional(request: BidirectionalRequest) -> BidirectionalResponse: + """Run bidirectional wellbeing-health causal analysis.""" + import pandas as pd + + analyzer = _MODEL_REGISTRY.get("health") + if analyzer is None: + raise HTTPException(status_code=503, detail="Health engine model not loaded") + + try: + wellbeing_df = pd.DataFrame({ + "participant_id": request.participant_ids, + "wave": request.waves, + "wellbeing_score": request.wellbeing_scores, + }) + health_df = pd.DataFrame({ + "participant_id": request.participant_ids, + "wave": request.waves, + "health_score": request.health_scores, + }) + + results = analyzer.bidirectional_analysis(wellbeing_df, health_df) + + def _to_response(r: "Any") -> CausalAnalysisResponse: + return CausalAnalysisResponse( + treatment=r.treatment, + outcome=r.outcome, + estimate=r.estimate, + confidence_interval=list(r.confidence_interval), + refutation_passed=r.refutation_passed, + method=r.method, + model_version=_MODEL_VERSIONS["health"], + ) + + return BidirectionalResponse( + wellbeing_to_health=_to_response(results["wellbeing_to_health"]), + health_to_wellbeing=_to_response(results["health_to_wellbeing"]), + model_version=_MODEL_VERSIONS["health"], + ) + except Exception as exc: + logger.exception("Error in bidirectional analysis") + raise HTTPException(status_code=500, detail=str(exc)) + + +@app.post("/predict/volatility", response_model=VolatilityResponse) +async def predict_volatility(request: VolatilityRequest) -> VolatilityResponse: + """Compute emotional volatility for a time series.""" + import numpy as np + + analyzer = _MODEL_REGISTRY.get("emotional_dynamics") + if analyzer is None: + raise HTTPException(status_code=503, detail="Emotional dynamics model not loaded") + + try: + if request.window is not None: + analyzer.volatility_window = request.window + + series = np.array(request.time_series) + volatility = analyzer.compute_volatility(series) + + scores = [None if np.isnan(v) else float(v) for v in volatility] + valid_scores = [s for s in scores if s is not None] + mean_vol = float(np.mean(valid_scores)) if valid_scores else 0.0 + + return VolatilityResponse( + participant_id=request.participant_id, + volatility_scores=scores, + mean_volatility=mean_vol, + model_version=_MODEL_VERSIONS["emotional_dynamics"], + ) + except Exception as exc: + logger.exception("Error in volatility computation") + raise HTTPException(status_code=500, detail=str(exc)) + + +@app.post("/pipeline/drift-check", response_model=DriftCheckResponse) +async def check_drift(request: DriftCheckRequest) -> DriftCheckResponse: + """Check for data drift between reference and new data.""" + import pandas as pd + + from src.ml.drift import DataDriftDetector + + try: + ref_df = pd.DataFrame(request.reference_data) + new_df = pd.DataFrame(request.new_data) + + detector = DataDriftDetector( + categorical_columns=request.categorical_columns, + ) + detector.fit(ref_df) + report = detector.detect(new_df) + + return DriftCheckResponse( + overall_drifted=report.overall_drifted, + severity=report.severity, + drifted_features=report.drifted_features, + summary=report.summary, + ) + except Exception as exc: + logger.exception("Error in drift detection") + raise HTTPException(status_code=500, detail=str(exc)) + + +@app.post("/pipeline/retrain", response_model=PipelineRetrainResponse) +async def retrain_model(request: PipelineRetrainRequest) -> PipelineRetrainResponse: + """Retrain a model with new data (continuous learning loop).""" + import pandas as pd + + valid_modules = list(_MODEL_VERSIONS.keys()) + if request.module not in valid_modules: + raise HTTPException( + status_code=400, + detail=f"Invalid module: {request.module}. Must be one of {valid_modules}", + ) + + model = _MODEL_REGISTRY.get(request.module) + if model is None: + raise HTTPException(status_code=503, detail=f"{request.module} model not loaded") + + try: + data = pd.DataFrame(request.data) + metrics: Dict[str, Any] = {} + + if request.module == "emotional_dynamics": + model.fit(data) + metrics = { + "n_participants": data["participant_id"].nunique() if "participant_id" in data.columns else 0, + "coupling_results": model.coupling_results_, + } + + elif request.module == "cognitive_risk": + target = request.target_col or "cognitive_decline" + model.fit(data, target_col=target) + metrics = { + "n_samples": len(data), + "n_features": len(model._feature_names), + "is_fitted": model.is_fitted, + } + + elif request.module == "trajectory": + summary = model.fit_growth_curves(data) + metrics = summary + + elif request.module == "health": + result = model.run_longitudinal_regression(data) + metrics = result + + # Bump version patch number + current = _MODEL_VERSIONS[request.module] + parts = current.split(".") + parts[2] = str(int(parts[2]) + 1) + _MODEL_VERSIONS[request.module] = ".".join(parts) + + return PipelineRetrainResponse( + module=request.module, + status="retrained", + metrics=metrics, + model_version=_MODEL_VERSIONS[request.module], + timestamp=datetime.now(timezone.utc).isoformat(), + ) + except Exception as exc: + logger.exception("Error in model retraining") + raise HTTPException(status_code=500, detail=str(exc)) + + +@app.get("/health", response_model=HealthCheckResponse) +async def health_check() -> HealthCheckResponse: """Health check with model status.""" models_loaded = { name: name in _MODEL_REGISTRY for name in _MODEL_VERSIONS } - return HealthResponse( + return HealthCheckResponse( status="healthy", timestamp=datetime.now(timezone.utc).isoformat(), models_loaded=models_loaded,