-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbuffer.py
More file actions
54 lines (45 loc) · 1.8 KB
/
buffer.py
File metadata and controls
54 lines (45 loc) · 1.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
"""
Reorg buffer: holds the resulting tree at the head (~30 slots) so we can determine
which branch is longer. Blocks are sorted by slot and processed in order.
Parent hash is always set (no need to handle missing).
"""
import threading
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from solana import parsed_idl_block_message_pb2
# Slots to buffer at head of tree for branch-length comparison. ~30 is enough for Solana.
REORG_BUFFER_SIZE = 30
# (block_hash, parent_hash, slot, tx_block)
BatchItem = tuple[bytes, bytes, int, "parsed_idl_block_message_pb2.ParsedIdlBlockMessage"]
class ReorgBuffer:
"""Thread-safe buffer of blocks; when full or on flush, returns batch sorted by slot."""
__slots__ = ("_size", "_items", "_lock")
def __init__(self, size: int = REORG_BUFFER_SIZE):
self._size = size
self._items: list[BatchItem] = []
self._lock = threading.Lock()
def add(
self,
block_hash: bytes,
parent_hash: bytes,
slot: int,
tx_block: "parsed_idl_block_message_pb2.ParsedIdlBlockMessage",
) -> list[BatchItem] | None:
"""
Append one block. If buffer reaches size, return sorted batch and clear; else None.
"""
with self._lock:
self._items.append((block_hash, parent_hash, slot, tx_block))
if len(self._items) >= self._size:
batch = sorted(self._items, key=lambda x: x[2])
self._items = []
return batch
return None
def flush(self) -> list[BatchItem]:
"""Return remaining items sorted by slot and clear the buffer."""
with self._lock:
if not self._items:
return []
batch = sorted(self._items, key=lambda x: x[2])
self._items = []
return batch