Authenticated requests¶
As mentioned earlier, requests are authenticated by the homeserver by use of the access token that was provided at login. This allows the homeserver to know what user, and which of the user’s devices, is making the request, so that it can ensure that the user is authorized to make the requests they are making, and to ensure that the correct information is returned to them.
Providing the access token can be done in one of two ways: by adding a query parameter, or by adding a request header. Using a request header is the recommended method, since some servers and proxies may log requested URLs, including query parameters, which would expose access tokens to anyone able to read the logs.
We add a method to our Client
class that will make an authenticated request.
Similar to our retry
function, it will take an
aiohttp request function to call (e.g. self.http_session.get
), and the
arguments to pass to the request function, and will call the function with
those arguments, but with the access token added. We will start with the
simple case, where the access token works as-is, and then handle some cases
where we may need to update the token somehow. That is, we will fill in the
“check if access token expired” and “get new access token if needed” sections
later.
async def authenticated(self, req_func, *args, **kwargs) -> aiohttp.ClientResponse:
"""Make an authenticated request to the homeserver.
Arguments:
``req_func``:
the function to call to make the request. e.g. to make an
authenticated ``GET`` request, this would be ``self.http_session.get``.
``*args, **kwargs``:
the arguments to pass to ``req_func``.
"""
if "access_token" not in self.storage:
raise RuntimeError("Not logged in") # FIXME: use a custom exception
{{check if access token expired}}
headers = kwargs.get("headers", {})
headers["Authorization"] = "Bearer " + self.storage["access_token"]
kwargs["headers"] = headers
resp = await req_func(*args, **kwargs)
{{get new access token if needed}}
return resp
Note that, the way this function is written, if the caller needs to add custom
headers, it must pass the headers
argument as a keyword argument rather than
as a positional argument. Given the number of arguments that the aiohttp
request functions take, this is likely to be the case.
Tests
As usual, we write tests to ensure that our code works correctly.
# {{copyright}}
import json
import pytest
import time
from matrixlib import client
from matrixlib import error
{{test authentication}}
We create a client and pre-populate the access token and other information. Only the access token is needed in the function that we are testing, but for completeness, we include other information that a client would normally have.
@pytest.mark.asyncio
async def test_basic_authenticated_requests(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
{{basic authentication tests}}
Now we make a simple request, check that it has added the appropriate header, and make sure that it has passed through the response.
mock_aioresponse.get(
"https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
body='{"displayname":"Alice"}',
headers={
"content-type": "application/json",
},
)
resp = await c.authenticated(
c.http_session.get, c.url("v3/profile/%40alice:example.org")
)
async with resp:
mock_aioresponse.assert_called_with(
"https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
method="GET",
headers={"Authorization": "Bearer anaccesstoken"},
)
assert resp.status == 200
assert await resp.json() == {"displayname": "Alice"}
Todo
test another request type, e.g. POST
There are two situations where a new access token needs to be obtained: the access token needs to be refreshed by means of a refresh token, or the client has been soft logged out.
Refresh tokens¶
If an attacker obtains a user’s access token, they will be able to make
requests to the server impersonating the user. In order to limit the impact of
a compromised access token, the server may expire access tokens and require
them to be refreshed through the use of a refresh token. If the server expires
access tokens, the initial refresh token is given when the user logs in. When
the access token expires, the client can use the refresh token to call the
POST /refresh
endpoint,
and then a new access token and refresh token are returned.
Note
If a client does not support refresh tokens, it may indicate that by omitting
the "refresh_token": true
parameter from the login request. In
this case, the access token will not expire, and the client does not need to
implement this section. However, usage of refresh tokens is recommended for
better security, and homeservers are allowed to reject logins that do not
indicate support for refresh tokens (though they are currently unlikely to do
so).
When we obtain the access token and the refresh token, the server will tell us
when the access token will expire by telling us how long it is valid (the
expires_in_ms
property in the response to the POST /login
endpoint). Using
this, we calculated the expected expiry time, and stored it as
access_token_valid_until
. So we can check whether the expected expiry time
has been passed, and if so, refresh the access token.
When we refresh the access token, the server may return an M_UNKNOWN_TOKEN
error code, indicating that we may have been logged out. In this case, we
check if it’s a soft logout, and if so, handle it as such. (See
below for more details.)
if (
"access_token_valid_until" in self.storage
and self.storage["access_token_valid_until"] < time.time_ns()
):
try:
await self._refresh_access_token()
except:
# refresh failed, so we're logged out
self.storage.clear()
raise
async def _refresh_access_token(self) -> None:
"""Try to refresh an expired access token"""
req_body = {"refresh_token": self.storage["refresh_token"]}
async with self.http_session.post(
self.url("v3/refresh"), json=req_body
) as resp:
try:
status, resp_body = await check_response(resp)
except error.MatrixError as e:
if e.body["errcode"] == "M_UNKNOWN_TOKEN":
return await self._handle_soft_logout(e.code, e.body)
else:
raise
# FIXME: if error.NotMatrixServerError, try again a few times, because
# it may be a temporary condition
if schema.is_valid(
resp_body,
{
"access_token": str,
"expires_in_ms": schema.Optional(int),
"refresh_token": schema.Optional(str),
},
):
# refresh successful; save the new values
if "expires_in_ms" in resp_body:
if "refresh_token" not in resp_body:
raise error.InvalidResponseError()
self.storage["access_token_valid_until"] = (
time.time_ns() + resp_body["expires_in_ms"] * 1_000_000
)
self.storage["refresh_token"] = resp_body["refresh_token"]
else:
del self.storage["access_token_valid_until"]
del self.storage["refresh_token"]
self.storage["access_token"] = resp_body["access_token"]
else:
raise error.InvalidResponseError()
Note that even if we have not reached the access token’s expiry (by our calculation), we may find after making the request that we still need to refresh the access token after making the request. For example, the server may expire the access token early, the client or server’s clock may have drifted, or network delays may cause our expected expiry time to be inaccurate.
If the access token needs to be refreshed, the server will not tell us that the
access token has expired, per se, but will return an error with error code
M_UNKNOWN_TOKEN
, indicating that the given access token is invalid. We can
then try to use our refresh token (if we have one) to obtain a new access
token.
If an M_UNKNOWN_TOKEN
error was received, but we do not have a refresh
token, then we check if we were soft logged out (again, see below).
if resp.status >= 400 and resp.content_type == "application/json":
body = await resp.json()
if body.get("errcode") == "M_UNKNOWN_TOKEN":
if "refresh_token" in self.storage:
await self._refresh_access_token()
else:
await self._handle_soft_logout(resp.status, body)
# update was successful, so retry the request
return await self.authenticated(req_func, *args, **kwargs)
Tests
@pytest.mark.asyncio
async def test_refresh_token(mock_aioresponse):
{{refresh token tests}}
We first test that we refresh the access token if it has already expired.
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"refresh_token": "a_refresh_token",
"access_token_valid_until": time.time_ns() - 1000,
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/refresh",
body=json.dumps(
{
"access_token": "another_access_token",
"expires_in_ms": 60000,
"refresh_token": "another_refresh_token",
},
),
headers={
"content-type": "application/json",
},
)
mock_aioresponse.get(
"https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
body='{"displayname":"Alice"}',
headers={
"content-type": "application/json",
},
)
resp = await c.authenticated(
c.http_session.get, c.url("v3/profile/%40alice:example.org")
)
async with resp:
mock_aioresponse.assert_called_with(
"https://matrix-client.example.org/_matrix/client/v3/refresh",
method="POST",
json={"refresh_token": "a_refresh_token"},
)
mock_aioresponse.assert_called_with(
"https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
method="GET",
headers={"Authorization": "Bearer another_access_token"},
)
assert resp.status == 200
assert await resp.json() == {"displayname": "Alice"}
assert c.storage["refresh_token"] == "another_refresh_token"
And then we test the case where the access token has not expired yet, but after making a request we are informed that our access token is invalid.
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"refresh_token": "arefreshtoken",
"access_token_valid_until": time.time_ns() + 60_000_000_000,
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
mock_aioresponse.get(
"https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
status=401,
body='{"errcode":"M_UNKNOWN_TOKEN","soft_logout":true}',
headers={
"content-type": "application/json",
},
)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/refresh",
body='{"access_token":"anotheraccesstoken","expires_in_ms":60000,"refresh_token":"another_refresh_token"}',
headers={
"content-type": "application/json",
},
)
mock_aioresponse.get(
"https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
body='{"displayname":"Alice"}',
headers={
"content-type": "application/json",
},
)
resp = await c.authenticated(
c.http_session.get, c.url("v3/profile/%40alice:example.org")
)
async with resp:
mock_aioresponse.assert_called_with(
"https://matrix-client.example.org/_matrix/client/v3/refresh",
method="POST",
json={"refresh_token": "arefreshtoken"},
)
mock_aioresponse.assert_called_with(
"https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
method="GET",
headers={"Authorization": "Bearer anotheraccesstoken"},
)
assert resp.status == 200
assert await resp.json() == {"displayname": "Alice"}
assert c.storage["refresh_token"] == "another_refresh_token"
Soft logout¶
The second condition under which a new access token needs to be obtained is if the user is soft logged out. This means that the server wants the user to re-authenticate before continuing, but after they have re-authenticated, they may resume where they had left off. In contrast, a hard logout means that the server considers the session to have ended and the client should forget any of its persisted state. A soft logout could happen, for example, if a user changes their password, and the server wants the user to re-authenticate all their other sessions.
A soft logout is indicated by a "soft_logout": true
property in an
M_UNKNOWN_TOKEN
error response. In response, the client should re-log in
using the normal login mechanism, setting the device_id
to the current
session’s device ID. However, this may involve user interaction, beyond the
scope of our library, so we will use a callback to allow the application to
perform the necessary steps to re-authenticate. The callback will be passed
one argument: the client object. The callback may need to re-prompt the user
for their password (but not for their user ID or any other identifier, since
the client already knows that), or to log in again using SSO.
# called when the client needs to re-log-in after a soft logout
re_log_in: typing.Callable[[Client], typing.Awaitable[None]]
async def _handle_soft_logout(self, status, body) -> None:
if body.get("soft_logout"):
await self.callbacks["re_log_in"](self)
else:
# we've been fully logged out, so we clear all our data and
# raise an exception
# FIXME: we should also emit an event to tell the client that we're logged out
self.storage.clear()
raise error.MatrixError(status, body)
To allow the user to re-log in, we will create a new function for the
re_log_in
callback to call, rather than re-using our existing login
functions, since it will operate slightly differently. Notably, it
needs to send the device ID to the server (so that it knows what device we’re
re-logging in as), and the function doesn’t need to take the user ID or any
other identifier as parameter since we already know the user ID. Also, unlike
our other login functions, we do not want to error if we’re already logged in.
async def re_log_in(
self,
auth_parameter: typing.Union[str, dict[str, typing.Any]],
login_type: str = "password",
) -> None:
"""Re-log in after getting soft logged out
Arguments:
``auth_parameter``:
The authentication parameter (e.g. password or token) to use.
``login_type``:
The method of logging in. The default value, ``'password'``, indicates
that you are logging in with a password. A value of ``'token'``
indicates that you are logging in with a token. An unknown value will
set that as the ``type`` in the request body, and ``auth_parameter``
must be a ``dict`` whose contents will be copied to the request body.
"""
req_body = {
"identifier": {
"type": "m.id.user",
"user": self.storage["user_id"],
},
"device_id": self.storage["device_id"],
"refresh_token": True,
}
if login_type == "password":
req_body["type"] = "m.login.password"
req_body["password"] = auth_parameter
elif login_type == "token":
req_body["type"] = "m.login.token"
req_body["token"] = auth_parameter
elif type(auth_parameter) == dict:
req_body["type"] = login_type
for name in auth_parameter:
req_body[name] = auth_parameter[name]
else:
raise RuntimeError("Unsupported authentication parameter")
await self._do_log_in(req_body)
Tests
@pytest.mark.asyncio
async def test_soft_logout(mock_aioresponse):
# our re-log in callback will just log in using a hard-coded password
async def re_log_in(cli: client.Client):
await cli.re_log_in("P4ssw0rd")
{{soft logout tests}}
To test that we can handle soft logouts, we test two cases: first the case where we do not have a refresh token, and secondly the case where we do.
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
},
callbacks={"re_log_in": re_log_in},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
mock_aioresponse.get(
"https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
status=401,
body='{"errcode":"M_UNKNOWN_TOKEN","soft_logout":true}',
headers={
"content-type": "application/json",
},
)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/login",
status=200,
payload={
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"access_token": "anewaccesstoken",
},
headers={
"content-type": "application/json",
},
)
mock_aioresponse.get(
"https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
body='{"displayname":"Alice"}',
headers={
"content-type": "application/json",
},
)
resp = await c.authenticated(
c.http_session.get, c.url("v3/profile/%40alice:example.org")
)
async with resp:
# make sure that the request finally succeeded
assert resp.status == 200
assert await resp.json() == {"displayname": "Alice"}
# make sure that we re-logged in and got a new access token
assert c.storage["access_token"] == "anewaccesstoken"
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"refresh_token": "arefreshtoken",
"access_token_valid_until": time.time_ns() + 60_000_000_000,
},
callbacks={"re_log_in": re_log_in},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
mock_aioresponse.get(
"https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
status=401,
body='{"errcode":"M_UNKNOWN_TOKEN","soft_logout":true}',
headers={
"content-type": "application/json",
},
)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/refresh",
status=401,
body='{"errcode":"M_UNKNOWN_TOKEN","soft_logout":true}',
headers={
"content-type": "application/json",
},
)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/login",
status=200,
payload={
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"access_token": "anewaccesstoken",
},
headers={
"content-type": "application/json",
},
)
mock_aioresponse.get(
"https://matrix-client.example.org/_matrix/client/v3/profile/%40alice:example.org",
body='{"displayname":"Alice"}',
headers={
"content-type": "application/json",
},
)
resp = await c.authenticated(
c.http_session.get, c.url("v3/profile/%40alice:example.org")
)
async with resp:
# make sure that we tried to refresh using the refresh token
mock_aioresponse.assert_called_with(
"https://matrix-client.example.org/_matrix/client/v3/refresh",
method="POST",
json={"refresh_token": "arefreshtoken"},
)
# make sure that the request finally succeeded
assert resp.status == 200
assert await resp.json() == {"displayname": "Alice"}
# make sure we re-logged in and got a new access token
assert c.storage["access_token"] == "anewaccesstoken"
Todo
add mutex so that only one thread/fibre will try to refresh/re-log in at once
Logging out¶
To show how our authenticated
method is used, we will write a function for
one of the simpler authenticated requests: logging out. Logging out calls the
POST /logout
endpoint.
Since this is a POST
request, we call self.authenticated
with
self.http_session.post
as the first argument. The POST /logout
endpoint
doesn’t take any body parameters, so our remaining arguments are the URL for
the endpoint and an empty JSON object to use as the request body.
After we log out, we clear our storage.
async def log_out(self) -> None:
"""Log out the current session and clear out the storage."""
async with await self.authenticated(
self.http_session.post,
self.url("v3/logout"),
json={},
) as resp:
await check_response(resp)
self.storage.clear()
Example: logout script¶
To complement our login script from the previous section, we create a logout script. This will log out the session that was logged in previously. Since all the necessary information was saved in the client storage, we don’t need to take any command-line arguments.
# {{copyright}}
"""Log out a logged-in session"""
import asyncio
import json
import typing
from matrixlib import client
{{json file storage}}
async def main():
async with client.Client(storage=JsonFileStorage()) as c:
await c.log_out()
print("Logged out")
asyncio.run(main())
Todo
UI auth (maybe as separate section)