Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- Drop 4 unused activityRelations indexes (already dropped on prod 2026-03-27,
-- see ACTIVITYRELATIONS_INDEX_CLEANUP.md — IF EXISTS guards for idempotency)
alter table "activityRelations" drop constraint if exists "activityRelations_activityId_memberId_key";

drop index concurrently if exists "ix_activityRelations_memberId_segmentId_include";
drop index concurrently if exists "ix_activityRelations_organizationId_segmentId_include";
drop index concurrently if exists "ix_activityRelations_platform_username";

create index concurrently if not exists idx_osa_org_segment_membercount
on "organizationSegmentsAgg" ("organizationId", "segmentId")
include ("memberCount");
67 changes: 40 additions & 27 deletions services/apps/data_sink_worker/src/service/activity.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import {
} from '@crowd/data-access-layer'
import { IDbActivityRelation } from '@crowd/data-access-layer/src/activityRelations/types'
import { DbStore, arePrimitivesDbEqual } from '@crowd/data-access-layer/src/database'
import { getMemberNoMerge } from '@crowd/data-access-layer/src/member_merge'
import {
IActivityRelationCreateOrUpdateData,
IDbActivity,
Expand Down Expand Up @@ -56,7 +55,7 @@ import {
} from '@crowd/types'

import { IActivityUpdateData, ISentimentActivityInput } from './activity.data'
import MemberService from './member.service'
import MemberService, { mergeIfAllowed } from './member.service'
import { IProcessActivityResult } from './types'

/* eslint-disable @typescript-eslint/no-explicit-any */
Expand Down Expand Up @@ -293,6 +292,28 @@ export default class ActivityService extends LoggerBase {
}
}

// When activity.username is set but differs from the member's platform identity value,
// override it so the member lookup and the identity insert use the same key.
// Example: git activities set activity.username to the author display name (e.g. "John Doe")
// while the identity stores the email (e.g. "john.doe@example.com"). Without this correction
// the lookup misses the existing member, creating an unnecessary orphan member.
if (username && member) {
const platformIdentity = member.identities.find(
(i) =>
i.platform === platform &&
i.type === MemberIdentityType.USERNAME &&
i.value &&
i.verified,
)
if (platformIdentity && platformIdentity.value !== username) {
this.log.debug(
{ platform, originalUsername: username, correctedUsername: platformIdentity.value },
'Overriding activity.username with member platform identity value',
)
activity.username = platformIdentity.value
}
}

member.identities = member.identities.filter((i) => i.value)

if (!username) {
Expand Down Expand Up @@ -1721,32 +1742,24 @@ export default class ActivityService extends LoggerBase {
const originalId = metadata.memberWithIdentity as string
const targetId = metadata.memberIdToUpdate as string

// but first check memberNoMerge table
const noMergeMemberIds = await getMemberNoMerge(this.pgQx, [originalId, targetId])

const noMerge = singleOrDefault(
noMergeMemberIds,
(m) =>
(m.memberId === originalId && m.noMergeId === targetId) ||
(m.memberId === targetId && m.noMergeId === originalId),
)

if (noMerge) {
metadata.noMerge = true
} else {
try {
await this.pgQx.tx(async (txPgQx) => {
const service = new CommonMemberService(txPgQx, this.temporal, this.log)
await service.merge(originalId, targetId)
})

try {
const merged = await mergeIfAllowed(
this.pgQx,
this.temporal,
this.log,
originalId,
targetId,
)
if (merged) {
return originalId
} catch (err) {
metadata.mergeError = {
errorMessage: err?.message ?? '<no error message>',
errorStack: err?.stack,
err,
}
} else {
metadata.noMerge = true
}
} catch (err) {
metadata.mergeError = {
errorMessage: err?.message ?? '<no error message>',
errorStack: err?.stack,
err,
}
}
}
Expand Down
230 changes: 221 additions & 9 deletions services/apps/data_sink_worker/src/service/member.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@ import {
isObjectEmpty,
singleOrDefault,
} from '@crowd/common'
import { BotDetectionService } from '@crowd/common_services'
import { BotDetectionService, CommonMemberService } from '@crowd/common_services'
import { QueryExecutor, createMember, dbStoreQx, updateMember } from '@crowd/data-access-layer'
import { findIdentitiesForMembers, findMembersByVerifiedUsernames } from '@crowd/data-access-layer'
import {
findIdentitiesForMembers,
findMemberIdByVerifiedIdentity,
findMembersByVerifiedUsernames,
moveToNewMember,
} from '@crowd/data-access-layer'
import { DbStore } from '@crowd/data-access-layer/src/database'
import { getMemberNoMerge } from '@crowd/data-access-layer/src/member_merge'
import IntegrationRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/integration.repo'
import {
IDbMember,
Expand Down Expand Up @@ -60,6 +66,36 @@ function orgCacheKey(org: IOrganization): string | null {
return null
}

/**
* Checks the memberNoMerge table and, if allowed, merges secondaryId into primaryId
* using CommonMemberService. Returns true if the merge was performed, false if a noMerge
* record prevents it. Throws if the merge itself fails.
*/
export async function mergeIfAllowed(
pgQx: QueryExecutor,
temporal: TemporalClient,
log: Logger,
primaryId: string,
secondaryId: string,
): Promise<boolean> {
const noMergeMemberIds = await getMemberNoMerge(pgQx, [primaryId, secondaryId])
const noMerge = singleOrDefault(
noMergeMemberIds,
(m) =>
(m.memberId === primaryId && m.noMergeId === secondaryId) ||
(m.memberId === secondaryId && m.noMergeId === primaryId),
)
if (noMerge) {
log.warn({ primaryId, secondaryId }, 'Members are marked as no-merge — skipping merge')
return false
}
await pgQx.tx(async (txPgQx) => {
const service = new CommonMemberService(txPgQx, temporal, log)
await service.merge(primaryId, secondaryId)
})
return true
}

export default class MemberService extends LoggerBase {
private readonly memberRepo: MemberRepository
private readonly pgQx: QueryExecutor
Expand Down Expand Up @@ -91,8 +127,18 @@ export default class MemberService extends LoggerBase {
try {
this.log.debug('Creating a new member!')

// prevent empty identity handles
data.identities = data.identities.filter((i) => i.value)
// filter empty handles and deduplicate by (platform, value, type, verified)
data.identities = data.identities.filter(
(identity, idx) =>
!!identity.value &&
data.identities.findIndex(
(j) =>
j.platform === identity.platform &&
j.value === identity.value &&
j.type === identity.type &&
j.verified === identity.verified,
) === idx,
)

if (data.identities.length === 0) {
throw new Error('Member must have at least one identity!')
Expand Down Expand Up @@ -154,20 +200,174 @@ export default class MemberService extends LoggerBase {
'memberService -> create -> createMember',
)

let insertedCount: number
try {
await logExecutionTimeV2(
insertedCount = await logExecutionTimeV2(
() => this.memberRepo.insertIdentities(id, integrationId, data.identities),
this.log,
'memberService -> create -> insertIdentities',
)
} catch (err) {
this.log.error(err, { memberId: id }, 'Error while inserting identities!')
this.log.error(err, { memberId: id }, 'Error inserting member identities!')
await logExecutionTimeV2(
async () => this.memberRepo.destroyMemberAfterError(id, false),
async () => this.memberRepo.destroyMemberAfterError(id, true),
this.log,
'memberService -> create -> destroyMemberAfterError',
)
throw new ApplicationError('Error while inserting identities for a new member!', err)
throw err
}

if (insertedCount < data.identities.length) {
// At least one verified identity conflicted. Walk every verified identity to:
// (a) find the existing member(s) that own the conflicting ones, and
// (b) collect identities that were successfully inserted for the orphan.
let existingMemberId: string | null = null
const orphanVerifiedIdentities: IMemberIdentity[] = []

for (const identity of data.identities.filter((i) => i.verified)) {
const owner = await findMemberIdByVerifiedIdentity(
this.pgQx,
identity.platform,
identity.value,
identity.type,
)

if (!owner) {
// The identity disappeared between INSERT and SELECT — unusual race condition.
// Cannot safely resolve; schedule orphan deletion and throw.
this.log.error(
{ orphanMemberId: id, identity },
'Verified identity not found after conflict detection — scheduling orphan deletion',
)
await this.scheduleOrphanMemberDeletion(id)
throw new ApplicationError(
`Identity conflict during member creation: owner not found for identity (${identity.platform}, ${identity.value}, ${identity.type})`,
)
} else if (owner === id) {
// Successfully inserted for the orphan — will be moved to the existing member below
orphanVerifiedIdentities.push(identity)
} else if (!existingMemberId) {
// First conflicting owner found
existingMemberId = owner
} else if (existingMemberId !== owner) {
// A second conflicting owner — two existing members each own a different verified
// identity of this incoming member, so the data source asserts they are the same
// person. Auto-merge the second into the first.
this.log.warn(
{
orphanMemberId: id,
primaryMemberId: existingMemberId,
secondaryMemberId: owner,
identity,
},
'Multiple conflicting verified identities belong to different existing members — merging automatically',
)
let merged: boolean
try {
merged = await mergeIfAllowed(
this.pgQx,
this.temporal,
this.log,
existingMemberId,
owner,
)
} catch (mergeErr) {
this.log.error(
mergeErr,
{
orphanMemberId: id,
primaryMemberId: existingMemberId,
secondaryMemberId: owner,
},
'Auto-merge of conflicting members failed — scheduling orphan deletion',
)
await this.scheduleOrphanMemberDeletion(id)
throw new ApplicationError(
`Identity conflict during member creation: auto-merge of members ${existingMemberId} and ${owner} failed for identity (${identity.platform}, ${identity.value}, ${identity.type})`,
)
}
if (!merged) {
this.log.error(
{
orphanMemberId: id,
primaryMemberId: existingMemberId,
secondaryMemberId: owner,
},
'Auto-merge prevented by noMerge record — scheduling orphan deletion',
)
await this.scheduleOrphanMemberDeletion(id)
throw new ApplicationError(
`Identity conflict during member creation: members ${existingMemberId} and ${owner} are marked as no-merge but share identity (${identity.platform}, ${identity.value}, ${identity.type})`,
)
}
// existingMemberId (primary) survives; owner (secondary) was absorbed
this.log.info(
{
orphanMemberId: id,
survivingMemberId: existingMemberId,
mergedMemberId: owner,
identity,
},
'Auto-merge of conflicting members succeeded',
)
}
// else: owner === existingMemberId — same member owns this identity too, nothing to do
}

if (existingMemberId) {
// Move any verified identities that were inserted for the orphan to the existing
// member so they are not lost when the orphan is cascade-deleted.
// UPDATE memberId rather than INSERT to avoid unique constraint violations.
for (const identity of orphanVerifiedIdentities) {
try {
await moveToNewMember(this.pgQx, {
oldMemberId: id,
newMemberId: existingMemberId,
platform: identity.platform,
value: identity.value,
type: identity.type,
})
} catch (moveErr) {
this.log.error(
moveErr,
{ orphanMemberId: id, existingMemberId, identity },
'Failed to move orphan verified identity to existing member — scheduling orphan deletion',
)
await this.scheduleOrphanMemberDeletion(id)
throw new ApplicationError(
`Failed to move identity (${identity.platform}, ${identity.value}, ${identity.type}) from orphan ${id} to existing member ${existingMemberId}`,
)
}
}
this.log.warn(
{
orphanMemberId: id,
existingMemberId,
transferredIdentities: orphanVerifiedIdentities.length,
},
'Identity conflict during member creation — reusing existing member, scheduling orphan deletion',
)
await logExecutionTimeV2(
() => this.memberRepo.addToSegments(existingMemberId, segmentIds),
this.log,
'memberService -> create -> addToSegments (conflict path)',
)
if (releaseMemberLock) {
await releaseMemberLock()
}
await this.scheduleOrphanMemberDeletion(id)
return existingMemberId
}

// insertedCount < data.identities.length but no conflicting owner found — unexpected
this.log.error(
{ memberId: id },
'Identity conflict during member creation but existing member not found — scheduling orphan deletion',
)
await this.scheduleOrphanMemberDeletion(id)
throw new ApplicationError(
`Identity conflict during member creation for member ${id}: inserted ${insertedCount} of ${data.identities.length} identities but found no conflicting owner`,
)
}

try {
Expand Down Expand Up @@ -396,7 +596,7 @@ export default class MemberService extends LoggerBase {
this.log.trace({ memberId: id }, 'Inserting new identities!')
try {
await logExecutionTimeV2(
() => this.memberRepo.insertIdentities(id, integrationId, identitiesToCreate),
() => this.memberRepo.insertIdentities(id, integrationId, identitiesToCreate, true),
this.log,
'memberService -> update -> insertIdentities',
)
Expand Down Expand Up @@ -822,6 +1022,18 @@ export default class MemberService extends LoggerBase {
return out
}

private async scheduleOrphanMemberDeletion(memberId: string): Promise<void> {
try {
await this.temporal.workflow.start('deleteOrphanMember', {
taskQueue: 'entity-merging',
workflowId: `${TemporalWorkflowId.DELETE_ORPHAN_MEMBER}/${memberId}`,
args: [memberId],
})
} catch (err) {
this.log.error(err, { memberId }, 'Failed to schedule orphan member deletion!')
}
}

private async startMemberBotAnalysisWithLLMWorkflow(memberId: string): Promise<void> {
await this.temporal.workflow.start('processMemberBotAnalysisWithLLM', {
taskQueue: 'profiles',
Expand Down
1 change: 1 addition & 0 deletions services/apps/entity_merging_worker/src/workflows.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export {
deleteOrphanMember,
finishMemberMerging,
finishOrganizationMerging,
finishMemberUnmerging,
Expand Down
Loading
Loading