Base concepts

This section introduces the base concepts of Matrix. Most of the topics mentioned in here will be discussed in more detail in other sections.

In Matrix, each user has an account on a server, called the user’s homeserver. Users communicate with each other primarily by sending events to rooms. Users must be in a room to send an event, and must have permission to send events; the events sent to a room are distributed to all other users who are members of the room.

There are various types of identifiers for referring to different things in Matrix. Identifiers start with a sigil to indicate what kind of things they refer to. User IDs start with an @ symbol, and consist of a username (also called a localpart) and a server name, separated by a :, for example, @alice:example.org. Room IDs start with an ! and event IDs start with a $. In general, clients should not parse room IDs and event IDs beyond checking the sigil. Also, clients often do not need to parse user IDs; one exception is splitting off the server name from the user ID to determine and locate the user’s homeserver. See the Discovery section for more information.

Each room has a globally unique room ID, and can have one or more room alias. Room IDs are not intended to be human-readable, so aliases are used to assign a human-readable name to them so that they can be found more easily. Room aliases can be added to rooms, removed from rooms, or moved from one room to another, at the discretion of the homeserver. Room aliases start with a # symbol, and consist of a localpart and a server name, separated by a :. Again, clients usually do not need to parse room aliases; one exception is that in some situations, a client may wish to display aliases that are local to a user’s homeserver separately from aliases that are remote.

API Requests

Matrix clients talk to their homeserver by making HTTP requests. All client-server requests are prefixed by the path /_matrix/client/. Usually, this prefix is followed by a version indicator (such as v3), and then followed by the endpoint name. For example, https://example.org/_matrix/client/v3/login. The HTTP request and response bodies (including error responses) are JSON objects, except in the case of a small number of endpoints.

API conventions

Request parameters can be passed in three places: in the path, in a query string, or in the request body. Path parameters are usually written as {paramName} when giving an endpoint name, such as /rooms/{roomId}. Note that path and query parameters may contain special characters that will need to be URL-encoded.

Time durations are usually given in milliseconds. Times are given as milliseconds since the UNIX epoch (00:00:00 UTC on 1 January 1970), also referred to as a millisecond timestamp.

To ensure that retried requests do not cause operations to be repeated, some endpoints use a transaction ID, a unique opaque string generated by the client, to identify requests. Transaction IDs, including a method of generating them, will be discussed in the section on Sending events to a room.

At times, we will be accessing large streams of data. It would be impractical for the server to send all the data at once. Instead, several endpoints will send a chunk of data along with a token that can be used to obtain the next chunk of data.

Handling errors

Since most HTTP response bodies are JSON objects, we create a function, for use internally, that will parse a response and raise an exception if there was an error. An error could occur either due to failure to parse the response body as JSON, or if the server indicated that an error occurred. Errors in Matrix are indicated by an appropriate HTTP status code, and the response body is a JSON object with errcode (giving a machine-readable error code) and error (giving a human-readable error description) properties.

client module functions:
async def check_response(
    resp: aiohttp.ClientResponse,
) -> tuple[int, dict[str, typing.Any]]:
    """Checks whether an HTTP response is an error.

    If successful, returns the HTTP code and the response body.  Raises an
    exception on error.
    """
    try:
        if resp.status < 200 or resp.status >= 400:
            result = await resp.json()
            if "errcode" not in result:
                raise error.NotMatrixServerError()
            raise error.MatrixError(resp.status, result)
        else:
            return (resp.status, await resp.json())
    except error.MatrixError:
        raise
    except:
        raise error.NotMatrixServerError()

The exceptions that we raise in this function either indicate that the server that responded to our request was not a Matrix homeserver (it did not respond in an expected manner), or it will wrap the error response from the server.

src/matrixlib/error.py:
# {{copyright}}

"""Error handling"""


{{error module classes}}
error module classes:
class NotMatrixServerError(RuntimeError):
    """The server is not a Matrix server"""

    pass


class MatrixError(RuntimeError):
    """Wraps a Matrix API error"""

    def __init__(self, code, body):
        self.code = code
        self.body = body

    # FIXME: add a __str__ function

Note

If check_response indicates that the server is not a Matrix homeserver, it does not necessarily mean that URL does not point to a Matrix homeserver. Instead, the erroneous response could merely be a temporary condition, for example, caused by a proxy server.

Tests

Following good software development practices, we write tests to ensure that our code runs correctly. The tests can also be used as examples to show how functions can be called. The tests can be run using pytest, and we use aioresponses when necessary to generate fake HTTP responses for aiohttp, so we can run our tests without needing to run an actual server anywhere.

We define a pytest fixture for aioresponses:

tests/conftest.py:
from aioresponses import aioresponses
import pytest


{{common test fixtures}}
common test fixtures:
@pytest.fixture
def mock_aioresponse():
    with aioresponses() as m:
        yield m

One downside to this approach, rather than testing against a real Matrix homeserver, is that we could have errors in our tests and so be inadvertently testing incorrectly. But this method gives us an easy way to test with minimal dependencies, gives us more control over the server response, and allows us to inspect our client’s requests. There are also other testing strategies for Matrix clients, which are discussed later on. TODO: link to section. Ideally, a Matrix library would have multiple types of tests, but for the sake of simplicity, we will only write the one type of test in this book.

Here we test our check_response function.

tests/test_base.py:
# {{copyright}}

import aiohttp
import pytest
import time
import typing
import unittest.mock as mock

from matrixlib import client
from matrixlib import error
from matrixlib import schema


{{test base}}
test base:
@pytest.mark.asyncio
async def test_check_response(mock_aioresponse):
    async with aiohttp.ClientSession() as session:
        {{check_response tests}}

First, we test that a response that looks good is accepted.

check_response tests:
mock_aioresponse.get(
    "http://example.org/ok",
    status=200,
    body="{}",
    headers={
        "content-type": "application/json",
    },
)
async with session.get("http://example.org/ok") as resp:
    assert await client.check_response(resp) == (200, {})

Next we test that a response that isn’t JSON isn’t accepted, even if the HTTP code is OK.

check_response tests:
mock_aioresponse.get(
    "http://example.org/not_json",
    status=200,
    body="OK",
    headers={
        "content-type": "text/plain",
    },
)
with pytest.raises(error.NotMatrixServerError):
    async with session.get("http://example.org/not_json") as resp:
        await client.check_response(resp)

If we get a response that looks JSON-y, but doesn’t have the right content type, we should also error.

check_response tests:
mock_aioresponse.get(
    "http://example.org/not_json_content_type",
    status=200,
    body="{}",
    headers={
        "content-type": "text/plain",
    },
)
with pytest.raises(error.NotMatrixServerError):
    async with session.get("http://example.org/not_json_content_type") as resp:
        await client.check_response(resp)

And finally, if we have a not-OK status code, but the body is a JSON object with an errcode property, we should capture that as a Matrix error.

check_response tests:
mock_aioresponse.get(
    "http://example.org/matrix_error",
    status=400,
    body='{"errcode":"M_UNKNOWN","error":"Unknown"}',
    headers={
        "content-type": "application/json",
    },
)
with pytest.raises(error.MatrixError):
    async with session.get("http://example.org/matrix_error") as resp:
        await client.check_response(resp)

Retrying requests

If we get an error response, we may wish to retry the request under certain circumstances. One situation in which we will want to retry the request is if the error response from the server indicates that the request has been rejected due to rate limiting. That is, we have made too many requests in a short period of time, and the server wants us to slow down. This is indicated by an HTTP status code of 429 (Too Many Requests) with a Retry-After HTTP header (giving a delay in seconds before the request should be retried), and/or a Matrix error code of M_LIMIT_EXCEEDED with a retry_after_ms property in the response body (giving a delay in milliseconds). If the response has both the Retry-After HTTP header and the retry_after_ms property, the Retry-After header takes precedence (though they should be roughly equivalent).

Another situation in which we will want to retry is if we either get no response from the server, or a non-Matrix response, as this could indicate that the homeserver is temporarily down and that we got a response from a proxy server. In this case, we will do exponential backoff: we will wait some time before trying again, and if the request keeps failing, we will increase our delay exponentially between retries until we reach some predetermined time limit.

Since retrying requests will be a frequent operation, we create a function to do this. It will take a time limit, an aiohttp request function to call (e.g. session.get), and the arguments to pass to the request function. It will perform the request, retrying (with appropriate delays between attempts) when necessary until the time limit is reached.

client module functions:
async def retry(limit: int, req_func, *args, **kwargs) -> aiohttp.ClientResponse:
    """Retry a request until a time limit is reached.

    The request will be retried if a non-Matrix response is received, or if
    the request was rate limited.

    Note that this will not apply a timeout to the actual request: if the
    server responds slowly, then it may exceed the retry time limit.  To limit
    the time taken by the request, a `timeout` argument should be passed to
    the request function.

    Arguments:

    ``limit``:
        the time limit, in milliseconds
    ``req_func``:
        the function to call to make the request.  e.g. to make a ``GET``
        request, this could be ``session.get``, where ``session`` is an
        ``aiohttp.ClientSession``.
    ``*args, **kwargs``:
        the arguments to pass to ``req_func``.
    """
    end_time = time.monotonic_ns() + limit * 1_000_000
    backoff = 2

    while True:
        resp = await req_func(*args, **kwargs)
        # FIXME: handle no response (aiohttp.ClientConnectionError)
        if resp.status < 400:
            # not an error response, so return
            return resp

        try:
            # try to parse the error
            result = await resp.json()
        except:
            if time.monotonic_ns() > end_time:
                return resp

            await resp.release()
            # doesn't look like a Matrix server -- exponential backoff
            # but if the backoff would take us past our limit, wait until our
            # time limit and make one last attempt
            delay = min(backoff, (end_time - time.monotonic_ns()) / 1_000_000)
            await asyncio.sleep(delay)
            backoff = backoff * 2
            continue

        if time.monotonic_ns() > end_time:
            return resp
        elif (
            resp.status == 429  # Too Many Requests
            and "Retry-After" in resp.headers
            and re.match(r"^[\d]+$", resp.headers["Retry-After"])
        ):
            await resp.release()
            delay = int(resp.headers["Retry-After"])
            # we're rate limited -- wait for the requested amount
            # in this case, if the delay would take us past our limit, there's
            # no point in trying again because the server says it will still be
            # rejected, so just return
            if time.monotonic_ns() + delay * 1_000_000_000 > end_time:
                return resp
            await asyncio.sleep(delay)
            continue
        elif (
            "errcode" in result
            and result["errcode"] == "M_LIMIT_EXCEEDED"
            and type(result.get("retry_after_ms")) == int
        ):
            await resp.release()
            delay_ms = result["retry_after_ms"]
            if time.monotonic_ns() + delay_ms * 1_000_000 > end_time:
                return resp
            await asyncio.sleep(math.ceil(delay_ms / 1_000))
            continue
        else:
            # some other error, so return and let the application deal with it
            return resp
Tests
test base:
@pytest.mark.asyncio
async def test_retry(mock_aioresponse):
    async with aiohttp.ClientSession() as session:
        {{retry tests}}

We test our retry function under different scenarios. First, we test the situation where the server is down and we get a response from a proxy:

retry tests:
mock_aioresponse.get("http://example.org/", status=504, body="Timeout")
mock_aioresponse.get("http://example.org/", status=504, body="Timeout")
mock_aioresponse.get("http://example.org/", status=200, body="OK")
# mock asyncio.sleep so that the test doesn't actually need to wait for
# multiple seconds
with mock.patch("asyncio.sleep") as sleep:
    async with await client.retry(
        10_000, session.get, "http://example.org/"
    ) as resp:
        # check that we got the final result
        assert resp.status == 200
        assert await resp.text() == "OK"
        # check that our mocked asyncio.sleep was called and that we're
        # backing off exponentially
        assert sleep.call_args_list == [mock.call(2), mock.call(4)]

Next, we test the situations where we are rate limited. We check both the cases where the server includes a Retry-After HTTP header and a retry_after_ms property.

retry tests:
mock_aioresponse.get(
    "http://example.org/",
    status=429,
    body='{"errcode":"M_LIMIT_EXCEEDED","retry_after_ms":100}',
    headers={
        "content-type": "application/json",
    },
)
mock_aioresponse.get("http://example.org/", status=200, body="OK")
with mock.patch("asyncio.sleep") as sleep:
    resp = await client.retry(10_000, session.get, "http://example.org/")
    async with resp:
        # check that we got the final result
        assert resp.status == 200
        assert await resp.text() == "OK"
        # check that we waited some time before retrying
        assert sleep.call_args == mock.call(1)

mock_aioresponse.get(
    "http://example.org/",
    status=429,
    body='{"errcode":"M_LIMIT_EXCEEDED"}',
    headers={
        "content-type": "application/json",
        "retry-after": "1",
    },
)
mock_aioresponse.get("http://example.org/", status=200, body="OK")
with mock.patch("asyncio.sleep") as sleep:
    resp = await client.retry(10_000, session.get, "http://example.org/")
    async with resp:
        # check that we got the final result
        assert resp.status == 200
        assert await resp.text() == "OK"
        # check that we waited some time before retrying
        assert sleep.call_args == mock.call(1)

We also test the situation where we are rate limited, but where the requested retry interval takes us past our time limit. In this case, we should only make the initial request and then return the error response.

retry tests:
mock_aioresponse.get(
    "http://example.org/",
    status=429,
    body='{"errcode":"M_LIMIT_EXCEEDED","retry_after_ms":100000000000}',
    headers={
        "content-type": "application/json",
    },
)
start_time = time.monotonic_ns()
resp = await client.retry(10_000, session.get, "http://example.org/")
async with resp:
    # check that we got the final result
    assert resp.status == 429
    # check that we didn't wait (the request time should be less than 1s)
    assert time.monotonic_ns() < start_time + 1_000_000_000

Finally, we test the situations where we get an OK response and a Matrix error from the server, in which cases, we should get the result immediately without retrying.

retry tests:
mock_aioresponse.get(
    "http://example.org/",
    status=200,
    body="{}",
    headers={
        "content-type": "application/json",
    },
)
start_time = time.monotonic_ns()
resp = await client.retry(10_000, session.get, "http://example.org/")
async with resp:
    # check that we got the final result
    assert resp.status == 200
    # check that we didn't wait (the request time should be less than 1s)
    assert time.monotonic_ns() < start_time + 1_000_000_000

mock_aioresponse.get(
    "http://example.org/",
    status=404,
    body='{"errcode":"M_NOT_FOUND"}',
    headers={
        "content-type": "application/json",
    },
)
start_time = time.monotonic_ns()
resp = await client.retry(10_000, session.get, "http://example.org/")
async with resp:
    # check that we got the final result
    assert resp.status == 404
    # check that we didn't wait (the request time should be less than 1s)
    assert time.monotonic_ns() < start_time + 1_000_000_000

Validating response bodies

After we have parsed the response body as JSON, we will want to retrieve the information. However, we first want to ensure that it follows the expected data format so that we do not inadvertently process corrupted data. The Matrix spec defines a schema for each endpoint, defining what properties are required or optional, and what data type each property should be. We will write a function to do some basic validation on the data.

Note

Some Matrix libraries do not have separate validation functions. Instead, they may try to validate responses while values are being retrieved. For the purposes of this book, the code seems to be clearer with the response validated first, before the values are retrieved. Either way, clients should not assume that servers will send correctly-formatted responses, and should perform some sort of validation.

src/matrixlib/schema.py:
# {{copyright}}

"""Validate JSON data according to a given schema"""

import typing
from . import error


{{schema module classes}}


{{schema module functions}}

We start with some helper classes to support more complex types.

schema module classes:
@typing.runtime_checkable
class SchemaHelper(typing.Protocol):
    """Base class for schema helper classes"""

    def is_valid(self, body: typing.Any) -> bool:
        ...  # pragma: no cover


class Optional(SchemaHelper):
    """Indicates that a parameter is optional"""

    def __init__(self, base_type: typing.Any):
        self.base_type = base_type

    def is_valid(self, body: typing.Any) -> bool:
        # if we're given a value, check against the base type
        return is_valid(body, self.base_type)


class Union(SchemaHelper):
    """Indicates that a parameter may be one of multiple types"""

    def __init__(self, *base_types: list[typing.Any]):
        self.base_types = base_types

    def is_valid(self, body: typing.Any) -> bool:
        for t in self.base_types:
            if is_valid(body, t):
                return True
        return False


class Intersection(SchemaHelper):
    """Indicates that a parameter must satisfy all of the given types"""

    def __init__(self, *base_types: list[typing.Any]):
        self.base_types = base_types

    def is_valid(self, body: typing.Any) -> bool:
        for t in self.base_types:
            if not is_valid(body, t):
                return False
        return True

We then write the main function for actually checking a value against a schema. The schema can be a normal Python type (int, str, etc.), an instance of one of our helper classes above, or a dict indicating that the value must be a dict with the given properties (possibly with other properties as well).

schema module functions:
def is_valid(body: typing.Any, schema: typing.Any) -> bool:
    """Check if the JSON data is valid according to the given schema

    Example:

    >>> is_valid({"foo": 1, "bar": True}, {"foo": int, "bar": bool})
    True

    """
    if schema == typing.Any:
        return True
    elif type(schema) == type:
        return type(body) == schema
    elif type(schema) == dict:
        # if schema is a dict, then body must also be a dict that contains
        # the keys given in schema (unless marked as optional), and the types
        # match
        if type(body) != dict:
            return False
        for key, value_schema in dict.items(schema):
            if key not in body:
                return isinstance(value_schema, Optional)
            if not is_valid(body[key], value_schema):
                return False
        return True
    elif isinstance(schema, SchemaHelper):
        return schema.is_valid(body)
    elif typing.get_origin(schema) == list:
        # if we have a list of a type, check that the body is a list, and then
        # check that each element matches the type
        if type(body) != list:
            return False
        base = typing.get_args(schema)[0]
        for item in body:
            if not is_valid(item, base):
                return False
        return True
    elif typing.get_origin(schema) == dict:
        # if we have a dict with arguments, the first argument must be str
        # (because JSON only allows string keys), and every value must match
        # the second argument
        if type(body) != dict:
            return False
        key_type, base = typing.get_args(schema)
        if key_type != str:
            return False
        for item in dict.values(body):
            if not is_valid(item, base):
                return False
        return True
    else:
        return False

Even though we can use dict and list as schemas, these cannot be used to match objects or lists that contain more complicated types. For example, we cannot do list[Union([int, bool])]. To handle this, we create helper classes for objects and lists that contain more complicated types. For these classes, we will use the JSON terms “object” and “array” rather than the Python terms “dict” and “list”.

schema module classes:
class Object(SchemaHelper):
    """Indicates an object that contains values of a given type"""

    def __init__(self, value_type: typing.Any):
        self.value_type = value_type

    def is_valid(self, body: typing.Any) -> bool:
        return type(body) == dict and all(
            (is_valid(value, self.value_type) for value in body.values())
        )


class Array(SchemaHelper):
    """Indicates an array that contains values of a given type"""

    def __init__(self, value_type: typing.Any):
        self.value_type = value_type

    def is_valid(self, body: typing.Any) -> bool:
        return type(body) == list and all(
            (is_valid(value, self.value_type) for value in body)
        )

We will also provide a convenience function that validates a response body and raises an exception if the validation fails:

schema module functions:
def ensure_valid(body: typing.Any, schema: typing.Any) -> None:
    """Throw an exception if the JSON data is not valid"""
    if not is_valid(body, schema):
        raise error.InvalidResponseError()
error module classes:
class InvalidResponseError(RuntimeError):
    """The server's response did not match the expected format"""

    pass
Tests
test base:
def test_schema():
    {{is_valid tests}}

We test some basic types.

is_valid tests:
assert schema.is_valid(1, typing.Any)
assert schema.is_valid(1, int)
assert not schema.is_valid(1, str)
assert schema.is_valid([1], list)

Next, we test that we can specify the schema of dicts.

is_valid tests:
assert schema.is_valid({"foo": "bar"}, {"foo": str})
assert not schema.is_valid({"foo": 1}, {"foo": str})
assert not schema.is_valid({}, {"foo": str})
assert not schema.is_valid(1, {})

And we test that we can specify optional values. If the property is marked as optional, and is present, it must match the given type.

is_valid tests:
assert schema.is_valid({"foo": "bar"}, {"foo": schema.Optional(str)})
assert not schema.is_valid({"foo": 1}, {"foo": schema.Optional(str)})
assert schema.is_valid({}, {"foo": schema.Optional(str)})

We test that unions accept all specified types, but no others.

is_valid tests:
assert schema.is_valid(1, schema.Union(str, int, bool))
assert schema.is_valid("foo", schema.Union(str, int, bool))
assert schema.is_valid(True, schema.Union(str, int, bool))
assert not schema.is_valid([], schema.Union(str, int, bool))
assert not schema.is_valid({}, schema.Union(str, int, bool))

We test that we can have lists and dicts of a single type.

is_valid tests:
assert schema.is_valid([1, 2], list[int])
assert not schema.is_valid([1, "foo"], list[int])
assert not schema.is_valid(1, list[int])
assert schema.is_valid({"foo": True, "bar": False}, dict[str, bool])
assert not schema.is_valid({"foo": True, "bar": 1}, dict[str, bool])
assert not schema.is_valid(1, dict[str, bool])

assert schema.is_valid([1, True], schema.Array(schema.Union(int, bool)))
assert not schema.is_valid([1, "foo"], schema.Array(schema.Union(int, bool)))
assert schema.is_valid(
    {"foo": 1, "bar": True}, schema.Object(schema.Union(int, bool))
)
assert not schema.is_valid(
    {"foo": 1, "bar": "baz"}, schema.Object(schema.Union(int, bool))
)

Note

Our validation API uses the fact that in Python, we can pass types around as data. Not all languages support this feature; in other languages where this is not supported, different techniques can be used. One possibility is to simply use a helper class, similar to our schema.Union or schema.Intersection classes, for each data type. This would give us a validation functioun that would look like, for example, schema.is_valid([1], schema.List(schema.Integer)).

Another possibility is to use strings (or atoms or symbols, in languages that support those constructs) to represent the base types, and using several more branches to the if..elif..else block. This would yield a validation function that would look like, for example, schema.is_valid([1], schema.List("integer")).

The shape of the validation API depends on the language features, the author’s preferred style, and what is idiomatic in the language.

Authenticated requests

Requests can either by authenticated or unauthenticated: authenticated requests are requests where the server knows which client is performing the request. Requests are authenticated through the use of an access token, which is given to the client upon login or registration. For more information, see the section on Authenticated requests.