Receiving events in a room: the sync loop

Now that we know how to send events, we need to know how to receive events. Matrix uses a technique called long-polling for receiving events. Usually, in a polling model, the client asks the server if there events ready, and the server responds immediately either saying that there are no events, or returning the events. This causes a problem in that clients must determine an appropriate polling interval — if the client polls too often, it uses a lot of bandwidth and power (especially since there is some overhead in making requests to the server); if the client doesn’t poll often enough, there will be a delay in receiving messages.

In a long-polling model, the client queries the server, but the server waits to respond until either a timeout is reached, or when new events are available. In this way, a client could set a long timeout (often 30 seconds or more), giving a long polling interval if there are no events available. On the other hand, when an event comes, the client will receive it immediately.

Matrix clients use the GET /sync endpoint to wait for events. The term “event” here is used loosely to refer to “something that happens” rather than specifically to the Matrix events that were discussed previously. To be sure, Matrix events are received through GET /sync, but other information is included as well. Here we will focus on receiving room events, and will look at other parts of the sync response in other chapters.

The GET /sync request has several request parameters, passed as query strings. One of these parameters is the timeout, which indicates how long the server should wait before responding if there are no events available. This is given in milliseconds, so to wait 30 seconds, for example, we would pass a value of “30000”. If no value is given, the server will respond immediately, giving a standard polling behaviour. The timeout should not be too long, or the connection could be terminated by proxies thinking that the server is not responding. A timeout of 30 seconds seems to work fairly well.

The since parameter is used to tell the server what events the client has already received. The first time that a client calls GET /sync, it will omit the since parameter. When the server responds, it will include a next_batch property. The client will then use this as the since parameter to the next GET /sync request. The server will then only include events that occur after the position marked by the token. Each time the server responds to a GET /sync request, it will include a next_batch token that the client will use for the next GET /sync request.

Without the token, the server will behave as if the client has not received any events yet. So if a client fails to include the since parameter, the server will most likely respond immediately since it thinks that it has events that the client has not received yet, and the client will receive duplicate events. Alternatively, the GET /sync endpoint could have been designed so that the server keeps track of the events that it has sent the client, and automatically returns subsequent events each time the client calls GET /sync. However, if one response gets lost for any reason, the client will miss some events. Thus the since parameter is used to ensure both that the client does not receive duplicate events and that it does not miss events.

There are several other parameters, but they will be discussed later on; the timeout and since parameters are the ones that we will focus on for now.

We now create a function to perform what is called the “sync loop”: the client makes a GET /sync request, waits for the response, processes the response, and then makes a new GET /sync request. However, since this loop runs forever, we want it to run concurrently with the application’s other code. Otherwise, we would have a hard time, for example, sending events while also waiting for events. Programming languages have their own ways of allowing code to run concurrently; in Python, we use asyncio.create_task. We will create a function in our Client class to start the sync loop, and another to stop it.

Client class initialization:
self.sync_task: typing.Optional[asyncio.Task[None]] = None
Client class methods:
def start_sync(self):
    """Start the sync loop"""
    if self.sync_task != None:
        # sync already started, so do nothing
        return
    self.sync_task = asyncio.create_task(self._run_sync_loop())

def stop_sync(self):
    """Stop the sync loop"""
    if self.sync_task == None:
        # no sync running, so do nothing
        return
    self.sync_task.cancel()
    self.sync_task = None

When we close the client (such as by calling client.close() – see the Client section), we make sure to stop the sync loop.

client cleanup:
self.stop_sync()

We are now ready to write the main skeleton of our sync loop. We will make a GET /sync request, and if the request fails for any reason, we will notify the application of the failure using our publish/subscribe pattern by publishing a SyncFailed message so that it can, for example, inform the user. We then wait some time before retrying. If the sync recovers, we will notify the application by publishing a SyncResumed message. On a successful request, we retrieve the next_batch property to use as the since parameter for the next request, and process the other sync data.

Client class methods:
async def _run_sync_loop(self) -> None:
    backoff = 2
    while True:
        params = {"timeout": "30000"}
        if "sync_token" in self.storage:
            params["since"] = self.storage["sync_token"]
        # TODO: add other params
        try:
            resp = await self.authenticated(
                self.http_session.get,
                self.url("v3/sync"),
                params=params,
            )
        except asyncio.CancelledError:
            return
        except:
            e = sys.exc_info()[1]
            logging.error(
                "Sync call failed (%r).  Waiting %d s and retrying.", e, backoff
            )
            await self.publisher.publish(SyncFailed(backoff))
            await asyncio.sleep(backoff)
            backoff = min(backoff * 2, 30)
            continue

        async with resp:
            try:
                code, body = await check_response(resp)
                schema.ensure_valid(
                    body,
                    {
                        "next_batch": str,
                        {{sync schema}}
                    },
                )
            except asyncio.CancelledError:
                return
            except:
                e = sys.exc_info()[1]
                logging.error(
                    "Sync call failed (%r).  Waiting %d s and retrying.", e, backoff
                )
                await self.publisher.publish(SyncFailed(backoff))
                await asyncio.sleep(backoff)
                backoff = min(backoff * 2, 30)
                continue

            if self.sync_task == None:
                # the sync was stopped, so we shouldn't process the result.  We
                # should get a CancelledError if the sync was stopped, but check
                # anyways, just in case
                return

            if backoff != 2:
                # the previous request failed
                await self.publisher.publish(SyncResumed())
                backoff = 2

            self.storage["sync_token"] = body["next_batch"]

            {{process sync response}}
client module classes:
class SyncFailed(typing.NamedTuple):
    """A message indicating that a sync call has failed"""

    delay: int


SyncFailed.delay.__doc__ = "The amount of time that the loop will wait before retrying"


class SyncResumed:
    """A message indicating that the sync loop has resumed after a previous failure"""

    def __eq__(self, other):
        return other.__class__ == SyncResumed
Tests
tests/test_sync.py:
# {{copyright}}

import asyncio
import json
import pytest
import unittest.mock as mock

from matrixlib import client
from matrixlib.events import Event, RoomEvent, StateEvent
from matrixlib import rooms


{{test sync}}

We test that the sync loop can handle an error and recover from it.

test sync:
@pytest.mark.asyncio
async def test_sync_retry(mock_aioresponse):
    async with client.Client(
        storage={
            "access_token": "anaccesstoken",
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
        },
        callbacks={},
        base_client_url="https://matrix-client.example.org/_matrix/client/",
    ) as c:
        {{sync retry test}}

We will generate a failed sync response, and then two successful sync responses. The second successful sync response will have a next_batch token that points back to itself, so that the request can be repeated.

sync retry test:
mock_aioresponse.get(
    "https://matrix-client.example.org/_matrix/client/v3/sync?timeout=30000",
    status=500,
    body="Error",
)
mock_aioresponse.get(
    "https://matrix-client.example.org/_matrix/client/v3/sync?timeout=30000",
    status=200,
    body='{"next_batch":"token"}',
    headers={
        "content-type": "application/json",
    },
)
mock_aioresponse.get(
    "https://matrix-client.example.org/_matrix/client/v3/sync?since=token&timeout=30000",
    status=200,
    body='{"next_batch":"token"}',
    headers={
        "content-type": "application/json",
    },
    repeat=True,
)

We will have a subscriber that collects the messages that the sync loop publishes, and at the end compare the result to the expected set of messages. When the sync loop publishes a message indicating that the sync has resumed, we will terminate the sync loop.

sync retry test:
messages = []

def subscriber(msg) -> None:
    messages.append(msg)
    if isinstance(msg, client.SyncResumed):
        c.stop_sync()

c.publisher.subscribe((client.SyncFailed, client.SyncResumed), subscriber)

We then start the sync and wait for the sync task to complete (which it will do by throwing a CancelledError when the handler tells it to stop). We can then test the messages that we captured against an expected set of messages.

sync retry test:
with mock.patch("asyncio.sleep") as sleep:
    c.start_sync()

    try:
        await c.sync_task
    except asyncio.CancelledError:
        pass

    assert messages == [client.SyncFailed(2), client.SyncResumed()]
    assert sleep.call_args == mock.call(2)

Receiving room events

Rather than trying to describe the entire sync response in one section, we will look at the parts of the sync response that we need to develop the features that are interested in. In this section, we are only interested in receiving room events.

The sync response has a rooms property, which is a JSON object that contains information about rooms that the user is interacting with. It can have four properties within it:

  • invite, which gives information about rooms that the user was invited to since the last sync;

  • join, which gives information about rooms that the user is currently in and that have had activity since the last sync;

  • knock, which gives information about rooms that the user has “knocked” upon (that is, the user has requested to join the room) since the last sync, and

  • leave, which gives information about rooms that the user has left since the last sync.

These four properties are all maps from room ID to room data. For receiving events from rooms, we will be interested in the join and leave properties, and only in specific parts of these properties; we will discuss other parts in later chapters. The join property’s schema is a superset of the leave property’s schema.

process sync response:
if "rooms" in body:
    {{process sync > rooms}}
sync schema:
"rooms": schema.Optional(
    {
        "join": schema.Optional(
            schema.Object(
                {
                    {{sync > rooms > leave schema}}
                    # FIXME: plus other properties
                }
            )
        ),
        "leave": schema.Optional(
            schema.Object(
                {
                    {{sync > rooms > leave schema}}
                }
            )
        ),
        # FIXME: plus other properties
    }
),

This looks like, for example:

{
    "rooms": {
        "join": {
            "!roomid1": {
                // room data
            }
        },
        "leave": {
            "!roomid2": {
                // room data
            }
        }
    }
}

Note that the join property will only list rooms where some activity has taken place; it will not list all the rooms that the user is in.

The room data may contain several properties; here we will only look at the state and timeline properties. The timeline property is a JSON object with three properties: events, limited, and prev_batch. The events property is a list of new events, which can be message events or state events, that were sent to the room since the previous sync. The events will not include the room_id property, since the room ID is already implied. However, not all events that were sent may be present: the server will generally limit the number of events that it sends per room in the sync response to avoid sending too much data. A limit can also be set as part of the filter query parameter passed to GET /sync — for more about filtering, see {TODO: filtering section}. If there are more events than what the server is willing to send, or has been told to send, the server will send only the most recent events, and set the limited property to true. If all the events are included, then the limited property will be set to false or will not be present. The prev_batch property contains a token that can be used to obtain the previous batch of messages, if needed, by calling the GET /rooms/{roomId}/messages endpoint — this will be discussed in another chapter.

The state property within the room data is a JSON object with one property: events, which is an array of state events (again, without the room_id property since the room ID is already implied). This gives the state changes in the room that have occurred in between the last sync call and the beginning of the events listed in the timeline property. Unlike the events listed in the timeline property, these will not be limited. The purpose for this property is to inform the client of state changes that would otherwise have been missed.

sync > rooms > leave schema:
"timeline": schema.Optional(
    {
        "events": schema.Array(
            events.RoomEvent.schema_without_room_id()
        ),
        "limited": schema.Optional(
            bool
        ),
        "prev_batch": schema.Optional(
            str
        ),
    }
),
"state": schema.Optional(
    {
        "events": schema.Optional(
            schema.Array(
                events.StateEvent.schema_without_room_id()
            )
        ),
    }
),

In an interactive client that displays events in a timeline to the user, the client should only display the events that are listed in the timeline property; if events from the state property are displayed, they may be in the wrong place. If a client internally tracks the room’s state, it should apply state events from both the state and the timeline properties, first applying all the events from state, and then the events from timeline.

For example, suppose that Alice is in a room with Bob. Alice sends a message, “Hello Bob!”. She then sets the room name, avatar, and topic (m.room.name, m.room.avatar, and m.room.topic state events), and invites Carol (an m.room.membership state event). She then sends another message, “Hello Carol!”. Carol joins the room (an m.room.membership state event), and sends “Hello Alice!”. Alice and Carol then have a long conversation and then Alice changes the room avatar again. The room timeline will then look like:

...
Alice: Hello Bob!
<Alice sets m.room.name>
<Alice sets m.room.avatar>
<Alice sets m.room.topic>
<Alice invites Carol>
Alice: Hello Carol!
<Carol joins>
Carol: Hello Alice!
... Alice and Carol converse ...
<Alice sets m.room.avatar again>

Suppose that Bob’s client was suspended during this time, and he comes back online. His client does a sync to get the recent messages. The sync response that he receives will look something like:

{
  "rooms": {
    "join": {
      "state": {
        "events": [
          // <Alice sets m.room.name>,
          // <Alice sets m.room.avatar>,
          // <Alice sets m.room.topic>,
          // <Alice invites Carol>,
          // <Carol joins>
        ]
      },
      "timeline": {
        "events": [
          // the last few lines of Alice and Carol's conversation ...,
          // <Alice sets m.room.avatar again>
        ],
        "limited": true,
        "prev_batch": "sometoken"
      }
    }
  },
  // ...
}

Bob’s client, then, will show the events from the timeline property: the last few lines of Alice and Carol’s conversation, and Alice setting the room avatar. (If needed, his client can use the prev_batch token to load previous messages to display.) Bob’s client will also apply the state events from the state property, and then the timeline property, to its local storage of the room state. In this way, it will display the new room name, avatar, and topic, and it will know that Carol is in the room. Since it applies the state events from state first, the m.room.avatar state event in the timeline property will override the one from the state property, which is what we want.

We can now write our code to extract the events and notify the application (or any other subscribers) via our publisher. There are several ways in which this can be done, some of which may be better for different uses. For example, if the application is a bot that responds to messages, it may be easiest to simply publish a message for each room event that is received. The approach we will take here is to use three different types of messages: one essentially replicating the the state property, one essentially replicating the timeline property, and one that indicates that the user has left the room (when the room is in the left property rather than the join property of the sync). This means that the application needs to do some more processing of the messages, but ensures that no information is lost, in case the application wants to make use of anything.

client module classes:
class RoomStateUpdates(typing.NamedTuple):
    """A message giving state updates for a room from sync"""

    room_id: str
    events: list[events.StateEvent]

    @staticmethod
    def create_from_sync(room_id: str, state_updates: dict) -> "RoomStateUpdates":
        state_events = [
            events.StateEvent(room_id=room_id, **e)
            for e in state_updates.get("events", [])
        ]
        return RoomStateUpdates(room_id, state_events)


RoomStateUpdates.room_id.__doc__ = "The room ID"
RoomStateUpdates.events.__doc__ = "The state events"


class RoomTimelineUpdates(typing.NamedTuple):
    """A message giving timeline for a room from sync"""

    room_id: str
    events: list[events.RoomEvent]
    limited: bool
    prev_batch: typing.Optional[str]

    @staticmethod
    def create_from_sync(room_id: str, timeline_updates: dict) -> "RoomTimelineUpdates":
        timeline_events = [
            (
                events.StateEvent(room_id=room_id, **e)
                if "state_key" in e
                else events.RoomEvent(room_id=room_id, **e)
            )
            for e in timeline_updates.get("events", [])
        ]
        return RoomTimelineUpdates(
            room_id,
            timeline_events,
            timeline_updates.get("limited", False),
            timeline_updates.get("prev_batch", None),
        )


RoomTimelineUpdates.room_id.__doc__ = "The room ID"
RoomTimelineUpdates.events.__doc__ = "The state events"
RoomTimelineUpdates.limited.__doc__ = "Whether the timeline update was limited"
RoomTimelineUpdates.prev_batch.__doc__ = "The token to retrieve previous messages"


class LeftRoom(typing.NamedTuple):
    """A message indicating that the user has left a room"""

    room_id: str


LeftRoom.room_id.__doc__ = "The ID of the room that the user left"
process sync > rooms:
if "join" in body["rooms"]:
    for room_id, room_data in body["rooms"]["join"].items():
        {{publish room data messages}}

if "leave" in body["rooms"]:
    for room_id, room_data in body["rooms"]["leave"].items():
        {{publish room data messages}}
        await self.publisher.publish(LeftRoom(room_id))
publish room data messages:
if "state" in room_data:
    await self.publisher.publish(
        RoomStateUpdates.create_from_sync(
            room_id, room_data["state"]
        ),
    )
if "timeline" in room_data:
    await self.publisher.publish(
        RoomTimelineUpdates.create_from_sync(
            room_id, room_data["timeline"]
        ),
    )
Tests
test sync:
@pytest.mark.asyncio
async def test_sync_room_events(mock_aioresponse):
    async with client.Client(
        storage={
            "access_token": "anaccesstoken",
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
        },
        callbacks={},
        base_client_url="https://matrix-client.example.org/_matrix/client/",
    ) as c:
        {{room event sync test}}

Our test that we can obtain events from the sync will be structured similarly to our test above that we can handle errors. We will generate three sync responses. The first will have a room in join, which will have an event in state and an event in timeline. The timeline will be marked as limited. The second will have a the same room, but in leave, and will have an event in timeline. The timeline will not be marked as limited. The third will be empty, and have the next_batch pointing back to itself.

room event sync test:
mock_aioresponse.get(
    "https://matrix-client.example.org/_matrix/client/v3/sync?timeout=30000",
    status=200,
    body=json.dumps(
        {
            "rooms": {
                "join": {
                    "!roomid": {
                        "state": {
                            "events": [
                                {
                                    "type": "m.room.name",
                                    "state_key": "",
                                    "sender": "@alice:example.org",
                                    "content": {
                                        "name": "Jabberwocky",
                                    },
                                    "event_id": "$event1",
                                    "origin_server_ts": 1234567890123,
                                    "unsigned": {
                                        "age": 4321,
                                    },
                                },
                            ],
                        },
                        "timeline": {
                            "events": [
                                {
                                    "type": "m.room.message",
                                    "sender": "@alice:example.org",
                                    "content": {
                                        "body": "'Twas brillig, and the slithy toves",
                                    },
                                    "event_id": "$event2",
                                    "origin_server_ts": 1234567892123,
                                    "unsigned": {
                                        "age": 2321,
                                    },
                                },
                            ],
                            "limited": True,
                            "prev_batch": "prev1",
                        },
                    },
                },
            },
            "next_batch": "token1",
        }
    ),
    headers={
        "content-type": "application/json",
    },
)
mock_aioresponse.get(
    "https://matrix-client.example.org/_matrix/client/v3/sync?since=token1&timeout=30000",
    status=200,
    body=json.dumps(
        {
            "rooms": {
                "leave": {
                    "!roomid": {
                        "timeline": {
                            "events": [
                                {
                                    "type": "m.room.message",
                                    "sender": "@alice:example.org",
                                    "content": {
                                        "body": "Did gyre and gimble in the wabe",
                                    },
                                    "event_id": "$event3",
                                    "origin_server_ts": 1234567894123,
                                    "unsigned": {
                                        "age": 1234,
                                    },
                                },
                            ],
                            "prev_batch": "prev2",
                        },
                    },
                },
            },
            "next_batch": "token2",
        }
    ),
    headers={
        "content-type": "application/json",
    },
)
mock_aioresponse.get(
    "https://matrix-client.example.org/_matrix/client/v3/sync?since=token2&timeout=30000",
    status=200,
    body='{"next_batch":"token2"}',
    headers={
        "content-type": "application/json",
    },
    repeat=True,
)

Our subscriber will capture the messages published by the sync task. Once the LeftRoom message is published (the last message that we will get from the sync responses), or a sync failure is detected, the subscriber will stop the sync loop.

room event sync test:
messages = []

def subscriber(msg) -> None:
    messages.append(msg)
    if isinstance(msg, client.LeftRoom) or isinstance(msg, client.SyncFailed):
        c.stop_sync()

c.publisher.subscribe(
    (
        client.RoomStateUpdates,
        client.RoomTimelineUpdates,
        client.LeftRoom,
        client.SyncFailed,
    ),
    subscriber,
)

We then start the sync and wait for the sync task to complete (which it will do by throwing a CancelledError when the handler tells it to stop). We can then test the messages that we captured against an expected set of messages.

room event sync test:
c.start_sync()

try:
    await c.sync_task
except asyncio.CancelledError:
    pass

assert messages == [
    client.RoomStateUpdates(
        "!roomid",
        [
            StateEvent(
                room_id="!roomid",
                type="m.room.name",
                state_key="",
                sender="@alice:example.org",
                content={"name": "Jabberwocky"},
                event_id="$event1",
                origin_server_ts=1234567890123,
                unsigned={"age": 4321},
            )
        ],
    ),
    client.RoomTimelineUpdates(
        "!roomid",
        [
            RoomEvent(
                room_id="!roomid",
                type="m.room.message",
                sender="@alice:example.org",
                content={"body": "'Twas brillig, and the slithy toves"},
                event_id="$event2",
                origin_server_ts=1234567892123,
                unsigned={"age": 2321},
            )
        ],
        True,
        "prev1",
    ),
    client.RoomTimelineUpdates(
        "!roomid",
        [
            RoomEvent(
                room_id="!roomid",
                type="m.room.message",
                sender="@alice:example.org",
                content={"body": "Did gyre and gimble in the wabe"},
                event_id="$event3",
                origin_server_ts=1234567894123,
                unsigned={"age": 1234},
            )
        ],
        False,
        "prev2",
    ),
    client.LeftRoom("!roomid"),
]

Todo

note that events may arrive “out of order” due to backfilling, federation delays, etc.

Todo

explain how sent events show up in the sync (transaction ID)

Todo

note on reliability, making sure we don’t lose events on crashes/termination

Example: echo bot

We now have enough to build a simple echo bot. This bot will echo whatever message is sent to a room. As usual, it relies on having previously logged in with the login script.

examples/echo.py:
# {{copyright}}

"""Echo events"""

import asyncio
import json
import sys
import typing

from matrixlib import client
from matrixlib import events


{{json file storage}}


async def main() -> None:
    global clientref
    async with client.Client(storage=JsonFileStorage()) as c:

        {{echo example}}


asyncio.run(main())

First, we create a subscriber so that we can receive events. Since we only need message events, we only need to handle RoomTimelineUpdates messages and not RoomStateUpdates or LeftRoom messages. Our event handler will loop through the events and search for normal text message events, that is, message events of type m.room.message with a body property and with msgtype: m.text. It is also good for bots to skip over messages sent by the bot’s user.

echo example:
async def timeline_subscriber(updates: client.RoomTimelineUpdates) -> None:
    for event in updates.events:
        if (
            isinstance(event, events.StateEvent)
            or event.type != "m.room.message"
            or "body" not in event.content
            or event.content.get("msgtype") != "m.text"
            or event.sender == c.user_id
        ):
            continue
        {{echo bot: process message}}

If the message body is “!quit”, we will take that as a signal to exit, so we will stop the sync and stop processing. (The bot can also be stopped by pressing the interrupt key — usually Ctrl-C.)

echo bot: process message:
if event.content["body"] == "!quit":
    c.stop_sync()
    return

Otherwise, we take the body from the message to create a new message event content with msgtype: m.notice. We can then send it as a new m.room.message event to the room.

echo bot: process message:
content = {"body": event.content["body"], "msgtype": "m.notice"}
await c.send_event(updates.room_id, "m.room.message", content)

Now that we have completed the subscriber, we can register it and then start the sync. We will then wait until the sync task is done.

echo example:
c.publisher.subscribe(client.RoomTimelineUpdates, timeline_subscriber)
c.start_sync()

await typing.cast(asyncio.Task[None], c.sync_task)

If you run this script, it should echo back any messages that are sent to any rooms that the bot user is in.

Tracking room state

Many clients will need to keep track of the room’s current state. So we will write a separate class that does this by subscribing to the sync messages from the Client class and updating the storage with the latest state.

For our library, we will separate functionality into other classes rather than putting it all in our Client class, to allow applications to decide whether or not to use the functionality. It also keeps our Client class smaller. A library in which most uses would need such functionality could make it part of, or accessible from, its Client class, rather that making it separate.

src/matrixlib/rooms.py:
# {{copyright}}

"""Room-related functionality"""

import typing

from . import client
from . import events


{{rooms module classes}}
rooms module classes:
class RoomStateTracker:
    """Tracks room state"""

    {{RoomStateTracker class methods}}

The initialization function will take a client object and will subscribe to the RoomStateUpdates and RoomTimelineUpdates messages. Note that the room state tracker should be created before the client’s sync loop is started so that it doesn’t miss any events.

RoomStateTracker class methods:
def __init__(self, c: client.Client):
    self.client = c
    c.publisher.subscribe(
        (client.RoomStateUpdates, client.RoomTimelineUpdates), self._subscriber
    )

In our subscriber function, we will get the current state from the client storage, update it with the events from the message, and then save it back to the client storage.

RoomStateTracker class methods:
def _subscriber(
    self, msg: typing.Union[client.RoomStateUpdates, client.RoomTimelineUpdates]
) -> None:
    state = self.client.storage.get("room_state_tracker", {})
    room_state = state.setdefault(msg.room_id, {})
    for e in msg.events:
        # loop through the state events and save them under room_state[type][state_key]
        if isinstance(e, events.StateEvent):
            room_state.setdefault(e.type, {})[e.state_key] = e.to_dict()
    self.client.storage["room_state_tracker"] = state

We then provide a method to get the current state at a given event type and state key, and to get all the current state for a given type.

RoomStateTracker class methods:
def get_state(
    self,
    room_id: str,
    event_type: str,
    state_key: str = "",
) -> typing.Optional[events.StateEvent]:
    """Get the current state for the given event type and state key

    Returns ``None`` if no state is found.
    """
    state = self.client.storage.get("room_state_tracker", {})
    event = state.get(room_id, {}).get(event_type, {}).get(state_key)
    return events.StateEvent(**event) if event else None

def get_all_state_for_type(
    self,
    room_id: str,
    event_type: str,
) -> dict[str, events.StateEvent]:
    """Get all the current state for the given event type

    Returns a ``dict`` mapping from state key to state event.
    """
    state = self.client.storage.get("room_state_tracker", {})
    state_events = state.get(room_id, {}).get(event_type, {})
    return {
        name: events.StateEvent(**value) for name, value in state_events.items()
    }
Tests
test sync:
@pytest.mark.asyncio
async def test_room_state_tracking(mock_aioresponse):
    async with client.Client(
        storage={
            "access_token": "anaccesstoken",
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
        },
        callbacks={},
        base_client_url="https://matrix-client.example.org/_matrix/client/",
    ) as c:
        {{room state tracker test}}

To test the tracker, we first create a tracker object linked to the client.

room state tracker test:
tracker = rooms.RoomStateTracker(c)

We then prepare a sync response to update the state.

room state tracker test:
mock_aioresponse.get(
    "https://matrix-client.example.org/_matrix/client/v3/sync?timeout=30000",
    status=200,
    body=json.dumps(
        {
            "rooms": {
                "join": {
                    "!roomid": {
                        "state": {
                            "events": [
                                {
                                    "type": "m.room.name",
                                    "state_key": "",
                                    "sender": "@alice:example.org",
                                    "content": {
                                        "name": "Room name",
                                    },
                                    "event_id": "$event1",
                                    "origin_server_ts": 1234567890123,
                                    "unsigned": {
                                        "age": 4321,
                                    },
                                },
                                {
                                    "type": "m.room.topic",
                                    "state_key": "",
                                    "sender": "@alice:example.org",
                                    "content": {
                                        "topic": "Room about stuff",
                                    },
                                    "event_id": "$event2",
                                    "origin_server_ts": 1234567890123,
                                    "unsigned": {
                                        "age": 4321,
                                    },
                                },
                            ],
                        },
                        "timeline": {
                            "events": [
                                {
                                    "type": "m.room.member",
                                    "state_key": "@bob:example.org",
                                    "sender": "@bob:example.org",
                                    "content": {
                                        "membership": "join",
                                    },
                                    "event_id": "$event3",
                                    "origin_server_ts": 1234567892123,
                                    "unsigned": {
                                        "age": 2321,
                                    },
                                },
                                {
                                    "type": "m.room.name",
                                    "state_key": "",
                                    "sender": "@alice:example.org",
                                    "content": {
                                        "name": "New room name",
                                    },
                                    "event_id": "$event4",
                                    "origin_server_ts": 1234567893123,
                                    "unsigned": {
                                        "age": 1321,
                                    },
                                },
                            ],
                            "limited": True,
                            "prev_batch": "prev1",
                        },
                    },
                },
            },
            "next_batch": "token1",
        }
    ),
    headers={
        "content-type": "application/json",
    },
)
mock_aioresponse.get(
    "https://matrix-client.example.org/_matrix/client/v3/sync?since=token1&timeout=30000",
    status=200,
    body='{"next_batch":"token1"}',
    headers={
        "content-type": "application/json",
    },
    repeat=True,
)

We create a subscriber that will stop the sync when the RoomTimelineUpdates message is sync, or if the sync fails, and we start the sync loop.

room state tracker test:
def subscriber(msg) -> None:
    c.stop_sync()

c.publisher.subscribe(
    (client.RoomTimelineUpdates, client.SyncFailed), subscriber
)

c.start_sync()

try:
    await c.sync_task
except asyncio.CancelledError:
    pass

We then check the state tracker and ensure that it has the correct state.

room state tracker test:
assert tracker.get_state("!roomid", "m.room.name") == StateEvent(
    room_id="!roomid",
    type="m.room.name",
    state_key="",
    sender="@alice:example.org",
    content={"name": "New room name"},
    event_id="$event4",
    origin_server_ts=1234567893123,
    unsigned={"age": 1321},
)

assert tracker.get_state("!roomid", "m.room.topic") == StateEvent(
    room_id="!roomid",
    type="m.room.topic",
    state_key="",
    sender="@alice:example.org",
    content={"topic": "Room about stuff"},
    event_id="$event2",
    origin_server_ts=1234567890123,
    unsigned={"age": 4321},
)

assert (
    tracker.get_state("!roomid", "m.room.member", "@carol:example.org") == None
)

assert tracker.get_state("!another_room", "m.room.name") == None

assert tracker.get_all_state_for_type("!roomid", "m.room.member") == {
    "@bob:example.org": StateEvent(
        room_id="!roomid",
        type="m.room.member",
        state_key="@bob:example.org",
        sender="@bob:example.org",
        content={"membership": "join"},
        event_id="$event3",
        origin_server_ts=1234567892123,
        unsigned={"age": 2321},
    ),
}

Todo

sliding sync