Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19640.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a `FilteredEvent` class that saves us copying events.
11 changes: 6 additions & 5 deletions synapse/app/admin_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.events import EventBase
from synapse.events.utils import FilteredEvent
from synapse.handlers.admin import ExfiltrationWriter
from synapse.server import HomeServer
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
Expand Down Expand Up @@ -150,14 +151,14 @@ def __init__(self, user_id: str, directory: str | None = None):
if list(os.listdir(self.base_directory)):
raise Exception("Directory must be empty")

def write_events(self, room_id: str, events: list[EventBase]) -> None:
def write_events(self, room_id: str, filtered_events: list[FilteredEvent]) -> None:
room_directory = os.path.join(self.base_directory, "rooms", room_id)
os.makedirs(room_directory, exist_ok=True)
events_file = os.path.join(room_directory, "events")

with open(events_file, "a") as f:
for event in events:
json.dump(event.get_pdu_json(), fp=f)
for filtered_event in filtered_events:
json.dump(filtered_event.event.get_pdu_json(), fp=f)

def write_state(
self, room_id: str, event_id: str, state: StateMap[EventBase]
Expand All @@ -175,7 +176,7 @@ def write_state(
def write_invite(
self, room_id: str, event: EventBase, state: StateMap[EventBase]
) -> None:
self.write_events(room_id, [event])
self.write_events(room_id, [FilteredEvent.state(event)])

# We write the invite state somewhere else as they aren't full events
# and are only a subset of the state at the event.
Expand All @@ -191,7 +192,7 @@ def write_invite(
def write_knock(
self, room_id: str, event: EventBase, state: StateMap[EventBase]
) -> None:
self.write_events(room_id, [event])
self.write_events(room_id, [FilteredEvent.state(event)])

# We write the knock state somewhere else as they aren't full events
# and are only a subset of the state at the event.
Expand Down
4 changes: 2 additions & 2 deletions synapse/appservice/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
TransactionUnusedFallbackKeys,
)
from synapse.events import EventBase
from synapse.events.utils import SerializeEventConfig
from synapse.events.utils import FilteredEvent, SerializeEventConfig
from synapse.http.client import SimpleHttpClient, is_unknown_endpoint
from synapse.logging import opentracing
from synapse.metrics import SERVER_NAME_LABEL
Expand Down Expand Up @@ -545,7 +545,7 @@ async def _serialize(
) -> list[JsonDict]:
time_now = self.clock.time_msec()
return await self._event_serializer.serialize_events(
list(events),
[FilteredEvent(event=e, membership=None) for e in events],
time_now,
config=SerializeEventConfig(
as_client_event=True,
Expand Down
107 changes: 92 additions & 15 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
MAX_PDU_SIZE,
EventContentFields,
EventTypes,
EventUnsignedContentFields,
RelationTypes,
)
from synapse.api.errors import Codes, SynapseError
Expand Down Expand Up @@ -416,6 +417,50 @@ def format_event_for_client_v2_without_room_id(d: JsonDict) -> JsonDict:
return d


@attr.s(slots=True, frozen=True, auto_attribs=True)
class FilteredEvent:
"""An event annotated with per-user data for client serialization.

Produced by filter_and_transform_events_for_client. Carries the user's
membership at the time of the event so serialization can inject it into
unsigned.membership (MSC4115) without cloning the underlying event.
"""

event: "EventBase"
"""The event to be serialized."""

membership: str | None
Comment thread
erikjohnston marked this conversation as resolved.
"""The user whose requesting the event's membership at the time of the
event was sent.

This is None if we didn't compute the membership. In Synapse this happens a)
when returning state events to state endpoints, or b) when the event is
returned to an admin.

According to the spec we don't have to include the membership for any events
if we don't want to, especially if its expensive to compute. In practice
clients really only care about events in the room timeline so that in
encrypted room they can determine if they should be able to decrypt the
event or not.
"""

@classmethod
def state(cls, event: "EventBase") -> "FilteredEvent":
"""Wrap a state event with no per-user membership annotation.

The event must be a state event (i.e. have a state_key).
"""
assert event.is_state(), (
f"FilteredEvent.state() called with non-state event {event.event_id}"
)
return cls(event=event, membership=None)

@classmethod
def admin_override(cls, event: "EventBase") -> "FilteredEvent":
"""Wrap an event that bypasses visibility filtering due to admin privileges."""
return cls(event=event, membership=None)


@attr.s(slots=True, frozen=True, auto_attribs=True)
class SerializeEventConfig:
as_client_event: bool = True
Expand All @@ -435,6 +480,9 @@ class SerializeEventConfig:
# only server admins can see through other configuration. For example,
# whether an event was soft failed by the server.
include_admin_metadata: bool = False
# Whether MSC4354 (sticky events) is enabled. When True, the sticky TTL
# will be computed and included in the unsigned section of sticky events.
msc4354_enabled: bool = False

@only_event_fields.validator
def _validate_only_event_fields(
Expand All @@ -461,13 +509,16 @@ def _serialize_event(
time_now_ms: int,
*,
config: SerializeEventConfig = _DEFAULT_SERIALIZE_EVENT_CONFIG,
membership: str | None = None,
) -> JsonDict:
"""Serialize event for clients

Args:
e
time_now_ms
config: Event serialization config
membership: The requesting user's membership at the time of the event,
to be injected into unsigned.membership (MSC4115).

Returns:
The serialized event dictionary.
Expand Down Expand Up @@ -564,6 +615,23 @@ def _serialize_event(
if e.internal_metadata.policy_server_spammy:
d["unsigned"]["io.element.synapse.policy_server_spammy"] = True

if config.msc4354_enabled:
sticky_duration = e.sticky_duration()
if sticky_duration:
expires_at = (
# min() ensures that the origin server can't lie about the time and
# send the event 'in the future', as that would allow them to exceed
# the 1 hour limit on stickiness duration.
min(e.origin_server_ts, time_now_ms) + sticky_duration.as_millis()
)
if expires_at > time_now_ms:
d["unsigned"][EventUnsignedContentFields.STICKY_TTL] = (
expires_at - time_now_ms
)

if membership is not None:
d["unsigned"][EventUnsignedContentFields.MEMBERSHIP] = membership

return d


Expand All @@ -577,13 +645,15 @@ class EventClientSerializer:
def __init__(self, hs: "HomeServer") -> None:
self._store = hs.get_datastores().main
self._auth = hs.get_auth()
self._config = hs.config
self._clock = hs.get_clock()
self._add_extra_fields_to_unsigned_client_event_callbacks: list[
ADD_EXTRA_FIELDS_TO_UNSIGNED_CLIENT_EVENT_CALLBACK
] = []

async def serialize_event(
self,
event: JsonDict | EventBase,
event: JsonDict | FilteredEvent,
time_now: int,
*,
config: SerializeEventConfig = _DEFAULT_SERIALIZE_EVENT_CONFIG,
Expand All @@ -605,7 +675,7 @@ async def serialize_event(
The serialized event
"""
# To handle the case of presence events and the like
if not isinstance(event, EventBase):
if not isinstance(event, FilteredEvent):
return event

# Force-enable server admin metadata because the only time an event with
Expand All @@ -617,11 +687,16 @@ async def serialize_event(
):
config = make_config_for_admin(config)

serialized_event = _serialize_event(event, time_now, config=config)
if self._config.experimental.msc4354_enabled:
config = attr.evolve(config, msc4354_enabled=True)

serialized_event = _serialize_event(
event.event, time_now, config=config, membership=event.membership
)

# If the event was redacted, fetch the redaction event from the database
# and include it in the serialized event's unsigned section.
redacted_by: str | None = event.internal_metadata.redacted_by
redacted_by: str | None = event.event.internal_metadata.redacted_by
if redacted_by is not None:
serialized_event.setdefault("unsigned", {})["redacted_by"] = redacted_by
if redaction_map is not None:
Expand All @@ -648,7 +723,7 @@ async def serialize_event(

new_unsigned = {}
for callback in self._add_extra_fields_to_unsigned_client_event_callbacks:
u = await callback(event)
u = await callback(event.event)
new_unsigned.update(u)

if new_unsigned:
Expand All @@ -666,9 +741,9 @@ async def serialize_event(

# Check if there are any bundled aggregations to include with the event.
if bundle_aggregations:
if event.event_id in bundle_aggregations:
if event.event.event_id in bundle_aggregations:
await self._inject_bundled_aggregations(
event,
event.event,
time_now,
config,
bundle_aggregations,
Expand Down Expand Up @@ -720,7 +795,7 @@ async def _inject_bundled_aggregations(
# `sender` of the edit; however MSC3925 proposes extending it to the whole
# of the edit, which is what we do here.
serialized_aggregations[RelationTypes.REPLACE] = await self.serialize_event(
event_aggregations.replace,
FilteredEvent(event=event_aggregations.replace, membership=None),
time_now,
config=config,
)
Expand All @@ -730,7 +805,7 @@ async def _inject_bundled_aggregations(
thread = event_aggregations.thread

serialized_latest_event = await self.serialize_event(
thread.latest_event,
FilteredEvent(event=thread.latest_event, membership=None),
time_now,
config=config,
bundle_aggregations=bundled_aggregations,
Expand All @@ -755,7 +830,7 @@ async def _inject_bundled_aggregations(
@trace
async def serialize_events(
self,
events: Collection[JsonDict | EventBase],
events: Collection[JsonDict | FilteredEvent],
time_now: int,
*,
config: SerializeEventConfig = _DEFAULT_SERIALIZE_EVENT_CONFIG,
Expand All @@ -780,11 +855,13 @@ async def serialize_events(
)

# Batch-fetch all redaction events in one go rather than one per event.
redaction_ids = {
e.internal_metadata.redacted_by
for e in events
if isinstance(e, EventBase) and e.internal_metadata.redacted_by is not None
}
redaction_ids: set[str] = set()
for e in events:
base = e.event if isinstance(e, FilteredEvent) else e
if isinstance(base, EventBase):
redacted_by = base.internal_metadata.redacted_by
if redacted_by is not None:
redaction_ids.add(redacted_by)
redaction_map = (
await self._store.get_events(redaction_ids) if redaction_ids else {}
)
Expand Down
29 changes: 19 additions & 10 deletions synapse/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from synapse.api.constants import Direction, EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.events import EventBase
from synapse.events.utils import FilteredEvent
from synapse.types import (
JsonMapping,
Requester,
Expand Down Expand Up @@ -251,32 +252,40 @@ async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") ->
topological=last_event.depth,
)

events = await filter_and_transform_events_for_client(
filtered_events = await filter_and_transform_events_for_client(
self._storage_controllers,
user_id,
events,
)

writer.write_events(room_id, events)
writer.write_events(room_id, filtered_events)

# Update the extremity tracking dicts
for event in events:
for filtered_event in filtered_events:
# Check if we have any prev events that haven't been
# processed yet, and add those to the appropriate dicts.
unseen_events = set(event.prev_event_ids()) - written_events
unseen_events = (
set(filtered_event.event.prev_event_ids()) - written_events
)
if unseen_events:
event_to_unseen_prevs[event.event_id] = unseen_events
event_to_unseen_prevs[filtered_event.event.event_id] = (
unseen_events
)
for unseen in unseen_events:
unseen_to_child_events.setdefault(unseen, set()).add(
event.event_id
filtered_event.event.event_id
)

# Now check if this event is an unseen prev event, if so
# then we remove this event from the appropriate dicts.
for child_id in unseen_to_child_events.pop(event.event_id, []):
event_to_unseen_prevs[child_id].discard(event.event_id)
for child_id in unseen_to_child_events.pop(
filtered_event.event.event_id, []
):
event_to_unseen_prevs[child_id].discard(
filtered_event.event.event_id
)

written_events.add(event.event_id)
written_events.add(filtered_event.event.event_id)

logger.info(
"Written %d events in room %s", len(written_events), room_id
Expand Down Expand Up @@ -511,7 +520,7 @@ class ExfiltrationWriter(metaclass=abc.ABCMeta):
"""Interface used to specify how to write exported data."""

@abc.abstractmethod
def write_events(self, room_id: str, events: list[EventBase]) -> None:
def write_events(self, room_id: str, events: list[FilteredEvent]) -> None:
"""Write a batch of events for a room."""
raise NotImplementedError()

Expand Down
17 changes: 8 additions & 9 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@

from synapse.api.constants import EduTypes, EventTypes, Membership, PresenceState
from synapse.api.errors import AuthError, SynapseError
from synapse.events import EventBase
from synapse.events.utils import SerializeEventConfig
from synapse.events.utils import FilteredEvent, SerializeEventConfig
from synapse.handlers.presence import format_user_presence_state
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.streams.config import PaginationConfig
Expand Down Expand Up @@ -102,19 +101,19 @@ async def get_stream(
# joined room, we need to send down presence for those users.
to_add: list[JsonDict] = []
for event in events:
if not isinstance(event, EventBase):
if not isinstance(event, FilteredEvent):
continue
if event.type == EventTypes.Member:
if event.membership != Membership.JOIN:
if event.event.type == EventTypes.Member:
if event.event.membership != Membership.JOIN:
continue
# Send down presence.
if event.state_key == requester.user.to_string():
if event.event.state_key == requester.user.to_string():
# Send down presence for everyone in the room.
users: Iterable[str] = await self.store.get_users_in_room(
event.room_id
event.event.room_id
)
else:
users = [event.state_key]
users = [event.event.state_key]

states = await presence_handler.get_states(users)
to_add.extend(
Expand Down Expand Up @@ -155,7 +154,7 @@ async def get_event(
room_id: str | None,
event_id: str,
show_redacted: bool = False,
) -> EventBase | None:
) -> FilteredEvent | None:
"""Retrieve a single specified event on behalf of a user.
The event will be transformed in a user-specific and time-specific way,
e.g. having unsigned metadata added or being erased depending on who is accessing.
Expand Down
Loading
Loading