Skip to content

Commit b3f8338

Browse files
authored
Merge pull request #1140 from ably/feature/server-provided-tombstone-serial-refactored
[ECO-5447][LiveObjects] Use server-provided timestamp to sweep old tombstones
2 parents e8bba9e + efe730e commit b3f8338

16 files changed

Lines changed: 529 additions & 80 deletions

File tree

live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,12 @@ internal data class ObjectMapEntry(
114114
*/
115115
val timeserial: String? = null,
116116

117+
/**
118+
* A timestamp from the [timeserial] field. Only present if [tombstone] is `true`
119+
* Spec: OME2d
120+
*/
121+
val serialTimestamp: Long? = null,
122+
117123
/**
118124
* The data that represents the value of the map entry.
119125
* Spec: OME2c
@@ -325,6 +331,12 @@ internal data class ObjectMessage(
325331
*/
326332
val serial: String? = null,
327333

334+
/**
335+
* A timestamp from the [serial] field.
336+
* Spec: OM2j
337+
*/
338+
val serialTimestamp: Long? = null,
339+
328340
/**
329341
* An opaque string used as a key to update the map of serial values on an object.
330342
* Spec: OM2i

live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje
1515
/**
1616
* @spec RTO5 - Sync objects data pool for collecting sync messages
1717
*/
18-
private val syncObjectsDataPool = mutableMapOf<String, ObjectState>()
18+
private val syncObjectsDataPool = mutableMapOf<String, ObjectMessage>()
1919
private var currentSyncId: String? = null
2020
/**
2121
* @spec RTO7 - Buffered object operations during sync
@@ -130,19 +130,20 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje
130130
val existingObjectUpdates = mutableListOf<Pair<BaseLiveObject, LiveObjectUpdate>>()
131131

132132
// RTO5c1
133-
for ((objectId, objectState) in syncObjectsDataPool) {
133+
for ((objectId, objectMessage) in syncObjectsDataPool) {
134+
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5b
134135
receivedObjectIds.add(objectId)
135136
val existingObject = liveObjects.objectsPool.get(objectId)
136137

137138
// RTO5c1a
138139
if (existingObject != null) {
139140
// Update existing object
140-
val update = existingObject.applyObjectSync(objectState) // RTO5c1a1
141+
val update = existingObject.applyObjectSync(objectMessage) // RTO5c1a1
141142
existingObjectUpdates.add(Pair(existingObject, update))
142143
} else { // RTO5c1b
143144
// RTO5c1b1, RTO5c1b1a, RTO5c1b1b - Create new object and add it to the pool
144145
val newObject = createObjectFromState(objectState)
145-
newObject.applyObjectSync(objectState)
146+
newObject.applyObjectSync(objectMessage)
146147
liveObjects.objectsPool.set(objectId, newObject)
147148
}
148149
}
@@ -201,7 +202,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje
201202

202203
val objectState: ObjectState = objectMessage.objectState
203204
if (objectState.counter != null || objectState.map != null) {
204-
syncObjectsDataPool[objectState.objectId] = objectState
205+
syncObjectsDataPool[objectState.objectId] = objectMessage
205206
} else {
206207
// RTO5c1b1c - object state must contain either counter or map data
207208
Log.w(tag, "Object state received without counter or map data, skipping message: ${objectMessage.id}")

live-objects/src/main/kotlin/io/ably/lib/objects/serialization/MsgpackSerialization.kt

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ internal fun ObjectMessage.writeMsgpack(packer: MessagePacker) {
3737
if (operation != null) fieldCount++
3838
if (objectState != null) fieldCount++
3939
if (serial != null) fieldCount++
40+
if (serialTimestamp != null) fieldCount++
4041
if (siteCode != null) fieldCount++
4142

4243
packer.packMapHeader(fieldCount)
@@ -81,6 +82,11 @@ internal fun ObjectMessage.writeMsgpack(packer: MessagePacker) {
8182
packer.packString(serial)
8283
}
8384

85+
if (serialTimestamp != null) {
86+
packer.packString("serialTimestamp")
87+
packer.packLong(serialTimestamp)
88+
}
89+
8490
if (siteCode != null) {
8591
packer.packString("siteCode")
8692
packer.packString(siteCode)
@@ -106,6 +112,7 @@ internal fun readObjectMessage(unpacker: MessageUnpacker): ObjectMessage {
106112
var operation: ObjectOperation? = null
107113
var objectState: ObjectState? = null
108114
var serial: String? = null
115+
var serialTimestamp: Long? = null
109116
var siteCode: String? = null
110117

111118
for (i in 0 until fieldCount) {
@@ -126,6 +133,7 @@ internal fun readObjectMessage(unpacker: MessageUnpacker): ObjectMessage {
126133
"operation" -> operation = readObjectOperation(unpacker)
127134
"object" -> objectState = readObjectState(unpacker)
128135
"serial" -> serial = unpacker.unpackString()
136+
"serialTimestamp" -> serialTimestamp = unpacker.unpackLong()
129137
"siteCode" -> siteCode = unpacker.unpackString()
130138
else -> unpacker.skipValue()
131139
}
@@ -140,6 +148,7 @@ internal fun readObjectMessage(unpacker: MessageUnpacker): ObjectMessage {
140148
operation = operation,
141149
objectState = objectState,
142150
serial = serial,
151+
serialTimestamp = serialTimestamp,
143152
siteCode = siteCode
144153
)
145154
}
@@ -557,6 +566,7 @@ private fun ObjectMapEntry.writeMsgpack(packer: MessagePacker) {
557566

558567
if (tombstone != null) fieldCount++
559568
if (timeserial != null) fieldCount++
569+
if (serialTimestamp != null) fieldCount++
560570
if (data != null) fieldCount++
561571

562572
packer.packMapHeader(fieldCount)
@@ -571,6 +581,11 @@ private fun ObjectMapEntry.writeMsgpack(packer: MessagePacker) {
571581
packer.packString(timeserial)
572582
}
573583

584+
if (serialTimestamp != null) {
585+
packer.packString("serialTimestamp")
586+
packer.packLong(serialTimestamp)
587+
}
588+
574589
if (data != null) {
575590
packer.packString("data")
576591
data.writeMsgpack(packer)
@@ -585,6 +600,7 @@ private fun readObjectMapEntry(unpacker: MessageUnpacker): ObjectMapEntry {
585600

586601
var tombstone: Boolean? = null
587602
var timeserial: String? = null
603+
var serialTimestamp: Long? = null
588604
var data: ObjectData? = null
589605

590606
for (i in 0 until fieldCount) {
@@ -599,12 +615,13 @@ private fun readObjectMapEntry(unpacker: MessageUnpacker): ObjectMapEntry {
599615
when (fieldName) {
600616
"tombstone" -> tombstone = unpacker.unpackBoolean()
601617
"timeserial" -> timeserial = unpacker.unpackString()
618+
"serialTimestamp" -> serialTimestamp = unpacker.unpackLong()
602619
"data" -> data = readObjectData(unpacker)
603620
else -> unpacker.skipValue()
604621
}
605622
}
606623

607-
return ObjectMapEntry(tombstone = tombstone, timeserial = timeserial, data = data)
624+
return ObjectMapEntry(tombstone = tombstone, timeserial = timeserial, serialTimestamp = serialTimestamp, data = data)
608625
}
609626

610627
/**

live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ internal abstract class BaseLiveObject(
4747
*
4848
* @spec RTLM6/RTLC6 - Overrides ObjectMessage with object data state from sync to LiveMap/LiveCounter
4949
*/
50-
internal fun applyObjectSync(objectState: ObjectState): LiveObjectUpdate {
50+
internal fun applyObjectSync(objectMessage: ObjectMessage): LiveObjectUpdate {
51+
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5b
5152
validate(objectState)
5253
// object's site serials are still updated even if it is tombstoned, so always use the site serials received from the operation.
5354
// should default to empty map if site serials do not exist on the object state, so that any future operation may be applied to this object.
@@ -61,7 +62,7 @@ internal abstract class BaseLiveObject(
6162
}
6263
return noOpCounterUpdate
6364
}
64-
return applyObjectState(objectState) // RTLM6, RTLC6
65+
return applyObjectState(objectState, objectMessage) // RTLM6, RTLC6
6566
}
6667

6768
/**
@@ -122,11 +123,14 @@ internal abstract class BaseLiveObject(
122123
/**
123124
* Marks the object as tombstoned.
124125
*/
125-
internal fun tombstone(): LiveObjectUpdate {
126+
internal fun tombstone(serialTimestamp: Long?): LiveObjectUpdate {
127+
if (serialTimestamp == null) {
128+
Log.w(tag, "Tombstoning object $objectId without serial timestamp, using local timestamp instead")
129+
}
126130
isTombstoned = true
127-
tombstonedAt = System.currentTimeMillis()
131+
tombstonedAt = serialTimestamp?: System.currentTimeMillis()
128132
val update = clearData()
129-
// TODO: Emit lifecycle events
133+
// TODO: Emit BaseLiveObject lifecycle events
130134
return update
131135
}
132136

@@ -153,7 +157,7 @@ internal abstract class BaseLiveObject(
153157
* @return A map describing the changes made to the object's data
154158
*
155159
*/
156-
abstract fun applyObjectState(objectState: ObjectState): LiveObjectUpdate
160+
abstract fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveObjectUpdate
157161

158162
/**
159163
* Applies an operation to this live object.

live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,16 +71,16 @@ internal class DefaultLiveCounter private constructor(
7171

7272
override fun validate(state: ObjectState) = liveCounterManager.validate(state)
7373

74-
override fun applyObjectState(objectState: ObjectState): LiveCounterUpdate {
75-
return liveCounterManager.applyState(objectState)
74+
override fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveCounterUpdate {
75+
return liveCounterManager.applyState(objectState, message.serialTimestamp)
7676
}
7777

7878
override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) {
79-
liveCounterManager.applyOperation(operation)
79+
liveCounterManager.applyOperation(operation, message.serialTimestamp)
8080
}
8181

8282
override fun clearData(): LiveCounterUpdate {
83-
return LiveCounterUpdate(data.get()).apply { data.set(0.0) }
83+
return liveCounterManager.calculateUpdateFromDataDiff(data.get(), 0.0).apply { data.set(0.0) }
8484
}
8585

8686
override fun notifyUpdated(update: LiveObjectUpdate) {

live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter):
1717
/**
1818
* @spec RTLC6 - Overrides counter data with state from sync
1919
*/
20-
internal fun applyState(objectState: ObjectState): LiveCounterUpdate {
20+
internal fun applyState(objectState: ObjectState, serialTimestamp: Long?): LiveCounterUpdate {
2121
val previousData = liveCounter.data.get()
2222

2323
if (objectState.tombstone) {
24-
liveCounter.tombstone()
24+
liveCounter.tombstone(serialTimestamp)
2525
} else {
2626
// override data for this object with data from the object state
2727
liveCounter.createOperationIsMerged = false // RTLC6b
@@ -33,13 +33,13 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter):
3333
}
3434
}
3535

36-
return LiveCounterUpdate(liveCounter.data.get() - previousData)
36+
return calculateUpdateFromDataDiff(previousData, liveCounter.data.get())
3737
}
3838

3939
/**
4040
* @spec RTLC7 - Applies operations to LiveCounter
4141
*/
42-
internal fun applyOperation(operation: ObjectOperation) {
42+
internal fun applyOperation(operation: ObjectOperation, serialTimestamp: Long?) {
4343
val update = when (operation.action) {
4444
ObjectOperationAction.CounterCreate -> applyCounterCreate(operation) // RTLC7d1
4545
ObjectOperationAction.CounterInc -> {
@@ -49,7 +49,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter):
4949
throw objectError("No payload found for ${operation.action} op for LiveCounter objectId=${objectId}")
5050
}
5151
}
52-
ObjectOperationAction.ObjectDelete -> liveCounter.tombstone()
52+
ObjectOperationAction.ObjectDelete -> liveCounter.tombstone(serialTimestamp)
5353
else -> throw objectError("Invalid ${operation.action} op for LiveCounter objectId=${objectId}") // RTLC7d3
5454
}
5555

@@ -85,6 +85,10 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter):
8585
return LiveCounterUpdate(amount)
8686
}
8787

88+
internal fun calculateUpdateFromDataDiff(prevData: Double, newData: Double): LiveCounterUpdate {
89+
return LiveCounterUpdate(newData - prevData)
90+
}
91+
8892
/**
8993
* @spec RTLC10 - Merges initial data from create operation
9094
*/

live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,12 @@ internal class DefaultLiveMap private constructor(
118118

119119
override fun unsubscribeAll() = liveMapManager.unsubscribeAll()
120120

121-
override fun applyObjectState(objectState: ObjectState): LiveMapUpdate {
122-
return liveMapManager.applyState(objectState)
121+
override fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveMapUpdate {
122+
return liveMapManager.applyState(objectState, message.serialTimestamp)
123123
}
124124

125125
override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) {
126-
liveMapManager.applyOperation(operation, message.serial)
126+
liveMapManager.applyOperation(operation, message.serial, message.serialTimestamp)
127127
}
128128

129129
override fun clearData(): LiveMapUpdate {

live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
2020
/**
2121
* @spec RTLM6 - Overrides object data with state from sync
2222
*/
23-
internal fun applyState(objectState: ObjectState): LiveMapUpdate {
23+
internal fun applyState(objectState: ObjectState, serialTimestamp: Long?): LiveMapUpdate {
2424
val previousData = liveMap.data.toMap()
2525

2626
if (objectState.tombstone) {
27-
liveMap.tombstone()
27+
liveMap.tombstone(serialTimestamp)
2828
} else {
2929
// override data for this object with data from the object state
3030
liveMap.createOperationIsMerged = false // RTLM6b
@@ -33,7 +33,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
3333
objectState.map?.entries?.forEach { (key, entry) ->
3434
liveMap.data[key] = LiveMapEntry(
3535
isTombstoned = entry.tombstone ?: false,
36-
tombstonedAt = if (entry.tombstone == true) System.currentTimeMillis() else null,
36+
tombstonedAt = if (entry.tombstone == true) entry.serialTimestamp ?: System.currentTimeMillis() else null,
3737
timeserial = entry.timeserial,
3838
data = entry.data
3939
)
@@ -51,24 +51,24 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
5151
/**
5252
* @spec RTLM15 - Applies operations to LiveMap
5353
*/
54-
internal fun applyOperation(operation: ObjectOperation, messageTimeserial: String?) {
54+
internal fun applyOperation(operation: ObjectOperation, serial: String?, serialTimestamp: Long?) {
5555
val update = when (operation.action) {
5656
ObjectOperationAction.MapCreate -> applyMapCreate(operation) // RTLM15d1
5757
ObjectOperationAction.MapSet -> {
5858
if (operation.mapOp != null) {
59-
applyMapSet(operation.mapOp, messageTimeserial) // RTLM15d2
59+
applyMapSet(operation.mapOp, serial) // RTLM15d2
6060
} else {
6161
throw objectError("No payload found for ${operation.action} op for LiveMap objectId=${objectId}")
6262
}
6363
}
6464
ObjectOperationAction.MapRemove -> {
6565
if (operation.mapOp != null) {
66-
applyMapRemove(operation.mapOp, messageTimeserial) // RTLM15d3
66+
applyMapRemove(operation.mapOp, serial, serialTimestamp) // RTLM15d3
6767
} else {
6868
throw objectError("No payload found for ${operation.action} op for LiveMap objectId=${objectId}")
6969
}
7070
}
71-
ObjectOperationAction.ObjectDelete -> liveMap.tombstone()
71+
ObjectOperationAction.ObjectDelete -> liveMap.tombstone(serialTimestamp)
7272
else -> throw objectError("Invalid ${operation.action} op for LiveMap objectId=${objectId}") // RTLM15d4
7373
}
7474

@@ -132,7 +132,6 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
132132
// RTLM7a2 - Replace existing entry with new one instead of mutating
133133
liveMap.data[mapOp.key] = LiveMapEntry(
134134
isTombstoned = false, // RTLM7a2c
135-
tombstonedAt = null,
136135
timeserial = timeSerial, // RTLM7a2b
137136
data = mapOp.data // RTLM7a2a
138137
)
@@ -154,6 +153,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
154153
private fun applyMapRemove(
155154
mapOp: ObjectMapOp, // RTLM8c1
156155
timeSerial: String?, // RTLM8c2
156+
timeStamp: Long?, // RTLM8c3
157157
): LiveMapUpdate {
158158
val existingEntry = liveMap.data[mapOp.key]
159159

@@ -168,19 +168,28 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
168168
return noOpMapUpdate
169169
}
170170

171+
val tombstonedAt = if (timeStamp != null) timeStamp else {
172+
Log.w(
173+
tag,
174+
"No timestamp provided for MAP_REMOVE op on key=\"${mapOp.key}\"; using current time as tombstone time; " +
175+
"objectId=${objectId}"
176+
)
177+
System.currentTimeMillis()
178+
}
179+
171180
if (existingEntry != null) {
172181
// RTLM8a2 - Replace existing entry with new one instead of mutating
173182
liveMap.data[mapOp.key] = LiveMapEntry(
174183
isTombstoned = true, // RTLM8a2c
175-
tombstonedAt = System.currentTimeMillis(),
184+
tombstonedAt = tombstonedAt,
176185
timeserial = timeSerial, // RTLM8a2b
177186
data = null // RTLM8a2a
178187
)
179188
} else {
180189
// RTLM8b, RTLM8b1
181190
liveMap.data[mapOp.key] = LiveMapEntry(
182191
isTombstoned = true, // RTLM8b2
183-
tombstonedAt = System.currentTimeMillis(),
192+
tombstonedAt = tombstonedAt,
184193
timeserial = timeSerial
185194
)
186195
}
@@ -224,7 +233,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
224233
val opTimeserial = entry.timeserial
225234
val update = if (entry.tombstone == true) {
226235
// RTLM17a2 - entry in MAP_CREATE op is removed, try to apply MAP_REMOVE op
227-
applyMapRemove(ObjectMapOp(key), opTimeserial)
236+
applyMapRemove(ObjectMapOp(key), opTimeserial, entry.serialTimestamp)
228237
} else {
229238
// RTLM17a1 - entry in MAP_CREATE op is not removed, try to set it via MAP_SET op
230239
applyMapSet(ObjectMapOp(key, entry.data), opTimeserial)

0 commit comments

Comments
 (0)