diff --git a/apps/start/src/components/insights/insight-card.tsx b/apps/start/src/components/insights/insight-card.tsx new file mode 100644 index 00000000..abeff89a --- /dev/null +++ b/apps/start/src/components/insights/insight-card.tsx @@ -0,0 +1,229 @@ +import { countries } from '@/translations/countries'; +import type { RouterOutputs } from '@/trpc/client'; +import { cn } from '@/utils/cn'; +import type { InsightPayload } from '@openpanel/validation'; +import { ArrowDown, ArrowUp, FilterIcon, RotateCcwIcon } from 'lucide-react'; +import { last } from 'ramda'; +import { useState } from 'react'; +import { SerieIcon } from '../report-chart/common/serie-icon'; +import { Badge } from '../ui/badge'; + +function formatWindowKind(windowKind: string): string { + switch (windowKind) { + case 'yesterday': + return 'Yesterday'; + case 'rolling_7d': + return '7 Days'; + case 'rolling_30d': + return '30 Days'; + } + return windowKind; +} + +interface InsightCardProps { + insight: RouterOutputs['insight']['list'][number]; + className?: string; + onFilter?: () => void; +} + +export function InsightCard({ + insight, + className, + onFilter, +}: InsightCardProps) { + const payload = insight.payload; + const dimensions = payload?.dimensions; + const availableMetrics = Object.entries(payload?.metrics ?? {}); + + // Pick what to display: prefer share if available (geo/devices), else primaryMetric + const [metricIndex, setMetricIndex] = useState( + availableMetrics.findIndex(([key]) => key === payload?.primaryMetric), + ); + const currentMetricKey = availableMetrics[metricIndex][0]; + const currentMetricEntry = availableMetrics[metricIndex][1]; + + const metricUnit = currentMetricEntry?.unit; + const currentValue = currentMetricEntry?.current ?? null; + const compareValue = currentMetricEntry?.compare ?? null; + + const direction = currentMetricEntry?.direction ?? 'flat'; + const isIncrease = direction === 'up'; + const isDecrease = direction === 'down'; + + const deltaText = + metricUnit === 'ratio' + ? `${Math.abs((currentMetricEntry?.delta ?? 0) * 100).toFixed(1)}pp` + : `${Math.abs((currentMetricEntry?.changePct ?? 0) * 100).toFixed(1)}%`; + + // Format metric values + const formatValue = (value: number | null): string => { + if (value == null) return '-'; + if (metricUnit === 'ratio') return `${(value * 100).toFixed(1)}%`; + return Math.round(value).toLocaleString(); + }; + + // Get the metric label + const metricKeyToLabel = (key: string) => + key === 'share' ? 'Share' : key === 'pageviews' ? 'Pageviews' : 'Sessions'; + + const metricLabel = metricKeyToLabel(currentMetricKey); + + const renderTitle = () => { + if ( + dimensions[0]?.key === 'country' || + dimensions[0]?.key === 'referrer_name' || + dimensions[0]?.key === 'device' + ) { + return ( + + {insight.displayName} + + ); + } + + if (insight.displayName.startsWith('http')) { + return ( + + + {dimensions[1]?.displayName} + + ); + } + + return insight.displayName; + }; + + return ( +
+
+ + {formatWindowKind(insight.windowKind)} + + {/* Severity: subtle dot instead of big pill */} + {insight.severityBand && ( +
+ + + {insight.severityBand} + +
+ )} +
+ {onFilter && ( +
+ {availableMetrics.length > 1 ? ( + + ) : ( +
+ )} + +
+ )} +
+ {renderTitle()} +
+ + {/* Metric row */} +
+
+
+
+ {metricLabel} +
+ +
+
+ {formatValue(currentValue)} +
+ + {/* Inline compare, smaller */} + {compareValue != null && ( +
+ vs {formatValue(compareValue)} +
+ )} +
+
+ + {/* Delta chip */} + +
+
+
+ ); +} + +function DeltaChip({ + isIncrease, + isDecrease, + deltaText, +}: { + isIncrease: boolean; + isDecrease: boolean; + deltaText: string; +}) { + return ( +
+ {isIncrease ? ( + + ) : isDecrease ? ( + + ) : null} + {deltaText} +
+ ); +} diff --git a/apps/start/src/components/overview/overview-insights.tsx b/apps/start/src/components/overview/overview-insights.tsx new file mode 100644 index 00000000..995bf3f7 --- /dev/null +++ b/apps/start/src/components/overview/overview-insights.tsx @@ -0,0 +1,75 @@ +import { useEventQueryFilters } from '@/hooks/use-event-query-filters'; +import { useTRPC } from '@/integrations/trpc/react'; +import { useQuery } from '@tanstack/react-query'; +import { InsightCard } from '../insights/insight-card'; +import { Skeleton } from '../skeleton'; +import { + Carousel, + CarouselContent, + CarouselItem, + CarouselNext, + CarouselPrevious, +} from '../ui/carousel'; + +interface OverviewInsightsProps { + projectId: string; +} + +export default function OverviewInsights({ projectId }: OverviewInsightsProps) { + const trpc = useTRPC(); + const [filters, setFilter] = useEventQueryFilters(); + const { data: insights, isLoading } = useQuery( + trpc.insight.list.queryOptions({ + projectId, + limit: 20, + }), + ); + + if (isLoading) { + const keys = Array.from({ length: 4 }, (_, i) => `insight-skeleton-${i}`); + return ( +
+ + + {keys.map((key) => ( + + + + ))} + + +
+ ); + } + + if (!insights || insights.length === 0) return null; + + return ( +
+ + + {insights.map((insight) => ( + + { + insight.payload.dimensions.forEach((dim) => { + void setFilter(dim.key, dim.value, 'is'); + }); + }} + /> + + ))} + + + + +
+ ); +} diff --git a/apps/start/src/components/sidebar-project-menu.tsx b/apps/start/src/components/sidebar-project-menu.tsx index 80c5c8dd..e1d480fb 100644 --- a/apps/start/src/components/sidebar-project-menu.tsx +++ b/apps/start/src/components/sidebar-project-menu.tsx @@ -17,6 +17,7 @@ import { LayoutPanelTopIcon, PlusIcon, SparklesIcon, + TrendingUpDownIcon, UndoDotIcon, UsersIcon, WallpaperIcon, @@ -39,13 +40,18 @@ export default function SidebarProjectMenu({ }: SidebarProjectMenuProps) { return ( <> -
Insights
+
Analytics
+ diff --git a/apps/start/src/components/sidebar.tsx b/apps/start/src/components/sidebar.tsx index 5562247d..cc7869eb 100644 --- a/apps/start/src/components/sidebar.tsx +++ b/apps/start/src/components/sidebar.tsx @@ -123,7 +123,7 @@ export function SidebarContainer({
diff --git a/apps/start/src/components/ui/carousel.tsx b/apps/start/src/components/ui/carousel.tsx index 68a123a1..664335d6 100644 --- a/apps/start/src/components/ui/carousel.tsx +++ b/apps/start/src/components/ui/carousel.tsx @@ -208,7 +208,7 @@ const CarouselPrevious = React.forwardRef< variant={variant} size={size} className={cn( - 'absolute h-10 w-10 rounded-full hover:scale-100 hover:translate-y-[-50%] transition-all duration-200', + 'absolute h-10 w-10 rounded-full hover:scale-100 hover:translate-y-[-50%] transition-all duration-200', orientation === 'horizontal' ? 'left-6 top-1/2 -translate-y-1/2' : '-top-12 left-1/2 -translate-x-1/2 rotate-90', diff --git a/apps/start/src/routeTree.gen.ts b/apps/start/src/routeTree.gen.ts index 735a873a..a75fc4b0 100644 --- a/apps/start/src/routeTree.gen.ts +++ b/apps/start/src/routeTree.gen.ts @@ -38,6 +38,7 @@ import { Route as AppOrganizationIdProjectIdReportsRouteImport } from './routes/ import { Route as AppOrganizationIdProjectIdReferencesRouteImport } from './routes/_app.$organizationId.$projectId.references' import { Route as AppOrganizationIdProjectIdRealtimeRouteImport } from './routes/_app.$organizationId.$projectId.realtime' import { Route as AppOrganizationIdProjectIdPagesRouteImport } from './routes/_app.$organizationId.$projectId.pages' +import { Route as AppOrganizationIdProjectIdInsightsRouteImport } from './routes/_app.$organizationId.$projectId.insights' import { Route as AppOrganizationIdProjectIdDashboardsRouteImport } from './routes/_app.$organizationId.$projectId.dashboards' import { Route as AppOrganizationIdProjectIdChatRouteImport } from './routes/_app.$organizationId.$projectId.chat' import { Route as AppOrganizationIdMembersTabsIndexRouteImport } from './routes/_app.$organizationId.members._tabs.index' @@ -273,6 +274,12 @@ const AppOrganizationIdProjectIdPagesRoute = path: '/pages', getParentRoute: () => AppOrganizationIdProjectIdRoute, } as any) +const AppOrganizationIdProjectIdInsightsRoute = + AppOrganizationIdProjectIdInsightsRouteImport.update({ + id: '/insights', + path: '/insights', + getParentRoute: () => AppOrganizationIdProjectIdRoute, + } as any) const AppOrganizationIdProjectIdDashboardsRoute = AppOrganizationIdProjectIdDashboardsRouteImport.update({ id: '/dashboards', @@ -495,6 +502,7 @@ export interface FileRoutesByFullPath { '/$organizationId/': typeof AppOrganizationIdIndexRoute '/$organizationId/$projectId/chat': typeof AppOrganizationIdProjectIdChatRoute '/$organizationId/$projectId/dashboards': typeof AppOrganizationIdProjectIdDashboardsRoute + '/$organizationId/$projectId/insights': typeof AppOrganizationIdProjectIdInsightsRoute '/$organizationId/$projectId/pages': typeof AppOrganizationIdProjectIdPagesRoute '/$organizationId/$projectId/realtime': typeof AppOrganizationIdProjectIdRealtimeRoute '/$organizationId/$projectId/references': typeof AppOrganizationIdProjectIdReferencesRoute @@ -552,6 +560,7 @@ export interface FileRoutesByTo { '/$organizationId': typeof AppOrganizationIdIndexRoute '/$organizationId/$projectId/chat': typeof AppOrganizationIdProjectIdChatRoute '/$organizationId/$projectId/dashboards': typeof AppOrganizationIdProjectIdDashboardsRoute + '/$organizationId/$projectId/insights': typeof AppOrganizationIdProjectIdInsightsRoute '/$organizationId/$projectId/pages': typeof AppOrganizationIdProjectIdPagesRoute '/$organizationId/$projectId/realtime': typeof AppOrganizationIdProjectIdRealtimeRoute '/$organizationId/$projectId/references': typeof AppOrganizationIdProjectIdReferencesRoute @@ -609,6 +618,7 @@ export interface FileRoutesById { '/_app/$organizationId/': typeof AppOrganizationIdIndexRoute '/_app/$organizationId/$projectId/chat': typeof AppOrganizationIdProjectIdChatRoute '/_app/$organizationId/$projectId/dashboards': typeof AppOrganizationIdProjectIdDashboardsRoute + '/_app/$organizationId/$projectId/insights': typeof AppOrganizationIdProjectIdInsightsRoute '/_app/$organizationId/$projectId/pages': typeof AppOrganizationIdProjectIdPagesRoute '/_app/$organizationId/$projectId/realtime': typeof AppOrganizationIdProjectIdRealtimeRoute '/_app/$organizationId/$projectId/references': typeof AppOrganizationIdProjectIdReferencesRoute @@ -677,6 +687,7 @@ export interface FileRouteTypes { | '/$organizationId/' | '/$organizationId/$projectId/chat' | '/$organizationId/$projectId/dashboards' + | '/$organizationId/$projectId/insights' | '/$organizationId/$projectId/pages' | '/$organizationId/$projectId/realtime' | '/$organizationId/$projectId/references' @@ -734,6 +745,7 @@ export interface FileRouteTypes { | '/$organizationId' | '/$organizationId/$projectId/chat' | '/$organizationId/$projectId/dashboards' + | '/$organizationId/$projectId/insights' | '/$organizationId/$projectId/pages' | '/$organizationId/$projectId/realtime' | '/$organizationId/$projectId/references' @@ -790,6 +802,7 @@ export interface FileRouteTypes { | '/_app/$organizationId/' | '/_app/$organizationId/$projectId/chat' | '/_app/$organizationId/$projectId/dashboards' + | '/_app/$organizationId/$projectId/insights' | '/_app/$organizationId/$projectId/pages' | '/_app/$organizationId/$projectId/realtime' | '/_app/$organizationId/$projectId/references' @@ -1085,6 +1098,13 @@ declare module '@tanstack/react-router' { preLoaderRoute: typeof AppOrganizationIdProjectIdPagesRouteImport parentRoute: typeof AppOrganizationIdProjectIdRoute } + '/_app/$organizationId/$projectId/insights': { + id: '/_app/$organizationId/$projectId/insights' + path: '/insights' + fullPath: '/$organizationId/$projectId/insights' + preLoaderRoute: typeof AppOrganizationIdProjectIdInsightsRouteImport + parentRoute: typeof AppOrganizationIdProjectIdRoute + } '/_app/$organizationId/$projectId/dashboards': { id: '/_app/$organizationId/$projectId/dashboards' path: '/dashboards' @@ -1528,6 +1548,7 @@ const AppOrganizationIdProjectIdSettingsRouteWithChildren = interface AppOrganizationIdProjectIdRouteChildren { AppOrganizationIdProjectIdChatRoute: typeof AppOrganizationIdProjectIdChatRoute AppOrganizationIdProjectIdDashboardsRoute: typeof AppOrganizationIdProjectIdDashboardsRoute + AppOrganizationIdProjectIdInsightsRoute: typeof AppOrganizationIdProjectIdInsightsRoute AppOrganizationIdProjectIdPagesRoute: typeof AppOrganizationIdProjectIdPagesRoute AppOrganizationIdProjectIdRealtimeRoute: typeof AppOrganizationIdProjectIdRealtimeRoute AppOrganizationIdProjectIdReferencesRoute: typeof AppOrganizationIdProjectIdReferencesRoute @@ -1548,6 +1569,8 @@ const AppOrganizationIdProjectIdRouteChildren: AppOrganizationIdProjectIdRouteCh AppOrganizationIdProjectIdChatRoute: AppOrganizationIdProjectIdChatRoute, AppOrganizationIdProjectIdDashboardsRoute: AppOrganizationIdProjectIdDashboardsRoute, + AppOrganizationIdProjectIdInsightsRoute: + AppOrganizationIdProjectIdInsightsRoute, AppOrganizationIdProjectIdPagesRoute: AppOrganizationIdProjectIdPagesRoute, AppOrganizationIdProjectIdRealtimeRoute: AppOrganizationIdProjectIdRealtimeRoute, diff --git a/apps/start/src/routes/_app.$organizationId.$projectId.index.tsx b/apps/start/src/routes/_app.$organizationId.$projectId.index.tsx index 15a4c95a..ed241c53 100644 --- a/apps/start/src/routes/_app.$organizationId.$projectId.index.tsx +++ b/apps/start/src/routes/_app.$organizationId.$projectId.index.tsx @@ -3,6 +3,7 @@ import { OverviewFiltersButtons, } from '@/components/overview/filters/overview-filters-buttons'; import { LiveCounter } from '@/components/overview/live-counter'; +import OverviewInsights from '@/components/overview/overview-insights'; import { OverviewInterval } from '@/components/overview/overview-interval'; import OverviewMetrics from '@/components/overview/overview-metrics'; import { OverviewRange } from '@/components/overview/overview-range'; @@ -50,6 +51,7 @@ function ProjectDashboard() {
+ diff --git a/apps/start/src/routes/_app.$organizationId.$projectId.insights.tsx b/apps/start/src/routes/_app.$organizationId.$projectId.insights.tsx new file mode 100644 index 00000000..2ccd0ab8 --- /dev/null +++ b/apps/start/src/routes/_app.$organizationId.$projectId.insights.tsx @@ -0,0 +1,431 @@ +import { FullPageEmptyState } from '@/components/full-page-empty-state'; +import { InsightCard } from '@/components/insights/insight-card'; +import { PageContainer } from '@/components/page-container'; +import { PageHeader } from '@/components/page-header'; +import { Skeleton } from '@/components/skeleton'; +import { + Carousel, + CarouselContent, + CarouselItem, + CarouselNext, + CarouselPrevious, +} from '@/components/ui/carousel'; +import { Input } from '@/components/ui/input'; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from '@/components/ui/select'; +import { TableButtons } from '@/components/ui/table'; +import { useTRPC } from '@/integrations/trpc/react'; +import { cn } from '@/utils/cn'; +import { PAGE_TITLES, createProjectTitle } from '@/utils/title'; +import { useQuery } from '@tanstack/react-query'; +import { createFileRoute, useNavigate } from '@tanstack/react-router'; +import { parseAsString, parseAsStringEnum, useQueryState } from 'nuqs'; +import { useMemo } from 'react'; + +export const Route = createFileRoute( + '/_app/$organizationId/$projectId/insights', +)({ + component: Component, + head: () => { + return { + meta: [ + { + title: createProjectTitle(PAGE_TITLES.INSIGHTS), + }, + ], + }; + }, +}); + +type SortOption = + | 'impact-desc' + | 'impact-asc' + | 'severity-desc' + | 'severity-asc' + | 'recent'; + +function getModuleDisplayName(moduleKey: string): string { + const displayNames: Record = { + geo: 'Geographic', + devices: 'Devices', + referrers: 'Referrers', + 'entry-pages': 'Entry Pages', + 'page-trends': 'Page Trends', + 'exit-pages': 'Exit Pages', + 'traffic-anomalies': 'Anomalies', + }; + return displayNames[moduleKey] || moduleKey.replace('-', ' '); +} + +function Component() { + const { projectId } = Route.useParams(); + const trpc = useTRPC(); + const { data: insights, isLoading } = useQuery( + trpc.insight.listAll.queryOptions({ + projectId, + limit: 500, + }), + ); + const navigate = useNavigate(); + + const [search, setSearch] = useQueryState( + 'search', + parseAsString.withDefault(''), + ); + const [moduleFilter, setModuleFilter] = useQueryState( + 'module', + parseAsString.withDefault('all'), + ); + const [windowKindFilter, setWindowKindFilter] = useQueryState( + 'window', + parseAsStringEnum([ + 'all', + 'yesterday', + 'rolling_7d', + 'rolling_30d', + ]).withDefault('all'), + ); + const [severityFilter, setSeverityFilter] = useQueryState( + 'severity', + parseAsStringEnum(['all', 'severe', 'moderate', 'low', 'none']).withDefault( + 'all', + ), + ); + const [directionFilter, setDirectionFilter] = useQueryState( + 'direction', + parseAsStringEnum(['all', 'up', 'down', 'flat']).withDefault('all'), + ); + const [sortBy, setSortBy] = useQueryState( + 'sort', + parseAsStringEnum([ + 'impact-desc', + 'impact-asc', + 'severity-desc', + 'severity-asc', + 'recent', + ]).withDefault('impact-desc'), + ); + + const filteredAndSorted = useMemo(() => { + if (!insights) return []; + + const filtered = insights.filter((insight) => { + // Search filter + if (search) { + const searchLower = search.toLowerCase(); + const matchesTitle = insight.title.toLowerCase().includes(searchLower); + const matchesSummary = insight.summary + ?.toLowerCase() + .includes(searchLower); + const matchesDimension = insight.dimensionKey + .toLowerCase() + .includes(searchLower); + if (!matchesTitle && !matchesSummary && !matchesDimension) { + return false; + } + } + + // Module filter + if (moduleFilter !== 'all' && insight.moduleKey !== moduleFilter) { + return false; + } + + // Window kind filter + if ( + windowKindFilter !== 'all' && + insight.windowKind !== windowKindFilter + ) { + return false; + } + + // Severity filter + if (severityFilter !== 'all') { + if (severityFilter === 'none' && insight.severityBand) return false; + if ( + severityFilter !== 'none' && + insight.severityBand !== severityFilter + ) + return false; + } + + // Direction filter + if (directionFilter !== 'all' && insight.direction !== directionFilter) { + return false; + } + + return true; + }); + + // Sort (create new array to avoid mutation) + const sorted = [...filtered].sort((a, b) => { + switch (sortBy) { + case 'impact-desc': + return (b.impactScore ?? 0) - (a.impactScore ?? 0); + case 'impact-asc': + return (a.impactScore ?? 0) - (b.impactScore ?? 0); + case 'severity-desc': { + const severityOrder: Record = { + severe: 3, + moderate: 2, + low: 1, + }; + const aSev = severityOrder[a.severityBand ?? ''] ?? 0; + const bSev = severityOrder[b.severityBand ?? ''] ?? 0; + return bSev - aSev; + } + case 'severity-asc': { + const severityOrder: Record = { + severe: 3, + moderate: 2, + low: 1, + }; + const aSev = severityOrder[a.severityBand ?? ''] ?? 0; + const bSev = severityOrder[b.severityBand ?? ''] ?? 0; + return aSev - bSev; + } + case 'recent': + return ( + new Date(b.firstDetectedAt ?? 0).getTime() - + new Date(a.firstDetectedAt ?? 0).getTime() + ); + default: + return 0; + } + }); + + return sorted; + }, [ + insights, + search, + moduleFilter, + windowKindFilter, + severityFilter, + directionFilter, + sortBy, + ]); + + // Group insights by module + const groupedByModule = useMemo(() => { + const groups = new Map(); + + for (const insight of filteredAndSorted) { + const existing = groups.get(insight.moduleKey) ?? []; + existing.push(insight); + groups.set(insight.moduleKey, existing); + } + + // Sort modules by impact (referrers first, then by average impact score) + return Array.from(groups.entries()).sort( + ([keyA, insightsA], [keyB, insightsB]) => { + // Referrers always first + if (keyA === 'referrers') return -1; + if (keyB === 'referrers') return 1; + + // Calculate average impact for each module + const avgImpactA = + insightsA.reduce((sum, i) => sum + (i.impactScore ?? 0), 0) / + insightsA.length; + const avgImpactB = + insightsB.reduce((sum, i) => sum + (i.impactScore ?? 0), 0) / + insightsB.length; + + // Sort by average impact (high to low) + return avgImpactB - avgImpactA; + }, + ); + }, [filteredAndSorted]); + + if (isLoading) { + return ( + + +
+ {Array.from({ length: 3 }, (_, i) => `section-${i}`).map((key) => ( +
+ + + + {Array.from({ length: 4 }, (_, i) => `skeleton-${i}`).map( + (cardKey) => ( + + + + ), + )} + + +
+ ))} +
+
+ ); + } + + return ( + + + + void setSearch(e.target.value || null)} + className="max-w-xs" + /> + + + + + + + {filteredAndSorted.length === 0 && !isLoading && ( + + )} + + {groupedByModule.length > 0 && ( +
+ {groupedByModule.map(([moduleKey, moduleInsights]) => ( +
+
+

+ {getModuleDisplayName(moduleKey)} +

+ + {moduleInsights.length}{' '} + {moduleInsights.length === 1 ? 'insight' : 'insights'} + +
+
+ + + {moduleInsights.map((insight, index) => ( + + { + const filterString = insight.payload?.dimensions + .map( + (dim) => + `${dim.key},is,${encodeURIComponent(dim.value)}`, + ) + .join(';'); + if (filterString) { + return () => { + navigate({ + to: '/$organizationId/$projectId', + from: Route.fullPath, + search: { + f: filterString, + }, + }); + }; + } + return undefined; + })()} + /> + + ))} + + + + +
+
+ ))} +
+ )} + + {filteredAndSorted.length > 0 && ( +
+ Showing {filteredAndSorted.length} of {insights?.length ?? 0} insights +
+ )} +
+ ); +} diff --git a/apps/start/src/utils/title.ts b/apps/start/src/utils/title.ts index e735636e..47b43fb8 100644 --- a/apps/start/src/utils/title.ts +++ b/apps/start/src/utils/title.ts @@ -90,6 +90,7 @@ export const PAGE_TITLES = { CHAT: 'AI Assistant', REALTIME: 'Realtime', REFERENCES: 'References', + INSIGHTS: 'Insights', // Profiles PROFILES: 'Profiles', PROFILE_EVENTS: 'Profile events', diff --git a/apps/worker/src/boot-cron.ts b/apps/worker/src/boot-cron.ts index 9650598e..a67c1e7f 100644 --- a/apps/worker/src/boot-cron.ts +++ b/apps/worker/src/boot-cron.ts @@ -34,6 +34,11 @@ export async function bootCron() { type: 'flushSessions', pattern: 1000 * 10, }, + { + name: 'insightsDaily', + type: 'insightsDaily', + pattern: '0 2 * * *', + }, ]; if (process.env.SELF_HOSTED && process.env.NODE_ENV === 'production') { diff --git a/apps/worker/src/boot-workers.ts b/apps/worker/src/boot-workers.ts index 55652de8..4b739afc 100644 --- a/apps/worker/src/boot-workers.ts +++ b/apps/worker/src/boot-workers.ts @@ -7,6 +7,7 @@ import { cronQueue, eventsGroupQueues, importQueue, + insightsQueue, miscQueue, notificationQueue, queueLogger, @@ -21,6 +22,7 @@ import { Worker as GroupWorker } from 'groupmq'; import { cronJob } from './jobs/cron'; import { incomingEvent } from './jobs/events.incoming-event'; import { importJob } from './jobs/import'; +import { insightsProjectJob } from './jobs/insights'; import { miscJob } from './jobs/misc'; import { notificationJob } from './jobs/notification'; import { sessionsJob } from './jobs/sessions'; @@ -49,7 +51,15 @@ function getEnabledQueues(): QueueName[] { logger.info('No ENABLED_QUEUES specified, starting all queues', { totalEventShards: EVENTS_GROUP_QUEUES_SHARDS, }); - return ['events', 'sessions', 'cron', 'notification', 'misc', 'import']; + return [ + 'events', + 'sessions', + 'cron', + 'notification', + 'misc', + 'import', + 'insights', + ]; } const queues = enabledQueuesEnv @@ -187,6 +197,17 @@ export async function bootWorkers() { logger.info('Started worker for import', { concurrency }); } + // Start insights worker + if (enabledQueues.includes('insights')) { + const concurrency = getConcurrencyFor('insights', 5); + const insightsWorker = new Worker(insightsQueue.name, insightsProjectJob, { + ...workerOptions, + concurrency, + }); + workers.push(insightsWorker); + logger.info('Started worker for insights', { concurrency }); + } + if (workers.length === 0) { logger.warn( 'No workers started. Check ENABLED_QUEUES environment variable.', diff --git a/apps/worker/src/index.ts b/apps/worker/src/index.ts index 7a4686ab..f0aa6f56 100644 --- a/apps/worker/src/index.ts +++ b/apps/worker/src/index.ts @@ -6,6 +6,7 @@ import { cronQueue, eventsGroupQueues, importQueue, + insightsQueue, miscQueue, notificationQueue, sessionsQueue, @@ -42,6 +43,7 @@ async function start() { new BullMQAdapter(notificationQueue), new BullMQAdapter(miscQueue), new BullMQAdapter(importQueue), + new BullMQAdapter(insightsQueue), ], serverAdapter: serverAdapter, }); diff --git a/apps/worker/src/jobs/cron.ts b/apps/worker/src/jobs/cron.ts index b50e3beb..eee51b16 100644 --- a/apps/worker/src/jobs/cron.ts +++ b/apps/worker/src/jobs/cron.ts @@ -6,6 +6,7 @@ import type { CronQueuePayload } from '@openpanel/queue'; import { jobdeleteProjects } from './cron.delete-projects'; import { ping } from './cron.ping'; import { salt } from './cron.salt'; +import { insightsDailyJob } from './insights'; export async function cronJob(job: Job) { switch (job.data.type) { @@ -27,5 +28,8 @@ export async function cronJob(job: Job) { case 'deleteProjects': { return await jobdeleteProjects(job); } + case 'insightsDaily': { + return await insightsDailyJob(job); + } } } diff --git a/apps/worker/src/jobs/insights.ts b/apps/worker/src/jobs/insights.ts new file mode 100644 index 00000000..a0156e0d --- /dev/null +++ b/apps/worker/src/jobs/insights.ts @@ -0,0 +1,72 @@ +import { ch } from '@openpanel/db/src/clickhouse/client'; +import { + createEngine, + devicesModule, + entryPagesModule, + geoModule, + insightStore, + pageTrendsModule, + referrersModule, +} from '@openpanel/db/src/services/insights'; +import type { + CronQueuePayload, + InsightsQueuePayloadProject, +} from '@openpanel/queue'; +import { insightsQueue } from '@openpanel/queue'; +import type { Job } from 'bullmq'; + +const defaultEngineConfig = { + keepTopNPerModuleWindow: 20, + closeStaleAfterDays: 7, + dimensionBatchSize: 50, + globalThresholds: { + minTotal: 200, + minAbsDelta: 80, + minPct: 0.15, + }, +}; + +export async function insightsDailyJob(job: Job) { + const projectIds = await insightStore.listProjectIdsForCadence('daily'); + const date = new Date().toISOString().slice(0, 10); + + for (const projectId of projectIds) { + await insightsQueue.add( + 'insightsProject', + { + type: 'insightsProject', + payload: { projectId, date }, + }, + { + jobId: `daily:${date}:${projectId}`, // idempotent + }, + ); + } +} + +export async function insightsProjectJob( + job: Job, +) { + const { projectId, date } = job.data.payload; + const engine = createEngine({ + store: insightStore, + modules: [ + referrersModule, + entryPagesModule, + pageTrendsModule, + geoModule, + devicesModule, + ], + db: ch, + config: defaultEngineConfig, + }); + + const projectCreatedAt = await insightStore.getProjectCreatedAt(projectId); + + await engine.runProject({ + projectId, + cadence: 'daily', + now: new Date(date), + projectCreatedAt, + }); +} diff --git a/packages/constants/index.ts b/packages/constants/index.ts index 17cd2d15..aff1b11d 100644 --- a/packages/constants/index.ts +++ b/packages/constants/index.ts @@ -245,3 +245,259 @@ export function getDefaultIntervalByDates( return null; } + +export const countries = { + AF: 'Afghanistan', + AL: 'Albania', + DZ: 'Algeria', + AS: 'American Samoa', + AD: 'Andorra', + AO: 'Angola', + AI: 'Anguilla', + AQ: 'Antarctica', + AG: 'Antigua and Barbuda', + AR: 'Argentina', + AM: 'Armenia', + AW: 'Aruba', + AU: 'Australia', + AT: 'Austria', + AZ: 'Azerbaijan', + BS: 'Bahamas', + BH: 'Bahrain', + BD: 'Bangladesh', + BB: 'Barbados', + BY: 'Belarus', + BE: 'Belgium', + BZ: 'Belize', + BJ: 'Benin', + BM: 'Bermuda', + BT: 'Bhutan', + BO: 'Bolivia', + BQ: 'Bonaire, Sint Eustatius and Saba', + BA: 'Bosnia and Herzegovina', + BW: 'Botswana', + BV: 'Bouvet Island', + BR: 'Brazil', + IO: 'British Indian Ocean Territory', + BN: 'Brunei Darussalam', + BG: 'Bulgaria', + BF: 'Burkina Faso', + BI: 'Burundi', + CV: 'Cabo Verde', + KH: 'Cambodia', + CM: 'Cameroon', + CA: 'Canada', + KY: 'Cayman Islands', + CF: 'Central African Republic', + TD: 'Chad', + CL: 'Chile', + CN: 'China', + CX: 'Christmas Island', + CC: 'Cocos (Keeling) Islands', + CO: 'Colombia', + KM: 'Comoros', + CD: 'Congo (Democratic Republic)', + CG: 'Congo', + CK: 'Cook Islands', + CR: 'Costa Rica', + HR: 'Croatia', + CU: 'Cuba', + CW: 'Curaçao', + CY: 'Cyprus', + CZ: 'Czechia', + CI: "Côte d'Ivoire", + DK: 'Denmark', + DJ: 'Djibouti', + DM: 'Dominica', + DO: 'Dominican Republic', + EC: 'Ecuador', + EG: 'Egypt', + SV: 'El Salvador', + GQ: 'Equatorial Guinea', + ER: 'Eritrea', + EE: 'Estonia', + SZ: 'Eswatina', + ET: 'Ethiopia', + FK: 'Falkland Islands', + FO: 'Faroe Islands', + FJ: 'Fiji', + FI: 'Finland', + FR: 'France', + GF: 'French Guiana', + PF: 'French Polynesia', + TF: 'French Southern Territories', + GA: 'Gabon', + GM: 'Gambia', + GE: 'Georgia', + DE: 'Germany', + GH: 'Ghana', + GI: 'Gibraltar', + GR: 'Greece', + GL: 'Greenland', + GD: 'Grenada', + GP: 'Guadeloupe', + GU: 'Guam', + GT: 'Guatemala', + GG: 'Guernsey', + GN: 'Guinea', + GW: 'Guinea-Bissau', + GY: 'Guyana', + HT: 'Haiti', + HM: 'Heard Island and McDonald Islands', + VA: 'Holy See', + HN: 'Honduras', + HK: 'Hong Kong', + HU: 'Hungary', + IS: 'Iceland', + IN: 'India', + ID: 'Indonesia', + IR: 'Iran', + IQ: 'Iraq', + IE: 'Ireland', + IM: 'Isle of Man', + IL: 'Israel', + IT: 'Italy', + JM: 'Jamaica', + JP: 'Japan', + JE: 'Jersey', + JO: 'Jordan', + KZ: 'Kazakhstan', + KE: 'Kenya', + KI: 'Kiribati', + KP: "Korea (Democratic People's Republic)", + KR: 'Korea (Republic)', + KW: 'Kuwait', + KG: 'Kyrgyzstan', + LA: "Lao People's Democratic Republic", + LV: 'Latvia', + LB: 'Lebanon', + LS: 'Lesotho', + LR: 'Liberia', + LY: 'Libya', + LI: 'Liechtenstein', + LT: 'Lithuania', + LU: 'Luxembourg', + MO: 'Macao', + MG: 'Madagascar', + MW: 'Malawi', + MY: 'Malaysia', + MV: 'Maldives', + ML: 'Mali', + MT: 'Malta', + MH: 'Marshall Islands', + MQ: 'Martinique', + MR: 'Mauritania', + MU: 'Mauritius', + YT: 'Mayotte', + MX: 'Mexico', + FM: 'Micronesia', + MD: 'Moldova', + MC: 'Monaco', + MN: 'Mongolia', + ME: 'Montenegro', + MS: 'Montserrat', + MA: 'Morocco', + MZ: 'Mozambique', + MM: 'Myanmar', + NA: 'Namibia', + NR: 'Nauru', + NP: 'Nepal', + NL: 'Netherlands', + NC: 'New Caledonia', + NZ: 'New Zealand', + NI: 'Nicaragua', + NE: 'Niger', + NG: 'Nigeria', + NU: 'Niue', + NF: 'Norfolk Island', + MP: 'Northern Mariana Islands', + NO: 'Norway', + OM: 'Oman', + PK: 'Pakistan', + PW: 'Palau', + PS: 'Palestine, State of', + PA: 'Panama', + PG: 'Papua New Guinea', + PY: 'Paraguay', + PE: 'Peru', + PH: 'Philippines', + PN: 'Pitcairn', + PL: 'Poland', + PT: 'Portugal', + PR: 'Puerto Rico', + QA: 'Qatar', + MK: 'Republic of North Macedonia', + RO: 'Romania', + RU: 'Russian Federation', + RW: 'Rwanda', + RE: 'Réunion', + BL: 'Saint Barthélemy', + SH: 'Saint Helena, Ascension and Tristan da Cunha', + KN: 'Saint Kitts and Nevis', + LC: 'Saint Lucia', + MF: 'Saint Martin (French part)', + PM: 'Saint Pierre and Miquelon', + VC: 'Saint Vincent and the Grenadines', + WS: 'Samoa', + SM: 'San Marino', + ST: 'Sao Tome and Principe', + SA: 'Saudi Arabia', + SN: 'Senegal', + RS: 'Serbia', + SC: 'Seychelles', + SL: 'Sierra Leone', + SG: 'Singapore', + SX: 'Sint Maarten (Dutch part)', + SK: 'Slovakia', + SI: 'Slovenia', + SB: 'Solomon Islands', + SO: 'Somalia', + ZA: 'South Africa', + GS: 'South Georgia and the South Sandwich Islands', + SS: 'South Sudan', + ES: 'Spain', + LK: 'Sri Lanka', + SD: 'Sudan', + SR: 'Suriname', + SJ: 'Svalbard and Jan Mayen', + SE: 'Sweden', + CH: 'Switzerland', + SY: 'Syrian Arab Republic', + TW: 'Taiwan', + TJ: 'Tajikistan', + TZ: 'Tanzania, United Republic of', + TH: 'Thailand', + TL: 'Timor-Leste', + TG: 'Togo', + TK: 'Tokelau', + TO: 'Tonga', + TT: 'Trinidad and Tobago', + TN: 'Tunisia', + TR: 'Turkey', + TM: 'Turkmenistan', + TC: 'Turks and Caicos Islands', + TV: 'Tuvalu', + UG: 'Uganda', + UA: 'Ukraine', + AE: 'United Arab Emirates', + GB: 'United Kingdom', + US: 'United States', + UM: 'United States Minor Outlying Islands', + UY: 'Uruguay', + UZ: 'Uzbekistan', + VU: 'Vanuatu', + VE: 'Venezuela', + VN: 'Viet Nam', + VG: 'Virgin Islands (British)', + VI: 'Virgin Islands (U.S.)', + WF: 'Wallis and Futuna', + EH: 'Western Sahara', + YE: 'Yemen', + ZM: 'Zambia', + ZW: 'Zimbabwe', + AX: 'Åland Islands', +} as const; + +export function getCountry(code?: string) { + return countries[code as keyof typeof countries]; +} diff --git a/packages/db/code-migrations/4-add-sessions.ts b/packages/db/code-migrations/4-add-sessions.ts index e6fd9389..cf1c00bb 100644 --- a/packages/db/code-migrations/4-add-sessions.ts +++ b/packages/db/code-migrations/4-add-sessions.ts @@ -100,6 +100,9 @@ async function createOldSessions() { if (!row || row.count === '0') { return null; } + if (row.created_at.startsWith('1970')) { + return null; + } return new Date(row.created_at); } catch (e) { return defaultDate; diff --git a/packages/db/code-migrations/8-order-keys.ts b/packages/db/code-migrations/8-order-keys.ts index cb2b5bf2..d36aa2a9 100644 --- a/packages/db/code-migrations/8-order-keys.ts +++ b/packages/db/code-migrations/8-order-keys.ts @@ -139,7 +139,10 @@ export async function up() { const firstEventDateJson = await firstEventDateResponse.json<{ created_at: string; }>(); - if (firstEventDateJson[0]?.created_at) { + if ( + firstEventDateJson[0]?.created_at && + !firstEventDateJson[0]?.created_at.startsWith('1970') + ) { const firstEventDate = new Date(firstEventDateJson[0]?.created_at); // Step 2: Copy data from old tables to new tables (partitioned by month for efficiency) // Set endDate to first of next month to ensure we capture all data in the current month @@ -174,7 +177,10 @@ export async function up() { created_at: string; }>(); - if (firstSessionDateJson[0]?.created_at) { + if ( + firstSessionDateJson[0]?.created_at && + !firstSessionDateJson[0]?.created_at.startsWith('1970') + ) { const firstSessionDate = new Date( firstSessionDateJson[0]?.created_at ?? '', ); diff --git a/packages/db/index.ts b/packages/db/index.ts index 58042d3f..aaa9b5a7 100644 --- a/packages/db/index.ts +++ b/packages/db/index.ts @@ -28,4 +28,5 @@ export * from './src/types'; export * from './src/clickhouse/query-builder'; export * from './src/services/import.service'; export * from './src/services/overview.service'; +export * from './src/services/insights'; export * from './src/session-context'; diff --git a/packages/db/prisma/migrations/20251212192459_insights/migration.sql b/packages/db/prisma/migrations/20251212192459_insights/migration.sql new file mode 100644 index 00000000..859dbcd1 --- /dev/null +++ b/packages/db/prisma/migrations/20251212192459_insights/migration.sql @@ -0,0 +1,57 @@ +-- CreateEnum +CREATE TYPE "public"."InsightState" AS ENUM ('active', 'suppressed', 'closed'); + +-- CreateTable +CREATE TABLE "public"."project_insights" ( + "id" UUID NOT NULL DEFAULT gen_random_uuid(), + "projectId" TEXT NOT NULL, + "moduleKey" TEXT NOT NULL, + "dimensionKey" TEXT NOT NULL, + "windowKind" TEXT NOT NULL, + "state" "public"."InsightState" NOT NULL DEFAULT 'active', + "title" TEXT NOT NULL, + "summary" TEXT, + "payload" JSONB, + "currentValue" DOUBLE PRECISION, + "compareValue" DOUBLE PRECISION, + "changePct" DOUBLE PRECISION, + "direction" TEXT, + "impactScore" DOUBLE PRECISION NOT NULL DEFAULT 0, + "severityBand" TEXT, + "version" INTEGER NOT NULL DEFAULT 1, + "threadId" UUID NOT NULL DEFAULT gen_random_uuid(), + "windowStart" TIMESTAMP(3), + "windowEnd" TIMESTAMP(3), + "firstDetectedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "lastUpdatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "lastSeenAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "project_insights_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "public"."insight_events" ( + "id" UUID NOT NULL DEFAULT gen_random_uuid(), + "insightId" UUID NOT NULL, + "eventKind" TEXT NOT NULL, + "changeFrom" JSONB, + "changeTo" JSONB, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "insight_events_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "project_insights_projectId_impactScore_idx" ON "public"."project_insights"("projectId", "impactScore" DESC); + +-- CreateIndex +CREATE INDEX "project_insights_projectId_moduleKey_windowKind_state_idx" ON "public"."project_insights"("projectId", "moduleKey", "windowKind", "state"); + +-- CreateIndex +CREATE UNIQUE INDEX "project_insights_projectId_moduleKey_dimensionKey_windowKin_key" ON "public"."project_insights"("projectId", "moduleKey", "dimensionKey", "windowKind", "state"); + +-- CreateIndex +CREATE INDEX "insight_events_insightId_createdAt_idx" ON "public"."insight_events"("insightId", "createdAt"); + +-- AddForeignKey +ALTER TABLE "public"."insight_events" ADD CONSTRAINT "insight_events_insightId_fkey" FOREIGN KEY ("insightId") REFERENCES "public"."project_insights"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/packages/db/prisma/migrations/20251217204808_insight_payload_default/migration.sql b/packages/db/prisma/migrations/20251217204808_insight_payload_default/migration.sql new file mode 100644 index 00000000..6ad792ee --- /dev/null +++ b/packages/db/prisma/migrations/20251217204808_insight_payload_default/migration.sql @@ -0,0 +1,9 @@ +/* + Warnings: + + - Made the column `payload` on table `project_insights` required. This step will fail if there are existing NULL values in that column. + +*/ +-- AlterTable +ALTER TABLE "public"."project_insights" ALTER COLUMN "payload" SET NOT NULL, +ALTER COLUMN "payload" SET DEFAULT '{}'; diff --git a/packages/db/prisma/migrations/20251217210920_insights/migration.sql b/packages/db/prisma/migrations/20251217210920_insights/migration.sql new file mode 100644 index 00000000..56a4b038 --- /dev/null +++ b/packages/db/prisma/migrations/20251217210920_insights/migration.sql @@ -0,0 +1,13 @@ +/* + Warnings: + + - You are about to drop the column `changePct` on the `project_insights` table. All the data in the column will be lost. + - You are about to drop the column `compareValue` on the `project_insights` table. All the data in the column will be lost. + - You are about to drop the column `currentValue` on the `project_insights` table. All the data in the column will be lost. + +*/ +-- AlterTable +ALTER TABLE "public"."project_insights" DROP COLUMN "changePct", +DROP COLUMN "compareValue", +DROP COLUMN "currentValue", +ADD COLUMN "displayName" TEXT NOT NULL DEFAULT ''; diff --git a/packages/db/prisma/schema.prisma b/packages/db/prisma/schema.prisma index 38a29105..3b4d5dfe 100644 --- a/packages/db/prisma/schema.prisma +++ b/packages/db/prisma/schema.prisma @@ -497,3 +497,58 @@ model Import { @@map("imports") } + +enum InsightState { + active + suppressed + closed +} + +model ProjectInsight { + id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid + projectId String + moduleKey String // e.g. "referrers", "entry-pages" + dimensionKey String // e.g. "referrer:instagram", "page:/pricing" + windowKind String // "yesterday" | "rolling_7d" | "rolling_30d" + state InsightState @default(active) + + title String + summary String? + displayName String @default("") + /// [IPrismaProjectInsightPayload] + payload Json @default("{}") // Rendered insight payload (typed) + + direction String? // "up" | "down" | "flat" + impactScore Float @default(0) + severityBand String? // "low" | "moderate" | "severe" + + version Int @default(1) + threadId String @default(dbgenerated("gen_random_uuid()")) @db.Uuid + + windowStart DateTime? + windowEnd DateTime? + + firstDetectedAt DateTime @default(now()) + lastUpdatedAt DateTime @default(now()) @updatedAt + lastSeenAt DateTime @default(now()) + + events InsightEvent[] + + @@unique([projectId, moduleKey, dimensionKey, windowKind, state]) + @@index([projectId, impactScore(sort: Desc)]) + @@index([projectId, moduleKey, windowKind, state]) + @@map("project_insights") +} + +model InsightEvent { + id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid + insightId String @db.Uuid + insight ProjectInsight @relation(fields: [insightId], references: [id], onDelete: Cascade) + eventKind String // "created" | "updated" | "severity_up" | "direction_flip" | "closed" | etc + changeFrom Json? + changeTo Json? + createdAt DateTime @default(now()) + + @@index([insightId, createdAt]) + @@map("insight_events") +} diff --git a/packages/db/src/clickhouse/query-builder.ts b/packages/db/src/clickhouse/query-builder.ts index ba2416c7..066b7b7c 100644 --- a/packages/db/src/clickhouse/query-builder.ts +++ b/packages/db/src/clickhouse/query-builder.ts @@ -43,7 +43,7 @@ class Expression { } export class Query { - private _select: string[] = []; + private _select: (string | Expression)[] = []; private _except: string[] = []; private _from?: string | Expression; private _where: WhereCondition[] = []; @@ -81,17 +81,19 @@ export class Query { // Select methods select( - columns: (string | null | undefined | false)[], + columns: (string | Expression | null | undefined | false)[], type: 'merge' | 'replace' = 'replace', ): Query { if (this._skipNext) return this as unknown as Query; if (type === 'merge') { this._select = [ ...this._select, - ...columns.filter((col): col is string => Boolean(col)), + ...columns.filter((col): col is string | Expression => Boolean(col)), ]; } else { - this._select = columns.filter((col): col is string => Boolean(col)); + this._select = columns.filter((col): col is string | Expression => + Boolean(col), + ); } return this as unknown as Query; } @@ -372,7 +374,14 @@ export class Query { if (this._select.length > 0) { parts.push( 'SELECT', - this._select.map((col) => this.escapeDate(col)).join(', '), + this._select + // Important: Expressions are treated as raw SQL; do not run escapeDate() + // on them, otherwise any embedded date strings get double-escaped + // (e.g. ''2025-12-16 23:59:59'') which ClickHouse rejects. + .map((col) => + col instanceof Expression ? col.toString() : this.escapeDate(col), + ) + .join(', '), ); } else { parts.push('SELECT *'); diff --git a/packages/db/src/prisma-client.ts b/packages/db/src/prisma-client.ts index e5900dd6..befe3f74 100644 --- a/packages/db/src/prisma-client.ts +++ b/packages/db/src/prisma-client.ts @@ -42,11 +42,11 @@ const getPrismaClient = () => { operation === 'update' || operation === 'delete' ) { - logger.info('Prisma operation', { - operation, - args, - model, - }); + // logger.info('Prisma operation', { + // operation, + // args, + // model, + // }); } return query(args); }, diff --git a/packages/db/src/services/insights/cached-clix.ts b/packages/db/src/services/insights/cached-clix.ts new file mode 100644 index 00000000..2dd772d6 --- /dev/null +++ b/packages/db/src/services/insights/cached-clix.ts @@ -0,0 +1,68 @@ +import crypto from 'node:crypto'; +import type { ClickHouseClient } from '@clickhouse/client'; +import { + type Query, + clix as originalClix, +} from '../../clickhouse/query-builder'; + +/** + * Creates a cached wrapper around clix that automatically caches query results + * based on query hash. This eliminates duplicate queries within the same module/window context. + * + * @param client - ClickHouse client + * @param cache - Optional cache Map to store query results + * @param timezone - Timezone for queries (defaults to UTC) + * @returns A function that creates cached Query instances (compatible with clix API) + */ +export function createCachedClix( + client: ClickHouseClient, + cache?: Map, + timezone?: string, +) { + function clixCached(): Query { + const query = originalClix(client, timezone); + const queryTimezone = timezone ?? 'UTC'; + + // Override execute() method to add caching + const originalExecute = query.execute.bind(query); + query.execute = async () => { + // Build the query SQL string + const querySQL = query.toSQL(); + + // Create cache key from query SQL + timezone + const cacheKey = crypto + .createHash('sha256') + .update(`${querySQL}|${queryTimezone}`) + .digest('hex'); + + // Check cache first + if (cache?.has(cacheKey)) { + return cache.get(cacheKey); + } + + // Execute query + const result = await originalExecute(); + + // Cache the result + if (cache) { + cache.set(cacheKey, result); + } + + return result; + }; + + return query; + } + + // Copy static methods from original clix + clixCached.exp = originalClix.exp; + clixCached.date = originalClix.date; + clixCached.datetime = originalClix.datetime; + clixCached.dynamicDatetime = originalClix.dynamicDatetime; + clixCached.toStartOf = originalClix.toStartOf; + clixCached.toStartOfInterval = originalClix.toStartOfInterval; + clixCached.toInterval = originalClix.toInterval; + clixCached.toDate = originalClix.toDate; + + return clixCached; +} diff --git a/packages/db/src/services/insights/engine.ts b/packages/db/src/services/insights/engine.ts new file mode 100644 index 00000000..086cb8f5 --- /dev/null +++ b/packages/db/src/services/insights/engine.ts @@ -0,0 +1,303 @@ +import { createCachedClix } from './cached-clix'; +import { materialDecision } from './material'; +import { defaultImpactScore, severityBand } from './scoring'; +import type { + Cadence, + ComputeContext, + ComputeResult, + InsightModule, + InsightStore, + WindowKind, +} from './types'; +import { resolveWindow } from './windows'; + +const DEFAULT_WINDOWS: WindowKind[] = [ + 'yesterday', + 'rolling_7d', + 'rolling_30d', +]; + +export interface EngineConfig { + keepTopNPerModuleWindow: number; // e.g. 5 + closeStaleAfterDays: number; // e.g. 7 + dimensionBatchSize: number; // e.g. 50 + globalThresholds: { + minTotal: number; // e.g. 200 + minAbsDelta: number; // e.g. 80 + minPct: number; // e.g. 0.15 + }; +} + +/** Simple gating to cut noise; modules can override via thresholds. */ +function passesThresholds( + r: ComputeResult, + mod: InsightModule, + cfg: EngineConfig, +): boolean { + const t = mod.thresholds ?? {}; + const minTotal = t.minTotal ?? cfg.globalThresholds.minTotal; + const minAbsDelta = t.minAbsDelta ?? cfg.globalThresholds.minAbsDelta; + const minPct = t.minPct ?? cfg.globalThresholds.minPct; + const cur = r.currentValue ?? 0; + const cmp = r.compareValue ?? 0; + const total = cur + cmp; + const absDelta = Math.abs(cur - cmp); + const pct = Math.abs(r.changePct ?? 0); + if (total < minTotal) return false; + if (absDelta < minAbsDelta) return false; + if (pct < minPct) return false; + return true; +} + +function chunk(arr: T[], size: number): T[][] { + if (size <= 0) return [arr]; + const out: T[][] = []; + for (let i = 0; i < arr.length; i += size) out.push(arr.slice(i, i + size)); + return out; +} + +export function createEngine(args: { + store: InsightStore; + modules: InsightModule[]; + db: any; + logger?: Pick; + config: EngineConfig; +}) { + const { store, modules, db, config } = args; + const logger = args.logger ?? console; + + function isProjectOldEnoughForWindow( + projectCreatedAt: Date | null | undefined, + baselineStart: Date, + ): boolean { + if (!projectCreatedAt) return true; // best-effort; don't block if unknown + return projectCreatedAt.getTime() <= baselineStart.getTime(); + } + + async function runProject(opts: { + projectId: string; + cadence: Cadence; + now: Date; + projectCreatedAt?: Date | null; + }): Promise { + const { projectId, cadence, now, projectCreatedAt } = opts; + const projLogger = logger; + const eligible = modules.filter((m) => m.cadence.includes(cadence)); + + for (const mod of eligible) { + const windows = mod.windows ?? DEFAULT_WINDOWS; + for (const windowKind of windows) { + let window: ReturnType; + let ctx: ComputeContext; + try { + window = resolveWindow(windowKind, now); + if ( + !isProjectOldEnoughForWindow(projectCreatedAt, window.baselineStart) + ) { + continue; + } + // Initialize cache for this module+window combination. + // Cache is automatically garbage collected when context goes out of scope. + const cache = new Map(); + ctx = { + projectId, + window, + db, + now, + logger: projLogger, + clix: createCachedClix(db, cache), + }; + } catch (e) { + projLogger.error('[insights] failed to create compute context', { + projectId, + module: mod.key, + windowKind, + err: e, + }); + continue; + } + + // 1) enumerate dimensions + let dims: string[] = []; + try { + dims = mod.enumerateDimensions + ? await mod.enumerateDimensions(ctx) + : []; + } catch (e) { + // Important: enumeration failures should not abort the whole project run. + // Also avoid lifecycle close/suppression when we didn't actually evaluate dims. + projLogger.error('[insights] module enumerateDimensions failed', { + projectId, + module: mod.key, + windowKind, + err: e, + }); + continue; + } + const maxDims = mod.thresholds?.maxDims ?? 25; + if (dims.length > maxDims) dims = dims.slice(0, maxDims); + + if (dims.length === 0) { + // Still do lifecycle close / suppression based on "nothing emitted" + await store.closeMissingActiveInsights({ + projectId, + moduleKey: mod.key, + windowKind, + seenDimensionKeys: [], + now, + staleDays: config.closeStaleAfterDays, + }); + + await store.applySuppression({ + projectId, + moduleKey: mod.key, + windowKind, + keepTopN: config.keepTopNPerModuleWindow, + now, + }); + + continue; + } + + // 2) compute in batches + const seen: string[] = []; + const dimBatches = chunk(dims, config.dimensionBatchSize); + for (const batch of dimBatches) { + let results: ComputeResult[] = []; + try { + results = await mod.computeMany(ctx, batch); + } catch (e) { + projLogger.error('[insights] module computeMany failed', { + projectId, + module: mod.key, + windowKind, + err: e, + }); + continue; + } + + for (const r of results) { + if (!r?.ok) continue; + if (!r.dimensionKey) continue; + + // 3) gate noise + if (!passesThresholds(r, mod, config)) continue; + + // 4) score + const impact = mod.score + ? mod.score(r, ctx) + : defaultImpactScore(r); + const sev = severityBand(r.changePct); + + // 5) dedupe/material change requires loading prev identity + const prev = await store.getActiveInsightByIdentity({ + projectId, + moduleKey: mod.key, + dimensionKey: r.dimensionKey, + windowKind, + }); + + const decision = materialDecision(prev, { + changePct: r.changePct, + direction: r.direction, + }); + + // 6) render + const card = mod.render(r, ctx); + + // 7) upsert + const persisted = await store.upsertInsight({ + projectId, + moduleKey: mod.key, + dimensionKey: r.dimensionKey, + window, + card, + metrics: { + direction: r.direction, + impactScore: impact, + severityBand: sev, + }, + now, + decision, + prev, + }); + + seen.push(r.dimensionKey); + + // 8) events only when material + if (!prev) { + await store.insertEvent({ + projectId, + insightId: persisted.id, + moduleKey: mod.key, + dimensionKey: r.dimensionKey, + windowKind, + eventKind: 'created', + changeFrom: null, + changeTo: { + title: card.title, + changePct: r.changePct, + direction: r.direction, + impact, + severityBand: sev, + }, + now, + }); + } else if (decision.material) { + const eventKind = + decision.reason === 'direction_flip' + ? 'direction_flip' + : decision.reason === 'severity_change' + ? sev && prev.severityBand && sev > prev.severityBand + ? 'severity_up' + : 'severity_down' + : 'updated'; + + await store.insertEvent({ + projectId, + insightId: persisted.id, + moduleKey: mod.key, + dimensionKey: r.dimensionKey, + windowKind, + eventKind, + changeFrom: { + direction: prev.direction, + impactScore: prev.impactScore, + severityBand: prev.severityBand, + }, + changeTo: { + changePct: r.changePct, + direction: r.direction, + impactScore: impact, + severityBand: sev, + }, + now, + }); + } + } + } + + // 10) lifecycle: close missing insights for this module/window + await store.closeMissingActiveInsights({ + projectId, + moduleKey: mod.key, + windowKind, + seenDimensionKeys: seen, + now, + staleDays: config.closeStaleAfterDays, + }); + + // 11) suppression: keep top N + await store.applySuppression({ + projectId, + moduleKey: mod.key, + windowKind, + keepTopN: config.keepTopNPerModuleWindow, + now, + }); + } + } + } + + return { runProject }; +} diff --git a/packages/db/src/services/insights/index.ts b/packages/db/src/services/insights/index.ts new file mode 100644 index 00000000..2d606504 --- /dev/null +++ b/packages/db/src/services/insights/index.ts @@ -0,0 +1,8 @@ +export * from './types'; +export * from './windows'; +export * from './scoring'; +export * from './material'; +export * from './engine'; +export * from './store'; +export * from './utils'; +export * from './modules'; diff --git a/packages/db/src/services/insights/material.ts b/packages/db/src/services/insights/material.ts new file mode 100644 index 00000000..13699b31 --- /dev/null +++ b/packages/db/src/services/insights/material.ts @@ -0,0 +1,43 @@ +import { severityBand as band } from './scoring'; +import type { MaterialDecision, PersistedInsight } from './types'; + +export function materialDecision( + prev: PersistedInsight | null, + next: { + changePct?: number; + direction?: 'up' | 'down' | 'flat'; + }, +): MaterialDecision { + const nextBand = band(next.changePct); + if (!prev) { + return { material: true, reason: 'created', newSeverityBand: nextBand }; + } + + // direction flip is always meaningful + const prevDir = (prev.direction ?? 'flat') as any; + const nextDir = next.direction ?? 'flat'; + if (prevDir !== nextDir && (nextDir === 'up' || nextDir === 'down')) { + return { + material: true, + reason: 'direction_flip', + newSeverityBand: nextBand, + }; + } + + // severity band change + const prevBand = (prev.severityBand ?? null) as any; + if (prevBand !== nextBand && nextBand !== null) { + return { + material: true, + reason: 'severity_change', + newSeverityBand: nextBand, + }; + } + + // Otherwise: treat as non-material (silent refresh). You can add deadband crossing here if you store prior changePct. + return { + material: false, + reason: 'none', + newSeverityBand: prevBand ?? nextBand, + }; +} diff --git a/packages/db/src/services/insights/modules/devices.module.ts b/packages/db/src/services/insights/modules/devices.module.ts new file mode 100644 index 00000000..9afea524 --- /dev/null +++ b/packages/db/src/services/insights/modules/devices.module.ts @@ -0,0 +1,275 @@ +import { TABLE_NAMES, formatClickhouseDate } from '../../../clickhouse/client'; +import type { + ComputeContext, + ComputeResult, + InsightModule, + RenderedCard, +} from '../types'; +import { + buildLookupMap, + computeChangePct, + computeDirection, + computeMedian, + getEndOfDay, + getWeekday, + selectTopDimensions, +} from '../utils'; + +async function fetchDeviceAggregates(ctx: ComputeContext): Promise<{ + currentMap: Map; + baselineMap: Map; + totalCurrent: number; + totalBaseline: number; +}> { + if (ctx.window.kind === 'yesterday') { + const [currentResults, baselineResults, totals] = await Promise.all([ + ctx + .clix() + .select<{ device: string; cnt: number }>(['device', 'count(*) as cnt']) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.start, + getEndOfDay(ctx.window.end), + ]) + .groupBy(['device']) + .execute(), + ctx + .clix() + .select<{ date: string; device: string; cnt: number }>([ + 'toDate(created_at) as date', + 'device', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.baselineEnd), + ]) + .groupBy(['date', 'device']) + .execute(), + ctx + .clix() + .select<{ cur_total: number }>([ + ctx.clix.exp( + `countIf(created_at BETWEEN '${formatClickhouseDate(ctx.window.start)}' AND '${formatClickhouseDate(getEndOfDay(ctx.window.end))}') as cur_total`, + ), + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.end), + ]) + .execute(), + ]); + + const currentMap = buildLookupMap(currentResults, (r) => r.device); + + const targetWeekday = getWeekday(ctx.window.start); + const aggregated = new Map(); + for (const r of baselineResults) { + if (!aggregated.has(r.device)) { + aggregated.set(r.device, []); + } + const entries = aggregated.get(r.device)!; + const existing = entries.find((e) => e.date === r.date); + if (existing) { + existing.cnt += Number(r.cnt ?? 0); + } else { + entries.push({ date: r.date, cnt: Number(r.cnt ?? 0) }); + } + } + + const baselineMap = new Map(); + for (const [deviceType, entries] of aggregated) { + const sameWeekdayValues = entries + .filter((e) => getWeekday(new Date(e.date)) === targetWeekday) + .map((e) => e.cnt) + .sort((a, b) => a - b); + + if (sameWeekdayValues.length > 0) { + baselineMap.set(deviceType, computeMedian(sameWeekdayValues)); + } + } + + const totalCurrent = totals[0]?.cur_total ?? 0; + const totalBaseline = + baselineMap.size > 0 + ? Array.from(baselineMap.values()).reduce((sum, val) => sum + val, 0) + : 0; + + return { currentMap, baselineMap, totalCurrent, totalBaseline }; + } + + const curStart = formatClickhouseDate(ctx.window.start); + const curEnd = formatClickhouseDate(getEndOfDay(ctx.window.end)); + const baseStart = formatClickhouseDate(ctx.window.baselineStart); + const baseEnd = formatClickhouseDate(getEndOfDay(ctx.window.baselineEnd)); + + const [results, totals] = await Promise.all([ + ctx + .clix() + .select<{ device: string; cur: number; base: number }>([ + 'device', + ctx.clix.exp( + `countIf(created_at BETWEEN '${curStart}' AND '${curEnd}') as cur`, + ), + ctx.clix.exp( + `countIf(created_at BETWEEN '${baseStart}' AND '${baseEnd}') as base`, + ), + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.end), + ]) + .groupBy(['device']) + .execute(), + ctx + .clix() + .select<{ cur_total: number; base_total: number }>([ + ctx.clix.exp( + `countIf(created_at BETWEEN '${curStart}' AND '${curEnd}') as cur_total`, + ), + ctx.clix.exp( + `countIf(created_at BETWEEN '${baseStart}' AND '${baseEnd}') as base_total`, + ), + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.end), + ]) + .execute(), + ]); + + const currentMap = buildLookupMap( + results, + (r) => r.device, + (r) => Number(r.cur ?? 0), + ); + + const baselineMap = buildLookupMap( + results, + (r) => r.device, + (r) => Number(r.base ?? 0), + ); + + const totalCurrent = totals[0]?.cur_total ?? 0; + const totalBaseline = totals[0]?.base_total ?? 0; + + return { currentMap, baselineMap, totalCurrent, totalBaseline }; +} + +export const devicesModule: InsightModule = { + key: 'devices', + cadence: ['daily'], + thresholds: { minTotal: 100, minAbsDelta: 0, minPct: 0.08, maxDims: 5 }, + + async enumerateDimensions(ctx) { + const { currentMap, baselineMap } = await fetchDeviceAggregates(ctx); + const topDims = selectTopDimensions( + currentMap, + baselineMap, + this.thresholds?.maxDims ?? 5, + ); + return topDims.map((dim) => `device:${dim}`); + }, + + async computeMany(ctx, dimensionKeys): Promise { + const { currentMap, baselineMap, totalCurrent, totalBaseline } = + await fetchDeviceAggregates(ctx); + const results: ComputeResult[] = []; + + for (const dimKey of dimensionKeys) { + if (!dimKey.startsWith('device:')) continue; + const deviceType = dimKey.replace('device:', ''); + + const currentValue = currentMap.get(deviceType) ?? 0; + const compareValue = baselineMap.get(deviceType) ?? 0; + + const currentShare = totalCurrent > 0 ? currentValue / totalCurrent : 0; + const compareShare = totalBaseline > 0 ? compareValue / totalBaseline : 0; + + const shareShiftPp = (currentShare - compareShare) * 100; + const changePct = computeChangePct(currentValue, compareValue); + const direction = computeDirection(changePct); + + results.push({ + ok: true, + dimensionKey: dimKey, + currentValue, + compareValue, + changePct, + direction, + extra: { + shareShiftPp, + currentShare, + compareShare, + }, + }); + } + + return results; + }, + + render(result, ctx): RenderedCard { + const device = result.dimensionKey.replace('device:', ''); + const changePct = result.changePct ?? 0; + const isIncrease = changePct >= 0; + + const sessionsCurrent = result.currentValue ?? 0; + const sessionsCompare = result.compareValue ?? 0; + const shareCurrent = Number(result.extra?.currentShare ?? 0); + const shareCompare = Number(result.extra?.compareShare ?? 0); + + return { + title: `${device} ${isIncrease ? '↑' : '↓'} ${Math.abs(changePct * 100).toFixed(0)}%`, + summary: `${ctx.window.label}. Device traffic change.`, + displayName: device, + payload: { + kind: 'insight_v1', + dimensions: [{ key: 'device', value: device, displayName: device }], + primaryMetric: 'sessions', + metrics: { + sessions: { + current: sessionsCurrent, + compare: sessionsCompare, + delta: sessionsCurrent - sessionsCompare, + changePct: sessionsCompare > 0 ? (result.changePct ?? 0) : null, + direction: result.direction ?? 'flat', + unit: 'count', + }, + share: { + current: shareCurrent, + compare: shareCompare, + delta: shareCurrent - shareCompare, + changePct: + shareCompare > 0 + ? (shareCurrent - shareCompare) / shareCompare + : null, + direction: + shareCurrent - shareCompare > 0.0005 + ? 'up' + : shareCurrent - shareCompare < -0.0005 + ? 'down' + : 'flat', + unit: 'ratio', + }, + }, + extra: { + // keep module-specific flags/fields if needed later + }, + }, + }; + }, +}; diff --git a/packages/db/src/services/insights/modules/entry-pages.module.ts b/packages/db/src/services/insights/modules/entry-pages.module.ts new file mode 100644 index 00000000..ea92df93 --- /dev/null +++ b/packages/db/src/services/insights/modules/entry-pages.module.ts @@ -0,0 +1,287 @@ +import { TABLE_NAMES, formatClickhouseDate } from '../../../clickhouse/client'; +import type { + ComputeContext, + ComputeResult, + InsightModule, + RenderedCard, +} from '../types'; +import { + buildLookupMap, + computeChangePct, + computeDirection, + computeWeekdayMedians, + getEndOfDay, + getWeekday, + selectTopDimensions, +} from '../utils'; + +const DELIMITER = '|||'; + +async function fetchEntryPageAggregates(ctx: ComputeContext): Promise<{ + currentMap: Map; + baselineMap: Map; + totalCurrent: number; + totalBaseline: number; +}> { + if (ctx.window.kind === 'yesterday') { + const [currentResults, baselineResults, totals] = await Promise.all([ + ctx + .clix() + .select<{ entry_origin: string; entry_path: string; cnt: number }>([ + 'entry_origin', + 'entry_path', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.start, + getEndOfDay(ctx.window.end), + ]) + .groupBy(['entry_origin', 'entry_path']) + .execute(), + ctx + .clix() + .select<{ + date: string; + entry_origin: string; + entry_path: string; + cnt: number; + }>([ + 'toDate(created_at) as date', + 'entry_origin', + 'entry_path', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.baselineEnd), + ]) + .groupBy(['date', 'entry_origin', 'entry_path']) + .execute(), + ctx + .clix() + .select<{ cur_total: number }>([ + ctx.clix.exp( + `countIf(created_at BETWEEN '${formatClickhouseDate(ctx.window.start)}' AND '${formatClickhouseDate(getEndOfDay(ctx.window.end))}') as cur_total`, + ), + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.end), + ]) + .execute(), + ]); + + const currentMap = buildLookupMap( + currentResults, + (r) => `${r.entry_origin || ''}${DELIMITER}${r.entry_path || '/'}`, + ); + + const targetWeekday = getWeekday(ctx.window.start); + const baselineMap = computeWeekdayMedians( + baselineResults, + targetWeekday, + (r) => `${r.entry_origin || ''}${DELIMITER}${r.entry_path || '/'}`, + ); + + const totalCurrent = totals[0]?.cur_total ?? 0; + const totalBaseline = Array.from(baselineMap.values()).reduce( + (sum, val) => sum + val, + 0, + ); + + return { currentMap, baselineMap, totalCurrent, totalBaseline }; + } + + const curStart = formatClickhouseDate(ctx.window.start); + const curEnd = formatClickhouseDate(getEndOfDay(ctx.window.end)); + const baseStart = formatClickhouseDate(ctx.window.baselineStart); + const baseEnd = formatClickhouseDate(getEndOfDay(ctx.window.baselineEnd)); + + const [results, totals] = await Promise.all([ + ctx + .clix() + .select<{ + entry_origin: string; + entry_path: string; + cur: number; + base: number; + }>([ + 'entry_origin', + 'entry_path', + ctx.clix.exp( + `countIf(created_at BETWEEN '${curStart}' AND '${curEnd}') as cur`, + ), + ctx.clix.exp( + `countIf(created_at BETWEEN '${baseStart}' AND '${baseEnd}') as base`, + ), + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.end), + ]) + .groupBy(['entry_origin', 'entry_path']) + .execute(), + ctx + .clix() + .select<{ cur_total: number; base_total: number }>([ + ctx.clix.exp( + `countIf(created_at BETWEEN '${curStart}' AND '${curEnd}') as cur_total`, + ), + ctx.clix.exp( + `countIf(created_at BETWEEN '${baseStart}' AND '${baseEnd}') as base_total`, + ), + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.end), + ]) + .execute(), + ]); + + const currentMap = buildLookupMap( + results, + (r) => `${r.entry_origin || ''}${DELIMITER}${r.entry_path || '/'}`, + (r) => Number(r.cur ?? 0), + ); + + const baselineMap = buildLookupMap( + results, + (r) => `${r.entry_origin || ''}${DELIMITER}${r.entry_path || '/'}`, + (r) => Number(r.base ?? 0), + ); + + const totalCurrent = totals[0]?.cur_total ?? 0; + const totalBaseline = totals[0]?.base_total ?? 0; + + return { currentMap, baselineMap, totalCurrent, totalBaseline }; +} + +export const entryPagesModule: InsightModule = { + key: 'entry-pages', + cadence: ['daily'], + thresholds: { minTotal: 100, minAbsDelta: 30, minPct: 0.2, maxDims: 100 }, + + async enumerateDimensions(ctx) { + const { currentMap, baselineMap } = await fetchEntryPageAggregates(ctx); + const topDims = selectTopDimensions( + currentMap, + baselineMap, + this.thresholds?.maxDims ?? 100, + ); + return topDims.map((dim) => `entry:${dim}`); + }, + + async computeMany(ctx, dimensionKeys): Promise { + const { currentMap, baselineMap, totalCurrent, totalBaseline } = + await fetchEntryPageAggregates(ctx); + const results: ComputeResult[] = []; + + for (const dimKey of dimensionKeys) { + if (!dimKey.startsWith('entry:')) continue; + const originPath = dimKey.replace('entry:', ''); + + const currentValue = currentMap.get(originPath) ?? 0; + const compareValue = baselineMap.get(originPath) ?? 0; + + const currentShare = totalCurrent > 0 ? currentValue / totalCurrent : 0; + const compareShare = totalBaseline > 0 ? compareValue / totalBaseline : 0; + + const shareShiftPp = (currentShare - compareShare) * 100; + const changePct = computeChangePct(currentValue, compareValue); + const direction = computeDirection(changePct); + + results.push({ + ok: true, + dimensionKey: dimKey, + currentValue, + compareValue, + changePct, + direction, + extra: { + shareShiftPp, + currentShare, + compareShare, + isNew: compareValue === 0 && currentValue > 0, + }, + }); + } + + return results; + }, + + render(result, ctx): RenderedCard { + const originPath = result.dimensionKey.replace('entry:', ''); + const [origin, path] = originPath.split(DELIMITER); + const displayValue = origin ? `${origin}${path}` : path || '/'; + const pct = ((result.changePct ?? 0) * 100).toFixed(1); + const isIncrease = (result.changePct ?? 0) >= 0; + const isNew = result.extra?.isNew as boolean | undefined; + + const title = isNew + ? `New entry page: ${displayValue}` + : `Entry page ${displayValue} ${isIncrease ? '↑' : '↓'} ${Math.abs(Number(pct))}%`; + + const sessionsCurrent = result.currentValue ?? 0; + const sessionsCompare = result.compareValue ?? 0; + const shareCurrent = Number(result.extra?.currentShare ?? 0); + const shareCompare = Number(result.extra?.compareShare ?? 0); + + return { + title, + summary: `${ctx.window.label}. Sessions ${sessionsCurrent} vs ${sessionsCompare}.`, + displayName: displayValue, + payload: { + kind: 'insight_v1', + dimensions: [ + { key: 'origin', value: origin ?? '', displayName: origin ?? '' }, + { key: 'path', value: path ?? '', displayName: path ?? '' }, + ], + primaryMetric: 'sessions', + metrics: { + sessions: { + current: sessionsCurrent, + compare: sessionsCompare, + delta: sessionsCurrent - sessionsCompare, + changePct: sessionsCompare > 0 ? (result.changePct ?? 0) : null, + direction: result.direction ?? 'flat', + unit: 'count', + }, + share: { + current: shareCurrent, + compare: shareCompare, + delta: shareCurrent - shareCompare, + changePct: + shareCompare > 0 + ? (shareCurrent - shareCompare) / shareCompare + : null, + direction: + shareCurrent - shareCompare > 0.0005 + ? 'up' + : shareCurrent - shareCompare < -0.0005 + ? 'down' + : 'flat', + unit: 'ratio', + }, + }, + extra: { + isNew: result.extra?.isNew, + }, + }, + }; + }, +}; diff --git a/packages/db/src/services/insights/modules/geo.module.ts b/packages/db/src/services/insights/modules/geo.module.ts new file mode 100644 index 00000000..e848507f --- /dev/null +++ b/packages/db/src/services/insights/modules/geo.module.ts @@ -0,0 +1,271 @@ +import { getCountry } from '@openpanel/constants'; +import { TABLE_NAMES, formatClickhouseDate } from '../../../clickhouse/client'; +import type { + ComputeContext, + ComputeResult, + InsightModule, + RenderedCard, +} from '../types'; +import { + buildLookupMap, + computeChangePct, + computeDirection, + computeWeekdayMedians, + getEndOfDay, + getWeekday, + selectTopDimensions, +} from '../utils'; + +async function fetchGeoAggregates(ctx: ComputeContext): Promise<{ + currentMap: Map; + baselineMap: Map; + totalCurrent: number; + totalBaseline: number; +}> { + if (ctx.window.kind === 'yesterday') { + const [currentResults, baselineResults, totals] = await Promise.all([ + ctx + .clix() + .select<{ country: string; cnt: number }>([ + 'country', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.start, + getEndOfDay(ctx.window.end), + ]) + .groupBy(['country']) + .execute(), + ctx + .clix() + .select<{ date: string; country: string; cnt: number }>([ + 'toDate(created_at) as date', + 'country', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.baselineEnd), + ]) + .groupBy(['date', 'country']) + .execute(), + ctx + .clix() + .select<{ cur_total: number }>([ + ctx.clix.exp( + `countIf(created_at BETWEEN '${formatClickhouseDate(ctx.window.start)}' AND '${formatClickhouseDate(getEndOfDay(ctx.window.end))}') as cur_total`, + ), + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.end), + ]) + .execute(), + ]); + + const currentMap = buildLookupMap( + currentResults, + (r) => r.country || 'unknown', + ); + + const targetWeekday = getWeekday(ctx.window.start); + const baselineMap = computeWeekdayMedians( + baselineResults, + targetWeekday, + (r) => r.country || 'unknown', + ); + + const totalCurrent = totals[0]?.cur_total ?? 0; + const totalBaseline = Array.from(baselineMap.values()).reduce( + (sum, val) => sum + val, + 0, + ); + + return { currentMap, baselineMap, totalCurrent, totalBaseline }; + } + + const curStart = formatClickhouseDate(ctx.window.start); + const curEnd = formatClickhouseDate(getEndOfDay(ctx.window.end)); + const baseStart = formatClickhouseDate(ctx.window.baselineStart); + const baseEnd = formatClickhouseDate(getEndOfDay(ctx.window.baselineEnd)); + + const [results, totals] = await Promise.all([ + ctx + .clix() + .select<{ country: string; cur: number; base: number }>([ + 'country', + ctx.clix.exp( + `countIf(created_at BETWEEN '${curStart}' AND '${curEnd}') as cur`, + ), + ctx.clix.exp( + `countIf(created_at BETWEEN '${baseStart}' AND '${baseEnd}') as base`, + ), + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.end), + ]) + .groupBy(['country']) + .execute(), + ctx + .clix() + .select<{ cur_total: number; base_total: number }>([ + ctx.clix.exp( + `countIf(created_at BETWEEN '${curStart}' AND '${curEnd}') as cur_total`, + ), + ctx.clix.exp( + `countIf(created_at BETWEEN '${baseStart}' AND '${baseEnd}') as base_total`, + ), + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.end), + ]) + .execute(), + ]); + + const currentMap = buildLookupMap( + results, + (r) => r.country || 'unknown', + (r) => Number(r.cur ?? 0), + ); + + const baselineMap = buildLookupMap( + results, + (r) => r.country || 'unknown', + (r) => Number(r.base ?? 0), + ); + + const totalCurrent = totals[0]?.cur_total ?? 0; + const totalBaseline = totals[0]?.base_total ?? 0; + + return { currentMap, baselineMap, totalCurrent, totalBaseline }; +} + +export const geoModule: InsightModule = { + key: 'geo', + cadence: ['daily'], + thresholds: { minTotal: 100, minAbsDelta: 0, minPct: 0.08, maxDims: 30 }, + + async enumerateDimensions(ctx) { + const { currentMap, baselineMap } = await fetchGeoAggregates(ctx); + const topDims = selectTopDimensions( + currentMap, + baselineMap, + this.thresholds?.maxDims ?? 30, + ); + return topDims.map((dim) => `country:${dim}`); + }, + + async computeMany(ctx, dimensionKeys): Promise { + const { currentMap, baselineMap, totalCurrent, totalBaseline } = + await fetchGeoAggregates(ctx); + const results: ComputeResult[] = []; + + for (const dimKey of dimensionKeys) { + if (!dimKey.startsWith('country:')) continue; + const country = dimKey.replace('country:', ''); + + const currentValue = currentMap.get(country) ?? 0; + const compareValue = baselineMap.get(country) ?? 0; + + const currentShare = totalCurrent > 0 ? currentValue / totalCurrent : 0; + const compareShare = totalBaseline > 0 ? compareValue / totalBaseline : 0; + + const shareShiftPp = (currentShare - compareShare) * 100; + const changePct = computeChangePct(currentValue, compareValue); + const direction = computeDirection(changePct); + + results.push({ + ok: true, + dimensionKey: dimKey, + currentValue, + compareValue, + changePct, + direction, + extra: { + shareShiftPp, + currentShare, + compareShare, + isNew: compareValue === 0 && currentValue > 0, + }, + }); + } + + return results; + }, + + render(result, ctx): RenderedCard { + const country = result.dimensionKey.replace('country:', ''); + const changePct = result.changePct ?? 0; + const isIncrease = changePct >= 0; + const isNew = result.extra?.isNew as boolean | undefined; + const displayName = getCountry(country); + + const title = isNew + ? `New traffic from: ${displayName}` + : `${displayName} ${isIncrease ? '↑' : '↓'} ${Math.abs(changePct * 100).toFixed(0)}%`; + + const sessionsCurrent = result.currentValue ?? 0; + const sessionsCompare = result.compareValue ?? 0; + const shareCurrent = Number(result.extra?.currentShare ?? 0); + const shareCompare = Number(result.extra?.compareShare ?? 0); + + return { + title, + summary: `${ctx.window.label}. Traffic change from ${displayName}.`, + displayName, + payload: { + kind: 'insight_v1', + dimensions: [ + { key: 'country', value: country, displayName: displayName }, + ], + primaryMetric: 'sessions', + metrics: { + sessions: { + current: sessionsCurrent, + compare: sessionsCompare, + delta: sessionsCurrent - sessionsCompare, + changePct: sessionsCompare > 0 ? (result.changePct ?? 0) : null, + direction: result.direction ?? 'flat', + unit: 'count', + }, + share: { + current: shareCurrent, + compare: shareCompare, + delta: shareCurrent - shareCompare, + changePct: + shareCompare > 0 + ? (shareCurrent - shareCompare) / shareCompare + : null, + direction: + shareCurrent - shareCompare > 0.0005 + ? 'up' + : shareCurrent - shareCompare < -0.0005 + ? 'down' + : 'flat', + unit: 'ratio', + }, + }, + extra: { + isNew: result.extra?.isNew, + }, + }, + }; + }, +}; diff --git a/packages/db/src/services/insights/modules/index.ts b/packages/db/src/services/insights/modules/index.ts new file mode 100644 index 00000000..72b45147 --- /dev/null +++ b/packages/db/src/services/insights/modules/index.ts @@ -0,0 +1,5 @@ +export { referrersModule } from './referrers.module'; +export { entryPagesModule } from './entry-pages.module'; +export { pageTrendsModule } from './page-trends.module'; +export { geoModule } from './geo.module'; +export { devicesModule } from './devices.module'; diff --git a/packages/db/src/services/insights/modules/page-trends.module.ts b/packages/db/src/services/insights/modules/page-trends.module.ts new file mode 100644 index 00000000..3da83cef --- /dev/null +++ b/packages/db/src/services/insights/modules/page-trends.module.ts @@ -0,0 +1,298 @@ +import { TABLE_NAMES, formatClickhouseDate } from '../../../clickhouse/client'; +import type { + ComputeContext, + ComputeResult, + InsightModule, + RenderedCard, +} from '../types'; +import { + buildLookupMap, + computeChangePct, + computeDirection, + computeWeekdayMedians, + getEndOfDay, + getWeekday, + selectTopDimensions, +} from '../utils'; + +const DELIMITER = '|||'; + +async function fetchPageTrendAggregates(ctx: ComputeContext): Promise<{ + currentMap: Map; + baselineMap: Map; + totalCurrent: number; + totalBaseline: number; +}> { + if (ctx.window.kind === 'yesterday') { + const [currentResults, baselineResults, totals] = await Promise.all([ + ctx + .clix() + .select<{ origin: string; path: string; cnt: number }>([ + 'origin', + 'path', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.events) + .where('project_id', '=', ctx.projectId) + .where('name', '=', 'screen_view') + .where('created_at', 'BETWEEN', [ + ctx.window.start, + getEndOfDay(ctx.window.end), + ]) + .groupBy(['origin', 'path']) + .execute(), + ctx + .clix() + .select<{ date: string; origin: string; path: string; cnt: number }>([ + 'toDate(created_at) as date', + 'origin', + 'path', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.events) + .where('project_id', '=', ctx.projectId) + .where('name', '=', 'screen_view') + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.baselineEnd), + ]) + .groupBy(['date', 'origin', 'path']) + .execute(), + ctx + .clix() + .select<{ cur_total: number }>([ + ctx.clix.exp( + `countIf(created_at BETWEEN '${formatClickhouseDate(ctx.window.start)}' AND '${formatClickhouseDate(getEndOfDay(ctx.window.end))}') as cur_total`, + ), + ]) + .from(TABLE_NAMES.events) + .where('project_id', '=', ctx.projectId) + .where('name', '=', 'screen_view') + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.end), + ]) + .execute(), + ]); + + const currentMap = buildLookupMap( + currentResults, + (r) => `${r.origin || ''}${DELIMITER}${r.path || '/'}`, + ); + + const targetWeekday = getWeekday(ctx.window.start); + const baselineMap = computeWeekdayMedians( + baselineResults, + targetWeekday, + (r) => `${r.origin || ''}${DELIMITER}${r.path || '/'}`, + ); + + const totalCurrent = totals[0]?.cur_total ?? 0; + const totalBaseline = Array.from(baselineMap.values()).reduce( + (sum, val) => sum + val, + 0, + ); + + return { currentMap, baselineMap, totalCurrent, totalBaseline }; + } + + const curStart = formatClickhouseDate(ctx.window.start); + const curEnd = formatClickhouseDate(getEndOfDay(ctx.window.end)); + const baseStart = formatClickhouseDate(ctx.window.baselineStart); + const baseEnd = formatClickhouseDate(getEndOfDay(ctx.window.baselineEnd)); + + const [results, totals] = await Promise.all([ + ctx + .clix() + .select<{ origin: string; path: string; cur: number; base: number }>([ + 'origin', + 'path', + ctx.clix.exp( + `countIf(created_at BETWEEN '${curStart}' AND '${curEnd}') as cur`, + ), + ctx.clix.exp( + `countIf(created_at BETWEEN '${baseStart}' AND '${baseEnd}') as base`, + ), + ]) + .from(TABLE_NAMES.events) + .where('project_id', '=', ctx.projectId) + .where('name', '=', 'screen_view') + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.end), + ]) + .groupBy(['origin', 'path']) + .execute(), + ctx + .clix() + .select<{ cur_total: number; base_total: number }>([ + ctx.clix.exp( + `countIf(created_at BETWEEN '${curStart}' AND '${curEnd}') as cur_total`, + ), + ctx.clix.exp( + `countIf(created_at BETWEEN '${baseStart}' AND '${baseEnd}') as base_total`, + ), + ]) + .from(TABLE_NAMES.events) + .where('project_id', '=', ctx.projectId) + .where('name', '=', 'screen_view') + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.end), + ]) + .execute(), + ]); + + const currentMap = buildLookupMap( + results, + (r) => `${r.origin || ''}${DELIMITER}${r.path || '/'}`, + (r) => Number(r.cur ?? 0), + ); + + const baselineMap = buildLookupMap( + results, + (r) => `${r.origin || ''}${DELIMITER}${r.path || '/'}`, + (r) => Number(r.base ?? 0), + ); + + const totalCurrent = totals[0]?.cur_total ?? 0; + const totalBaseline = totals[0]?.base_total ?? 0; + + return { currentMap, baselineMap, totalCurrent, totalBaseline }; +} + +export const pageTrendsModule: InsightModule = { + key: 'page-trends', + cadence: ['daily'], + // Share-based thresholds (values in basis points: 100 = 1%) + // minTotal: require at least 0.5% combined share (current + baseline) + // minAbsDelta: require at least 0.5 percentage point shift + // minPct: require at least 25% relative change in share + thresholds: { minTotal: 50, minAbsDelta: 50, minPct: 0.25, maxDims: 100 }, + + async enumerateDimensions(ctx) { + const { currentMap, baselineMap } = await fetchPageTrendAggregates(ctx); + const topDims = selectTopDimensions( + currentMap, + baselineMap, + this.thresholds?.maxDims ?? 100, + ); + return topDims.map((dim) => `page:${dim}`); + }, + + async computeMany(ctx, dimensionKeys): Promise { + const { currentMap, baselineMap, totalCurrent, totalBaseline } = + await fetchPageTrendAggregates(ctx); + const results: ComputeResult[] = []; + + for (const dimKey of dimensionKeys) { + if (!dimKey.startsWith('page:')) continue; + const originPath = dimKey.replace('page:', ''); + + const pageviewsCurrent = currentMap.get(originPath) ?? 0; + const pageviewsCompare = baselineMap.get(originPath) ?? 0; + + const currentShare = + totalCurrent > 0 ? pageviewsCurrent / totalCurrent : 0; + const compareShare = + totalBaseline > 0 ? pageviewsCompare / totalBaseline : 0; + + // Use share values in basis points (100 = 1%) for thresholding + // This makes thresholds intuitive: minAbsDelta=50 means 0.5pp shift + const currentShareBp = currentShare * 10000; + const compareShareBp = compareShare * 10000; + + const shareShiftPp = (currentShare - compareShare) * 100; + // changePct is relative change in share, not absolute pageviews + const shareChangePct = computeChangePct(currentShare, compareShare); + const direction = computeDirection(shareChangePct); + + results.push({ + ok: true, + dimensionKey: dimKey, + // Use share in basis points for threshold checks + currentValue: currentShareBp, + compareValue: compareShareBp, + changePct: shareChangePct, + direction, + extra: { + // Keep absolute values for display + pageviewsCurrent, + pageviewsCompare, + shareShiftPp, + currentShare, + compareShare, + isNew: pageviewsCompare === 0 && pageviewsCurrent > 0, + }, + }); + } + + return results; + }, + + render(result, ctx): RenderedCard { + const originPath = result.dimensionKey.replace('page:', ''); + const [origin, path] = originPath.split(DELIMITER); + const displayValue = origin ? `${origin}${path}` : path || '/'; + + // Get absolute pageviews from extra (currentValue/compareValue are now share-based) + const pageviewsCurrent = Number(result.extra?.pageviewsCurrent ?? 0); + const pageviewsCompare = Number(result.extra?.pageviewsCompare ?? 0); + const shareCurrent = Number(result.extra?.currentShare ?? 0); + const shareCompare = Number(result.extra?.compareShare ?? 0); + const shareShiftPp = Number(result.extra?.shareShiftPp ?? 0); + const isNew = result.extra?.isNew as boolean | undefined; + + // Display share shift in percentage points + const isIncrease = shareShiftPp >= 0; + const shareShiftDisplay = Math.abs(shareShiftPp).toFixed(1); + + const title = isNew + ? `New page getting views: ${displayValue}` + : `Page ${displayValue} share ${isIncrease ? '↑' : '↓'} ${shareShiftDisplay}pp`; + + return { + title, + summary: `${ctx.window.label}. Share ${(shareCurrent * 100).toFixed(1)}% vs ${(shareCompare * 100).toFixed(1)}%.`, + displayName: displayValue, + payload: { + kind: 'insight_v1', + dimensions: [ + { key: 'origin', value: origin ?? '', displayName: origin ?? '' }, + { key: 'path', value: path ?? '', displayName: path ?? '' }, + ], + primaryMetric: 'share', + metrics: { + pageviews: { + current: pageviewsCurrent, + compare: pageviewsCompare, + delta: pageviewsCurrent - pageviewsCompare, + changePct: + pageviewsCompare > 0 + ? (pageviewsCurrent - pageviewsCompare) / pageviewsCompare + : null, + direction: + pageviewsCurrent > pageviewsCompare + ? 'up' + : pageviewsCurrent < pageviewsCompare + ? 'down' + : 'flat', + unit: 'count', + }, + share: { + current: shareCurrent, + compare: shareCompare, + delta: shareCurrent - shareCompare, + changePct: result.changePct ?? null, // This is now share-based + direction: result.direction ?? 'flat', + unit: 'ratio', + }, + }, + extra: { + isNew: result.extra?.isNew, + shareShiftPp, + }, + }, + }; + }, +}; diff --git a/packages/db/src/services/insights/modules/referrers.module.ts b/packages/db/src/services/insights/modules/referrers.module.ts new file mode 100644 index 00000000..2b9d4efd --- /dev/null +++ b/packages/db/src/services/insights/modules/referrers.module.ts @@ -0,0 +1,275 @@ +import { TABLE_NAMES, formatClickhouseDate } from '../../../clickhouse/client'; +import type { + ComputeContext, + ComputeResult, + InsightModule, + RenderedCard, +} from '../types'; +import { + buildLookupMap, + computeChangePct, + computeDirection, + computeWeekdayMedians, + getEndOfDay, + getWeekday, + selectTopDimensions, +} from '../utils'; + +async function fetchReferrerAggregates(ctx: ComputeContext): Promise<{ + currentMap: Map; + baselineMap: Map; + totalCurrent: number; + totalBaseline: number; +}> { + if (ctx.window.kind === 'yesterday') { + const [currentResults, baselineResults, totals] = await Promise.all([ + ctx + .clix() + .select<{ referrer_name: string; cnt: number }>([ + 'referrer_name', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.start, + getEndOfDay(ctx.window.end), + ]) + .groupBy(['referrer_name']) + .execute(), + ctx + .clix() + .select<{ date: string; referrer_name: string; cnt: number }>([ + 'toDate(created_at) as date', + 'referrer_name', + 'count(*) as cnt', + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.baselineEnd), + ]) + .groupBy(['date', 'referrer_name']) + .execute(), + ctx + .clix() + .select<{ cur_total: number }>([ + ctx.clix.exp( + `countIf(created_at BETWEEN '${formatClickhouseDate(ctx.window.start)}' AND '${formatClickhouseDate(getEndOfDay(ctx.window.end))}') as cur_total`, + ), + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.end), + ]) + .execute(), + ]); + + const currentMap = buildLookupMap( + currentResults, + (r) => r.referrer_name || 'direct', + ); + + const targetWeekday = getWeekday(ctx.window.start); + const baselineMap = computeWeekdayMedians( + baselineResults, + targetWeekday, + (r) => r.referrer_name || 'direct', + ); + + const totalCurrent = totals[0]?.cur_total ?? 0; + const totalBaseline = Array.from(baselineMap.values()).reduce( + (sum, val) => sum + val, + 0, + ); + + return { currentMap, baselineMap, totalCurrent, totalBaseline }; + } + + const curStart = formatClickhouseDate(ctx.window.start); + const curEnd = formatClickhouseDate(getEndOfDay(ctx.window.end)); + const baseStart = formatClickhouseDate(ctx.window.baselineStart); + const baseEnd = formatClickhouseDate(getEndOfDay(ctx.window.baselineEnd)); + + const [results, totals] = await Promise.all([ + ctx + .clix() + .select<{ referrer_name: string; cur: number; base: number }>([ + 'referrer_name', + ctx.clix.exp( + `countIf(created_at BETWEEN '${curStart}' AND '${curEnd}') as cur`, + ), + ctx.clix.exp( + `countIf(created_at BETWEEN '${baseStart}' AND '${baseEnd}') as base`, + ), + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.end), + ]) + .groupBy(['referrer_name']) + .execute(), + ctx + .clix() + .select<{ cur_total: number; base_total: number }>([ + ctx.clix.exp( + `countIf(created_at BETWEEN '${curStart}' AND '${curEnd}') as cur_total`, + ), + ctx.clix.exp( + `countIf(created_at BETWEEN '${baseStart}' AND '${baseEnd}') as base_total`, + ), + ]) + .from(TABLE_NAMES.sessions) + .where('project_id', '=', ctx.projectId) + .where('sign', '=', 1) + .where('created_at', 'BETWEEN', [ + ctx.window.baselineStart, + getEndOfDay(ctx.window.end), + ]) + .execute(), + ]); + + const currentMap = buildLookupMap( + results, + (r) => r.referrer_name || 'direct', + (r) => Number(r.cur ?? 0), + ); + + const baselineMap = buildLookupMap( + results, + (r) => r.referrer_name || 'direct', + (r) => Number(r.base ?? 0), + ); + + const totalCurrent = totals[0]?.cur_total ?? 0; + const totalBaseline = totals[0]?.base_total ?? 0; + + return { currentMap, baselineMap, totalCurrent, totalBaseline }; +} + +export const referrersModule: InsightModule = { + key: 'referrers', + cadence: ['daily'], + thresholds: { minTotal: 100, minAbsDelta: 20, minPct: 0.15, maxDims: 50 }, + + async enumerateDimensions(ctx) { + const { currentMap, baselineMap } = await fetchReferrerAggregates(ctx); + const topDims = selectTopDimensions( + currentMap, + baselineMap, + this.thresholds?.maxDims ?? 50, + ); + return topDims.map((dim) => `referrer:${dim}`); + }, + + async computeMany(ctx, dimensionKeys): Promise { + const { currentMap, baselineMap, totalCurrent, totalBaseline } = + await fetchReferrerAggregates(ctx); + const results: ComputeResult[] = []; + + for (const dimKey of dimensionKeys) { + if (!dimKey.startsWith('referrer:')) continue; + const referrerName = dimKey.replace('referrer:', ''); + + const currentValue = currentMap.get(referrerName) ?? 0; + const compareValue = baselineMap.get(referrerName) ?? 0; + + const currentShare = totalCurrent > 0 ? currentValue / totalCurrent : 0; + const compareShare = totalBaseline > 0 ? compareValue / totalBaseline : 0; + + const shareShiftPp = (currentShare - compareShare) * 100; + const changePct = computeChangePct(currentValue, compareValue); + const direction = computeDirection(changePct); + + results.push({ + ok: true, + dimensionKey: dimKey, + currentValue, + compareValue, + changePct, + direction, + extra: { + shareShiftPp, + currentShare, + compareShare, + isNew: compareValue === 0 && currentValue > 0, + isGone: currentValue === 0 && compareValue > 0, + }, + }); + } + + return results; + }, + + render(result, ctx): RenderedCard { + const referrer = result.dimensionKey.replace('referrer:', ''); + const pct = ((result.changePct ?? 0) * 100).toFixed(1); + const isIncrease = (result.changePct ?? 0) >= 0; + const isNew = result.extra?.isNew as boolean | undefined; + + const title = isNew + ? `New traffic source: ${referrer}` + : `Traffic from ${referrer} ${isIncrease ? '↑' : '↓'} ${Math.abs(Number(pct))}%`; + + const sessionsCurrent = result.currentValue ?? 0; + const sessionsCompare = result.compareValue ?? 0; + const shareCurrent = Number(result.extra?.currentShare ?? 0); + const shareCompare = Number(result.extra?.compareShare ?? 0); + + return { + title, + summary: `${ctx.window.label}. Sessions ${sessionsCurrent} vs ${sessionsCompare}.`, + displayName: referrer, + payload: { + kind: 'insight_v1', + dimensions: [ + { + key: 'referrer_name', + value: referrer, + displayName: referrer, + }, + ], + primaryMetric: 'sessions', + metrics: { + sessions: { + current: sessionsCurrent, + compare: sessionsCompare, + delta: sessionsCurrent - sessionsCompare, + changePct: sessionsCompare > 0 ? (result.changePct ?? 0) : null, + direction: result.direction ?? 'flat', + unit: 'count', + }, + share: { + current: shareCurrent, + compare: shareCompare, + delta: shareCurrent - shareCompare, + changePct: + shareCompare > 0 + ? (shareCurrent - shareCompare) / shareCompare + : null, + direction: + shareCurrent - shareCompare > 0.0005 + ? 'up' + : shareCurrent - shareCompare < -0.0005 + ? 'down' + : 'flat', + unit: 'ratio', + }, + }, + extra: { + isNew: result.extra?.isNew, + isGone: result.extra?.isGone, + }, + }, + }; + }, +}; diff --git a/packages/db/src/services/insights/scoring.ts b/packages/db/src/services/insights/scoring.ts new file mode 100644 index 00000000..7aff4a61 --- /dev/null +++ b/packages/db/src/services/insights/scoring.ts @@ -0,0 +1,18 @@ +import type { ComputeResult } from './types'; + +export function defaultImpactScore(r: ComputeResult): number { + const vol = (r.currentValue ?? 0) + (r.compareValue ?? 0); + const pct = Math.abs(r.changePct ?? 0); + // stable-ish: bigger change + bigger volume => higher impact + return Math.log1p(vol) * (pct * 100); +} + +export function severityBand( + changePct?: number | null, +): 'low' | 'moderate' | 'severe' | null { + const p = Math.abs(changePct ?? 0); + if (p < 0.1) return null; + if (p < 0.5) return 'low'; + if (p < 1) return 'moderate'; + return 'severe'; +} diff --git a/packages/db/src/services/insights/store.ts b/packages/db/src/services/insights/store.ts new file mode 100644 index 00000000..9e05316e --- /dev/null +++ b/packages/db/src/services/insights/store.ts @@ -0,0 +1,343 @@ +import { Prisma, db } from '../../prisma-client'; +import type { + Cadence, + InsightStore, + PersistedInsight, + RenderedCard, + WindowKind, + WindowRange, +} from './types'; + +export const insightStore: InsightStore = { + async listProjectIdsForCadence(cadence: Cadence): Promise { + const projects = await db.project.findMany({ + where: { + deleteAt: null, + eventsCount: { gt: 10_000 }, + updatedAt: { gt: new Date(Date.now() - 1000 * 60 * 60 * 24) }, + organization: { + subscriptionStatus: 'active', + }, + }, + select: { id: true }, + }); + return projects.map((p) => p.id); + }, + + async getProjectCreatedAt(projectId: string): Promise { + const project = await db.project.findFirst({ + where: { id: projectId, deleteAt: null }, + select: { createdAt: true }, + }); + return project?.createdAt ?? null; + }, + + async getActiveInsightByIdentity({ + projectId, + moduleKey, + dimensionKey, + windowKind, + }): Promise { + const insight = await db.projectInsight.findFirst({ + where: { + projectId, + moduleKey, + dimensionKey, + windowKind, + state: 'active', + }, + }); + + if (!insight) return null; + + return { + id: insight.id, + projectId: insight.projectId, + moduleKey: insight.moduleKey, + dimensionKey: insight.dimensionKey, + windowKind: insight.windowKind as WindowKind, + state: insight.state as 'active' | 'suppressed' | 'closed', + version: insight.version, + impactScore: insight.impactScore, + lastSeenAt: insight.lastSeenAt, + lastUpdatedAt: insight.lastUpdatedAt, + direction: insight.direction, + severityBand: insight.severityBand, + }; + }, + + async upsertInsight({ + projectId, + moduleKey, + dimensionKey, + window, + card, + metrics, + now, + decision, + prev, + }): Promise { + const baseData = { + projectId, + moduleKey, + dimensionKey, + windowKind: window.kind, + state: prev?.state === 'closed' ? 'active' : (prev?.state ?? 'active'), + title: card.title, + summary: card.summary ?? null, + displayName: card.displayName, + payload: card.payload, + direction: metrics.direction ?? null, + impactScore: metrics.impactScore, + severityBand: metrics.severityBand ?? null, + version: prev ? (decision.material ? prev.version + 1 : prev.version) : 1, + windowStart: window.start, + windowEnd: window.end, + lastSeenAt: now, + lastUpdatedAt: now, + }; + + // Try to find existing insight first + const existing = prev + ? await db.projectInsight.findFirst({ + where: { + projectId, + moduleKey, + dimensionKey, + windowKind: window.kind, + state: prev.state, + }, + }) + : null; + + let insight: any; + if (existing) { + // Update existing + insight = await db.projectInsight.update({ + where: { id: existing.id }, + data: { + ...baseData, + threadId: existing.threadId, // Preserve threadId + }, + }); + } else { + // Create new - need to check if there's a closed/suppressed one to reopen + const closed = await db.projectInsight.findFirst({ + where: { + projectId, + moduleKey, + dimensionKey, + windowKind: window.kind, + state: { in: ['closed', 'suppressed'] }, + }, + orderBy: { lastUpdatedAt: 'desc' }, + }); + + if (closed) { + // Reopen and update + insight = await db.projectInsight.update({ + where: { id: closed.id }, + data: { + ...baseData, + state: 'active', + threadId: closed.threadId, // Preserve threadId + }, + }); + } else { + // Create new + insight = await db.projectInsight.create({ + data: { + ...baseData, + firstDetectedAt: now, + }, + }); + } + } + + return { + id: insight.id, + projectId: insight.projectId, + moduleKey: insight.moduleKey, + dimensionKey: insight.dimensionKey, + windowKind: insight.windowKind as WindowKind, + state: insight.state as 'active' | 'suppressed' | 'closed', + version: insight.version, + impactScore: insight.impactScore, + lastSeenAt: insight.lastSeenAt, + lastUpdatedAt: insight.lastUpdatedAt, + direction: insight.direction, + severityBand: insight.severityBand, + }; + }, + + async insertEvent({ + projectId, + insightId, + moduleKey, + dimensionKey, + windowKind, + eventKind, + changeFrom, + changeTo, + now, + }): Promise { + await db.insightEvent.create({ + data: { + insightId, + eventKind, + changeFrom: changeFrom + ? (changeFrom as Prisma.InputJsonValue) + : Prisma.DbNull, + changeTo: changeTo + ? (changeTo as Prisma.InputJsonValue) + : Prisma.DbNull, + createdAt: now, + }, + }); + }, + + async closeMissingActiveInsights({ + projectId, + moduleKey, + windowKind, + seenDimensionKeys, + now, + staleDays, + }): Promise { + const staleDate = new Date(now); + staleDate.setDate(staleDate.getDate() - staleDays); + + const result = await db.projectInsight.updateMany({ + where: { + projectId, + moduleKey, + windowKind, + state: 'active', + lastSeenAt: { lt: staleDate }, + dimensionKey: { notIn: seenDimensionKeys }, + }, + data: { + state: 'closed', + lastUpdatedAt: now, + }, + }); + + return result.count; + }, + + async applySuppression({ + projectId, + moduleKey, + windowKind, + keepTopN, + now, + }): Promise<{ suppressed: number; unsuppressed: number }> { + // Get all active insights for this module/window, ordered by impactScore desc + const insights = await db.projectInsight.findMany({ + where: { + projectId, + moduleKey, + windowKind, + state: { in: ['active', 'suppressed'] }, + }, + orderBy: { impactScore: 'desc' }, + }); + + if (insights.length === 0) { + return { suppressed: 0, unsuppressed: 0 }; + } + + let suppressed = 0; + let unsuppressed = 0; + + // For "yesterday" insights, suppress any that are stale (windowEnd is not actually yesterday) + // This prevents showing confusing insights like "Yesterday traffic dropped" when it's from 2+ days ago + if (windowKind === 'yesterday') { + const yesterday = new Date(now); + yesterday.setUTCHours(0, 0, 0, 0); + yesterday.setUTCDate(yesterday.getUTCDate() - 1); + const yesterdayTime = yesterday.getTime(); + + for (const insight of insights) { + // If windowEnd is null, consider it stale + const isStale = insight.windowEnd + ? new Date(insight.windowEnd).setUTCHours(0, 0, 0, 0) !== + yesterdayTime + : true; + + if (isStale && insight.state === 'active') { + await db.projectInsight.update({ + where: { id: insight.id }, + data: { state: 'suppressed', lastUpdatedAt: now }, + }); + suppressed++; + } + } + + // Filter to only non-stale insights for top-N logic + const freshInsights = insights.filter((insight) => { + if (!insight.windowEnd) return false; + const windowEndTime = new Date(insight.windowEnd).setUTCHours( + 0, + 0, + 0, + 0, + ); + return windowEndTime === yesterdayTime; + }); + + const topN = freshInsights.slice(0, keepTopN); + const belowN = freshInsights.slice(keepTopN); + + for (const insight of belowN) { + if (insight.state === 'active') { + await db.projectInsight.update({ + where: { id: insight.id }, + data: { state: 'suppressed', lastUpdatedAt: now }, + }); + suppressed++; + } + } + + for (const insight of topN) { + if (insight.state === 'suppressed') { + await db.projectInsight.update({ + where: { id: insight.id }, + data: { state: 'active', lastUpdatedAt: now }, + }); + unsuppressed++; + } + } + + return { suppressed, unsuppressed }; + } + + // For non-yesterday windows, apply standard top-N suppression + const topN = insights.slice(0, keepTopN); + const belowN = insights.slice(keepTopN); + + // Suppress those below top N + for (const insight of belowN) { + if (insight.state === 'active') { + await db.projectInsight.update({ + where: { id: insight.id }, + data: { state: 'suppressed', lastUpdatedAt: now }, + }); + suppressed++; + } + } + + // Unsuppress those in top N + for (const insight of topN) { + if (insight.state === 'suppressed') { + await db.projectInsight.update({ + where: { id: insight.id }, + data: { state: 'active', lastUpdatedAt: now }, + }); + unsuppressed++; + } + } + + return { suppressed, unsuppressed }; + }, +}; diff --git a/packages/db/src/services/insights/types.ts b/packages/db/src/services/insights/types.ts new file mode 100644 index 00000000..91728fc7 --- /dev/null +++ b/packages/db/src/services/insights/types.ts @@ -0,0 +1,191 @@ +import type { + InsightDimension, + InsightMetricEntry, + InsightMetricKey, + InsightPayload, +} from '@openpanel/validation'; + +export type Cadence = 'daily'; + +export type WindowKind = 'yesterday' | 'rolling_7d' | 'rolling_30d'; + +export interface WindowRange { + kind: WindowKind; + start: Date; // inclusive + end: Date; // inclusive (or exclusive, but be consistent) + baselineStart: Date; + baselineEnd: Date; + label: string; // e.g. "Yesterday" / "Last 7 days" +} + +export interface ComputeContext { + projectId: string; + window: WindowRange; + db: any; // your DB client + now: Date; + logger: Pick; + /** + * Cached clix function that automatically caches query results based on query hash. + * This eliminates duplicate queries within the same module+window context. + * Use this instead of importing clix directly to benefit from automatic caching. + */ + clix: ReturnType; +} + +export interface ComputeResult { + ok: boolean; + dimensionKey: string; // e.g. "referrer:instagram" / "page:/pricing" + currentValue?: number; + compareValue?: number; + changePct?: number; // -0.15 = -15% + direction?: 'up' | 'down' | 'flat'; + extra?: Record; // share delta pp, rank, sparkline, etc. +} + +// Types imported from @openpanel/validation: +// - InsightMetricKey +// - InsightMetricEntry +// - InsightDimension +// - InsightPayload + +/** + * Render should be deterministic and safe to call multiple times. + * Returns the shape that matches ProjectInsight create input. + * The payload contains all metric data and display metadata. + */ +export interface RenderedCard { + title: string; + summary?: string; + displayName: string; + payload: InsightPayload; // Contains dimensions, primaryMetric, metrics, extra +} + +/** Optional per-module thresholds (the engine can still apply global defaults) */ +export interface ModuleThresholds { + minTotal?: number; // min current+baseline + minAbsDelta?: number; // min abs(current-compare) + minPct?: number; // min abs(changePct) + maxDims?: number; // cap enumerateDimensions +} + +export interface InsightModule { + key: string; + cadence: Cadence[]; + /** Optional per-module override; engine applies a default if omitted. */ + windows?: WindowKind[]; + thresholds?: ModuleThresholds; + enumerateDimensions?(ctx: ComputeContext): Promise; + /** Preferred path: batch compute many dimensions in one go. */ + computeMany( + ctx: ComputeContext, + dimensionKeys: string[], + ): Promise; + /** Must not do DB reads; just format output. */ + render(result: ComputeResult, ctx: ComputeContext): RenderedCard; + /** Score decides what to show (top-N). */ + score?(result: ComputeResult, ctx: ComputeContext): number; + /** Optional: compute "drivers" for AI explain step */ + drivers?( + result: ComputeResult, + ctx: ComputeContext, + ): Promise>; +} + +/** Insight row shape returned from persistence (minimal fields engine needs). */ +export interface PersistedInsight { + id: string; + projectId: string; + moduleKey: string; + dimensionKey: string; + windowKind: WindowKind; + state: 'active' | 'suppressed' | 'closed'; + version: number; + impactScore: number; + lastSeenAt: Date; + lastUpdatedAt: Date; + direction?: string | null; + severityBand?: string | null; +} + +/** Material change decision used for events/notifications. */ +export type MaterialReason = + | 'created' + | 'direction_flip' + | 'severity_change' + | 'cross_deadband' + | 'reopened' + | 'none'; + +export interface MaterialDecision { + material: boolean; + reason: MaterialReason; + newSeverityBand?: 'low' | 'moderate' | 'severe' | null; +} + +/** + * Persistence interface: implement with Postgres. + * Keep engine independent of query builder choice. + */ +export interface InsightStore { + listProjectIdsForCadence(cadence: Cadence): Promise; + /** Used by the engine/worker to decide if a window has enough baseline history. */ + getProjectCreatedAt(projectId: string): Promise; + getActiveInsightByIdentity(args: { + projectId: string; + moduleKey: string; + dimensionKey: string; + windowKind: WindowKind; + }): Promise; + upsertInsight(args: { + projectId: string; + moduleKey: string; + dimensionKey: string; + window: WindowRange; + card: RenderedCard; + metrics: { + direction?: 'up' | 'down' | 'flat'; + impactScore: number; + severityBand?: string | null; + }; + now: Date; + decision: MaterialDecision; + prev: PersistedInsight | null; + }): Promise; + insertEvent(args: { + projectId: string; + insightId: string; + moduleKey: string; + dimensionKey: string; + windowKind: WindowKind; + eventKind: + | 'created' + | 'updated' + | 'severity_up' + | 'severity_down' + | 'direction_flip' + | 'closed' + | 'reopened' + | 'suppressed' + | 'unsuppressed'; + changeFrom?: Record | null; + changeTo?: Record | null; + now: Date; + }): Promise; + /** Mark insights as not seen this run if you prefer lifecycle via closeMissing() */ + closeMissingActiveInsights(args: { + projectId: string; + moduleKey: string; + windowKind: WindowKind; + seenDimensionKeys: string[]; + now: Date; + staleDays: number; // close if not seen for X days + }): Promise; // count closed + /** Enforce top-N display by suppressing below-threshold insights. */ + applySuppression(args: { + projectId: string; + moduleKey: string; + windowKind: WindowKind; + keepTopN: number; + now: Date; + }): Promise<{ suppressed: number; unsuppressed: number }>; +} diff --git a/packages/db/src/services/insights/utils.ts b/packages/db/src/services/insights/utils.ts new file mode 100644 index 00000000..3dc5eb57 --- /dev/null +++ b/packages/db/src/services/insights/utils.ts @@ -0,0 +1,151 @@ +/** + * Shared utilities for insight modules + */ + +/** + * Get UTC weekday (0 = Sunday, 6 = Saturday) + */ +export function getWeekday(date: Date): number { + return date.getUTCDay(); +} + +/** + * Compute median of a sorted array of numbers + */ +export function computeMedian(sortedValues: number[]): number { + if (sortedValues.length === 0) return 0; + const mid = Math.floor(sortedValues.length / 2); + return sortedValues.length % 2 === 0 + ? ((sortedValues[mid - 1] ?? 0) + (sortedValues[mid] ?? 0)) / 2 + : (sortedValues[mid] ?? 0); +} + +/** + * Compute weekday medians from daily breakdown data. + * Groups by dimension, filters to matching weekday, computes median per dimension. + * + * @param data - Array of { date, dimension, cnt } rows + * @param targetWeekday - Weekday to filter to (0-6) + * @param getDimension - Function to extract normalized dimension from row + * @returns Map of dimension -> median value + */ +export function computeWeekdayMedians( + data: T[], + targetWeekday: number, + getDimension: (row: T) => string, +): Map { + // Group by dimension, filtered to target weekday + const byDimension = new Map(); + + for (const row of data) { + const rowWeekday = getWeekday(new Date((row as any).date)); + if (rowWeekday !== targetWeekday) continue; + + const dim = getDimension(row); + const values = byDimension.get(dim) ?? []; + values.push(Number((row as any).cnt ?? 0)); + byDimension.set(dim, values); + } + + // Compute median per dimension + const result = new Map(); + for (const [dim, values] of byDimension) { + values.sort((a, b) => a - b); + result.set(dim, computeMedian(values)); + } + + return result; +} + +/** + * Compute change percentage between current and compare values + */ +export function computeChangePct( + currentValue: number, + compareValue: number, +): number { + return compareValue > 0 + ? (currentValue - compareValue) / compareValue + : currentValue > 0 + ? 1 + : 0; +} + +/** + * Determine direction based on change percentage + */ +export function computeDirection( + changePct: number, + threshold = 0.05, +): 'up' | 'down' | 'flat' { + return changePct > threshold + ? 'up' + : changePct < -threshold + ? 'down' + : 'flat'; +} + +/** + * Get end of day timestamp (23:59:59.999) for a given date. + * Used to ensure BETWEEN queries include the full day. + */ +export function getEndOfDay(date: Date): Date { + const end = new Date(date); + end.setUTCHours(23, 59, 59, 999); + return end; +} + +/** + * Build a lookup map from query results. + * Aggregates counts by key, handling duplicate keys by summing values. + * + * @param results - Array of result rows + * @param getKey - Function to extract the key from each row + * @param getCount - Function to extract the count from each row (defaults to 'cnt' field) + * @returns Map of key -> aggregated count + */ +export function buildLookupMap( + results: T[], + getKey: (row: T) => string, + getCount: (row: T) => number = (row) => Number((row as any).cnt ?? 0), +): Map { + const map = new Map(); + for (const row of results) { + const key = getKey(row); + const cnt = getCount(row); + map.set(key, (map.get(key) ?? 0) + cnt); + } + return map; +} + +/** + * Select top-N dimensions by ranking on greatest(current, baseline). + * This preserves union behavior: dimensions with high values in either period are included. + * + * @param currentMap - Map of dimension -> current value + * @param baselineMap - Map of dimension -> baseline value + * @param maxDims - Maximum number of dimensions to return + * @returns Array of dimension keys, ranked by greatest(current, baseline) + */ +export function selectTopDimensions( + currentMap: Map, + baselineMap: Map, + maxDims: number, +): string[] { + // Merge all dimensions from both maps + const allDims = new Set(); + for (const dim of currentMap.keys()) allDims.add(dim); + for (const dim of baselineMap.keys()) allDims.add(dim); + + // Rank by greatest(current, baseline) + const ranked = Array.from(allDims) + .map((dim) => ({ + dim, + maxValue: Math.max(currentMap.get(dim) ?? 0, baselineMap.get(dim) ?? 0), + })) + .sort((a, b) => b.maxValue - a.maxValue) + .slice(0, maxDims) + .map((x) => x.dim); + + return ranked; +} diff --git a/packages/db/src/services/insights/windows.ts b/packages/db/src/services/insights/windows.ts new file mode 100644 index 00000000..d2d8870b --- /dev/null +++ b/packages/db/src/services/insights/windows.ts @@ -0,0 +1,59 @@ +import type { WindowKind, WindowRange } from './types'; + +function atUtcMidnight(d: Date) { + const x = new Date(d); + x.setUTCHours(0, 0, 0, 0); + return x; +} + +function addDays(d: Date, days: number) { + const x = new Date(d); + x.setUTCDate(x.getUTCDate() + days); + return x; +} + +/** + * Convention: end is inclusive (end of day). If you prefer exclusive, adapt consistently. + */ +export function resolveWindow(kind: WindowKind, now: Date): WindowRange { + const today0 = atUtcMidnight(now); + const yesterday0 = addDays(today0, -1); + if (kind === 'yesterday') { + const start = yesterday0; + const end = yesterday0; + // Baseline: median of last 4 same weekdays -> engine/module implements the median. + // Here we just define the candidate range; module queries last 28 days and filters weekday. + const baselineStart = addDays(yesterday0, -28); + const baselineEnd = addDays(yesterday0, -1); + return { kind, start, end, baselineStart, baselineEnd, label: 'Yesterday' }; + } + if (kind === 'rolling_7d') { + const end = yesterday0; + const start = addDays(end, -6); // 7 days inclusive + const baselineEnd = addDays(start, -1); + const baselineStart = addDays(baselineEnd, -6); + return { + kind, + start, + end, + baselineStart, + baselineEnd, + label: 'Last 7 days', + }; + } + // rolling_30d + { + const end = yesterday0; + const start = addDays(end, -29); + const baselineEnd = addDays(start, -1); + const baselineStart = addDays(baselineEnd, -29); + return { + kind, + start, + end, + baselineStart, + baselineEnd, + label: 'Last 30 days', + }; + } +} diff --git a/packages/db/src/session-consistency.ts b/packages/db/src/session-consistency.ts index b75129c6..fdab0db0 100644 --- a/packages/db/src/session-consistency.ts +++ b/packages/db/src/session-consistency.ts @@ -180,11 +180,11 @@ export function sessionConsistency() { // For write operations with session: cache WAL LSN after write if (isWriteOperation(operation)) { - logger.info('Prisma operation', { - operation, - args, - model, - }); + // logger.info('Prisma operation', { + // operation, + // args, + // model, + // }); const result = await query(args); diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index 7e525f25..ee1a915d 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -3,6 +3,7 @@ import type { IIntegrationConfig, INotificationRuleConfig, IProjectFilters, + InsightPayload, } from '@openpanel/validation'; import type { IClickhouseBotEvent, @@ -18,6 +19,7 @@ declare global { type IPrismaIntegrationConfig = IIntegrationConfig; type IPrismaNotificationPayload = INotificationPayload; type IPrismaProjectFilters = IProjectFilters[]; + type IPrismaProjectInsightPayload = InsightPayload; type IPrismaClickhouseEvent = IClickhouseEvent; type IPrismaClickhouseProfile = IClickhouseProfile; type IPrismaClickhouseBotEvent = IClickhouseBotEvent; diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index d21bf4e7..11526826 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -111,13 +111,18 @@ export type CronQueuePayloadProject = { type: 'deleteProjects'; payload: undefined; }; +export type CronQueuePayloadInsightsDaily = { + type: 'insightsDaily'; + payload: undefined; +}; export type CronQueuePayload = | CronQueuePayloadSalt | CronQueuePayloadFlushEvents | CronQueuePayloadFlushSessions | CronQueuePayloadFlushProfiles | CronQueuePayloadPing - | CronQueuePayloadProject; + | CronQueuePayloadProject + | CronQueuePayloadInsightsDaily; export type MiscQueuePayloadTrialEndingSoon = { type: 'trialEndingSoon'; @@ -235,6 +240,21 @@ export const importQueue = new Queue( }, ); +export type InsightsQueuePayloadProject = { + type: 'insightsProject'; + payload: { projectId: string; date: string }; +}; + +export const insightsQueue = new Queue( + getQueueName('insights'), + { + connection: getRedisQueue(), + defaultJobOptions: { + removeOnComplete: 100, + }, + }, +); + export function addTrialEndingSoonJob(organizationId: string, delay: number) { return miscQueue.add( 'misc', diff --git a/packages/trpc/src/root.ts b/packages/trpc/src/root.ts index d852c9c1..517db51b 100644 --- a/packages/trpc/src/root.ts +++ b/packages/trpc/src/root.ts @@ -5,6 +5,7 @@ import { clientRouter } from './routers/client'; import { dashboardRouter } from './routers/dashboard'; import { eventRouter } from './routers/event'; import { importRouter } from './routers/import'; +import { insightRouter } from './routers/insight'; import { integrationRouter } from './routers/integration'; import { notificationRouter } from './routers/notification'; import { onboardingRouter } from './routers/onboarding'; @@ -47,6 +48,7 @@ export const appRouter = createTRPCRouter({ overview: overviewRouter, realtime: realtimeRouter, chat: chatRouter, + insight: insightRouter, }); // export type definition of API diff --git a/packages/trpc/src/routers/insight.ts b/packages/trpc/src/routers/insight.ts new file mode 100644 index 00000000..51d2c5a9 --- /dev/null +++ b/packages/trpc/src/routers/insight.ts @@ -0,0 +1,102 @@ +import { db } from '@openpanel/db'; +import { z } from 'zod'; +import { getProjectAccess } from '../access'; +import { TRPCAccessError } from '../errors'; +import { createTRPCRouter, protectedProcedure } from '../trpc'; + +export const insightRouter = createTRPCRouter({ + list: protectedProcedure + .input( + z.object({ + projectId: z.string(), + limit: z.number().min(1).max(100).optional().default(50), + }), + ) + .query(async ({ input: { projectId, limit }, ctx }) => { + const access = await getProjectAccess({ + userId: ctx.session.userId, + projectId, + }); + + if (!access) { + throw TRPCAccessError('You do not have access to this project'); + } + + // Fetch more insights than needed to account for deduplication + const allInsights = await db.projectInsight.findMany({ + where: { + projectId, + state: 'active', + moduleKey: { + notIn: ['page-trends', 'entry-pages'], + }, + }, + orderBy: { + impactScore: 'desc', + }, + take: limit * 3, // Fetch 3x to account for deduplication + }); + + // WindowKind priority: yesterday (1) > rolling_7d (2) > rolling_30d (3) + const windowKindPriority: Record = { + yesterday: 1, + rolling_7d: 2, + rolling_30d: 3, + }; + + // Group by moduleKey + dimensionKey, keep only highest priority windowKind + const deduplicated = new Map(); + for (const insight of allInsights) { + const key = `${insight.moduleKey}:${insight.dimensionKey}`; + const existing = deduplicated.get(key); + const currentPriority = windowKindPriority[insight.windowKind] ?? 999; + const existingPriority = existing + ? (windowKindPriority[existing.windowKind] ?? 999) + : 999; + + // Keep if no existing, or if current has higher priority (lower number) + if (!existing || currentPriority < existingPriority) { + deduplicated.set(key, insight); + } + } + + // Convert back to array, sort by impactScore, and limit + const insights = Array.from(deduplicated.values()) + .sort((a, b) => (b.impactScore ?? 0) - (a.impactScore ?? 0)) + .slice(0, limit) + .map(({ impactScore, ...rest }) => rest); // Remove impactScore from response + + return insights; + }), + + listAll: protectedProcedure + .input( + z.object({ + projectId: z.string(), + limit: z.number().min(1).max(500).optional().default(200), + }), + ) + .query(async ({ input: { projectId, limit }, ctx }) => { + const access = await getProjectAccess({ + userId: ctx.session.userId, + projectId, + }); + + if (!access) { + throw TRPCAccessError('You do not have access to this project'); + } + + const insights = await db.projectInsight.findMany({ + where: { + projectId, + state: 'active', + }, + orderBy: { + impactScore: 'desc', + }, + take: limit, + }); + + return insights; + }), +}); diff --git a/packages/validation/index.ts b/packages/validation/index.ts index bc09a9e7..273d53f8 100644 --- a/packages/validation/index.ts +++ b/packages/validation/index.ts @@ -1,2 +1,3 @@ export * from './src/index'; export * from './src/types.validation'; +export * from './src/types.insights'; diff --git a/packages/validation/src/index.ts b/packages/validation/src/index.ts index bc8777ee..62162d71 100644 --- a/packages/validation/src/index.ts +++ b/packages/validation/src/index.ts @@ -553,3 +553,5 @@ export const zCreateImport = z.object({ }); export type ICreateImport = z.infer; + +export * from './types.insights'; diff --git a/packages/validation/src/types.insights.ts b/packages/validation/src/types.insights.ts new file mode 100644 index 00000000..7d2e13cf --- /dev/null +++ b/packages/validation/src/types.insights.ts @@ -0,0 +1,43 @@ +export type InsightMetricKey = 'sessions' | 'pageviews' | 'share'; + +export type InsightMetricUnit = 'count' | 'ratio'; + +export interface InsightMetricEntry { + current: number; + compare: number; + delta: number; + changePct: number | null; + direction: 'up' | 'down' | 'flat'; + unit: InsightMetricUnit; +} + +export interface InsightDimension { + key: string; + value: string; + displayName?: string; +} + +export interface InsightExtra { + [key: string]: unknown; + currentShare?: number; + compareShare?: number; + shareShiftPp?: number; + isNew?: boolean; + isGone?: boolean; +} + +/** + * Shared payload shape for insights cards. This is embedded in DB rows and + * shipped to the frontend, so it must remain backwards compatible. + */ +export interface InsightPayload { + kind?: 'insight_v1'; + dimensions: InsightDimension[]; + primaryMetric: InsightMetricKey; + metrics: Partial>; + + /** + * Module-specific extra data. + */ + extra?: Record; +} diff --git a/sh/docker-build b/sh/docker-build index 821bc121..764efb00 100755 --- a/sh/docker-build +++ b/sh/docker-build @@ -38,7 +38,7 @@ docker buildx create --name multi-arch-builder --use || true build_image() { local app=$1 local image_name="lindesvard/openpanel-$app" - local full_version="$image_name:$VERSION-$PRERELEASE" + local full_version="$image_name:$VERSION" # Use apps/start/Dockerfile for dashboard app local dockerfile="apps/$app/Dockerfile" @@ -47,10 +47,10 @@ build_image() { fi if [ -n "$PRERELEASE" ]; then - echo "(pre-release) Building multi-architecture image for $full_version" + echo "(pre-release) Building multi-architecture image for $full_version-$PRERELEASE" docker buildx build \ --platform linux/amd64,linux/arm64 \ - -t "$full_version" \ + -t "$full_version-$PRERELEASE" \ --build-arg DATABASE_URL="postgresql://p@p:5432/p" \ -f "$dockerfile" \ --push \