Receiving events in a room: the sync loop¶
Now that we know how to send events, we need to know how to receive events. Matrix uses a technique called long-polling for receiving events. Usually, in a polling model, the client asks the server if there events ready, and the server responds immediately either saying that there are no events, or returning the events. This causes a problem in that clients must determine an appropriate polling interval — if the client polls too often, it uses a lot of bandwidth and power (especially since there is some overhead in making requests to the server); if the client doesn’t poll often enough, there will be a delay in receiving messages.
In a long-polling model, the client queries the server, but the server waits to respond until either a timeout is reached, or when new events are available. In this way, a client could set a long timeout (often 30 seconds or more), giving a long polling interval if there are no events available. On the other hand, when an event comes, the client will receive it immediately.
Matrix clients use the GET /sync
endpoint to wait for
events. The term “event” here is used loosely to refer to “something that
happens” rather than specifically to the Matrix events that were discussed
previously. To be sure, Matrix events are received through GET /sync
, but
other information is included as well. Here we will focus on receiving room
events, and will look at other parts of the sync response in other chapters.
The GET /sync
request has several request parameters, passed as query
strings. One of these parameters is the timeout
, which indicates how long
the server should wait before responding if there are no events available.
This is given in milliseconds, so to wait 30 seconds, for example, we would
pass a value of “30000”. If no value is given, the server will respond
immediately, giving a standard polling behaviour. The timeout should not be
too long, or the connection could be terminated by proxies thinking that the
server is not responding. A timeout of 30 seconds seems to work fairly well.
The since
parameter is used to tell the server what events the client has
already received. The first time that a client calls GET /sync
, it will omit
the since
parameter. When the server responds, it will include a
next_batch
property. The client will then use this as the since
parameter to
the next GET /sync
request. The server will then only include events that
occur after the position marked by the token. Each time the server responds to
a GET /sync
request, it will include a next_batch
token that the client
will use for the next GET /sync
request.
Without the token, the server will behave as if the client has not received any
events yet. So if a client fails to include the since
parameter, the server
will most likely respond immediately since it thinks that it has events that
the client has not received yet, and the client will receive duplicate events.
Alternatively, the GET /sync
endpoint could have been designed so that the
server keeps track of the events that it has sent the client, and automatically
returns subsequent events each time the client calls GET /sync
. However, if
one response gets lost for any reason, the client will miss some events. Thus
the since
parameter is used to ensure both that the client does not receive
duplicate events and that it does not miss events.
There are several other parameters, but they will be discussed later on; the
timeout
and since
parameters are the ones that we will focus on for now.
We now create a function to perform what is called the “sync loop”: the client
makes a GET /sync
request, waits for the response, processes the response,
and then makes a new GET /sync
request. However, since this loop runs
forever, we want it to run concurrently with the application’s other code.
Otherwise, we would have a hard time, for example, sending events while also
waiting for events. Programming languages have their own ways of allowing code
to run concurrently; in Python, we use asyncio.create_task
. We will create a
function in our Client
class to start the sync loop, and another to stop it.
self.sync_task: typing.Optional[asyncio.Task[None]] = None
def start_sync(self):
"""Start the sync loop"""
if self.sync_task != None:
# sync already started, so do nothing
return
self.sync_task = asyncio.create_task(self._run_sync_loop())
def stop_sync(self):
"""Stop the sync loop"""
if self.sync_task == None:
# no sync running, so do nothing
return
self.sync_task.cancel()
self.sync_task = None
When we close the client (such as by calling client.close()
– see the
Client section), we make sure to stop the sync loop.
self.stop_sync()
We are now ready to write the main skeleton of our sync loop. We will make a
GET /sync
request, and if the request fails for any reason, we will notify
the application of the failure using our publish/subscribe
pattern by publishing a SyncFailed
message so
that it can, for example, inform the user. We then wait some time before
retrying. If the sync recovers, we will notify the application by publishing a
SyncResumed
message. On a successful request, we retrieve the next_batch
property to use as the since
parameter for the next request, and process the
other sync data.
async def _run_sync_loop(self) -> None:
backoff = 2
while True:
params = {"timeout": "30000"}
if "sync_token" in self.storage:
params["since"] = self.storage["sync_token"]
# TODO: add other params
try:
resp = await self.authenticated(
self.http_session.get,
self.url("v3/sync"),
params=params,
)
except asyncio.CancelledError:
return
except:
e = sys.exc_info()[1]
logging.error(
"Sync call failed (%r). Waiting %d s and retrying.", e, backoff
)
await self.publisher.publish(SyncFailed(backoff))
await asyncio.sleep(backoff)
backoff = min(backoff * 2, 30)
continue
async with resp:
try:
code, body = await check_response(resp)
schema.ensure_valid(
body,
{
"next_batch": str,
{{sync schema}}
},
)
except asyncio.CancelledError:
return
except:
e = sys.exc_info()[1]
logging.error(
"Sync call failed (%r). Waiting %d s and retrying.", e, backoff
)
await self.publisher.publish(SyncFailed(backoff))
await asyncio.sleep(backoff)
backoff = min(backoff * 2, 30)
continue
if self.sync_task == None:
# the sync was stopped, so we shouldn't process the result. We
# should get a CancelledError if the sync was stopped, but check
# anyways, just in case
return
if backoff != 2:
# the previous request failed
await self.publisher.publish(SyncResumed())
backoff = 2
self.storage["sync_token"] = body["next_batch"]
{{process sync response}}
class SyncFailed(typing.NamedTuple):
"""A message indicating that a sync call has failed"""
delay: int
SyncFailed.delay.__doc__ = "The amount of time that the loop will wait before retrying"
class SyncResumed:
"""A message indicating that the sync loop has resumed after a previous failure"""
def __eq__(self, other):
return other.__class__ == SyncResumed
Tests
# {{copyright}}
import asyncio
import json
import pytest
import unittest.mock as mock
from matrixlib import client
from matrixlib.events import Event, RoomEvent, StateEvent
from matrixlib import rooms
{{test sync}}
We test that the sync loop can handle an error and recover from it.
@pytest.mark.asyncio
async def test_sync_retry(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
{{sync retry test}}
We will generate a failed sync response, and then two successful sync
responses. The second successful sync response will have a next_batch
token
that points back to itself, so that the request can be repeated.
mock_aioresponse.get(
"https://matrix-client.example.org/_matrix/client/v3/sync?timeout=30000",
status=500,
body="Error",
)
mock_aioresponse.get(
"https://matrix-client.example.org/_matrix/client/v3/sync?timeout=30000",
status=200,
body='{"next_batch":"token"}',
headers={
"content-type": "application/json",
},
)
mock_aioresponse.get(
"https://matrix-client.example.org/_matrix/client/v3/sync?since=token&timeout=30000",
status=200,
body='{"next_batch":"token"}',
headers={
"content-type": "application/json",
},
repeat=True,
)
We will have a subscriber that collects the messages that the sync loop publishes, and at the end compare the result to the expected set of messages. When the sync loop publishes a message indicating that the sync has resumed, we will terminate the sync loop.
messages = []
def subscriber(msg) -> None:
messages.append(msg)
if isinstance(msg, client.SyncResumed):
c.stop_sync()
c.publisher.subscribe((client.SyncFailed, client.SyncResumed), subscriber)
We then start the sync and wait for the sync task to complete (which it will do
by throwing a CancelledError
when the handler tells it to stop). We can then
test the messages that we captured against an expected set of messages.
with mock.patch("asyncio.sleep") as sleep:
c.start_sync()
try:
await c.sync_task
except asyncio.CancelledError:
pass
assert messages == [client.SyncFailed(2), client.SyncResumed()]
assert sleep.call_args == mock.call(2)
Receiving room events¶
Rather than trying to describe the entire sync response in one section, we will look at the parts of the sync response that we need to develop the features that are interested in. In this section, we are only interested in receiving room events.
The sync response has a rooms
property, which is a JSON object that contains
information about rooms that the user is interacting with. It can have four
properties within it:
invite
, which gives information about rooms that the user was invited to since the last sync;join
, which gives information about rooms that the user is currently in and that have had activity since the last sync;knock
, which gives information about rooms that the user has “knocked” upon (that is, the user has requested to join the room) since the last sync, andleave
, which gives information about rooms that the user has left since the last sync.
These four properties are all maps from room ID to room data. For receiving events
from rooms, we will be interested in the join
and leave
properties, and only in
specific parts of these properties; we will discuss other parts in later chapters.
The join
property’s schema is a superset of the leave
property’s schema.
if "rooms" in body:
{{process sync > rooms}}
"rooms": schema.Optional(
{
"join": schema.Optional(
schema.Object(
{
{{sync > rooms > leave schema}}
# FIXME: plus other properties
}
)
),
"leave": schema.Optional(
schema.Object(
{
{{sync > rooms > leave schema}}
}
)
),
# FIXME: plus other properties
}
),
This looks like, for example:
{
"rooms": {
"join": {
"!roomid1": {
// room data
}
},
"leave": {
"!roomid2": {
// room data
}
}
}
}
Note that the join
property will only list rooms where some activity has taken
place; it will not list all the rooms that the user is in.
The room data may contain several properties; here we will only look at the
state
and timeline
properties. The timeline
property is a JSON object
with three properties: events
, limited
, and prev_batch
. The events
property is a list of new events, which can be message events or state events,
that were sent to the room since the previous sync. The events will not
include the room_id
property, since the room ID is already implied. However,
not all events that were sent may be present: the server will generally limit
the number of events that it sends per room in the sync response to avoid
sending too much data. A limit can also be set as part of the filter
query
parameter passed to GET /sync
— for more about filtering, see {TODO:
filtering section}. If there are more events than what the server is willing
to send, or has been told to send, the server will send only the most recent
events, and set the limited
property to true
. If all the events are
included, then the limited
property will be set to false
or will not be
present. The prev_batch
property contains a token that can be used to obtain
the previous batch of messages, if needed, by calling the GET /rooms/{roomId}/messages
endpoint — this will be discussed in another chapter.
The state
property within the room data is a JSON object with one property:
events
, which is an array of state events (again, without the room_id
property
since the room ID is already implied). This gives the state changes in the
room that have occurred in between the last sync call and the beginning of the
events listed in the timeline
property. Unlike the events listed in the
timeline
property, these will not be limited. The purpose for this property is to
inform the client of state changes that would otherwise have been missed.
"timeline": schema.Optional(
{
"events": schema.Array(
events.RoomEvent.schema_without_room_id()
),
"limited": schema.Optional(
bool
),
"prev_batch": schema.Optional(
str
),
}
),
"state": schema.Optional(
{
"events": schema.Optional(
schema.Array(
events.StateEvent.schema_without_room_id()
)
),
}
),
In an interactive client that displays events in a timeline to the user, the
client should only display the events that are listed in the timeline
property;
if events from the state
property are displayed, they may be in the wrong place.
If a client internally tracks the room’s state, it should apply state events
from both the state
and the timeline
properties, first applying all the
events from state
, and then the events from timeline
.
For example, suppose that Alice is in a room with Bob. Alice sends a message,
“Hello Bob!”. She then sets the room name, avatar, and topic (m.room.name
,
m.room.avatar
, and m.room.topic
state events), and invites Carol (an
m.room.membership
state event). She then sends another message, “Hello
Carol!”. Carol joins the room (an m.room.membership
state event), and sends
“Hello Alice!”. Alice and Carol then have a long conversation and then Alice
changes the room avatar again. The room timeline will then look like:
...
Alice: Hello Bob!
<Alice sets m.room.name>
<Alice sets m.room.avatar>
<Alice sets m.room.topic>
<Alice invites Carol>
Alice: Hello Carol!
<Carol joins>
Carol: Hello Alice!
... Alice and Carol converse ...
<Alice sets m.room.avatar again>
Suppose that Bob’s client was suspended during this time, and he comes back online. His client does a sync to get the recent messages. The sync response that he receives will look something like:
{
"rooms": {
"join": {
"state": {
"events": [
// <Alice sets m.room.name>,
// <Alice sets m.room.avatar>,
// <Alice sets m.room.topic>,
// <Alice invites Carol>,
// <Carol joins>
]
},
"timeline": {
"events": [
// the last few lines of Alice and Carol's conversation ...,
// <Alice sets m.room.avatar again>
],
"limited": true,
"prev_batch": "sometoken"
}
}
},
// ...
}
Bob’s client, then, will show the events from the timeline
property: the last
few lines of Alice and Carol’s conversation, and Alice setting the room
avatar. (If needed, his client can use the prev_batch
token to load previous
messages to display.) Bob’s client will also apply the state events from the
state
property, and then the timeline
property, to its local storage of the room
state. In this way, it will display the new room name, avatar, and topic, and
it will know that Carol is in the room. Since it applies the state events from
state
first, the m.room.avatar
state event in the timeline
property will
override the one from the state
property, which is what we want.
We can now write our code to extract the events and notify the application (or
any other subscribers) via our publisher. There are several ways in which this
can be done, some of which may be better for different uses. For example, if
the application is a bot that responds to messages, it may be easiest to simply
publish a message for each room event that is received. The approach we will
take here is to use three different types of messages: one essentially
replicating the the state
property, one essentially replicating the timeline
property, and one that indicates that the user has left the room (when the room is
in the left
property rather than the join
property of the sync). This means that
the application needs to do some more processing of the messages, but ensures
that no information is lost, in case the application wants to make use of
anything.
class RoomStateUpdates(typing.NamedTuple):
"""A message giving state updates for a room from sync"""
room_id: str
events: list[events.StateEvent]
@staticmethod
def create_from_sync(room_id: str, state_updates: dict) -> "RoomStateUpdates":
state_events = [
events.StateEvent(room_id=room_id, **e)
for e in state_updates.get("events", [])
]
return RoomStateUpdates(room_id, state_events)
RoomStateUpdates.room_id.__doc__ = "The room ID"
RoomStateUpdates.events.__doc__ = "The state events"
class RoomTimelineUpdates(typing.NamedTuple):
"""A message giving timeline for a room from sync"""
room_id: str
events: list[events.RoomEvent]
limited: bool
prev_batch: typing.Optional[str]
@staticmethod
def create_from_sync(room_id: str, timeline_updates: dict) -> "RoomTimelineUpdates":
timeline_events = [
(
events.StateEvent(room_id=room_id, **e)
if "state_key" in e
else events.RoomEvent(room_id=room_id, **e)
)
for e in timeline_updates.get("events", [])
]
return RoomTimelineUpdates(
room_id,
timeline_events,
timeline_updates.get("limited", False),
timeline_updates.get("prev_batch", None),
)
RoomTimelineUpdates.room_id.__doc__ = "The room ID"
RoomTimelineUpdates.events.__doc__ = "The state events"
RoomTimelineUpdates.limited.__doc__ = "Whether the timeline update was limited"
RoomTimelineUpdates.prev_batch.__doc__ = "The token to retrieve previous messages"
class LeftRoom(typing.NamedTuple):
"""A message indicating that the user has left a room"""
room_id: str
LeftRoom.room_id.__doc__ = "The ID of the room that the user left"
if "join" in body["rooms"]:
for room_id, room_data in body["rooms"]["join"].items():
{{publish room data messages}}
if "leave" in body["rooms"]:
for room_id, room_data in body["rooms"]["leave"].items():
{{publish room data messages}}
await self.publisher.publish(LeftRoom(room_id))
if "state" in room_data:
await self.publisher.publish(
RoomStateUpdates.create_from_sync(
room_id, room_data["state"]
),
)
if "timeline" in room_data:
await self.publisher.publish(
RoomTimelineUpdates.create_from_sync(
room_id, room_data["timeline"]
),
)
Tests
@pytest.mark.asyncio
async def test_sync_room_events(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
{{room event sync test}}
Our test that we can obtain events from the sync will be structured similarly
to our test above that we can handle errors. We will generate three sync
responses. The first will have a room in join
, which will have an event in
state
and an event in timeline
. The timeline
will be marked as
limited
. The second will have a the same room, but in leave
, and will have
an event in timeline
. The timeline
will not be marked as limited
. The
third will be empty, and have the next_batch
pointing back to itself.
mock_aioresponse.get(
"https://matrix-client.example.org/_matrix/client/v3/sync?timeout=30000",
status=200,
body=json.dumps(
{
"rooms": {
"join": {
"!roomid": {
"state": {
"events": [
{
"type": "m.room.name",
"state_key": "",
"sender": "@alice:example.org",
"content": {
"name": "Jabberwocky",
},
"event_id": "$event1",
"origin_server_ts": 1234567890123,
"unsigned": {
"age": 4321,
},
},
],
},
"timeline": {
"events": [
{
"type": "m.room.message",
"sender": "@alice:example.org",
"content": {
"body": "'Twas brillig, and the slithy toves",
},
"event_id": "$event2",
"origin_server_ts": 1234567892123,
"unsigned": {
"age": 2321,
},
},
],
"limited": True,
"prev_batch": "prev1",
},
},
},
},
"next_batch": "token1",
}
),
headers={
"content-type": "application/json",
},
)
mock_aioresponse.get(
"https://matrix-client.example.org/_matrix/client/v3/sync?since=token1&timeout=30000",
status=200,
body=json.dumps(
{
"rooms": {
"leave": {
"!roomid": {
"timeline": {
"events": [
{
"type": "m.room.message",
"sender": "@alice:example.org",
"content": {
"body": "Did gyre and gimble in the wabe",
},
"event_id": "$event3",
"origin_server_ts": 1234567894123,
"unsigned": {
"age": 1234,
},
},
],
"prev_batch": "prev2",
},
},
},
},
"next_batch": "token2",
}
),
headers={
"content-type": "application/json",
},
)
mock_aioresponse.get(
"https://matrix-client.example.org/_matrix/client/v3/sync?since=token2&timeout=30000",
status=200,
body='{"next_batch":"token2"}',
headers={
"content-type": "application/json",
},
repeat=True,
)
Our subscriber will capture the messages published by the sync task. Once the
LeftRoom
message is published (the last message that we will get from the
sync responses), or a sync failure is detected, the subscriber will stop the
sync loop.
messages = []
def subscriber(msg) -> None:
messages.append(msg)
if isinstance(msg, client.LeftRoom) or isinstance(msg, client.SyncFailed):
c.stop_sync()
c.publisher.subscribe(
(
client.RoomStateUpdates,
client.RoomTimelineUpdates,
client.LeftRoom,
client.SyncFailed,
),
subscriber,
)
We then start the sync and wait for the sync task to complete (which it will do
by throwing a CancelledError
when the handler tells it to stop). We can then
test the messages that we captured against an expected set of messages.
c.start_sync()
try:
await c.sync_task
except asyncio.CancelledError:
pass
assert messages == [
client.RoomStateUpdates(
"!roomid",
[
StateEvent(
room_id="!roomid",
type="m.room.name",
state_key="",
sender="@alice:example.org",
content={"name": "Jabberwocky"},
event_id="$event1",
origin_server_ts=1234567890123,
unsigned={"age": 4321},
)
],
),
client.RoomTimelineUpdates(
"!roomid",
[
RoomEvent(
room_id="!roomid",
type="m.room.message",
sender="@alice:example.org",
content={"body": "'Twas brillig, and the slithy toves"},
event_id="$event2",
origin_server_ts=1234567892123,
unsigned={"age": 2321},
)
],
True,
"prev1",
),
client.RoomTimelineUpdates(
"!roomid",
[
RoomEvent(
room_id="!roomid",
type="m.room.message",
sender="@alice:example.org",
content={"body": "Did gyre and gimble in the wabe"},
event_id="$event3",
origin_server_ts=1234567894123,
unsigned={"age": 1234},
)
],
False,
"prev2",
),
client.LeftRoom("!roomid"),
]
Todo
note that events may arrive “out of order” due to backfilling, federation delays, etc.
Todo
explain how sent events show up in the sync (transaction ID)
Todo
note on reliability, making sure we don’t lose events on crashes/termination
Example: echo bot¶
We now have enough to build a simple echo bot. This bot will echo whatever message is sent to a room. As usual, it relies on having previously logged in with the login script.
# {{copyright}}
"""Echo events"""
import asyncio
import json
import sys
import typing
from matrixlib import client
from matrixlib import events
{{json file storage}}
async def main() -> None:
global clientref
async with client.Client(storage=JsonFileStorage()) as c:
{{echo example}}
asyncio.run(main())
First, we create a subscriber so that we can receive events. Since we only
need message events, we only need to handle RoomTimelineUpdates
messages and
not RoomStateUpdates
or LeftRoom
messages. Our event handler will loop
through the events and search for normal text message events, that is, message
events of type m.room.message
with a body
property and with msgtype: m.text
. It is also good for bots to skip over messages sent by the bot’s
user.
async def timeline_subscriber(updates: client.RoomTimelineUpdates) -> None:
for event in updates.events:
if (
isinstance(event, events.StateEvent)
or event.type != "m.room.message"
or "body" not in event.content
or event.content.get("msgtype") != "m.text"
or event.sender == c.user_id
):
continue
{{echo bot: process message}}
If the message body is “!quit”, we will take that as a signal to exit, so we will stop the sync and stop processing. (The bot can also be stopped by pressing the interrupt key — usually Ctrl-C.)
if event.content["body"] == "!quit":
c.stop_sync()
return
Otherwise, we take the body
from the message to create a new message event
content with msgtype: m.notice
. We can then send it as a new
m.room.message
event to the room.
content = {"body": event.content["body"], "msgtype": "m.notice"}
await c.send_event(updates.room_id, "m.room.message", content)
Now that we have completed the subscriber, we can register it and then start the sync. We will then wait until the sync task is done.
c.publisher.subscribe(client.RoomTimelineUpdates, timeline_subscriber)
c.start_sync()
await typing.cast(asyncio.Task[None], c.sync_task)
If you run this script, it should echo back any messages that are sent to any rooms that the bot user is in.
Tracking room state¶
Many clients will need to keep track of the room’s current state. So we will
write a separate class that does this by subscribing to the sync messages from
the Client
class and updating the storage with the latest state.
For our library, we will separate functionality into other classes rather than
putting it all in our Client
class, to allow applications to decide whether
or not to use the functionality. It also keeps our Client
class smaller. A
library in which most uses would need such functionality could make it part of,
or accessible from, its Client
class, rather that making it separate.
# {{copyright}}
"""Room-related functionality"""
import typing
from . import client
from . import events
{{rooms module classes}}
class RoomStateTracker:
"""Tracks room state"""
{{RoomStateTracker class methods}}
The initialization function will take a client object and will subscribe to the
RoomStateUpdates
and RoomTimelineUpdates
messages. Note that the room
state tracker should be created before the client’s sync loop is started so
that it doesn’t miss any events.
def __init__(self, c: client.Client):
self.client = c
c.publisher.subscribe(
(client.RoomStateUpdates, client.RoomTimelineUpdates), self._subscriber
)
In our subscriber function, we will get the current state from the client storage, update it with the events from the message, and then save it back to the client storage.
def _subscriber(
self, msg: typing.Union[client.RoomStateUpdates, client.RoomTimelineUpdates]
) -> None:
state = self.client.storage.get("room_state_tracker", {})
room_state = state.setdefault(msg.room_id, {})
for e in msg.events:
# loop through the state events and save them under room_state[type][state_key]
if isinstance(e, events.StateEvent):
room_state.setdefault(e.type, {})[e.state_key] = e.to_dict()
self.client.storage["room_state_tracker"] = state
We then provide a method to get the current state at a given event type and state key, and to get all the current state for a given type.
def get_state(
self,
room_id: str,
event_type: str,
state_key: str = "",
) -> typing.Optional[events.StateEvent]:
"""Get the current state for the given event type and state key
Returns ``None`` if no state is found.
"""
state = self.client.storage.get("room_state_tracker", {})
event = state.get(room_id, {}).get(event_type, {}).get(state_key)
return events.StateEvent(**event) if event else None
def get_all_state_for_type(
self,
room_id: str,
event_type: str,
) -> dict[str, events.StateEvent]:
"""Get all the current state for the given event type
Returns a ``dict`` mapping from state key to state event.
"""
state = self.client.storage.get("room_state_tracker", {})
state_events = state.get(room_id, {}).get(event_type, {})
return {
name: events.StateEvent(**value) for name, value in state_events.items()
}
Tests
@pytest.mark.asyncio
async def test_room_state_tracking(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
{{room state tracker test}}
To test the tracker, we first create a tracker object linked to the client.
tracker = rooms.RoomStateTracker(c)
We then prepare a sync response to update the state.
mock_aioresponse.get(
"https://matrix-client.example.org/_matrix/client/v3/sync?timeout=30000",
status=200,
body=json.dumps(
{
"rooms": {
"join": {
"!roomid": {
"state": {
"events": [
{
"type": "m.room.name",
"state_key": "",
"sender": "@alice:example.org",
"content": {
"name": "Room name",
},
"event_id": "$event1",
"origin_server_ts": 1234567890123,
"unsigned": {
"age": 4321,
},
},
{
"type": "m.room.topic",
"state_key": "",
"sender": "@alice:example.org",
"content": {
"topic": "Room about stuff",
},
"event_id": "$event2",
"origin_server_ts": 1234567890123,
"unsigned": {
"age": 4321,
},
},
],
},
"timeline": {
"events": [
{
"type": "m.room.member",
"state_key": "@bob:example.org",
"sender": "@bob:example.org",
"content": {
"membership": "join",
},
"event_id": "$event3",
"origin_server_ts": 1234567892123,
"unsigned": {
"age": 2321,
},
},
{
"type": "m.room.name",
"state_key": "",
"sender": "@alice:example.org",
"content": {
"name": "New room name",
},
"event_id": "$event4",
"origin_server_ts": 1234567893123,
"unsigned": {
"age": 1321,
},
},
],
"limited": True,
"prev_batch": "prev1",
},
},
},
},
"next_batch": "token1",
}
),
headers={
"content-type": "application/json",
},
)
mock_aioresponse.get(
"https://matrix-client.example.org/_matrix/client/v3/sync?since=token1&timeout=30000",
status=200,
body='{"next_batch":"token1"}',
headers={
"content-type": "application/json",
},
repeat=True,
)
We create a subscriber that will stop the sync when the RoomTimelineUpdates
message is sync, or if the sync fails, and we start the sync loop.
def subscriber(msg) -> None:
c.stop_sync()
c.publisher.subscribe(
(client.RoomTimelineUpdates, client.SyncFailed), subscriber
)
c.start_sync()
try:
await c.sync_task
except asyncio.CancelledError:
pass
We then check the state tracker and ensure that it has the correct state.
assert tracker.get_state("!roomid", "m.room.name") == StateEvent(
room_id="!roomid",
type="m.room.name",
state_key="",
sender="@alice:example.org",
content={"name": "New room name"},
event_id="$event4",
origin_server_ts=1234567893123,
unsigned={"age": 1321},
)
assert tracker.get_state("!roomid", "m.room.topic") == StateEvent(
room_id="!roomid",
type="m.room.topic",
state_key="",
sender="@alice:example.org",
content={"topic": "Room about stuff"},
event_id="$event2",
origin_server_ts=1234567890123,
unsigned={"age": 4321},
)
assert (
tracker.get_state("!roomid", "m.room.member", "@carol:example.org") == None
)
assert tracker.get_state("!another_room", "m.room.name") == None
assert tracker.get_all_state_for_type("!roomid", "m.room.member") == {
"@bob:example.org": StateEvent(
room_id="!roomid",
type="m.room.member",
state_key="@bob:example.org",
sender="@bob:example.org",
content={"membership": "join"},
event_id="$event3",
origin_server_ts=1234567892123,
unsigned={"age": 2321},
),
}
Todo
sliding sync