Authenticated requests

As mentioned earlier, requests are authenticated by the homeserver by use of the access token that was provided at login. This allows the homeserver to know what user, and which of the user’s devices, is making the request, so that it can ensure that the user is authorized to make the requests they are making, and to ensure that the correct information is returned to them.

Providing the access token can be done in one of two ways: by adding a query parameter, or by adding a request header. Using a request header is the recommended method, since some servers and proxies may log requested URLs, including query parameters, which would expose access tokens to anyone able to read the logs.

We add a method to our Client class that will make an authenticated request. Similar to our retry function, it will take an aiohttp request function to call (e.g. self.http_session.get), and the arguments to pass to the request function, and will call the function with those arguments, but with the access token added. We will start with the simple case, where the access token works as-is, and then handle some cases where we may need to update the token somehow. That is, we will fill in the “check if access token expired” and “get new access token if needed” sections later.

Client class methods:
async def authenticated(self, req_func, *args, **kwargs) -> aiohttp.ClientResponse:
    """Make an authenticated request to the homeserver.

    Arguments:

    ``req_func``:
        the function to call to make the request.  e.g. to make an
        authenticated ``GET`` request, this would be ``self.http_session.get``.
    ``*args, **kwargs``:
        the arguments to pass to ``req_func``.
    """
    if "access_token" not in self.storage:
        raise RuntimeError("Not logged in")  # FIXME: use a custom exception

    {{check if access token expired}}

    headers = kwargs.get("headers", {})
    headers["Authorization"] = "Bearer " + self.storage["access_token"]
    kwargs["headers"] = headers
    resp = await req_func(*args, **kwargs)

    {{get new access token if needed}}

    return resp

Note that, the way this function is written, if the caller needs to add custom headers, it must pass the headers argument as a keyword argument rather than as a positional argument. Given the number of arguments that the aiohttp request functions take, this is likely to be the case.

Tests

As usual, we write tests to ensure that our code works correctly.

tests/test_authentication.py:
# {{copyright}}

import json
import pytest
import time

from matrixlib import client
from matrixlib import error


{{test authentication}}

We create a client and pre-populate the access token and other information. Only the access token is needed in the function that we are testing, but for completeness, we include other information that a client would normally have.

test authentication:
@pytest.mark.asyncio
async def test_basic_authenticated_requests(mock_aioresponse):
    async with client.Client(
        storage={
            "access_token": "anaccesstoken",
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
        },
        callbacks={},
        base_client_url="https://matrix-client.example.org/_matrix/client/",
    ) as c:
        {{basic authentication tests}}

Now we make a simple request, check that it has added the appropriate header, and make sure that it has passed through the response.

basic authentication tests:
mock_aioresponse.get(
    "https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
    body='{"displayname":"Alice"}',
    headers={
        "content-type": "application/json",
    },
)
resp = await c.authenticated(
    c.http_session.get, c.url("v3/profile/%40alice:example.org")
)
async with resp:
    mock_aioresponse.assert_called_with(
        "https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
        method="GET",
        headers={"Authorization": "Bearer anaccesstoken"},
    )
    assert resp.status == 200
    assert await resp.json() == {"displayname": "Alice"}

Todo

test another request type, e.g. POST

There are two situations where a new access token needs to be obtained: the access token needs to be refreshed by means of a refresh token, or the client has been soft logged out.

Refresh tokens

If an attacker obtains a user’s access token, they will be able to make requests to the server impersonating the user. In order to limit the impact of a compromised access token, the server may expire access tokens and require them to be refreshed through the use of a refresh token. If the server expires access tokens, the initial refresh token is given when the user logs in. When the access token expires, the client can use the refresh token to call the POST /refresh endpoint, and then a new access token and refresh token are returned.

Note

If a client does not support refresh tokens, it may indicate that by omitting the "refresh_token": true parameter from the login request. In this case, the access token will not expire, and the client does not need to implement this section. However, usage of refresh tokens is recommended for better security, and homeservers are allowed to reject logins that do not indicate support for refresh tokens (though they are currently unlikely to do so).

When we obtain the access token and the refresh token, the server will tell us when the access token will expire by telling us how long it is valid (the expires_in_ms property in the response to the POST /login endpoint). Using this, we calculated the expected expiry time, and stored it as access_token_valid_until. So we can check whether the expected expiry time has been passed, and if so, refresh the access token.

When we refresh the access token, the server may return an M_UNKNOWN_TOKEN error code, indicating that we may have been logged out. In this case, we check if it’s a soft logout, and if so, handle it as such. (See below for more details.)

check if access token expired:
if (
    "access_token_valid_until" in self.storage
    and self.storage["access_token_valid_until"] < time.time_ns()
):
    try:
        await self._refresh_access_token()
    except:
        # refresh failed, so we're logged out
        self.storage.clear()
        raise
Client class methods:
async def _refresh_access_token(self) -> None:
    """Try to refresh an expired access token"""
    req_body = {"refresh_token": self.storage["refresh_token"]}
    async with self.http_session.post(
        self.url("v3/refresh"), json=req_body
    ) as resp:
        try:
            status, resp_body = await check_response(resp)
        except error.MatrixError as e:
            if e.body["errcode"] == "M_UNKNOWN_TOKEN":
                return await self._handle_soft_logout(e.code, e.body)
            else:
                raise
        # FIXME: if error.NotMatrixServerError, try again a few times, because
        # it may be a temporary condition

        if schema.is_valid(
            resp_body,
            {
                "access_token": str,
                "expires_in_ms": schema.Optional(int),
                "refresh_token": schema.Optional(str),
            },
        ):
            # refresh successful; save the new values
            if "expires_in_ms" in resp_body:
                if "refresh_token" not in resp_body:
                    raise error.InvalidResponseError()
                self.storage["access_token_valid_until"] = (
                    time.time_ns() + resp_body["expires_in_ms"] * 1_000_000
                )
                self.storage["refresh_token"] = resp_body["refresh_token"]
            else:
                del self.storage["access_token_valid_until"]
                del self.storage["refresh_token"]
            self.storage["access_token"] = resp_body["access_token"]
        else:
            raise error.InvalidResponseError()

Note that even if we have not reached the access token’s expiry (by our calculation), we may find after making the request that we still need to refresh the access token after making the request. For example, the server may expire the access token early, the client or server’s clock may have drifted, or network delays may cause our expected expiry time to be inaccurate.

If the access token needs to be refreshed, the server will not tell us that the access token has expired, per se, but will return an error with error code M_UNKNOWN_TOKEN, indicating that the given access token is invalid. We can then try to use our refresh token (if we have one) to obtain a new access token.

If an M_UNKNOWN_TOKEN error was received, but we do not have a refresh token, then we check if we were soft logged out (again, see below).

get new access token if needed:
if resp.status >= 400 and resp.content_type == "application/json":
    body = await resp.json()
    if body.get("errcode") == "M_UNKNOWN_TOKEN":
        if "refresh_token" in self.storage:
            await self._refresh_access_token()
        else:
            await self._handle_soft_logout(resp.status, body)

        # update was successful, so retry the request
        return await self.authenticated(req_func, *args, **kwargs)
Tests
test authentication:
@pytest.mark.asyncio
async def test_refresh_token(mock_aioresponse):
    {{refresh token tests}}

We first test that we refresh the access token if it has already expired.

refresh token tests:
async with client.Client(
    storage={
        "access_token": "anaccesstoken",
        "user_id": "@alice:example.org",
        "device_id": "ABCDEFG",
        "refresh_token": "a_refresh_token",
        "access_token_valid_until": time.time_ns() - 1000,
    },
    callbacks={},
    base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
    mock_aioresponse.post(
        "https://matrix-client.example.org/_matrix/client/v3/refresh",
        body=json.dumps(
            {
                "access_token": "another_access_token",
                "expires_in_ms": 60000,
                "refresh_token": "another_refresh_token",
            },
        ),
        headers={
            "content-type": "application/json",
        },
    )
    mock_aioresponse.get(
        "https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
        body='{"displayname":"Alice"}',
        headers={
            "content-type": "application/json",
        },
    )

    resp = await c.authenticated(
        c.http_session.get, c.url("v3/profile/%40alice:example.org")
    )
    async with resp:
        mock_aioresponse.assert_called_with(
            "https://matrix-client.example.org/_matrix/client/v3/refresh",
            method="POST",
            json={"refresh_token": "a_refresh_token"},
        )
        mock_aioresponse.assert_called_with(
            "https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
            method="GET",
            headers={"Authorization": "Bearer another_access_token"},
        )
        assert resp.status == 200
        assert await resp.json() == {"displayname": "Alice"}
        assert c.storage["refresh_token"] == "another_refresh_token"

And then we test the case where the access token has not expired yet, but after making a request we are informed that our access token is invalid.

refresh token tests:
async with client.Client(
    storage={
        "access_token": "anaccesstoken",
        "user_id": "@alice:example.org",
        "device_id": "ABCDEFG",
        "refresh_token": "arefreshtoken",
        "access_token_valid_until": time.time_ns() + 60_000_000_000,
    },
    callbacks={},
    base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
    mock_aioresponse.get(
        "https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
        status=401,
        body='{"errcode":"M_UNKNOWN_TOKEN","soft_logout":true}',
        headers={
            "content-type": "application/json",
        },
    )
    mock_aioresponse.post(
        "https://matrix-client.example.org/_matrix/client/v3/refresh",
        body='{"access_token":"anotheraccesstoken","expires_in_ms":60000,"refresh_token":"another_refresh_token"}',
        headers={
            "content-type": "application/json",
        },
    )
    mock_aioresponse.get(
        "https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
        body='{"displayname":"Alice"}',
        headers={
            "content-type": "application/json",
        },
    )

    resp = await c.authenticated(
        c.http_session.get, c.url("v3/profile/%40alice:example.org")
    )
    async with resp:
        mock_aioresponse.assert_called_with(
            "https://matrix-client.example.org/_matrix/client/v3/refresh",
            method="POST",
            json={"refresh_token": "arefreshtoken"},
        )
        mock_aioresponse.assert_called_with(
            "https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
            method="GET",
            headers={"Authorization": "Bearer anotheraccesstoken"},
        )
        assert resp.status == 200
        assert await resp.json() == {"displayname": "Alice"}
        assert c.storage["refresh_token"] == "another_refresh_token"

Soft logout

The second condition under which a new access token needs to be obtained is if the user is soft logged out. This means that the server wants the user to re-authenticate before continuing, but after they have re-authenticated, they may resume where they had left off. In contrast, a hard logout means that the server considers the session to have ended and the client should forget any of its persisted state. A soft logout could happen, for example, if a user changes their password, and the server wants the user to re-authenticate all their other sessions.

A soft logout is indicated by a "soft_logout": true property in an M_UNKNOWN_TOKEN error response. In response, the client should re-log in using the normal login mechanism, setting the device_id to the current session’s device ID. However, this may involve user interaction, beyond the scope of our library, so we will use a callback to allow the application to perform the necessary steps to re-authenticate. The callback will be passed one argument: the client object. The callback may need to re-prompt the user for their password (but not for their user ID or any other identifier, since the client already knows that), or to log in again using SSO.

client callbacks:
# called when the client needs to re-log-in after a soft logout
re_log_in: typing.Callable[[Client], typing.Awaitable[None]]
Client class methods:
async def _handle_soft_logout(self, status, body) -> None:
    if body.get("soft_logout"):
        await self.callbacks["re_log_in"](self)
    else:
        # we've been fully logged out, so we clear all our data and
        # raise an exception
        # FIXME: we should also emit an event to tell the client that we're logged out
        self.storage.clear()
        raise error.MatrixError(status, body)

To allow the user to re-log in, we will create a new function for the re_log_in callback to call, rather than re-using our existing login functions, since it will operate slightly differently. Notably, it needs to send the device ID to the server (so that it knows what device we’re re-logging in as), and the function doesn’t need to take the user ID or any other identifier as parameter since we already know the user ID. Also, unlike our other login functions, we do not want to error if we’re already logged in.

Client class methods:
async def re_log_in(
    self,
    auth_parameter: typing.Union[str, dict[str, typing.Any]],
    login_type: str = "password",
) -> None:
    """Re-log in after getting soft logged out

    Arguments:

    ``auth_parameter``:
        The authentication parameter (e.g. password or token) to use.
    ``login_type``:
        The method of logging in.  The default value, ``'password'``, indicates
        that you are logging in with a password.  A value of ``'token'``
        indicates that you are logging in with a token.  An unknown value will
        set that as the ``type`` in the request body, and ``auth_parameter``
        must be a ``dict`` whose contents will be copied to the request body.
    """
    req_body = {
        "identifier": {
            "type": "m.id.user",
            "user": self.storage["user_id"],
        },
        "device_id": self.storage["device_id"],
        "refresh_token": True,
    }

    if login_type == "password":
        req_body["type"] = "m.login.password"
        req_body["password"] = auth_parameter
    elif login_type == "token":
        req_body["type"] = "m.login.token"
        req_body["token"] = auth_parameter
    elif type(auth_parameter) == dict:
        req_body["type"] = login_type
        for name in auth_parameter:
            req_body[name] = auth_parameter[name]
    else:
        raise RuntimeError("Unsupported authentication parameter")

    await self._do_log_in(req_body)
Tests
test authentication:
@pytest.mark.asyncio
async def test_soft_logout(mock_aioresponse):
    # our re-log in callback will just log in using a hard-coded password
    async def re_log_in(cli: client.Client):
        await cli.re_log_in("P4ssw0rd")

    {{soft logout tests}}

To test that we can handle soft logouts, we test two cases: first the case where we do not have a refresh token, and secondly the case where we do.

soft logout tests:
async with client.Client(
    storage={
        "access_token": "anaccesstoken",
        "user_id": "@alice:example.org",
        "device_id": "ABCDEFG",
    },
    callbacks={"re_log_in": re_log_in},
    base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
    mock_aioresponse.get(
        "https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
        status=401,
        body='{"errcode":"M_UNKNOWN_TOKEN","soft_logout":true}',
        headers={
            "content-type": "application/json",
        },
    )
    mock_aioresponse.post(
        "https://matrix-client.example.org/_matrix/client/v3/login",
        status=200,
        payload={
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
            "access_token": "anewaccesstoken",
        },
        headers={
            "content-type": "application/json",
        },
    )
    mock_aioresponse.get(
        "https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
        body='{"displayname":"Alice"}',
        headers={
            "content-type": "application/json",
        },
    )

    resp = await c.authenticated(
        c.http_session.get, c.url("v3/profile/%40alice:example.org")
    )
    async with resp:
        # make sure that the request finally succeeded
        assert resp.status == 200
        assert await resp.json() == {"displayname": "Alice"}
        # make sure that we re-logged in and got a new access token
        assert c.storage["access_token"] == "anewaccesstoken"
soft logout tests:
async with client.Client(
    storage={
        "access_token": "anaccesstoken",
        "user_id": "@alice:example.org",
        "device_id": "ABCDEFG",
        "refresh_token": "arefreshtoken",
        "access_token_valid_until": time.time_ns() + 60_000_000_000,
    },
    callbacks={"re_log_in": re_log_in},
    base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
    mock_aioresponse.get(
        "https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
        status=401,
        body='{"errcode":"M_UNKNOWN_TOKEN","soft_logout":true}',
        headers={
            "content-type": "application/json",
        },
    )
    mock_aioresponse.post(
        "https://matrix-client.example.org/_matrix/client/v3/refresh",
        status=401,
        body='{"errcode":"M_UNKNOWN_TOKEN","soft_logout":true}',
        headers={
            "content-type": "application/json",
        },
    )
    mock_aioresponse.post(
        "https://matrix-client.example.org/_matrix/client/v3/login",
        status=200,
        payload={
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
            "access_token": "anewaccesstoken",
        },
        headers={
            "content-type": "application/json",
        },
    )
    mock_aioresponse.get(
        "https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
        body='{"displayname":"Alice"}',
        headers={
            "content-type": "application/json",
        },
    )

    resp = await c.authenticated(
        c.http_session.get, c.url("v3/profile/%40alice:example.org")
    )
    async with resp:
        # make sure that we tried to refresh using the refresh token
        mock_aioresponse.assert_called_with(
            "https://matrix-client.example.org/_matrix/client/v3/refresh",
            method="POST",
            json={"refresh_token": "arefreshtoken"},
        )

        # make sure that the request finally succeeded
        assert resp.status == 200
        assert await resp.json() == {"displayname": "Alice"}
        # make sure we re-logged in and got a new access token
        assert c.storage["access_token"] == "anewaccesstoken"

Todo

add mutex so that only one thread/fibre will try to refresh/re-log in at once

Logging out

To show how our authenticated method is used, we will write a function for one of the simpler authenticated requests: logging out. Logging out calls the POST /logout endpoint. Since this is a POST request, we call self.authenticated with self.http_session.post as the first argument. The POST /logout endpoint doesn’t take any body parameters, so our remaining arguments are the URL for the endpoint and an empty JSON object to use as the request body.

After we log out, we clear our storage.

Client class methods:
async def log_out(self) -> None:
    """Log out the current session and clear out the storage."""
    async with await self.authenticated(
        self.http_session.post,
        self.url("v3/logout"),
        json={},
    ) as resp:
        await check_response(resp)
        self.storage.clear()

Example: logout script

To complement our login script from the previous section, we create a logout script. This will log out the session that was logged in previously. Since all the necessary information was saved in the client storage, we don’t need to take any command-line arguments.

examples/logout.py:
# {{copyright}}

"""Log out a logged-in session"""

import asyncio
import json
import typing

from matrixlib import client


{{json file storage}}


async def main():
    async with client.Client(storage=JsonFileStorage()) as c:
        await c.log_out()
        print("Logged out")


asyncio.run(main())

Todo

UI auth (maybe as separate section)