Feature Description
Implement Operational Transformation (OT) or Conflict-free Replicated Data Types (CRDTs) to handle concurrent editing conflicts gracefully and ensure eventual consistency across all connected clients.
Problem Statement
Current concurrency issues:
- Race conditions when multiple users edit simultaneously
- Last-write-wins approach can lose data
- No conflict detection or resolution
- Undo/redo breaks when others are editing
- Strokes can appear in wrong order
- No guarantee of consistency across clients
Operational Transformation Basics
OT ensures that concurrent operations converge to the same state by transforming operations against each other.
Example Conflict:
- User A: Adds stroke at position 5
- User B: Deletes stroke at position 3 (simultaneously)
Without OT, User A's stroke might reference a position that no longer exists after User B's deletion.
1. OT Core Implementation
// frontend/src/services/OperationalTransformation.js
export class OperationTransformer {
/**
* Transform operation A against operation B
* Returns transformed version of A that accounts for B's changes
*/
transform(opA, opB) {
const type = `${opA.type}_${opB.type}`;
switch (type) {
case 'insert_insert':
return this.transformInsertInsert(opA, opB);
case 'insert_delete':
return this.transformInsertDelete(opA, opB);
case 'delete_insert':
return this.transformDeleteInsert(opA, opB);
case 'delete_delete':
return this.transformDeleteDelete(opA, opB);
default:
return opA;
}
}
transformInsertInsert(opA, opB) {
// If B inserts before A's position, shift A's position
if (opB.position <= opA.position) {
return { ...opA, position: opA.position + 1 };
}
return opA;
}
transformInsertDelete(opA, opB) {
// If B deletes before A's position, shift A's position down
if (opB.position < opA.position) {
return { ...opA, position: opA.position - 1 };
}
return opA;
}
transformDeleteInsert(opA, opB) {
// If B inserts before A's delete position, shift A up
if (opB.position <= opA.position) {
return { ...opA, position: opA.position + 1 };
}
return opA;
}
transformDeleteDelete(opA, opB) {
// Both trying to delete same position - B wins
if (opA.position === opB.position) {
return null; // Cancel A's operation
}
// B deleted before A's position
if (opB.position < opA.position) {
return { ...opA, position: opA.position - 1 };
}
return opA;
}
}
2. Vector Clock for Causality
// frontend/src/services/VectorClock.js
export class VectorClock {
constructor(clientId) {
this.clientId = clientId;
this.clock = { [clientId]: 0 };
}
increment() {
this.clock[this.clientId]++;
return this.getCopy();
}
update(otherClock) {
for (const [clientId, timestamp] of Object.entries(otherClock)) {
this.clock[clientId] = Math.max(
this.clock[clientId] || 0,
timestamp
);
}
}
getCopy() {
return { ...this.clock };
}
happensBefore(otherClock) {
let strictlyLess = false;
for (const clientId in this.clock) {
if (this.clock[clientId] > (otherClock[clientId] || 0)) {
return false;
}
if (this.clock[clientId] < (otherClock[clientId] || 0)) {
strictlyLess = true;
}
}
return strictlyLess;
}
concurrent(otherClock) {
}
happensAfter(otherClock) {
return new VectorClock(this.clientId).setClock(otherClock).happensBefore(this.clock);
}
setClock(clock) {
this.clock = { ...clock };
return this;
}
}
3. Operation History Buffer
// frontend/src/services/OperationHistory.js
export class OperationHistory {
constructor(clientId) {
this.clientId = clientId;
this.history = [];
this.vectorClock = new VectorClock(clientId);
this.transformer = new OperationTransformer();
}
addLocalOperation(operation) {
const op = {
...operation,
clientId: this.clientId,
vectorClock: this.vectorClock.increment(),
id: generateOperationId()
};
this.history.push(op);
return op;
}
addRemoteOperation(operation) {
// Update our vector clock
this.vectorClock.update(operation.vectorClock);
// Transform against concurrent local operations
let transformedOp = operation;
for (const localOp of this.getLocalOperationsSince(operation.vectorClock)) {
transformedOp = this.transformer.transform(transformedOp, localOp);
}
this.history.push(transformedOp);
return transformedOp;
}
getLocalOperationsSince(vectorClock) {
return this.history.filter(op =>
op.clientId === this.clientId &&
);
}
pruneHistory(beforeTimestamp) {
this.history = this.history.filter(op =>
op.timestamp > beforeTimestamp
);
}
}
4. Canvas Operation Types
// frontend/src/models/operations.js
export const OperationType = {
INSERT_STROKE: 'insert_stroke',
DELETE_STROKE: 'delete_stroke',
MODIFY_STROKE: 'modify_stroke',
MOVE_STROKE: 'move_stroke',
CLEAR_CANVAS: 'clear_canvas'
};
export class Operation {
static insertStroke(stroke, position) {
return {
type: OperationType.INSERT_STROKE,
stroke: stroke,
position: position,
timestamp: Date.now()
};
}
static deleteStroke(strokeId, position) {
return {
type: OperationType.DELETE_STROKE,
strokeId: strokeId,
position: position,
timestamp: Date.now()
};
}
static modifyStroke(strokeId, changes) {
return {
type: OperationType.MODIFY_STROKE,
strokeId: strokeId,
changes: changes,
timestamp: Date.now()
};
}
}
5. Collaborative Canvas Manager
// frontend/src/services/CollaborativeCanvas.js
export class CollaborativeCanvas {
constructor(roomId, userId, socket) {
this.roomId = roomId;
this.userId = userId;
this.socket = socket;
this.operationHistory = new OperationHistory(userId);
this.pendingOperations = [];
}
localStrokeAdded(stroke) {
const operation = Operation.insertStroke(stroke, this.getStrokeCount());
const op = this.operationHistory.addLocalOperation(operation);
// Send to server
this.socket.emit('operation', {
roomId: this.roomId,
operation: op
});
// Apply locally
this.applyOperation(op);
}
remoteOperationReceived(operation) {
const transformedOp = this.operationHistory.addRemoteOperation(operation);
if (transformedOp) {
this.applyOperation(transformedOp);
}
}
applyOperation(operation) {
switch (operation.type) {
case OperationType.INSERT_STROKE:
this.canvas.insertStrokeAt(operation.stroke, operation.position);
break;
case OperationType.DELETE_STROKE:
this.canvas.deleteStrokeAt(operation.position);
break;
case OperationType.MODIFY_STROKE:
this.canvas.modifyStroke(operation.strokeId, operation.changes);
break;
}
}
undo() {
// Get last local operation
const lastOp = this.operationHistory.getLastLocalOperation();
// Create inverse operation
const inverseOp = this.createInverseOperation(lastOp);
const op = this.operationHistory.addLocalOperation(inverseOp);
// Broadcast
this.socket.emit('operation', {
roomId: this.roomId,
operation: op
});
this.applyOperation(op);
}
createInverseOperation(operation) {
switch (operation.type) {
case OperationType.INSERT_STROKE:
return Operation.deleteStroke(operation.stroke.id, operation.position);
case OperationType.DELETE_STROKE:
return Operation.insertStroke(operation.stroke, operation.position);
default:
return null;
}
}
}
6. Backend Operation Server
# backend/services/operation_server.py
class OperationServer:
def __init__(self):
self.room_operations = {} # room_id -> list of operations
def receive_operation(self, room_id, operation):
if room_id not in self.room_operations:
self.room_operations[room_id] = []
# Validate operation
if not self.validate_operation(operation):
return {'error': 'Invalid operation'}
# Store operation
self.room_operations[room_id].append(operation)
# Broadcast to other clients
return {'success': True, 'operation': operation}
def validate_operation(self, operation):
# Check required fields
required = ['type', 'clientId', 'vectorClock', 'timestamp']
return all(field in operation for field in required)
def get_operations_since(self, room_id, vector_clock):
if room_id not in self.room_operations:
return []
# Return operations that happened after the given vector clock
return [
op for op in self.room_operations[room_id]
if not self.happened_before(op['vectorClock'], vector_clock)
]
def happened_before(self, clock_a, clock_b):
# Vector clock comparison
for client_id in clock_a:
if clock_a[client_id] > clock_b.get(client_id, 0):
return False
return True
7. Socket.IO Integration
# backend/routes/socketio_handlers.py
from services.operation_server import OperationServer
operation_server = OperationServer()
@socketio.on('operation')
@require_auth_socketio
def handle_operation(data):
user = get_current_user()
room_id = data.get('roomId')
operation = data.get('operation')
# Process operation
result = operation_server.receive_operation(room_id, operation)
if 'error' in result:
emit('operation_error', result)
return
# Broadcast to room (except sender)
emit('operation', {
'operation': operation
}, room=room_id, skip_sid=request.sid)
Files to Create/Modify
Frontend:
frontend/src/services/OperationalTransformation.js ⭐ (NEW)
frontend/src/services/VectorClock.js ⭐ (NEW)
frontend/src/services/OperationHistory.js ⭐ (NEW)
frontend/src/services/CollaborativeCanvas.js ⭐ (NEW)
frontend/src/models/operations.js ⭐ (NEW)
frontend/src/components/Canvas.js (MODIFY)
Backend:
backend/services/operation_server.py ⭐ (NEW)
backend/routes/socketio_handlers.py (MODIFY)
Benefits
- Guaranteed eventual consistency
- No lost edits due to conflicts
- Proper concurrent undo/redo
- Resilient to network delays
- Scales to many simultaneous users
- Professional collaborative editing like Google Docs
Testing Requirements
- Simulate concurrent operations
- Test network partition scenarios
- Verify convergence with 3+ clients
- Stress test with rapid operations
- Test undo/redo during conflicts
Feature Description
Implement Operational Transformation (OT) or Conflict-free Replicated Data Types (CRDTs) to handle concurrent editing conflicts gracefully and ensure eventual consistency across all connected clients.
Problem Statement
Current concurrency issues:
Operational Transformation Basics
OT ensures that concurrent operations converge to the same state by transforming operations against each other.
Example Conflict:
Without OT, User A's stroke might reference a position that no longer exists after User B's deletion.
1. OT Core Implementation
2. Vector Clock for Causality
3. Operation History Buffer
4. Canvas Operation Types
5. Collaborative Canvas Manager
6. Backend Operation Server
7. Socket.IO Integration
Files to Create/Modify
Frontend:
frontend/src/services/OperationalTransformation.js⭐ (NEW)frontend/src/services/VectorClock.js⭐ (NEW)frontend/src/services/OperationHistory.js⭐ (NEW)frontend/src/services/CollaborativeCanvas.js⭐ (NEW)frontend/src/models/operations.js⭐ (NEW)frontend/src/components/Canvas.js(MODIFY)Backend:
backend/services/operation_server.py⭐ (NEW)backend/routes/socketio_handlers.py(MODIFY)Benefits
Testing Requirements