Skip to content

Integrate ML service with continuous learning pipeline and EMA surveys#5

Merged
dougdevitre merged 1 commit intomainfrom
claude/integrate-new-features-smYFv
Apr 6, 2026
Merged

Integrate ML service with continuous learning pipeline and EMA surveys#5
dougdevitre merged 1 commit intomainfrom
claude/integrate-new-features-smYFv

Conversation

@dougdevitre
Copy link
Copy Markdown
Owner

Summary

This PR establishes the complete bridge between the Express backend and FastAPI ML serving layer, implementing the continuous learning loop (Data Capture → Model Dynamics → Generate Insights → Deploy Interventions → Feedback). It adds EMA survey infrastructure, pipeline orchestration for model retraining and drift detection, and integrates ML predictions throughout the API routes with graceful fallbacks to mock data.

Key Changes

ML Service Integration

  • New mlClient.ts: HTTP client with typed methods for all ML modules (emotional dynamics, cognitive risk, trajectory, causal analysis, bidirectional analysis, volatility, drift detection, and model retraining). Includes retry logic with exponential backoff, timeout handling, and health check caching.
  • New pipelineOrchestrator.ts: Manages the continuous learning loop—ingests intervention feedback, buffers it, auto-triggers retraining at thresholds, and coordinates drift detection. Maintains module state and training data gathering from DynamoDB.

EMA Survey Protocol

  • New emaSurveyConfig.ts: Implements "A Close Look at Daily Life" protocol with 9 surveys/day over 2 weeks. Defines core items (affect, life satisfaction, social interaction) and extended items (self-esteem, psychological richness, serendipity) with proper scheduling. Includes compliance tracking and response-to-measures mapping.
  • New ema.ts routes: Endpoints for survey delivery (GET /participants/:id/ema/survey) and submission (POST /participants/:id/ema/submit), with automatic persistence to DynamoDB and fallback handling.

Pipeline API

  • New pipeline.ts routes: Endpoints for feedback ingestion (POST /pipeline/feedback), manual retraining (POST /pipeline/retrain), drift detection (POST /pipeline/drift-check), and status monitoring (GET /pipeline/status). All routes require researcher/admin authorization.

Route Enhancements with ML Integration

  • emotional-dynamics.ts: Queries DynamoDB observations and calls ML service for emotion coupling analysis and volatility computation; falls back to mock data when ML unavailable.
  • cognitive.ts: Integrates ML-based cognitive risk assessment with DynamoDB fallback.
  • health.ts: Adds causal analysis endpoints via ML service for treatment-outcome relationships and bidirectional wellbeing-health analysis.
  • lifespan.ts: Retrieves trajectory data from DynamoDB with ML-powered cluster analysis; graceful fallback to mock results.
  • insights.ts: Auto-enriches emotional dynamics context from ML service when query parameters are missing.
  • participants.ts, observations.ts, interventions.ts: Updated to query DynamoDB repositories with mock data fallback.

FastAPI ML Service Extensions

  • serve.py: Added request/response models for causal analysis, bidirectional analysis, volatility, drift detection, and pipeline retraining. New endpoints: /predict/causal-analysis, /predict/bidirectional, /predict/volatility, /predict/drift-check, /pipeline/retrain. Integrated CausalHealthAnalyzer into model registry.

Implementation Details

  • Graceful Degradation: All routes attempt ML service calls but seamlessly fall back to mock/cached data if unavailable, ensuring the API remains functional during ML service downtime.
  • Continuous Learning: Feedback buffer auto-triggers retraining after 50 entries; training data is gathered from DynamoDB repositories per module.
  • Type Safety: Full TypeScript interfaces for all ML request/response types matching FastAPI Pydantic models.
  • Retry & Timeout: ML client implements exponential backoff (2 retries by default) and 30-second timeout with configurable environment variables.
  • Health Caching: ML service availability is cached for 60 seconds to avoid repeated health checks.

https://claude.ai/code/session_01RDjuYsc96BWr7Y1FGkmbgU

…EMA survey protocol

Wire all backend routes to DynamoDB repositories with graceful fallback to mock
data. Add ML client service bridging Express backend to FastAPI ML serving layer.
Expand FastAPI serve.py with health engine, bidirectional analysis, volatility,
drift detection, and model retraining endpoints. Implement continuous learning
loop via pipeline orchestrator with feedback ingestion and auto-retrain.
Add EMA survey configuration matching the "A Close Look at Daily Life" protocol
(9 surveys/day, 2-week period, 80% compliance target). Wire insight generation
to auto-enrich from ML service when query params not provided.

https://claude.ai/code/session_01RDjuYsc96BWr7Y1FGkmbgU
@dougdevitre dougdevitre requested a review from Copilot April 6, 2026 22:12
@dougdevitre dougdevitre merged commit 9c6c7b3 into main Apr 6, 2026
8 checks passed
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR connects the Express backend to the FastAPI ML serving layer and introduces a continuous-learning “pipeline” surface area (feedback → drift checks → retraining) alongside new EMA survey configuration and endpoints, with DynamoDB-first reads/writes and mock fallbacks.

Changes:

  • Added a typed ML HTTP client plus a pipeline orchestrator that buffers feedback, triggers retraining, and runs drift checks.
  • Implemented EMA protocol configuration + new EMA routes for survey delivery, submission, and compliance stats.
  • Extended the FastAPI service with new prediction/pipeline endpoints (causal analysis, bidirectional, volatility, drift-check, retrain) and integrated ML calls into multiple existing routes with fallback behavior.

Reviewed changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 15 comments.

Show a summary per file
File Description
src/ml/serve.py Adds new Pydantic models and endpoints for causal/bidirectional/volatility/drift/retrain; registers health model.
src/backend/src/services/mlClient.ts New typed HTTP client with retry/timeout and health caching for the ML service.
src/backend/src/services/pipelineOrchestrator.ts New orchestrator for feedback ingestion, drift checks, and retraining with training-data gathering.
src/backend/src/services/emaSurveyConfig.ts Defines EMA protocol items, scheduling, response mapping, and compliance computation.
src/backend/src/routes/pipeline.ts New secured API endpoints for feedback ingestion, retrain, drift-check, and pipeline status.
src/backend/src/routes/ema.ts New EMA survey delivery/submission/compliance/protocol endpoints backed by DynamoDB observations.
src/backend/src/routes/emotional-dynamics.ts Uses DynamoDB observations + ML service for coupling/volatility; falls back to mock results.
src/backend/src/routes/cognitive.ts Reads cognitive assessments from DynamoDB and uses ML risk scoring with fallback.
src/backend/src/routes/health.ts Reads health records from DynamoDB and calls ML causal analysis with fallback.
src/backend/src/routes/lifespan.ts Reads trajectories from DynamoDB and calls ML clustering with fallback.
src/backend/src/routes/insights.ts Auto-enriches missing insight context using DynamoDB observations + ML calls.
src/backend/src/routes/participants.ts Switches to DynamoDB repository with mock fallback; adjusts participant ID regex.
src/backend/src/routes/observations.ts Switches to DynamoDB repository with mock fallback for listing/creating observations.
src/backend/src/routes/interventions.ts Switches to DynamoDB repository with mock fallback for listing/creating interventions.
src/backend/src/index.ts Registers the new pipeline and EMA routers.
Comments suppressed due to low confidence (1)

src/backend/src/routes/insights.ts:222

  • After participantInsightQuerySchema.safeParse(...) succeeds, the code no longer uses parseResult.data to get the validated/narrowed values. As written, couplingType/couplingStrength/etc. remain the broader inferred types and can cause type errors (and risks diverging from Zod defaults/coercions). Prefer destructuring from parseResult.data and use those values when calling generateParticipantInsights.
    const parseResult = participantInsightQuerySchema.safeParse({
      couplingType,
      couplingStrength,
      volatility,
      inertia,
      recentTrend,
    });

    if (!parseResult.success) {
      res.status(400).json({
        success: false,
        error: {
          code: 'VALIDATION_ERROR',
          message: 'Invalid insight request parameters',
          details: parseResult.error.flatten(),
        },
      });
      return;
    }

    logger.info('Generating participant insights', { participantId: id });

    const insights = await generateParticipantInsights(
      id,
      { couplingType, couplingStrength, volatility, inertia },
      recentTrend,
    );

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 13 to +14
/** Regex for participant ID format */
const ID_PATTERN = /^p-\d{3,}$/;
const ID_PATTERN = /^[pP]-\d{3,}$/;
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ID_PATTERN was widened to accept uppercase P-..., but other routes (e.g. insights) still validate lowercase-only and mock data/likely DynamoDB IDs appear lowercase (p-001). This makes ID validation inconsistent across the API and can lead to accepting IDs here that will 404 elsewhere. Consider keeping lowercase-only or normalizing incoming IDs to lowercase before validation/lookup across all participant-scoped routes.

Copilot uses AI. Check for mistakes.
logger.info('Fetching cognitive assessments', { participantId: id });

const results = mockCognitiveAssessments.filter((a) => a.participantId === id);
let results;
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let results; is an implicit any under strict TypeScript settings and will fail compilation. Please give this variable an explicit type (e.g., CognitiveAssessment[] or the repository page item type) before using it with paginate.

Suggested change
let results;
let results: (typeof mockCognitiveAssessments)[number][];

Copilot uses AI. Check for mistakes.
Comment on lines +62 to +68
let assessments;
try {
const page = await cognitiveRepository.listByParticipant(participantId);
assessments = page.items;
} catch {
assessments = mockCognitiveAssessments.filter((a) => a.participantId === participantId);
}
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let assessments; is also an implicit any in strict mode and will fail compilation. Add an explicit type (e.g. CognitiveAssessment[]) so the subsequent .length/.map usage is type-safe.

Copilot uses AI. Check for mistakes.
Comment on lines +33 to +39
let results;
try {
const page = await healthRepository.listByParticipant(id);
results = page.items;
if (domain) {
results = results.filter((r) => r.domain === domain);
}
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let results; is an implicit any under strict TypeScript settings and will fail compilation. Please type this variable explicitly (e.g. HealthRecord[]) before passing it to paginate / filtering by domain.

Copilot uses AI. Check for mistakes.
Comment on lines +92 to +103
const mlResult = await mlClient.runCausalAnalysis({
treatment: exposureVariable,
outcome: outcomeVariable,
confounders: covariates,
data: dataColumns,
});

result = {
estimatedEffect: mlResult.estimate,
confidenceInterval: mlResult.confidence_interval,
pValue: 0.003, // ML service doesn't return p-value directly yet
method: mlResult.method,
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API accepts a method in the request body, but it is not sent to the ML service, and the response method is taken from mlResult.method (ML-side default) rather than the requested method. This makes the method parameter effectively non-functional/misleading. Either plumb method through to the FastAPI request model + analyzer, or remove it from the public API and document the fixed method used.

Suggested change
const mlResult = await mlClient.runCausalAnalysis({
treatment: exposureVariable,
outcome: outcomeVariable,
confounders: covariates,
data: dataColumns,
});
result = {
estimatedEffect: mlResult.estimate,
confidenceInterval: mlResult.confidence_interval,
pValue: 0.003, // ML service doesn't return p-value directly yet
method: mlResult.method,
const mlRequest: Parameters<typeof mlClient.runCausalAnalysis>[0] & { method?: typeof method } = {
treatment: exposureVariable,
outcome: outcomeVariable,
confounders: covariates,
data: dataColumns,
method,
};
const mlResult = await mlClient.runCausalAnalysis(mlRequest);
result = {
estimatedEffect: mlResult.estimate,
confidenceInterval: mlResult.confidence_interval,
pValue: 0.003, // ML service doesn't return p-value directly yet
method: method ?? mlResult.method,

Copilot uses AI. Check for mistakes.
Comment on lines +197 to +203
return {
data: {
score: _feedbackBuffer.map((f) => Object.values(f.preScores)[0] ?? 0),
percentile: _feedbackBuffer.map((f) => Object.values(f.preScores)[1] ?? 50),
normalized_score: _feedbackBuffer.map((f) => Object.values(f.preScores)[2] ?? 0.5),
cognitive_decline: _feedbackBuffer.map((f) => {
const delta = Object.values(f.postScores)[0]! - Object.values(f.preScores)[0]!;
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the cognitive_risk training data builder, Object.values(...)[0]! uses non-null assertions and will throw when preScores/postScores are empty or don't share the same keys. This can break the auto-retrain path once the buffer reaches the threshold. Consider selecting explicit score keys, validating presence, and handling missing values safely before computing deltas.

Suggested change
return {
data: {
score: _feedbackBuffer.map((f) => Object.values(f.preScores)[0] ?? 0),
percentile: _feedbackBuffer.map((f) => Object.values(f.preScores)[1] ?? 50),
normalized_score: _feedbackBuffer.map((f) => Object.values(f.preScores)[2] ?? 0.5),
cognitive_decline: _feedbackBuffer.map((f) => {
const delta = Object.values(f.postScores)[0]! - Object.values(f.preScores)[0]!;
const firstPreScoresWithValues = _feedbackBuffer.find(
(f) => Object.keys(f.preScores).length > 0,
)?.preScores;
const fallbackPreScoreKeys = firstPreScoresWithValues ? Object.keys(firstPreScoresWithValues) : [];
const scoreKey = (firstPreScoresWithValues && 'score' in firstPreScoresWithValues)
? 'score'
: fallbackPreScoreKeys[0];
const percentileKey = (firstPreScoresWithValues && 'percentile' in firstPreScoresWithValues)
? 'percentile'
: fallbackPreScoreKeys[1];
const normalizedScoreKey = (firstPreScoresWithValues && 'normalized_score' in firstPreScoresWithValues)
? 'normalized_score'
: fallbackPreScoreKeys[2];
return {
data: {
score: _feedbackBuffer.map((f) =>
scoreKey && typeof f.preScores[scoreKey] === 'number' ? f.preScores[scoreKey] : 0,
),
percentile: _feedbackBuffer.map((f) =>
percentileKey && typeof f.preScores[percentileKey] === 'number' ? f.preScores[percentileKey] : 50,
),
normalized_score: _feedbackBuffer.map((f) =>
normalizedScoreKey && typeof f.preScores[normalizedScoreKey] === 'number'
? f.preScores[normalizedScoreKey]
: 0.5,
),
cognitive_decline: _feedbackBuffer.map((f) => {
const sharedKeys = Object.keys(f.preScores).filter((key) => key in f.postScores);
const deltaKey = scoreKey && scoreKey in f.postScores && scoreKey in f.preScores
? scoreKey
: sharedKeys[0];
if (!deltaKey) {
return 0;
}
const preValue = f.preScores[deltaKey];
const postValue = f.postScores[deltaKey];
if (typeof preValue !== 'number' || typeof postValue !== 'number') {
return 0;
}
const delta = postValue - preValue;

Copilot uses AI. Check for mistakes.
Comment on lines +212 to +220
return {
data: {
participant_id: _feedbackBuffer.map((f) => f.participantId),
age: _feedbackBuffer.map(() => 65 + Math.random() * 20),
wellbeing: _feedbackBuffer.map((f) => {
const vals = Object.values(f.postScores);
return vals.length > 0 ? vals.reduce((a, b) => a + b, 0) / vals.length : 50;
}),
},
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Training data generation for the trajectory retrain path uses Math.random() to synthesize ages. This makes retraining non-reproducible and can easily degrade models (and complicate debugging/auditing). Prefer sourcing age (or time) from persisted participant/trajectory records, or disable/guard trajectory retrains until real training columns are available.

Suggested change
return {
data: {
participant_id: _feedbackBuffer.map((f) => f.participantId),
age: _feedbackBuffer.map(() => 65 + Math.random() * 20),
wellbeing: _feedbackBuffer.map((f) => {
const vals = Object.values(f.postScores);
return vals.length > 0 ? vals.reduce((a, b) => a + b, 0) / vals.length : 50;
}),
},
logger.warn('Skipping trajectory retraining dataset generation because persisted age/time training columns are not available');
return {
data: {},

Copilot uses AI. Check for mistakes.
Comment on lines +87 to +108
// Flatten into ML request format
const pids: string[] = [];
const ages: number[] = [];
const wellbeingScores: number[] = [];

for (const { pid, trajectories } of allTrajectories) {
for (const traj of trajectories) {
for (const point of traj.points) {
pids.push(pid);
ages.push(point.age);
wellbeingScores.push(point.value);
}
}
}

if (pids.length >= 3) {
const mlResult = await mlClient.clusterTrajectories({
participantIds: pids,
age: ages,
wellbeing: wellbeingScores,
nClusters,
});
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ML call is gated by if (pids.length >= 3), but pids counts data points, not unique participants. Since nClusters is at least 2, this can still call the ML service with fewer unique participants than nClusters, which will make KMeans fail. Gate on new Set(pids).size >= nClusters (or >= 2) instead, and/or fall back when there’s insufficient unique participants.

Copilot uses AI. Check for mistakes.
Comment on lines +424 to +429
try:
if request.window is not None:
analyzer.volatility_window = request.window

series = np.array(request.time_series)
volatility = analyzer.compute_volatility(series)
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/predict/volatility mutates shared model state via analyzer.volatility_window = request.window. Because the analyzer instance is global, concurrent requests (or different clients) can race and affect each other’s results. Prefer passing window as an argument to compute_volatility, cloning the analyzer per request, or restoring the previous value in a finally block.

Copilot uses AI. Check for mistakes.
Comment on lines +236 to +241
anger: Number(responses.na_angry ?? 0),
life_satisfaction: Number(responses.ls_satisfied ?? 0),
life_conditions: Number(responses.ls_conditions ?? 0),
social_interaction: Boolean(responses.social_interaction),
social_quality: Number(responses.social_quality ?? 0),
// Extended items (may be undefined for mid-day surveys)
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

social_interaction is derived via Boolean(responses.social_interaction), which will coerce any non-empty string (including 'false') to true. Since responses allows string | boolean, this can silently flip answers. Consider only accepting booleans for this item (validation) or parsing 'true'/'false' strings explicitly.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants