Base concepts¶
This section introduces the base concepts of Matrix. Most of the topics mentioned in here will be discussed in more detail in other sections.
In Matrix, each user has an account on a server, called the user’s homeserver. Users communicate with each other primarily by sending events to rooms. Users must be in a room to send an event, and must have permission to send events; the events sent to a room are distributed to all other users who are members of the room.
There are various types of identifiers for referring to different things in
Matrix. Identifiers start with a sigil to indicate what kind of things they
refer to. User IDs start with an @
symbol, and consist of a username (also
called a localpart) and a server name, separated by a :
, for example,
@alice:example.org
. Room IDs start with an !
and event IDs start with a
$
. In general, clients should not parse room IDs and event IDs beyond
checking the sigil. Also, clients often do not need to parse user IDs; one
exception is splitting off the server name from the user ID to determine and
locate the user’s homeserver. See the Discovery section for
more information.
Each room has a globally unique room ID, and can have one or more room alias.
Room IDs are not intended to be human-readable, so aliases are used to assign a
human-readable name to them so that they can be found more easily. Room
aliases can be added to rooms, removed from rooms, or moved from one room to
another, at the discretion of the homeserver. Room aliases start with a #
symbol, and consist of a localpart and a server name, separated by a :
.
Again, clients usually do not need to parse room aliases; one exception is that
in some situations, a client may wish to display aliases that are local to a
user’s homeserver separately from aliases that are remote.
API Requests¶
Matrix clients talk to their homeserver by making HTTP requests. All
client-server requests are prefixed by the path /_matrix/client/
. Usually,
this prefix is followed by a version indicator (such as v3
), and then
followed by the endpoint name. For example,
https://example.org/_matrix/client/v3/login
. The HTTP request and response
bodies (including error responses) are JSON objects, except in the case of a
small number of endpoints.
API conventions¶
Request parameters can be passed in three places: in the path, in a query
string, or in the request body. Path parameters are usually written as
{paramName}
when giving an endpoint name, such as /rooms/{roomId}
. Note
that path and query parameters may contain special characters that will need to
be URL-encoded.
Time durations are usually given in milliseconds. Times are given as milliseconds since the UNIX epoch (00:00:00 UTC on 1 January 1970), also referred to as a millisecond timestamp.
To ensure that retried requests do not cause operations to be repeated, some endpoints use a transaction ID, a unique opaque string generated by the client, to identify requests. Transaction IDs, including a method of generating them, will be discussed in the section on Sending events to a room.
At times, we will be accessing large streams of data. It would be impractical for the server to send all the data at once. Instead, several endpoints will send a chunk of data along with a token that can be used to obtain the next chunk of data.
Handling errors¶
Since most HTTP response bodies are JSON objects, we create a function, for use
internally, that will parse a response and raise an exception if there was an
error. An error could occur either due to failure to parse the response body
as JSON, or if the server indicated that an error occurred. Errors in Matrix
are indicated by an appropriate HTTP status code, and the response body is a
JSON object with errcode
(giving a machine-readable error code) and error
(giving a human-readable error description) properties.
async def check_response(
resp: aiohttp.ClientResponse,
) -> tuple[int, dict[str, typing.Any]]:
"""Checks whether an HTTP response is an error.
If successful, returns the HTTP code and the response body. Raises an
exception on error.
"""
try:
if resp.status < 200 or resp.status >= 400:
result = await resp.json()
if "errcode" not in result:
raise error.NotMatrixServerError()
raise error.MatrixError(resp.status, result)
else:
return (resp.status, await resp.json())
except error.MatrixError:
raise
except:
raise error.NotMatrixServerError()
The exceptions that we raise in this function either indicate that the server that responded to our request was not a Matrix homeserver (it did not respond in an expected manner), or it will wrap the error response from the server.
# {{copyright}}
"""Error handling"""
{{error module classes}}
class NotMatrixServerError(RuntimeError):
"""The server is not a Matrix server"""
pass
class MatrixError(RuntimeError):
"""Wraps a Matrix API error"""
def __init__(self, code, body):
self.code = code
self.body = body
# FIXME: add a __str__ function
Note
If check_response
indicates that the server is not a Matrix homeserver, it
does not necessarily mean that URL does not point to a Matrix homeserver.
Instead, the erroneous response could merely be a temporary condition, for
example, caused by a proxy server.
Tests
Following good software development practices, we write tests to ensure that our code runs correctly. The tests can also be used as examples to show how functions can be called. The tests can be run using pytest, and we use aioresponses when necessary to generate fake HTTP responses for aiohttp, so we can run our tests without needing to run an actual server anywhere.
We define a pytest fixture for aioresponses:
from aioresponses import aioresponses
import pytest
{{common test fixtures}}
@pytest.fixture
def mock_aioresponse():
with aioresponses() as m:
yield m
One downside to this approach, rather than testing against a real Matrix homeserver, is that we could have errors in our tests and so be inadvertently testing incorrectly. But this method gives us an easy way to test with minimal dependencies, gives us more control over the server response, and allows us to inspect our client’s requests. There are also other testing strategies for Matrix clients, which are discussed later on. TODO: link to section. Ideally, a Matrix library would have multiple types of tests, but for the sake of simplicity, we will only write the one type of test in this book.
Here we test our check_response
function.
# {{copyright}}
import aiohttp
import pytest
import time
import typing
import unittest.mock as mock
from matrixlib import client
from matrixlib import error
from matrixlib import schema
{{test base}}
@pytest.mark.asyncio
async def test_check_response(mock_aioresponse):
async with aiohttp.ClientSession() as session:
{{check_response tests}}
First, we test that a response that looks good is accepted.
mock_aioresponse.get(
"http://example.org/ok",
status=200,
body="{}",
headers={
"content-type": "application/json",
},
)
async with session.get("http://example.org/ok") as resp:
assert await client.check_response(resp) == (200, {})
Next we test that a response that isn’t JSON isn’t accepted, even if the HTTP code is OK.
mock_aioresponse.get(
"http://example.org/not_json",
status=200,
body="OK",
headers={
"content-type": "text/plain",
},
)
with pytest.raises(error.NotMatrixServerError):
async with session.get("http://example.org/not_json") as resp:
await client.check_response(resp)
If we get a response that looks JSON-y, but doesn’t have the right content type, we should also error.
mock_aioresponse.get(
"http://example.org/not_json_content_type",
status=200,
body="{}",
headers={
"content-type": "text/plain",
},
)
with pytest.raises(error.NotMatrixServerError):
async with session.get("http://example.org/not_json_content_type") as resp:
await client.check_response(resp)
And finally, if we have a not-OK status code, but the body is a JSON object
with an errcode
property, we should capture that as a Matrix error.
mock_aioresponse.get(
"http://example.org/matrix_error",
status=400,
body='{"errcode":"M_UNKNOWN","error":"Unknown"}',
headers={
"content-type": "application/json",
},
)
with pytest.raises(error.MatrixError):
async with session.get("http://example.org/matrix_error") as resp:
await client.check_response(resp)
Retrying requests¶
If we get an error response, we may wish to retry the request under certain
circumstances. One situation in which we will want to retry the request is if
the error response from the server indicates that the request has been rejected
due to rate limiting. That is, we have made too many requests in a short
period of time, and the server wants us to slow down. This is indicated by an
HTTP status code of 429 (Too Many Requests) with a Retry-After
HTTP header
(giving a delay in seconds before the request should be retried), and/or a
Matrix error code of M_LIMIT_EXCEEDED
with a retry_after_ms
property in the
response body (giving a delay in milliseconds). If the response has both the
Retry-After
HTTP header and the retry_after_ms
property, the Retry-After
header takes precedence (though they should be roughly equivalent).
Another situation in which we will want to retry is if we either get no response from the server, or a non-Matrix response, as this could indicate that the homeserver is temporarily down and that we got a response from a proxy server. In this case, we will do exponential backoff: we will wait some time before trying again, and if the request keeps failing, we will increase our delay exponentially between retries until we reach some predetermined time limit.
Since retrying requests will be a frequent operation, we create a function to
do this. It will take a time limit, an aiohttp request function to call
(e.g. session.get
), and the arguments to pass to the request function. It
will perform the request, retrying (with appropriate delays between attempts)
when necessary until the time limit is reached.
async def retry(limit: int, req_func, *args, **kwargs) -> aiohttp.ClientResponse:
"""Retry a request until a time limit is reached.
The request will be retried if a non-Matrix response is received, or if
the request was rate limited.
Note that this will not apply a timeout to the actual request: if the
server responds slowly, then it may exceed the retry time limit. To limit
the time taken by the request, a `timeout` argument should be passed to
the request function.
Arguments:
``limit``:
the time limit, in milliseconds
``req_func``:
the function to call to make the request. e.g. to make a ``GET``
request, this could be ``session.get``, where ``session`` is an
``aiohttp.ClientSession``.
``*args, **kwargs``:
the arguments to pass to ``req_func``.
"""
end_time = time.monotonic_ns() + limit * 1_000_000
backoff = 2
while True:
resp = await req_func(*args, **kwargs)
# FIXME: handle no response (aiohttp.ClientConnectionError)
if resp.status < 400:
# not an error response, so return
return resp
try:
# try to parse the error
result = await resp.json()
except:
if time.monotonic_ns() > end_time:
return resp
await resp.release()
# doesn't look like a Matrix server -- exponential backoff
# but if the backoff would take us past our limit, wait until our
# time limit and make one last attempt
delay = min(backoff, (end_time - time.monotonic_ns()) / 1_000_000)
await asyncio.sleep(delay)
backoff = backoff * 2
continue
if time.monotonic_ns() > end_time:
return resp
elif (
resp.status == 429 # Too Many Requests
and "Retry-After" in resp.headers
and re.match(r"^[\d]+$", resp.headers["Retry-After"])
):
await resp.release()
delay = int(resp.headers["Retry-After"])
# we're rate limited -- wait for the requested amount
# in this case, if the delay would take us past our limit, there's
# no point in trying again because the server says it will still be
# rejected, so just return
if time.monotonic_ns() + delay * 1_000_000_000 > end_time:
return resp
await asyncio.sleep(delay)
continue
elif (
"errcode" in result
and result["errcode"] == "M_LIMIT_EXCEEDED"
and type(result.get("retry_after_ms")) == int
):
await resp.release()
delay_ms = result["retry_after_ms"]
if time.monotonic_ns() + delay_ms * 1_000_000 > end_time:
return resp
await asyncio.sleep(math.ceil(delay_ms / 1_000))
continue
else:
# some other error, so return and let the application deal with it
return resp
Tests
@pytest.mark.asyncio
async def test_retry(mock_aioresponse):
async with aiohttp.ClientSession() as session:
{{retry tests}}
We test our retry
function under different scenarios. First, we test the
situation where the server is down and we get a response from a proxy:
mock_aioresponse.get("http://example.org/", status=504, body="Timeout")
mock_aioresponse.get("http://example.org/", status=504, body="Timeout")
mock_aioresponse.get("http://example.org/", status=200, body="OK")
# mock asyncio.sleep so that the test doesn't actually need to wait for
# multiple seconds
with mock.patch("asyncio.sleep") as sleep:
async with await client.retry(
10_000, session.get, "http://example.org/"
) as resp:
# check that we got the final result
assert resp.status == 200
assert await resp.text() == "OK"
# check that our mocked asyncio.sleep was called and that we're
# backing off exponentially
assert sleep.call_args_list == [mock.call(2), mock.call(4)]
Next, we test the situations where we are rate limited. We check both the
cases where the server includes a Retry-After
HTTP header and a
retry_after_ms
property.
mock_aioresponse.get(
"http://example.org/",
status=429,
body='{"errcode":"M_LIMIT_EXCEEDED","retry_after_ms":100}',
headers={
"content-type": "application/json",
},
)
mock_aioresponse.get("http://example.org/", status=200, body="OK")
with mock.patch("asyncio.sleep") as sleep:
resp = await client.retry(10_000, session.get, "http://example.org/")
async with resp:
# check that we got the final result
assert resp.status == 200
assert await resp.text() == "OK"
# check that we waited some time before retrying
assert sleep.call_args == mock.call(1)
mock_aioresponse.get(
"http://example.org/",
status=429,
body='{"errcode":"M_LIMIT_EXCEEDED"}',
headers={
"content-type": "application/json",
"retry-after": "1",
},
)
mock_aioresponse.get("http://example.org/", status=200, body="OK")
with mock.patch("asyncio.sleep") as sleep:
resp = await client.retry(10_000, session.get, "http://example.org/")
async with resp:
# check that we got the final result
assert resp.status == 200
assert await resp.text() == "OK"
# check that we waited some time before retrying
assert sleep.call_args == mock.call(1)
We also test the situation where we are rate limited, but where the requested retry interval takes us past our time limit. In this case, we should only make the initial request and then return the error response.
mock_aioresponse.get(
"http://example.org/",
status=429,
body='{"errcode":"M_LIMIT_EXCEEDED","retry_after_ms":100000000000}',
headers={
"content-type": "application/json",
},
)
start_time = time.monotonic_ns()
resp = await client.retry(10_000, session.get, "http://example.org/")
async with resp:
# check that we got the final result
assert resp.status == 429
# check that we didn't wait (the request time should be less than 1s)
assert time.monotonic_ns() < start_time + 1_000_000_000
Finally, we test the situations where we get an OK response and a Matrix error from the server, in which cases, we should get the result immediately without retrying.
mock_aioresponse.get(
"http://example.org/",
status=200,
body="{}",
headers={
"content-type": "application/json",
},
)
start_time = time.monotonic_ns()
resp = await client.retry(10_000, session.get, "http://example.org/")
async with resp:
# check that we got the final result
assert resp.status == 200
# check that we didn't wait (the request time should be less than 1s)
assert time.monotonic_ns() < start_time + 1_000_000_000
mock_aioresponse.get(
"http://example.org/",
status=404,
body='{"errcode":"M_NOT_FOUND"}',
headers={
"content-type": "application/json",
},
)
start_time = time.monotonic_ns()
resp = await client.retry(10_000, session.get, "http://example.org/")
async with resp:
# check that we got the final result
assert resp.status == 404
# check that we didn't wait (the request time should be less than 1s)
assert time.monotonic_ns() < start_time + 1_000_000_000
Validating response bodies¶
After we have parsed the response body as JSON, we will want to retrieve the information. However, we first want to ensure that it follows the expected data format so that we do not inadvertently process corrupted data. The Matrix spec defines a schema for each endpoint, defining what properties are required or optional, and what data type each property should be. We will write a function to do some basic validation on the data.
Note
Some Matrix libraries do not have separate validation functions. Instead, they may try to validate responses while values are being retrieved. For the purposes of this book, the code seems to be clearer with the response validated first, before the values are retrieved. Either way, clients should not assume that servers will send correctly-formatted responses, and should perform some sort of validation.
# {{copyright}}
"""Validate JSON data according to a given schema"""
import typing
from . import error
{{schema module classes}}
{{schema module functions}}
We start with some helper classes to support more complex types.
@typing.runtime_checkable
class SchemaHelper(typing.Protocol):
"""Base class for schema helper classes"""
def is_valid(self, body: typing.Any) -> bool:
... # pragma: no cover
class Optional(SchemaHelper):
"""Indicates that a parameter is optional"""
def __init__(self, base_type: typing.Any):
self.base_type = base_type
def is_valid(self, body: typing.Any) -> bool:
# if we're given a value, check against the base type
return is_valid(body, self.base_type)
class Union(SchemaHelper):
"""Indicates that a parameter may be one of multiple types"""
def __init__(self, *base_types: list[typing.Any]):
self.base_types = base_types
def is_valid(self, body: typing.Any) -> bool:
for t in self.base_types:
if is_valid(body, t):
return True
return False
class Intersection(SchemaHelper):
"""Indicates that a parameter must satisfy all of the given types"""
def __init__(self, *base_types: list[typing.Any]):
self.base_types = base_types
def is_valid(self, body: typing.Any) -> bool:
for t in self.base_types:
if not is_valid(body, t):
return False
return True
We then write the main function for actually checking a value against a schema.
The schema can be a normal Python type (int
, str
, etc.), an instance of one
of our helper classes above, or a dict
indicating that the value must be a
dict
with the given properties (possibly with other properties as well).
def is_valid(body: typing.Any, schema: typing.Any) -> bool:
"""Check if the JSON data is valid according to the given schema
Example:
>>> is_valid({"foo": 1, "bar": True}, {"foo": int, "bar": bool})
True
"""
if schema == typing.Any:
return True
elif type(schema) == type:
return type(body) == schema
elif type(schema) == dict:
# if schema is a dict, then body must also be a dict that contains
# the keys given in schema (unless marked as optional), and the types
# match
if type(body) != dict:
return False
for key, value_schema in dict.items(schema):
if key not in body:
return isinstance(value_schema, Optional)
if not is_valid(body[key], value_schema):
return False
return True
elif isinstance(schema, SchemaHelper):
return schema.is_valid(body)
elif typing.get_origin(schema) == list:
# if we have a list of a type, check that the body is a list, and then
# check that each element matches the type
if type(body) != list:
return False
base = typing.get_args(schema)[0]
for item in body:
if not is_valid(item, base):
return False
return True
elif typing.get_origin(schema) == dict:
# if we have a dict with arguments, the first argument must be str
# (because JSON only allows string keys), and every value must match
# the second argument
if type(body) != dict:
return False
key_type, base = typing.get_args(schema)
if key_type != str:
return False
for item in dict.values(body):
if not is_valid(item, base):
return False
return True
else:
return False
Even though we can use dict
and list
as schemas, these cannot be used to
match objects or lists that contain more complicated types. For example, we
cannot do list[Union([int, bool])]
. To handle this, we create helper classes
for objects and lists that contain more complicated types. For these classes,
we will use the JSON terms “object” and “array” rather than the Python terms
“dict” and “list”.
class Object(SchemaHelper):
"""Indicates an object that contains values of a given type"""
def __init__(self, value_type: typing.Any):
self.value_type = value_type
def is_valid(self, body: typing.Any) -> bool:
return type(body) == dict and all(
(is_valid(value, self.value_type) for value in body.values())
)
class Array(SchemaHelper):
"""Indicates an array that contains values of a given type"""
def __init__(self, value_type: typing.Any):
self.value_type = value_type
def is_valid(self, body: typing.Any) -> bool:
return type(body) == list and all(
(is_valid(value, self.value_type) for value in body)
)
We will also provide a convenience function that validates a response body and raises an exception if the validation fails:
def ensure_valid(body: typing.Any, schema: typing.Any) -> None:
"""Throw an exception if the JSON data is not valid"""
if not is_valid(body, schema):
raise error.InvalidResponseError()
class InvalidResponseError(RuntimeError):
"""The server's response did not match the expected format"""
pass
Tests
def test_schema():
{{is_valid tests}}
We test some basic types.
assert schema.is_valid(1, typing.Any)
assert schema.is_valid(1, int)
assert not schema.is_valid(1, str)
assert schema.is_valid([1], list)
Next, we test that we can specify the schema of dicts.
assert schema.is_valid({"foo": "bar"}, {"foo": str})
assert not schema.is_valid({"foo": 1}, {"foo": str})
assert not schema.is_valid({}, {"foo": str})
assert not schema.is_valid(1, {})
And we test that we can specify optional values. If the property is marked as optional, and is present, it must match the given type.
assert schema.is_valid({"foo": "bar"}, {"foo": schema.Optional(str)})
assert not schema.is_valid({"foo": 1}, {"foo": schema.Optional(str)})
assert schema.is_valid({}, {"foo": schema.Optional(str)})
We test that unions accept all specified types, but no others.
assert schema.is_valid(1, schema.Union(str, int, bool))
assert schema.is_valid("foo", schema.Union(str, int, bool))
assert schema.is_valid(True, schema.Union(str, int, bool))
assert not schema.is_valid([], schema.Union(str, int, bool))
assert not schema.is_valid({}, schema.Union(str, int, bool))
We test that we can have lists and dicts of a single type.
assert schema.is_valid([1, 2], list[int])
assert not schema.is_valid([1, "foo"], list[int])
assert not schema.is_valid(1, list[int])
assert schema.is_valid({"foo": True, "bar": False}, dict[str, bool])
assert not schema.is_valid({"foo": True, "bar": 1}, dict[str, bool])
assert not schema.is_valid(1, dict[str, bool])
assert schema.is_valid([1, True], schema.Array(schema.Union(int, bool)))
assert not schema.is_valid([1, "foo"], schema.Array(schema.Union(int, bool)))
assert schema.is_valid(
{"foo": 1, "bar": True}, schema.Object(schema.Union(int, bool))
)
assert not schema.is_valid(
{"foo": 1, "bar": "baz"}, schema.Object(schema.Union(int, bool))
)
Note
Our validation API uses the fact that in Python, we can pass types around as
data. Not all languages support this feature; in other languages where this is
not supported, different techniques can be used. One possibility is to simply
use a helper class, similar to our schema.Union
or schema.Intersection
classes, for each data type. This would give us a validation functioun that
would look like, for example, schema.is_valid([1], schema.List(schema.Integer))
.
Another possibility is to use strings (or atoms or symbols, in languages that
support those constructs) to represent the base types, and using several more
branches to the if..elif..else
block. This would yield a validation function
that would look like, for example, schema.is_valid([1], schema.List("integer"))
.
The shape of the validation API depends on the language features, the author’s preferred style, and what is idiomatic in the language.
Authenticated requests¶
Requests can either by authenticated or unauthenticated: authenticated requests are requests where the server knows which client is performing the request. Requests are authenticated through the use of an access token, which is given to the client upon login or registration. For more information, see the section on Authenticated requests.