Device keys and one-time keys¶
Since each device is a separate recipient for encrypted messages, each device must create and publish public keys so that messages can be encrypted for it. With Olm, devices have three types of keys that they need to maintain: a signing key (also referred to as a fingerprint key), an encryption key (also referred to as an identity key), and a number of one-time and/or fallback keys. Olm uses elliptic curve cryptography: the signing key is an Ed25519 key, and the encryption, one-time, and fallback keys are Curve25519 keys (sometimes referred to as X25519 in other places).
As end-to-end encryption is difficult to implement correctly, the Matrix Foundation provides two libraries that implement some of the core functionality needed for encryption. Libolm is written in C/C++ and has bindings for several languages, but is deprecated. Vodozemac is written in Rust and, since it is newer, currently has bindings for fewer languages, though this should change. Vodozemac should be used whenever possible; here we will use vodozemac’s Python bindings.
Note
As of the time of writing, we must use the version of the bindings in git as the latest released version does not support all the functions that we need.
Each device creates a vodozemac Account
object. This object manages the
device’s keys and creates Olm sessions. We create a DeviceKeysManager
class
to manage this and handle the interface with our Client
class for us.
src/matrixlib/devices.py
:¶# {{copyright}}
"""Device-related functionality"""
import asyncio
from base64 import b64decode
from canonicaljson import encode_canonical_json
from cryptography.hazmat.primitives.asymmetric import ed25519
import sys
import typing
import vodozemac
from . import client
from . import schema
{{devices module functions}}
{{devices module classes}}
class DeviceKeysManager:
"""Manages a device's keys for end-to-end encryption"""
{{DeviceKeysManager class methods}}
The initialization function will be passed the Client
object so that it can
access the client’s storage and subscribe to sync updates. Data will be stored
encrypted, using vodozemac’s “pickle” functionality, so the initialization
function will also take the encryption key, which must be a 32-byte binary
sequence as required by vodozemac.
Danger
Since this key allows access to encrypted messages, the application should store this key securely, for example by using the operating system’s secret storage functionality (sometimes called a keychain, keyring, or credential storage) or deriving it from a password.
def __init__(self, c: client.Client, key: bytes):
"""
Arguments:
``c``:
the client object
``key``:
a 32-byte binary used to encrypt the objects in storage
"""
self.client = c
self.key = key
{{DeviceKeysManager initialization}}
First, we will create a vodozemac Account
object. If we had previously
created one, we will load it from the client’s storage. Otherwise, we will
create a brand new account and store it. The vodozemac methods for serializing
and deserializing objects are called pickle
and from_pickle
, respectively.
Other vodozemac objects also have pickle
and from_pickle
methods.
if "olm_account" in c.storage:
self.account = vodozemac.Account.from_pickle(c.storage["olm_account"], key)
else:
self.account = vodozemac.Account()
c.storage["olm_account"] = self.account.pickle(key)
We add methods to retrieve the identity and fingerprint keys from the vodozemac
Account
.
@property
def identity_key(self) -> vodozemac.Curve25519PublicKey:
return self.account.curve25519_key
@property
def fingerprint_key(self) -> vodozemac.Ed25519PublicKey:
return self.account.ed25519_key
In addition to the identity and fingerprint keys, we also need a number of one-time keys and/or a fallback key. One-time keys are keys that are uploaded to the server, and “claimed” by other devices when they want to create an Olm session with us. As the name implies, one-time keys are only used once; after the key has been claimed, the server will delete the key, and no other device can claim it. This means that the number of Olm sessions that others can create with us is limited by the number of one-time keys that we have uploaded. To ensure that other devices are always able to create Olm sessions with us, we will generate and upload new one-time keys whenever the supply on the server is decreased. We will also upload a fallback key, which is used in the same way as a one-time key, but is returned by the server when it has no more one-time keys to return. Even though we try to ensure that the server always has enough one-time keys, the server can run out, for example, if many Olm sessions are created while we are offline. Olm sessions are more secure when one-time keys are used, but fallback keys ensure that Olm sessions can always be created. Clients may choose whether to upload one-time keys, fallback keys, or both; we will upload both.
We upload device keys, one-time keys, and fallback keys to the server using the
POST /keys/upload
endpoint. We will create an asyncio.Task
to generate and upload keys when
needed, since it needs to monitor the number of one-time keys on the server and
upload new ones when needed. This will be the only part managing the keys.
Our task will initially upload keys (if needed), and then wait to be told to
upload keys again via an asyncio.Event
whenever we detect that more one-time
or fallback keys may need to be uploaded.
An asyncio.Event
is a synchronization primitive in Python that allows one or
more tasks to wait for a signal from another task to continue processing. If
the asyncio.Event
is triggered before a task waits for it, the task will
continue immediately when it tries to wait for it. In addition, the
asyncio.Event
can be cleared after it has been triggered, so that a task that
waits on it will wait until it is triggered again. Thus our key upload task
can wait for the asyncio.Event
, clear it after it is triggered, upload the
keys, and then wait for the asyncio.Event
again.
self.upload_task = asyncio.create_task(self._upload_keys_task())
async def _upload_keys_task(self) -> None:
self.upload_keys_event = asyncio.Event()
while True:
self.upload_keys_event.clear()
await self._upload_keys()
await self.upload_keys_event.wait()
We will use the client storage to store information about what keys have been uploaded to the server, so that if the client gets terminated in the middle of an upload or the upload fails, we will be able to re-upload the keys that are necessary.
Our key uploading function will perform the following tasks:
retrieve the information about the keys that it has already uploaded;
construct a request body for
POST /keys/upload
based on what keys are on the server, and any internal state;upload the keys;
update the stored information about the keys, and our internal state.
async def _upload_keys(self) -> None:
keys_on_server = self.client.storage.get("olm_account.keys_on_server", {})
upload_body = {}
{{populate /keys/upload body}}
if upload_body != {}:
async with await client.retry(
60_000, # retry for up to one minute
self.client.authenticated,
self.client.http_session.post,
self.client.url("v3/keys/upload"),
json=upload_body,
) as resp:
try:
_, resp_body = await client.check_response(resp)
except:
# if the key upload failed, wait a bit, then trigger another
# key upload
await asyncio.sleep(120)
self.upload_keys_event.set()
return
{{update keys-on-server information}}
self.client.storage["olm_account.keys_on_server"] = keys_on_server
When the client closes, we will need to stop the key upload task.
c.publisher.subscribe(client.ClientClosed, self._client_closed_subscriber)
def _client_closed_subscriber(self, _: client.ClientClosed) -> None:
self.upload_task.cancel()
Uploading device keys¶
Todo
this is very Olm/Megolm-specific. We should figure out a way to way to enable/disable support for different algorithms.
To upload the fingerprint and identity keys, we will use the device_keys
property of the request body. This property takes a JSON object with the
following properties:
algorithms
: a list of strings indicating the algorithms that the device supports. For Olm/Megolm, this list should containm.olm.v1.curve25519-aes-sha2
, andm.megolm.v1.aes-sha2
.user_id
: the user’s ID.device_id
: the device’s ID, as given by the server when the client logged in.keys
: a JSON object giving the device’s public keys. This is a mapping from<algorithm_name>:<device_id>
to the public keys. The fingerprint key uses the algorithm nameed25519
and the identity key uses the algorithm namecurve25519
.signatures
: signatures of thedevice_keys
object, created using the device’s fingerprint key. Thesignatures
property has a specific format, and we will create signatures in other places as well, so we will create a method to add this property.
We will use a boolean flag in our keys_on_server
dict to indicate whether the
fingerprint and identity keys have been successfully uploaded.
if not keys_on_server.get("device_keys", False):
device_keys = {
"algorithms": [
"m.olm.v1.curve25519-aes-sha2",
"m.megolm.v1.aes-sha2",
],
"user_id": self.client.user_id,
"device_id": self.client.device_id,
"keys": {
f"curve25519:{self.client.device_id}": self.account.curve25519_key.to_base64(),
f"ed25519:{self.client.device_id}": self.account.ed25519_key.to_base64(),
},
}
self.sign_json(device_keys)
upload_body["device_keys"] = device_keys
keys_on_server["device_keys"] = True
Tests
tests/test_device_keys.py
:¶# {{copyright}}
import asyncio
import aioresponses
import cryptography
import json
import pytest
import vodozemac
from matrixlib import client
from matrixlib import devices
{{test device keys}}
@pytest.mark.asyncio
async def test_device_keys_upload(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
{{device keys upload test}}
To test our code, we create a device manager, and check that it uploads device keys.
The response body for the POST /keys/upload
call is to satisfy the code that
we will write later for one-time keys. For the purposes of the code that we
have written so far, we can pretend that the body is an empty JSON object.
def callback(url, **kwargs):
assert "device_keys" in kwargs["json"]
return aioresponses.CallbackResult(
status=200,
body='{"one_time_key_counts":{"signed_curve25519":100}}',
headers={
"Content-Type": "application/json",
},
)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/upload",
callback=callback,
)
manager = devices.DeviceKeysManager(c, b"\x00" * 32)
await asyncio.sleep(0.1)
mock_aioresponse.assert_called()
Signing JSON¶
Matrix defines a method for signing JSON objects using public signing keys. Normally, signatures are only defined for byte strings. We can serialize an object into JSON and sign it, but when it later needs to be verified, the signature may or may not match, depending on how the verifier serializes the JSON. For example, the verifier may use different whitespace from the signer, which will lead to a different signature. To ensure that the signer and verifier generate the same encoding, Matrix defines a specific way of encoding JSON, called canonical JSON.
To distribute the signature, a signatures
property is added to the object,
which is a mapping from the signer’s ID (in our case, our user ID), to the key
ID prefixed by the signature algorithm (here, it will be
ed25519:<device_id>
), to the signature. This scheme allows multiple
signatures to be added. Of course, this means that when signing and verifying
a signed JSON object, we must remove the signatures
property before encoding
and signing/verifying. Matrix also provides a special property to add data
that does not get signed: the unsigned
property (if present) also gets
removed before encoding and signing/verifying. For example, this can be used
by intermediaries to add information to the object without breaking the
signature. You may recall that, events have an unsigned
property that contain some information. That is because events get signed
using this same method (and hashed, using a similar method) by the originating
server, but the data in unsigned
is added by the recipient’s server.
The following function definition is based on code from the Matrix spec, and
uses vodozemac’s built-in signing functionality. The encode_canonical_json
comes from the canonicaljson
package, which implement’s Matrix’s
Canonical JSON format. If your language of choice does not already have an
implementation of Canonical JSON, you may need to implement it yourself.
def sign_json(self, json_object: dict) -> dict:
"""Sign a JSON object using the device's signing key
The input object is modified to include the signature.
"""
signatures = json_object.pop("signatures", {})
unsigned = json_object.pop("unsigned", None)
sig = self.account.sign(
encode_canonical_json(json_object).decode("utf-8")
).to_base64()
key_id = f"ed25519:{self.client.device_id}"
signatures.setdefault(self.client.user_id, {})[key_id] = sig
json_object["signatures"] = signatures
if unsigned is not None:
json_object["unsigned"] = unsigned
return json_object
We also create a function to verify a signature.
def verify_json_ed25519(
signature_key: vodozemac.Ed25519PublicKey,
user_id: str,
device_id: str,
json_object: dict,
) -> None:
"""Verify a signed JSON object using a public key
Arguments:
- ``signature_key``: the public part of the key used to sign the object
- ``user_id``: the ID of the user who signed the object
- ``device_id``: the ID of the device used to sign the object
- ``json_object``: the signed JSON object
"""
to_verify = {
key: value
for key, value in json_object.items()
if key not in ["signatures", "unsigned"]
}
canonical = encode_canonical_json(to_verify).decode("utf-8")
signature = vodozemac.Ed25519Signature.from_base64(
json_object["signatures"][user_id][f"ed25519:{device_id}"]
)
signature_key.verify_signature(canonical, signature)
Tests
We test that the signature we generate is correct, that we can verify a correct signature, and that we can detect an incorrect signature.
@pytest.mark.asyncio
async def test_sign_and_verify(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
{{ed25519 sign and verify test}}
device_keys = None
def callback(url, **kwargs):
nonlocal device_keys
device_keys = kwargs["json"]["device_keys"]
return aioresponses.CallbackResult(
status=200,
body='{"one_time_key_counts":{"signed_curve25519":100}}',
headers={
"Content-Type": "application/json",
},
)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/upload",
callback=callback,
)
manager = devices.DeviceKeysManager(c, b"\x00" * 32)
await asyncio.sleep(0.1)
devices.verify_json_ed25519(
manager.fingerprint_key,
"@alice:example.org",
"ABCDEFG",
device_keys,
)
# should ignore anything in "unsigned"
device_keys["unsigned"] = {"foo": "bar"}
devices.verify_json_ed25519(
manager.fingerprint_key,
"@alice:example.org",
"ABCDEFG",
device_keys,
)
# other modifications should result in a bad signature
device_keys["new_key"] = 1
with pytest.raises(vodozemac.SignatureException):
devices.verify_json_ed25519(
manager.fingerprint_key,
"@alice:example.org",
"ABCDEFG",
device_keys,
)
Uploading one-time and fallback keys¶
As mentioned above, the client must ensure that the server has a supply of one-time keys (or OTKs) that others can claim. There are a number of considerations to take into account when doing this.
Vodozemac and libolm will only store a limited number of one-time keys to avoid
unbounded memory usage. If we generate new one-time keys when there are old
keys that have not been used yet, the old keys may be overwritten. If we later
receive a messages that was encrypted using one of the old keys, we will not be
able to decrypt the message since we no longer have the private keys. For this
reason, we need to limit the number of keys that we generate and publish.
Vodozemac provides a property on the Account
object,
max_number_of_one_time_keys
, which gives the maximum number of one-time keys
that should be maintained on the server. For example, if
max_number_of_one_time_keys
is 50, and the server currently has 42 one-time
keys, then we should upload at most 8 one-time keys. (Libolm has a
similarly-named function, but in libolm’s case, the function returns the
maximum number of one-time keys that libolm stores. Clients should maintain a
maximum of half this number of one-time keys on the server.)
Now that we know how many one-time keys we should maintain on the server, we
need to find out how many keys the server has. Obviously, if we haven’t
uploaded any one-time keys, the server will have no keys. When we call the
POST /keys/upload
endpoint, the server’s response will include the number of
one-time keys that it has. It will include this information whether or not our
request includes any new one-time keys or not. However, when someone claims a
key, the number that we have will be outdated, and we do not want to constantly
poll POST /keys/upload
to check on how many keys are left. For this, we can
rely on the GET /sync
response, which will include a
device_one_time_keys_count
property indicating the number of one-time keys
that the server has. This property is a map from algorithm name to the number
of keys remaining. If an algorithm name is missing, then the number of keys
should be taken to be 0.
"device_one_time_keys_count": schema.Optional(
dict[str, int]
),
So to keep track of how many one-time keys the server has, we will publish a
message in our sync handler with the contents of the
device_one_time_keys_count
. The message will also indicate whether there are
any to-device messages in the sync. We will discuss to-device messages later,
but in short, they are messages sent directly from one device to another
device, without the use of a room. This is the way in which Olm-encrypted
events are usually sent, since Olm is a one-to-one encrypted channel between
devices.
The reason we want to know whether there are to-device messages is as follows: as with room events, servers may limit the number of to-device messages sent in a single sync response to avoid having the sync response being too large. Thus after a sync response, we may not have all the to-device messages that were sent to us. This means that we may not want to generate new one-time keys yet, as this could overwrite some old keys that could be used by messages that we have not yet received. If the server does not return any to-device messages, then we know that we have all the messages that are currently available. Thus we will wait until the sync contains no to-device messages before uploading keys.
await self.publisher.publish(
OneTimeKeysCount(
body.get("device_one_time_keys_count", {}),
body.get("to_device", {}).get("events", []) != [],
)
)
class OneTimeKeysCount(typing.NamedTuple):
"""A message indicating the one-time keys count from the sync"""
otk_count: dict[str, int]
has_to_device: bool
OneTimeKeysCount.otk_count.__doc__ = (
"Dict mapping algorithm name to one-time keys count"
)
OneTimeKeysCount.has_to_device.__doc__ = (
"Whether any to-device messages were in the sync"
)
Note
Rather than include in our message whether or not there are to-device messages, the sync processor could simply not publish the message if there are to-device messages. However, it is usually better to let the subscriber make its own decision about how to handle the information, rather than having the publisher make the decision. For example, the subscriber might want to upload new one-time keys if the count gets extremely low, regardless of whether there are any to-device messages. Perhaps there may be multiple subscribers that have different criteria for when to process the information. So the publisher just packages up the information that the subscribers need, and lets the subscriber apply their own logic.
Todo
Change sync to handle “catchup” syncs
We can now subscribe to this message in our device keys manager. We will also create a lock since we may have several functions accessing our one-time key counts concurrently.
c.publisher.subscribe(client.OneTimeKeysCount, self._otk_count_subscriber)
self.lock = asyncio.Lock()
If the OneTimeKeysCount
message indicates that there are no to-device events,
our subscriber will check the number of keys with algorithm signed_curve25519
(which is the algorithm name for the Olm one-time keys), and if it is smaller
than max_number_of_one_time_keys
, it will record the count in a member
variable (so that the key uploader can determine how many keys to upload) and
trigger our key upload task to upload keys.
async def _otk_count_subscriber(
self, one_time_keys_count: client.OneTimeKeysCount
) -> None:
if not one_time_keys_count.has_to_device:
count = one_time_keys_count.otk_count.get("signed_curve25519", 0)
if count < self.account.max_number_of_one_time_keys:
async with self.lock:
self.otk_count_from_sync = count
self.upload_keys_event.set()
In our initialization function, we also set an initial value for the one-time
keys count to None
to indicate that we don’t have an update from the sync.
self.otk_count_from_sync: typing.Optional[int] = None
We store the one-time keys count in the client storage, so that if the client restarts, it will still know whether it needs to upload keys. In our key upload function, we check if we have received a new key count from the sync, and if so, we will store that value. If not, we will get the key count from the storage. We will then determine if we need to upload new one-time keys, and how many we need to upload.
async with self.lock:
if self.otk_count_from_sync is not None:
keys_on_server["one_time_keys"] = self.otk_count_from_sync
self.client.storage["olm_account.keys_on_server"] = keys_on_server
otk_count = self.otk_count_from_sync
self.otk_count_from_sync = None
else:
otk_count = keys_on_server.get("one_time_keys", 0)
otks_needed = self.account.max_number_of_one_time_keys - otk_count
if otks_needed > 0:
{{generate one-time keys}}
To generate one-time keys, we call the account.generate_one_time_keys()
method with the number of keys that we want to generate. We can then get the
keys from account.one_time_keys
to upload to the server. After the keys are
uploaded, we call account.mark_keys_as_published()
which will mark those
one-time keys as having been published to the server. Without calling
mark_keys_as_published()
, account.one_time_keys
would still contain the
keys that were uploaded. This is so that if the application fails to upload
the keys for whatever reason, it can try again later. After calling
mark_keys_as_published()
, account.one_time_keys
will be empty until new
one-time keys are generated.
Since we may have previously generated one-time keys, we should check whether we have already generated keys, and if so, we reduce the number of new keys that we generate.
Generating one-time keys may take some time. Rather than generating all the needed keys at once, we generate them in smaller batches so that we don’t need to wait for all of them to be generated before uploading them. Here we will generate them in batches of 20 (FIXME: is this a good number?), but the batch size may depend on the device capabilities.
Since generate_one_time_keys()
and mark_keys_as_published()
change the
account
object, we need to re-save the account to the client storage after
calling them.
keys_available = len(self.account.one_time_keys)
otks_needed = otks_needed - keys_available
if otks_needed > 0:
self.account.generate_one_time_keys(min(20, otks_needed))
self.client.storage["olm_account"] = self.account.pickle(self.key)
After we generate the keys, we must format them in a way that they can be used.
For Olm, the one-time keys are uploaded in signed_curve25519
format, which
means that, in addition to the public key itself, the key is signed using the
account’s fingerprint key. This is done by signing a JSON object, as described
above, with contents {"key": <public_key>}
.
upload_body["one_time_keys"] = {
f"signed_curve25519:{name}": self.sign_json(
{"key": public_key.to_base64()}
)
for (name, public_key) in self.account.one_time_keys.items()
}
After the keys have been uploaded, we mark them as having been published.
self.account.mark_keys_as_published()
self.client.storage["olm_account"] = self.account.pickle(self.key)
We also need to update our record of the number of one-time keys that the
server has. As mentioned above, the response to POST /keys/upload
will
include the server’s current count. However, there is a minor problem here:
while we are making our POST /keys/upload
request, we could get a GET /sync
response, which will also have a one-time keys count. If the numbers in both
responses are the same, then there is no problem. If the count from POST /keys/upload
is lower than the count from GET /sync
, then the count from
POST /keys/upload
must be the newer count: POST /keys/upload
is the only
way in which the one-time keys count can increase, and requests to that
endpoint are not made anywhere else, so it is not possible that the response to
GET /sync
represents the counts from a later time. However, if the count
from GET /sync
is lower than the count from POST /keys/upload
, we don’t
know which is the newer count.
Fortunately, there is a relatively simple solution to this problem: we can make
another POST /keys/upload
request with an empty body. This won’t add any new
keys, but will return the one-time keys count. We may receive yet another GET /sync
response while we’re making this second request, but in this case, we
know that the count with the lower number is the newer number, since we are not
adding any new keys and so we the count can only decrease at this point.
Alternatively
Another way to solve this issue is to pause the sync task while uploading keys. This could be done, for example, by uploding keys in the same task as the sync loop. Of course, this means that we will not get sync updates from the server while we are uploading keys.
schema.ensure_valid(resp_body, {"one_time_key_counts": dict[str, int]})
otk_count_from_resp = resp_body["one_time_key_counts"].get(
"signed_curve25519", 0
)
await self.lock.acquire()
if (
self.otk_count_from_sync is None
or self.otk_count_from_sync >= otk_count_from_resp
):
keys_on_server["one_time_keys"] = otk_count_from_resp
self.otk_count_from_sync = None
self.lock.release()
else:
self.otk_count_from_sync = None
self.lock.release()
async with await client.retry(
10_000,
self.client.authenticated,
self.client.http_session.post,
self.client.url("v3/keys/upload"),
json={},
) as resp2:
_, resp2_body = await client.check_response(resp2)
schema.ensure_valid(
resp2_body, {"one_time_key_counts": dict[str, int]}
)
async with self.lock:
otk_count_from_resp = resp2_body["one_time_key_counts"].get(
"signed_curve25519", 0
)
if self.otk_count_from_sync != None:
keys_on_server["one_time_keys"] = min(
otk_count_from_resp, self.otk_count_from_sync
)
self.otk_count_from_sync = None
else:
keys_on_server["one_time_keys"] = otk_count_from_resp
Now that we have the latest one-time keys count, we check whether we still have
too few keys. If so, we will trigger our upload_keys_event
so that our key
upload task will generate and upload more keys.
if (
keys_on_server["one_time_keys"]
< self.account.max_number_of_one_time_keys
):
self.upload_keys_event.set()
Todo
fallback keys
Tests
@pytest.mark.asyncio
async def test_otk_tracking(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
{{otk tracking test}}
To test this, we will use a callback on the POST /keys/upload
endpoint to
check that one-time keys are uploaded, and to simulate a sync response that
updates the one-time key count. We then make sure that the device key manager
makes another call to POST /keys/upload
. We also check that the device key
manager continues to make calls to POST /keys/upload
until the server has
max_number_of_one_time_keys
keys.
otk_count = 0
async def callback1(url, **kwargs):
# manager will make an initial request to upload device keys and otks
nonlocal otk_count
assert "one_time_keys" in kwargs["json"]
otk_count = len(kwargs["json"]["one_time_keys"])
# pretend a sync response came in
await c.publisher.publish(
client.OneTimeKeysCount(
{"signed_curve25519": 0},
False,
)
)
return aioresponses.CallbackResult(
status=200,
body=json.dumps(
{
"one_time_key_counts": {
"signed_curve25519": otk_count,
},
}
),
headers={
"Content-Type": "application/json",
},
)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/upload",
callback=callback1,
)
def callback2(url, **kwargs):
# manager should make an empty request to get the current count
nonlocal otk_count
assert kwargs["json"] == {}
otk_count = otk_count - 7
return aioresponses.CallbackResult(
status=200,
body=json.dumps(
{
"one_time_key_counts": {
"signed_curve25519": otk_count,
},
}
),
headers={
"Content-Type": "application/json",
},
)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/upload",
callback=callback2,
)
max_reached_event = asyncio.Event()
async def callback3(url, **kwargs):
# manager will make upload additional keys until max is reached
nonlocal otk_count
# ensure that the client has stored the correct count of one-time keys
assert c.storage["olm_account.keys_on_server"]["one_time_keys"] == otk_count
assert "one_time_keys" in kwargs["json"]
otk_count = otk_count + len(kwargs["json"]["one_time_keys"])
if otk_count == manager.account.max_number_of_one_time_keys:
max_reached_event.set()
return aioresponses.CallbackResult(
status=200,
body=json.dumps(
{
"one_time_key_counts": {
"signed_curve25519": otk_count,
},
}
),
headers={
"Content-Type": "application/json",
},
)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/upload",
callback=callback3,
repeat=True,
)
manager = devices.DeviceKeysManager(c, b"\x00" * 32)
await max_reached_event.wait()