Tying together the Olm/Megolm pieces¶
We now have all the pieces that we need to encrypt and decrypt events using
Olm/Megolm. In this section, we will tie all the pieces together so that
clients can easily send and receive encrypted events. We create a class
called OlmMegolmEncryptionManager
that will handle this.
class OlmMegolmEncryptionManager:
"""Manages encryption and decryption of room events"""
{{OlmMegolmEncryptionManager class methods}}
We create an OlmMegolmEncryptionManager
by passing it a Client
object, a
key for encrypting data in the storage, and optionally a
DeviceKeysManager
, a DeviceTracker
,
and a RoomStateTracker
. If they are not
provided, new ones will be created.
def __init__(
self,
c: Client,
key: bytes,
device_keys_manager: typing.Optional[devices.DeviceKeysManager] = None,
device_tracker: typing.Optional[devices.DeviceTracker] = None,
room_state_tracker: typing.Optional[rooms.RoomStateTracker] = None,
):
"""
Arguments:
``c``:
the client object
``key``:
a 32-byte binary used to encrypt the objects in storage
``device_keys_manager``:
a ``devices.DeviceKeysManager`` object to use. If none is provided, a
new one will be created.
``device_tracker``:
a ``devices.DeviceTracker`` object to use. If none is provided, a new
one will be created.
"""
self.client = c
self.key = key
self.device_keys_manager = typing.cast(
devices.DeviceKeysManager,
device_keys_manager
if device_keys_manager != None
else devices.DeviceKeysManager(c, key),
)
self.device_tracker = typing.cast(
devices.DeviceTracker,
device_tracker if device_tracker != None else devices.DeviceTracker(c),
)
self.room_state_tracker = typing.cast(
rooms.RoomStateTracker,
room_state_tracker
if room_state_tracker != None
else rooms.RoomStateTracker(c),
)
{{OlmMegolmEncryptionManager initialization}}
We will also subscribe to the necessary messages from the Client
object so
that it can receive the information that it needs from the /sync
response,
namely to-device events.
c.publisher.subscribe(client.ToDeviceEvents, self._to_device_subscriber)
And we create a lock to ensure that concurrent operations do not conflict. We use a single lock here; it may be possible to improve concurrency by using finer-grained locks.
self.lock = asyncio.Lock()
We will publish messages using our publish/subscribe mechanism, so we will have our own publisher object as a member.
self.publisher = pubsub.Publisher()
Decrypting to-device events¶
First, let us handle the to-device events. The events that we will be
interested in are m.room.encrypted
events that are Olm-encrypted. (We ignore
any Megolm-encrypted events since they should not be sent as to-device events.)
async def _to_device_subscriber(self, msg: client.ToDeviceEvents) -> None:
async with self.lock:
for event in msg.events:
if (
event.type == "m.room.encrypted"
and event.content.get("algorithm") == olm.OLM_ALGORITHM
and schema.is_valid(
event.content,
{
"sender_key": str,
"ciphertext": schema.Object({"type": int, "body": str}),
},
)
):
await self._decrypt_and_process_todevice(event)
async def _decrypt_and_process_todevice(self, event: events.Event) -> None:
sender_key = vodozemac.Curve25519PublicKey.from_base64(
event.content["sender_key"]
)
try:
{{decrypt Olm-encrypted event}}
except:
e = sys.exc_info()[1]
logging.error(
f"Unable to decrypt to-device event from {event.sender}:{sender_key.to_base64()}",
e,
)
return
{{process decrypted Olm event}}
else:
logging.info(
f"Ignoring {decrypted['type']} event from {event.sender}:{sender_key.to_base64()}"
)
To decrypt the event, we first determine whether we already have an Olm channel established with the sender. If so, we use the channel to decrypt the event. Otherwise, we create a new channel and decrypt the event.
olm_channel = olm.OlmChannel.create_from_storage(
self.client,
self.device_keys_manager,
event.sender,
sender_key,
)
if isinstance(olm_channel, olm.OlmChannel):
decrypted = olm_channel.decrypt(event.content)
else:
# FIXME: we should probably only query the cache, and never query
# the server here, to avoid delays
sender_devices = (
(await self.device_tracker.get_device_keys([event.sender]))
.get(event.sender, devices.UserDeviceKeysResult({}))
.device_keys
)
sender_device_id, sender_fingerprint_key = None, None
for device_id, device in sender_devices.items():
if (
vodozemac.Curve25519PublicKey.from_base64(
device.get("keys", {}).get(f"curve25519:{device_id}")
)
== sender_key
):
sender_device_id = device_id
sender_fingerprint_key = device["keys"].get(
f"ed25519:{device_id}"
)
(
olm_channel,
decrypted_or_err,
) = olm.OlmChannel.create_from_encrypted_event(
self.client,
self.device_keys_manager,
event.sender,
sender_key,
event.content,
sender_device_id,
sender_fingerprint_key,
)
if isinstance(decrypted_or_err, dict):
decrypted = decrypted_or_err
else:
raise decrypted_or_err
# FIXME: Olm unwedging - on error, create new olm session and send dummy event
After the event is decrypted, we can process the plaintext event. For now, we
will only consider m.room_key
events. When we receive an m.room_key
event,
we try to create an inbound Megolm session from it. We then publish a message
indicating that the session has been received. This can be used by the client
to decrypt room events that were received before the m.room_key
event was
received, as we will explain below.
if decrypted["type"] == "m.room_key":
try:
session = InboundMegolmSession.from_room_key(
self.client,
event.sender,
sender_key,
decrypted["content"],
self.key,
)
await self.publisher.publish(session)
except:
e = sys.exc_info()[1]
logging.error(
f"Unable to process room key from {event.sender}:{sender_key.to_base64()}",
e,
)
Tests
We test that we can decrypt an Olm-encrypted event by simulating the reception of an encrypted room key event.
@pytest.mark.asyncio
async def test_decrypt_olm_encrypted_room_key(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"room_state_tracker": {
"!room_id": {
"m.room.encryption": {
"": ROOM_ENCRYPTION_EVENT,
},
"m.room.member": {
"@alice:example.org": ALICE_ROOM_MEMBERSHIP,
"@bob:example.org": BOB_ROOM_MEMBERSHIP,
},
},
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as alice:
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@bob:example.org",
"device_id": "HIJKLMN",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as bob:
{{test decrypt Olm-encrypted room key}}
First, we create the device keys for our two clients. We write a utility function to create device keys for a client, since we will be doing that it several tests.
async def _create_device_keys(
mock_aioresponse, client: Client
) -> typing.Tuple[dict, dict, devices.DeviceKeysManager]:
device_keys = {}
otks = {}
ev = asyncio.Event()
def callback(url, **kwargs):
nonlocal device_keys, otks
device_keys = kwargs["json"]["device_keys"]
otks = kwargs["json"]["one_time_keys"]
ev.set()
return aioresponses.CallbackResult(
status=200,
body='{"one_time_key_counts":{"signed_curve25519":100}}',
headers={
"Content-Type": "application/json",
},
)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/upload",
callback=callback,
)
device_keys_manager = devices.DeviceKeysManager(client, b"\x00" * 32)
await ev.wait()
return (device_keys, otks, device_keys_manager)
Then we use the function to create keys for Alice and Bob.
(
alice_device_keys,
alice_otks,
alice_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, alice)
(
bob_device_keys,
bob_otks,
bob_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, bob)
# store the device keys so that the device tracker can get them
alice.storage["device_tracker.cache.@alice:example.org"] = bob.storage[
"device_tracker.cache.@alice:example.org"
] = {"device_keys": {"ABCDEFG": alice_device_keys}}
alice.storage["device_tracker.cache.@bob:example.org"] = bob.storage[
"device_tracker.cache.@bob:example.org"
] = {"device_keys": {"HIJKLMN": bob_device_keys}}
Next Alice creates an outbound Megolm session to share with Bob.
alice_room_state_tracker = rooms.RoomStateTracker(alice)
alice_device_tracker = devices.DeviceTracker(alice)
outbound_session = megolm.OutboundMegolmSession(
alice,
"!room_id",
alice_room_state_tracker,
alice_device_tracker,
alice_device_keys_manager,
b"\x00" * 32,
)
[
(bob_device_keys, bob_room_key)
] = await outbound_session.get_session_key_for_sending()
Alice creates an Olm channel with Bob and encrypts the Megolm session.
# use the first `signed_curve25519` one-time key
otk = [
key
for id, key in bob_otks.items()
if id.startswith("signed_curve25519:")
][0]
alice_olm_channel = olm.OlmChannel.create_outbound_channel(
alice,
alice_device_keys_manager,
bob_device_keys,
otk,
)
room_key_encrypted = alice_olm_channel.encrypt("m.room_key", bob_room_key)
Bob creates an OlmMegolmEncryptionManager
and then receives the encrypted
room key. We can check that the OlmMegolmEncryptionManager
has correctly
processed the event by checking that the inbound Megolm session is in Bob’s
storage.
bob_device_tracker = devices.DeviceTracker(bob)
bob_encryption_manager = megolm.OlmMegolmEncryptionManager(
bob,
b"\x00" * 32,
bob_device_keys_manager,
bob_device_tracker,
)
await bob.publisher.publish(
client.ToDeviceEvents(
[
events.Event(
sender="@alice:example.org",
type="m.room.encrypted",
content=room_key_encrypted,
)
]
)
)
assert (
f"inbound_megolm_session.!room_id.@alice:example.org.{outbound_session.session_id}"
in bob.storage
)
Decrypting room events¶
Now that we can decrypt to-device events, allowing us to receive room keys, we can try to decrypt room events.
Trying to decrypt a room event is fairly straightforward. We first ensure that it actually is encrypted, and has the required information, such as the session ID. We then try to load the inbound Megolm session from storage using the room ID, the sender’s user ID, and the session ID, and if we were able to load the Megolm session, then we use it to try to decrypt the event.
Todo
allow handling withheld codes
async def decrypt_room_event(self, encrypted: events.RoomEvent) -> events.RoomEvent:
"""Decrypt an event
Returns the room event that would have been received if the event was not
encrypted.
Arguments:
``encrypted``:
the encrypted room event
"""
async with self.lock:
content = encrypted.content
if (
encrypted.type != "m.room.encrypted"
or content.get("algorithm") != MEGOLM_ALGORITHM
or "session_id" not in content
):
raise error.UnableToDecryptError("Not encrypted or malformed event")
session = InboundMegolmSession.from_storage(
self.client,
encrypted.room_id,
encrypted.sender,
content["session_id"],
self.key,
)
if not session:
# FIXME: add the session ID
raise error.UnableToDecryptError("Unknown session ID")
decrypted = session.decrypt(encrypted)
return events.RoomEvent(
type=decrypted["type"],
sender=encrypted.sender,
content=decrypted["content"],
unsigned=encrypted.unsigned,
room_id=decrypted["room_id"],
event_id=encrypted.event_id,
origin_server_ts=encrypted.origin_server_ts,
)
Tradeoff
An SDK could automatically decrypt events that arrive from the sync and send the decrypted events to the application automatically, rather than having the application call a function to decrypt the events. This is more convenient, but introduces some more complexity and raises some questions regarding how to do it. The SDK should not notify the application of both the encrypted and the decrypted versions of the event separately, as this could be confusing, so the approach that we have taken of publishing a message containing the room timeline from the sync would need to be modified somehow.
There is also the question of how to handle events that cannot be decrypted. In some cases, the to-device event might be received after the room event is received, so we receive the Megolm session after we receive the event that it decrypts. This delay might be short, so we may want to wait a bit before notifying the application about the failure to decrypt, but of course this raises the question of how long to wait.
Another question is whether the failure to decrypt one event should delay notifying the application of subsequent events. The answer to this depends on how sensitive the application is to the ordering of events. If the application depends on processing events in-order, then it should not be notified of an event until all previous events either have been decrypted or we are reasonably certain that they are undecryptable. However, if the application does not need to process events in-order, we do not need to delay notifying it about decrypted events.
Having the application call a function to decrypt and encrypted event, as we have done here, makes a bit more work for the application, but allows it to make the decision on how to handle decryption failures.
Tests
We will first test that we can successfully decrypt an event if we already have the Megolm session.
@pytest.mark.asyncio
async def test_decrypt_megolm_encrypted_event(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"room_state_tracker": {
"!room_id": {
"m.room.encryption": {
"": ROOM_ENCRYPTION_EVENT,
},
"m.room.member": {
"@alice:example.org": ALICE_ROOM_MEMBERSHIP,
"@bob:example.org": BOB_ROOM_MEMBERSHIP,
},
},
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as alice:
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@bob:example.org",
"device_id": "HIJKLMN",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as bob:
{{test decrypt Megolm-encrypted event}}
We first create the device keys and create an outbound Megolm session for Alice to encrypt with. We then create an inbound Megolm session that Bob will use to decrypt.
(
alice_device_keys,
alice_otks,
alice_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, alice)
(
bob_device_keys,
bob_otks,
bob_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, bob)
# store the device keys so that the device tracker can get them
alice.storage["device_tracker.cache.@alice:example.org"] = bob.storage[
"device_tracker.cache.@alice:example.org"
] = {"device_keys": {"ABCDEFG": alice_device_keys}}
alice.storage["device_tracker.cache.@bob:example.org"] = bob.storage[
"device_tracker.cache.@bob:example.org"
] = {"device_keys": {"HIJKLMN": bob_device_keys}}
alice_room_state_tracker = rooms.RoomStateTracker(alice)
alice_device_tracker = devices.DeviceTracker(alice)
outbound_session = megolm.OutboundMegolmSession(
alice,
"!room_id",
alice_room_state_tracker,
alice_device_tracker,
alice_device_keys_manager,
b"\x00" * 32,
)
[
(bob_device_keys, bob_room_key)
] = await outbound_session.get_session_key_for_sending()
megolm.InboundMegolmSession.from_room_key(
bob,
"@alice:example.org",
vodozemac.Curve25519PublicKey.from_base64(
alice_device_keys["keys"]["ed25519:ABCDEFG"]
),
bob_room_key,
b"\x00" * 32,
)
Alice then encrypts a room event, and we check that Bob can decrypt it.
encrypted_content = outbound_session.encrypt(
"m.room.message",
{"msgtype": "m.text", "body": "Hello World!"},
)
bob_device_tracker = devices.DeviceTracker(bob)
bob_encryption_manager = megolm.OlmMegolmEncryptionManager(
bob,
b"\x00" * 32,
bob_device_keys_manager,
bob_device_tracker,
)
decrypted = await bob_encryption_manager.decrypt_room_event(
events.RoomEvent(
sender="@alice:example.org",
type="m.room.encrypted",
room_id="!room_id",
event_id="$event_id",
content=encrypted_content,
origin_server_ts=1234567890123,
)
)
assert decrypted == events.RoomEvent(
sender="@alice:example.org",
type="m.room.message",
room_id="!room_id",
event_id="$event_id",
content={"msgtype": "m.text", "body": "Hello World!"},
origin_server_ts=1234567890123,
)
We then test that it handles certain errors as expected.
@pytest.mark.asyncio
async def test_megolm_decryption_error_handling(mock_aioresponse):
async with Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as client:
{{test Megolm decryption error handling}}
We set up an OlmMegolmEncryptionManagar
to test.
device_keys_manager = devices.DeviceKeysManager(client, b"\x00" * 32)
device_tracker = devices.DeviceTracker(client)
encryption_manager = megolm.OlmMegolmEncryptionManager(
client,
b"\x00" * 32,
device_keys_manager,
device_tracker,
)
We first try to decrypt a non-encrypted event and expect to get an error.
with pytest.raises(
error.UnableToDecryptError, match="Not encrypted or malformed event"
):
await encryption_manager.decrypt_room_event(
events.RoomEvent(
sender="@alice:example.org",
type="m.room.message",
room_id="!room_id",
event_id="$event_id",
content={"body": "Hello world!", "msgtype": "m.text"},
origin_server_ts=1234567890123,
)
)
Then we try to decrypt an event encrypted with an unknown algorithm.
with pytest.raises(
error.UnableToDecryptError, match="Not encrypted or malformed event"
):
await encryption_manager.decrypt_room_event(
events.RoomEvent(
sender="@alice:example.org",
type="m.room.encrypted",
room_id="!room_id",
event_id="$event_id",
content={"algorithm": "org.example.unknown"},
origin_server_ts=1234567890123,
)
)
Finally we try to decrypt a Megolm-encrypted event that we do not have the Megolm session for.
with pytest.raises(error.UnableToDecryptError, match="Unknown session ID"):
await encryption_manager.decrypt_room_event(
events.RoomEvent(
sender="@alice:example.org",
type="m.room.encrypted",
room_id="!room_id",
event_id="$event_id",
content={
"algorithm": megolm.MEGOLM_ALGORITHM,
"session_id": "nonexistent",
"ciphertext": "nothing",
},
origin_server_ts=1234567890123,
)
)
Checking if a encryption is enabled¶
Now that we can decrypt events, let us turn our attention to encrypting. Before we actually encrypt an event, we must first determine if we need to encrypt events for the room. That is, we check if the room has encryption enabled.
This is done by checking if the room has an
m.room.encryption
state event set with the algorithm
property set to m.megolm.v1.aes-sha2
.
However, the m.room.encryption
state event is slightly different from other
state events in that we are not interested in just the current value: once
encryption has been enabled in a room, it cannot be disabled. If encryption
could be disabled in a room, then users might not notice it being disabled, and
send sensitive messages under the assumption that the messages are encrypted.
It could also be problematic if the encryption algorithm is changed, as the new encryption algorithm could be weaker than the old one, or have different security properties. Although we only support Megolm for now, we will treat any attempt to change the encryption algorithm as an undesirable condition. In the future, if we support multiple encryption algorithms, we may allow some transitions, but it is safest to default to rejecting by default.
Thus we need to track the m.room.encryption
state event, and keep track of
whether encryption was ever enabled in a room. We do this in the same way that
we made the RoomStateTracker
, where we
subscribe to sync changes. When we receive an m.room.encryption
state event,
we compare it with the previously set values, and we will detect different
conditions and publish messages that the application can subscribe to:
if Megolm encryption is enabled for the first time or re-enabled after having been disabled or encryption set to a different algorithm,
if encryption is disabled (that is, if encryption had previously been enabled, and we receive an
m.room.encryption
event that does not have analgorithm
),if the encryption algorithm changes, or
if the encryption algorithm is initially set to an unknown algorithm.
In addition to publishing messages, we will also create a function that will tell us whether encryption is enabled in a room, and which will raise exceptions for the above conditions (except for Megolm encryption being enabled).
Todo
We also don’t want to allow changing parameters to something that is less secure?
c.publisher.subscribe(
(client.RoomStateUpdates, client.RoomTimelineUpdates),
self._state_subscriber,
)
async def _state_subscriber(
self, msg: typing.Union[client.RoomStateUpdates, client.RoomTimelineUpdates]
) -> None:
room_id = msg.room_id
for e in msg.events:
if (
isinstance(e, events.StateEvent)
and e.type == "m.room.encryption"
and e.state_key == ""
):
state = self.client.storage.get("room_encryption_state", {})
[prev_alg, orig_alg] = state.get(room_id, [None, None])
new_alg = e.content.get("algorithm")
if type(new_alg) != str:
new_alg = None
if prev_alg == new_alg:
# if the encryption algorithm isn't changing, we don't need to
# do anything
return
elif new_alg is None:
await self.publisher.publish(EncryptionDisabled(room_id, orig_alg))
elif orig_alg == new_alg:
# encryption is being restored to the original algorithm
await self.publisher.publish(EncryptionEnabled(room_id, new_alg))
elif prev_alg is None:
if orig_alg is None:
# encryption is being enabled for the first time
# the new encryption algorithm will be the room's original
# encryption algorithm
orig_alg = new_alg
if new_alg == MEGOLM_ALGORITHM:
await self.publisher.publish(
EncryptionEnabled(room_id, new_alg)
)
else:
await self.publisher.publish(
UnknownRoomEncryptionAlgorithm(
room_id, typing.cast(str, new_alg)
)
)
else:
# the encryption algorithm is being set to something
# other than the previous algorithm, and other than the
# original algorithm
await self.publisher.publish(
EncryptionAlgorithmChanged(room_id, orig_alg, new_alg)
)
else:
await self.publisher.publish(
EncryptionAlgorithmChanged(room_id, orig_alg, new_alg)
)
state[room_id] = [new_alg, orig_alg]
self.client.storage["room_encryption_state"] = state
def is_room_encrypted(self, room_id: str) -> bool:
"""
Determines whether events sent to the given room should be encrypted.
May raise
* ``EncryptionDisabledError`` if encryption was enabled in the room in the
past, but is now disabled
* ``EncryptionAlgorithmChangedError`` if the encryption algorithm for the
room changed from its initial algorithm
* ``UnknownRoomEncryptionAlgorithmError`` if the encryption algorithm for
the room is not known (that is, it is not Megolm)
"""
state = self.client.storage.get("room_encryption_state", {})
[curr_alg, orig_alg] = state.get(room_id, [None, None])
if curr_alg is None:
if orig_alg is None:
return False
else:
raise error.EncryptionDisabledError()
elif curr_alg != orig_alg:
raise error.EncryptionAlgorithmChangedError(orig_alg, curr_alg)
elif curr_alg == MEGOLM_ALGORITHM:
return True
else:
raise error.UnknownEncryptionAlgorithmError(orig_alg)
We now define the messages and exceptions that we use above.
class EncryptionEnabled(typing.NamedTuple):
"""A message indicating that encryption was enabled in a room"""
room_id: str
algorithm: str
class EncryptionDisabled(typing.NamedTuple):
"""A message indicating that encryption was disabled in a room"""
room_id: str
original_algorithm: str
EncryptionDisabled.original_algorithm.__doc__ = (
"The encryption algorithm that was initially set for the room"
)
class UnknownRoomEncryptionAlgorithm(typing.NamedTuple):
"""A message indicating that the encryption algorithm for a room was
initially set to an unknown algorithm"""
room_id: str
algorithm: str
class EncryptionAlgorithmChanged(typing.NamedTuple):
"""A message indicating that the encryption algorithm for a room changed"""
room_id: str
original_algorithm: str
new_algorithm: str
class EncryptionDisabledError(RuntimeError):
"""Encryption was enabled in a room but is now disabled"""
pass
class EncryptionAlgorithmChangedError(RuntimeError):
"""The encryption algorithm in a room was changed from its initial
algorithm"""
def __init__(self, initial_algorithm: str, new_algorithm: str):
super()
self.initial_algorithm = initial_algorithm
self.new_algorithm = new_algorithm
class UnknownEncryptionAlgorithmError(RuntimeError):
"""The initial encryption algorithm for a room is not a known algorithm"""
def __init__(self, algorithm: str):
super()
self.algorithm = algorithm
Tests
To test this, we will simulate m.room.encryption
events being sent to two
different rooms. In the first room, we will enable encryption, change it to
a different algorithm, and then disable encryption. And in the second room, we
will set the encryption algorithm to an unknown algorithm. We will subscribe
to all the messages, and ensure that we receive the expected messages at the
right times. We will also check that is_room_encrypted
returns the correct
response in each situation.
@pytest.mark.asyncio
async def test_room_encryption_event(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
{{room encryption event test}}
We first set up an OlmMegolmEncryptionManager
.
(
device_keys,
otks,
device_keys_manager,
) = await _create_device_keys(mock_aioresponse, c)
device_tracker = devices.DeviceTracker(c)
encryption_manager = megolm.OlmMegolmEncryptionManager(
c,
b"\x00" * 32,
device_keys_manager,
device_tracker,
)
We then subscribe to the messages that we are interested in. We use Python’s
Mock
class to
create subscriber methods so that we can check whether they were called.
subscriber = Mock()
encryption_manager.publisher.subscribe(
(
megolm.EncryptionEnabled,
megolm.UnknownRoomEncryptionAlgorithm,
megolm.EncryptionDisabled,
megolm.EncryptionAlgorithmChanged,
),
subscriber,
)
We check that our rooms are marked as unencrypted.
assert encryption_manager.is_room_encrypted("!room1") == False
assert encryption_manager.is_room_encrypted("!room2") == False
We then cause the client to publish sync events indicating changes in room encryption. First enable encryption in a room, and we check that we received a message indicating that encryption was enabled, and that the room is marked as encrypted.
await c.publisher.publish(
client.RoomTimelineUpdates(
"!room1",
[
events.StateEvent(
type="m.room.encryption",
state_key="",
sender="@bob:example.org",
content={"algorithm": megolm.MEGOLM_ALGORITHM},
event_id="$enable_encryption",
room_id="!room1",
origin_server_ts=1234567890000,
)
],
False,
None,
)
)
assert subscriber.call_count == 1
assert subscriber.call_args == mock.call(
megolm.EncryptionEnabled("!room1", megolm.MEGOLM_ALGORITHM)
)
assert encryption_manager.is_room_encrypted("!room1") == True
We then change the encryption algorithm, and check that we receive an event
indicating that the encryption algorithm is being changed, and that
is_room_encrypted
raises an error.
subscriber.reset_mock()
await c.publisher.publish(
client.RoomTimelineUpdates(
"!room1",
[
events.StateEvent(
type="m.room.encryption",
state_key="",
sender="@bob:example.org",
content={"algorithm": "a.different.algorithm"},
event_id="$enable_encryption",
room_id="!room1",
origin_server_ts=1234567890001,
)
],
False,
None,
)
)
assert subscriber.call_count == 1
assert subscriber.call_args == mock.call(
megolm.EncryptionAlgorithmChanged(
"!room1", megolm.MEGOLM_ALGORITHM, "a.different.algorithm"
),
)
with pytest.raises(error.EncryptionAlgorithmChangedError):
encryption_manager.is_room_encrypted("!room1")
Next, we send an empty m.room.encryption
event, and check that we receive an
event indicating that encryption was disabled, and that is_room_encrypted
raises an error.
subscriber.reset_mock()
await c.publisher.publish(
client.RoomTimelineUpdates(
"!room1",
[
events.StateEvent(
type="m.room.encryption",
state_key="",
sender="@bob:example.org",
content={},
event_id="$enable_encryption",
room_id="!room1",
origin_server_ts=1234567890002,
)
],
False,
None,
)
)
assert subscriber.call_count == 1
assert subscriber.call_args == mock.call(
megolm.EncryptionDisabled("!room1", megolm.MEGOLM_ALGORITHM)
)
with pytest.raises(error.EncryptionDisabledError):
encryption_manager.is_room_encrypted("!room1")
When we re-enable encryption with the correct algorithm, we should receive
another event indicating that encryption was enabled, and is_room_encrypted
should return True
.
subscriber.reset_mock()
await c.publisher.publish(
client.RoomTimelineUpdates(
"!room1",
[
events.StateEvent(
type="m.room.encryption",
state_key="",
sender="@bob:example.org",
content={"algorithm": megolm.MEGOLM_ALGORITHM},
event_id="$enable_encryption",
room_id="!room1",
origin_server_ts=1234567890003,
)
],
False,
None,
)
)
assert subscriber.call_count == 1
assert subscriber.call_args == mock.call(
megolm.EncryptionEnabled("!room1", megolm.MEGOLM_ALGORITHM)
)
assert encryption_manager.is_room_encrypted("!room1") == True
If we initially enable encryption in a room with an unknown algorithm, we
should receive an event indicating that the algorithm is unknown, and
is_room_encrypted
should raise an error.
subscriber.reset_mock()
await c.publisher.publish(
client.RoomTimelineUpdates(
"!room2",
[
events.StateEvent(
type="m.room.encryption",
state_key="",
sender="@bob:example.org",
content={"algorithm": "an.unknown.algorithm"},
event_id="$enable_encryption",
room_id="!room2",
origin_server_ts=1234567890004,
)
],
False,
None,
)
)
assert subscriber.call_count == 1
assert subscriber.call_args == mock.call(
megolm.UnknownRoomEncryptionAlgorithm("!room2", "an.unknown.algorithm")
)
with pytest.raises(error.UnknownEncryptionAlgorithmError):
encryption_manager.is_room_encrypted("!room2")
Encrypting to-device events¶
After determining that we need to encrypt events to the room, we will new actually perform the encryption. As before, we start with to-device events, and will look at room events after.
Applications will not encrypt to-device events directly, rather it will be called by the other functions such us the function used to encrypt room events. So the function that we write here to encrypt to-devices events will be an internal function.
The Matrix API allows us to send multiple to-device events, of the same type, in one call, so our function will take an event type and a list of device key objects and event contents, and will send the events to those devices. Our function will also take a batch size, as it will split the requests into batches to avoid making a request that is too big, and so that the server can start delivering the events before receiving all of them.
We must also handle errors — if we are unable to encrypt an event for a user, we will store the event to retry later.
Conceptual explanation¶
The code that we write here will be quite complicated, so before we get to the code, let as discuss how it will operate and explain why the code becomes complicated.
In the Encrypting with Olm section, we saw that creating a new Olm session to encrypt a to-device event required claiming one of the recipient’s one-time keys. If we need to create a new Olm session, encrypting an event for a single recipient is a fairly straightforward process when taken in isolation:
claim a one-time key;
create an Olm session (via our
OlmChannel
class);encrypt the event;
send the encrypted event.
However, in practice, it is not always this simple, even without taking into account error handling. When sending an encrypted room event, sending the Megolm sessions can take a significant amount of time in large rooms since we must ensure that each recipient gets the Megolm session, and we must send it to each recipient individually. Thus, for example, if there are 1000 recipient devices in a room, and none of them have the Megolm session, then we would need to go through this procedure 1000 times.
The Matrix API contains some optimizations to speed this up a bit. The endpoints for claiming one-time keys and for sending to-device events can claim from/send to multiple devices at a time, reducing the number of calls that must be made to the server. Thus if we need to encrypt to multiple recipients, and we need to create a new Olm session for all of them, the process could look something like this:
claim one-time keys for all recipients;
for each recipient:
create an Olm session;
encrypt the event;
send all the encrypted events.
If we already have Olm sessions created with some recipients, then we could modify the process to look something like this:
claim one-time keys for all recipients that we need new Olm sessions for;
for each recipient:
create an Olm session if we don’t already have one
encrypt the event;
send all the encrypted events.
But there are some obvious ways in which this is slower than necessary. Firstly, if we already have an Olm session with some recipients, we don’t need to wait until after we have claimed the one-time keys; we can begin encrypting to those recipients while we are waiting for the key claim request to return, so that we can start encrypting earlier. Secondly, we don’t need to wait until all the events have been encrypted before sending them all off. We can batch them up and start sending some events as others are still being encrypted. This new process might look something like:
send a key claim request to the server for all the recipients that we need new Olm sessions for, but don’t block waiting for it;
for each recipient that we already have on Olm session for
encrypt the event;
put the encrypted event in a send queue;
if the send queue is full
wait for the previous send task (if any) to finish
send all the events in the queue (don’t block waiting for the send to complete)
and empty the queue;
if the key claim request has not returned yet, but the previous send task (if any) has finished, send all the events in the send queue (if any) and empty the queue;
wait until the key claim request has returned;
for each of the recipient that we claimed a one-time key:
create an Olm session;
encrypt the event;
put the encrypted event in the send queue;
if the send queue is full
wait for the previous send task (if any) to finish
send all the events in the queue (don’t block waiting for the send to complete)
and empty the queue;
wait for the previous send task (if any) to finish and send all the events in the send queue (if any) and empty the queue.
You can see that it’s now starting to get complicated. We also need to make sure that we handle errors (for example, re-sending events if the initial attempt fails). As well, we may need to consider the case where we concurrently send multiple events to a single recipient, for example, if we send two encrypted events in two different rooms and so need to send two different Megolm sessions. In this case, ideally we do not want to claim two one-time keys an create two Olm sessions for the same recipient as that would be wasted effort; we only want to create one Olm session for each recipient.
There are many possible variations and many different ways to structure the code, depending on programming language capabilities and tradeoffs between complexity and efficiency. Here, we will make a moderately-complicated approach: we will implement the parallel process outlined above, but for error handling and concurrent sending, we will handle them in fairly simple ways.
We will explain how we handle each type of error as we get to them. With regards to concurrent sending, we will take the simple approach of simply disallowing it. When we encrypt with Olm, we will take out a lock, preventing any other Olm encryption. This is a very coarse approach, and in some cases may slow things down considerably, but it is very simple, and should work well enough in most situations. For example, in an interactive client, the user will only send events in one room at a time. If the client needs something different, other approaches could be used instead. For example, a separate lock could be used for each recipient, so that the a concurrent send will only block if it has a recipient in common with another.
Implementation¶
We will now implement the process that we outlined above.
Todo
explain OTK request timeout
async def _encrypt_and_send_to_device(
self,
events: typing.Iterable[typing.Tuple[dict, str, dict]],
otk_request_timeout: typing.Optional[int] = None,
batch_size: int = 200,
) -> None:
async with self.lock:
send_queue: list[typing.Tuple[str, str, dict]] = []
send_task: asyncio.Task[None] = asyncio.create_task(asyncio.sleep(0))
{{determine recipients without Olm channels}}
{{send keys claim request}}
{{encrypt to recipients with existing Olm sessions}}
{{send pending events while waiting for key claim request}}
{{create new Olm sessions and encrypt}}
{{flush send queue}}
First we determine which recipients we already have Olm channels with, and
which recipients we need to create channels for. If we have an existing Olm
channel for a recipient, we will put the Olm channel, the event type, and the
event contents in the events_to_encrypt
list. If we don’t have an existing
Olm channel, we store the recipient’s device keys (which we will use to create
an Olm channel), the event type, and the event contents in the
channels_to_create
list.
events_to_encrypt: list[typing.Tuple[olm.OlmChannel, str, dict]] = []
channels_to_create: list[typing.Tuple[dict, str, dict]] = []
for device_key, event_type, event_content in events:
if not schema.is_valid(
device_key, {"user_id": str, "device_id": str, "keys": dict}
):
logging.error("invalid device key", device_key)
continue
user_id = device_key["user_id"]
device_id = device_key["device_id"]
identity_key_id = f"curve25519:{device_id}"
if identity_key_id not in device_key["keys"]:
logging.error("device key does not have Olm identity key")
continue
identity_key = vodozemac.Curve25519PublicKey.from_base64(
device_key["keys"][identity_key_id]
)
olm_channel = olm.OlmChannel.create_from_storage(
self.client, self.device_keys_manager, user_id, identity_key
)
if isinstance(olm_channel, olm.OlmChannel):
olm_channel.assert_partner_device_id(device_id)
events_to_encrypt.append((olm_channel, event_type, event_content))
else:
channels_to_create.append((device_key, event_type, event_content))
For the recipients that we need to create Olm channels with, we need to send a
POST /keys/claim
request to claim a one-time key. We have previously written
a function for making the POST /keys/claim
request, so we will make use of it. Since we
do not want to block, waiting for the request to finish, we will create an
asyncio.Task
to make the request concurrently with our other processing.
# create the `devices` parameter for `claim_otks`, which is a dict mapping
# user IDs to a list of device IDs
claim_otks_devices: dict[str, list[str]] = {}
for device_key, _, _ in channels_to_create:
claim_otks_devices.setdefault(device_key["user_id"], []).append(
device_key["device_id"]
)
keys_claim_task = (
asyncio.create_task(
self.client.claim_otks(
"signed_curve25519",
claim_otks_devices,
otk_request_timeout,
)
)
if claim_otks_devices # an empty dict is falsy in Python
else asyncio.create_task(asyncio.sleep(0, ({}, [])))
)
While we’re waiting for the POST /keys/claim
request, we encrypt to the
recipients that we already have Olm channels with (if any). As we encrypt, we
place the encrypted events in a queue and when the queue is full, we send them.
Again, we do not need to wait for the sending to complete before continuing to
encrypt events, so we will use another asyncio.Task
for sending. However, we
only want to have one send request in flight at a time, so we wait for the
previous send to complete (if any) before sending a new batch of encrypted
events.
Todo
How do we handle errors?
for olm_channel, event_type, event_content in events_to_encrypt:
encrypted = olm_channel.encrypt(event_type, event_content)
send_queue.append(
(
olm_channel.partner_user_id,
typing.cast(str, olm_channel.partner_device_id),
encrypted,
)
)
if len(send_queue) >= batch_size:
send_task = asyncio.create_task(
self._flush_to_device_send_queue(send_task, send_queue)
)
send_queue = []
async def _flush_to_device_send_queue(
self,
send_task: asyncio.Task[None],
send_queue: list[typing.Tuple[str, str, dict]],
) -> None:
to_device_contents: dict[str, dict[str, dict]] = {}
for user_id, device_id, contents in send_queue:
to_device_contents.setdefault(user_id, {})[device_id] = contents
# wait until the previous send is done, so that we don't have multiple
# in-flight
await send_task
await asyncio.create_task(
self.client.send_to_device("m.room.encrypted", to_device_contents)
)
After we have finished encrypting events to existing Olm channels, we will need
to wait for the result of the POST /keys/claim
query so that we can create
the new Olm channels. If we don’t have the result yet, and we have some events
in the send queue, we can send those events while we are waiting for the POST /keys/claim
result. But this only makes sense to do if we don’t currently
have a send task in flight. Thus we wait for the first of the two tasks to
complete, and if the keys claim task is not complete while the send task is,
then we will send the events.
if len(send_queue) > 0:
done, pending = await asyncio.wait(
[keys_claim_task, send_task], return_when=asyncio.FIRST_COMPLETED
)
if keys_claim_task in pending:
send_task = asyncio.create_task(
self._flush_to_device_send_queue(send_task, send_queue)
)
send_queue = []
Now we must wait for the keys claim task to complete before we can continue any further. When that returns, we can use the one-time keys to create new Olm channels and encrypt the events to the remaining recipients. Again, as we encrypt events, we will put them in the queue, and when the queue is full, we will send the events. Note that we may not receive a one-time key for all of the recipients that we requested. This may happen if they do not have any one-time keys available, or if they are on a remote server that did not respond in time.
Todo
explain how to handle missing OTKs.
await keys_claim_task
otk_result, otk_failures = keys_claim_task.result()
for device_key, event_type, event_content in channels_to_create:
id_and_otk = otk_result.get(device_key["user_id"], {}).get(
device_key["device_id"]
)
if id_and_otk:
id, otk = id_and_otk
olm_channel = olm.OlmChannel.create_outbound_channel(
self.client,
self.device_keys_manager,
device_key,
otk,
)
encrypted = olm_channel.encrypt(event_type, event_content)
send_queue.append(
(
device_key["user_id"],
device_key["device_id"],
encrypted,
)
)
if len(send_queue) >= batch_size:
send_task = asyncio.create_task(
self._flush_to_device_send_queue(send_task, send_queue)
)
send_queue = []
Finally, after we finish encrypting the events, we send any events that are left in the send queue.
if len(send_queue) > 0:
await self._flush_to_device_send_queue(send_task, send_queue)
Tests
To test this, we will send two sets of events. The first set will be one event sent to Bob’s device, and the second set will be another event sent to Bob’s device as well as one event sent to Carol’s device. We expect that for the second set of events, it will only try to create a new Olm session with Carol’s device, and use the existing Olm session with Bob’s device. We check this by looking at the one-time keys that it tries to claim.
@pytest.mark.asyncio
async def test_send_olm_encrypted_events(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"room_state_tracker": {
"!room_id": {
"m.room.encryption": {
"": ROOM_ENCRYPTION_EVENT,
},
"m.room.member": {
"@alice:example.org": ALICE_ROOM_MEMBERSHIP,
"@bob:example.org": BOB_ROOM_MEMBERSHIP,
},
},
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as alice:
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@bob:example.org",
"device_id": "HIJKLMN",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as bob:
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@carol:example.org",
"device_id": "OPQRSTU",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as carol:
{{test send Olm-encrypted events}}
We first create the device keys for Alice, Bob, and Carol, and an
OlmMegolmEncryptionManager
for Alice.
(
alice_device_keys,
alice_otks,
alice_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, alice)
(
bob_device_keys,
bob_otks,
bob_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, bob)
(
carol_device_keys,
carol_otks,
carol_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, carol)
alice_device_tracker = devices.DeviceTracker(alice)
encryption_manager = megolm.OlmMegolmEncryptionManager(
alice,
b"\x00" * 32,
alice_device_keys_manager,
alice_device_tracker,
)
We create a callback that will capture the to-device event that Alice sends
to Bob and Carol, so that we can ensure that they are decryptable. With the
repeat=True
option passed to mock_aioresponse
, the callback will be called
for all calls to the sendToDevice
endpoint with m.room.encrypted
as the
event type.
to_device: dict[typing.Tuple[str, str], typing.Any] = {}
def send_to_device_callback(url, **kwargs):
nonlocal to_device
for user_id, device_messages in kwargs["json"]["messages"].items():
for device_id, message in device_messages.items():
to_device.setdefault((user_id, device_id), []).append(
message
)
return aioresponses.CallbackResult(
status=200,
body="{}",
headers={"Content-Type": "application/json"},
)
send_to_device_url_pattern = re.compile(
r"^https://matrix-client.example.org/_matrix/client/v3/sendToDevice/m.room.encrypted/.*$"
)
mock_aioresponse.put(
send_to_device_url_pattern,
callback=send_to_device_callback,
repeat=True,
)
We add a callback to handle the first /keys/claim
call that Alice will make,
and ensure that she is requesting Bob’s one-time key. Then we encrypt and send
an event to Bob.
def keys_claim_1_callback(url, **kwargs):
nonlocal bob_otks
if kwargs["json"]["one_time_keys"] == {
"@bob:example.org": {"HIJKLMN": "signed_curve25519"}
}:
otk_id, otk = [
(id, key)
for id, key in bob_otks.items()
if id.startswith("signed_curve25519:")
][0]
return aioresponses.CallbackResult(
status=200,
body=json.dumps(
{
"one_time_keys": {
"@bob:example.org": {"HIJKLMN": {otk_id: otk}}
}
}
),
headers={"Content-Type": "application/json"},
)
else:
return aioresponses.CallbackResult(
status=400,
body="unexpected response",
)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/claim",
callback=keys_claim_1_callback,
)
await encryption_manager._encrypt_and_send_to_device(
[(bob_device_keys, "m.room.message", {"content": "Hello world!"})]
)
We then add a callback to handle the second /keys/claim
call that Alice will
make, which should only be requesting Carol’s one-time key. Then we encrypt
and send an event to Bob and an event to Carol.
def keys_claim_2_callback(url, **kwargs):
nonlocal carol_otks
if kwargs["json"]["one_time_keys"] == {
"@carol:example.org": {"OPQRSTU": "signed_curve25519"}
}:
otk_id, otk = [
(id, key)
for id, key in carol_otks.items()
if id.startswith("signed_curve25519:")
][0]
return aioresponses.CallbackResult(
status=200,
body=json.dumps(
{
"one_time_keys": {
"@carol:example.org": {"OPQRSTU": {otk_id: otk}}
}
}
),
headers={"Content-Type": "application/json"},
)
else:
return aioresponses.CallbackResult(
status=400,
body="unexpected response",
)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/claim",
callback=keys_claim_2_callback,
)
await encryption_manager._encrypt_and_send_to_device(
[
(bob_device_keys, "m.room.message", {"content": "Hello!"}),
(carol_device_keys, "m.room.message", {"content": "Hello!"}),
]
)
Finally, we check that Alice encrypted the events properly by having Bob and Carol try to decrypt them.
bob_events = to_device[("@bob:example.org", "HIJKLMN")]
assert len(bob_events) == 2
(
bob_channel,
bob_decrypted_event_0,
) = olm.OlmChannel.create_from_encrypted_event(
bob,
bob_device_keys_manager,
"@alice:example.org",
alice_device_keys_manager.identity_key,
bob_events[0],
partner_device_id="ABCDEFG",
partner_fingerprint_key=alice_device_keys_manager.fingerprint_key,
)
assert bob_decrypted_event_0["content"] == {"content": "Hello world!"}
assert bob_channel.decrypt(bob_events[1])["content"] == {
"content": "Hello!"
}
carol_events = to_device[("@carol:example.org", "OPQRSTU")]
assert len(carol_events) == 1
(
carol_channel,
carol_decrypted_event_0,
) = olm.OlmChannel.create_from_encrypted_event(
carol,
carol_device_keys_manager,
"@alice:example.org",
alice_device_keys_manager.identity_key,
carol_events[0],
partner_device_id="ABCDEFG",
partner_fingerprint_key=alice_device_keys_manager.fingerprint_key,
)
assert carol_decrypted_event_0["content"] == {"content": "Hello!"}
Todo
Test other situations
Encrypting room events¶
We now write a function that allows us to encrypt and send room events. To encrypt a room event, we must have an outbound Megolm session for that room. So we remember the current Megolm session for each room. If we don’t have a Megolm session, then we create one. Also, if the Megolm session detects that it needs to be rotated, we will create a new one.
We create a function that obtains the Megolm session for the room (either using an existing one or creating a new one), and sends it to all the other devices in the room (as necessary). We have already written functions to do most of this; we just need to call them.
async def _get_and_send_outbound_megolm_session(
self,
room_id: str,
otk_request_timeout: typing.Optional[int] = None,
) -> OutboundMegolmSession:
{{get or create Megolm session for room}}
{{send Megolm sessions to recipient devices}}
return outbound_session
When we create an outbound Megolm session for a room, we remember the ID of the session. Then, when we need to encrypt for that room again, we get the session ID associated with the room, and the load the outbound session from the storage. We also need to create an inbound Megolm session from the outbound Megolm session, so that we can decrypt our own events.
Note
For better performance, we can maintain a cache of recently used outbound Megolm sessions, rather than loading the sessions from storage each time. However, that is beyond the scope of this book.
storage_key = f"outbound_megolm_session.{room_id}"
if storage_key in self.client.storage:
session_id = self.client.storage[storage_key]
outbound_session = OutboundMegolmSession(
self.client,
room_id,
room_state_tracker=self.room_state_tracker,
device_tracker=self.device_tracker,
device_keys_manager=self.device_keys_manager,
key=self.key,
session_id=session_id, # passing in the session ID loads it from storage
)
else:
outbound_session = OutboundMegolmSession(
self.client,
room_id,
room_state_tracker=self.room_state_tracker,
device_tracker=self.device_tracker,
device_keys_manager=self.device_keys_manager,
key=self.key,
)
InboundMegolmSession.from_outbound_session(
self.client,
outbound_session,
key=self.key,
)
self.client.storage[storage_key] = outbound_session.session_id
We use the Megolm session’s get_session_key_for_sending
method to get the
room key event payload to send to each device. This method may raise an
exception if it detects that the session needs rotating, in which case we
create a new Megolm session and store the session ID for the room.
try:
session_keys = await outbound_session.get_session_key_for_sending()
except SessionExpiredException:
outbound_session = OutboundMegolmSession(
self.client,
room_id,
room_state_tracker=self.room_state_tracker,
device_tracker=self.device_tracker,
device_keys_manager=self.device_keys_manager,
key=self.key,
)
InboundMegolmSession.from_outbound_session(
self.client,
outbound_session,
key=self.key,
)
self.client.storage[storage_key] = outbound_session.session_id
session_keys = await outbound_session.get_session_key_for_sending()
# remove the old outbound session from storage
del self.client.storage[
f"outbound_megolm_session.{room_id}.{outbound_session.session_id}"
]
We then encrypt and send the room key events using the function we wrote in the last section.
if len(session_keys) != 0:
keys_to_send = [
(device_key, "m.room_key", content)
for (device_key, content) in session_keys
]
await self._encrypt_and_send_to_device(keys_to_send, otk_request_timeout)
outbound_session.mark_as_sent(
[device_key for (device_key, _) in session_keys]
)
Todo
Handle errors from _encrypt_and_send_to_device
Using this function, we can create a function that takes an event, and encrypts and sends it to a room. We use the same arguments as for sending unencrypted events. Using the functions that we have already written, this function is fairly straightforward: we get the Megolm and send session for encrypting the event, we encrypt the event, and then we send it to the room.
async def encrypt_and_send_room_event(
self,
room_id: str,
event_type: str,
event_content: dict[str, typing.Any],
retry_ms: int = 0,
txn_id: typing.Optional[str] = None,
) -> None:
"""Send an event to a room
Arguments:
``room_id``:
the ID of the room to send to
``event_type``:
the type of event to send
``event_content``:
the content of the event
``retry_ms``:
how long to retry sending, in milliseconds
``txn_id``:
the transaction ID to use. If none is specified, one is generated.
"""
session = await self._get_and_send_outbound_megolm_session(room_id)
encrypted_content = session.encrypt(event_type, event_content)
await self.client.send_event(
room_id,
"m.room.encrypted",
encrypted_content,
retry_ms,
txn_id,
)
Todo
Handle errors
Tests
We test that we can send an encrypted event to a room. Alice will send an
encrypted event to a room, and we will ensure that Bob can decrypt it. We will
create an OlmMegolmEncryptionManager
for each user, and ensure that Alice’s
OlmMegolmEncryptionManager
can generate events that Bob’s
OlmMegolmEncryptionManager
can decrypt.
@pytest.mark.asyncio
async def test_send_megolm_encrypted_event_to_room(mock_aioresponse):
room_state = {
"m.room.encryption": {
"": {
"room_id": "!room_id",
"type": "m.room.encryption",
"state_key": "",
"sender": "@alice:example.org",
"event_id": "$encryption_event",
"origin_server_ts": 1234567890123,
"content": {
"algorithm": megolm.MEGOLM_ALGORITHM,
"rotation_period_msgs": 2,
},
},
},
"m.room.member": {
"@alice:example.org": ALICE_ROOM_MEMBERSHIP,
"@bob:example.org": BOB_ROOM_MEMBERSHIP,
},
}
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"room_state_tracker": {
"!room_id": room_state,
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as alice:
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@bob:example.org",
"device_id": "HIJKLMN",
"room_state_tracker": {
"!room_id": room_state,
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as bob:
{{test send Megolm-encrypted event to room}}
First we create device keys for Alice and Bob, we populate the clients’ device caches with the keys, and we create encryption managers.
(
alice_device_keys,
alice_otks,
alice_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, alice)
(
bob_device_keys,
bob_otks,
bob_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, bob)
bob.storage["device_tracker.cache.@alice:example.org"] = alice.storage[
"device_tracker.cache.@alice:example.org"
] = {
"device_keys": {
"ABCDEFG": alice_device_keys,
}
}
bob.storage["device_tracker.cache.@bob:example.org"] = alice.storage[
"device_tracker.cache.@bob:example.org"
] = {
"device_keys": {
"HIJKLMN": bob_device_keys,
}
}
alice_device_tracker = devices.DeviceTracker(alice)
alice_encryption_manager = megolm.OlmMegolmEncryptionManager(
alice,
b"\x00" * 32,
alice_device_keys_manager,
alice_device_tracker,
)
bob_device_tracker = devices.DeviceTracker(alice)
bob_encryption_manager = megolm.OlmMegolmEncryptionManager(
bob,
b"\x00" * 32,
bob_device_keys_manager,
bob_device_tracker,
)
We then create request callbacks so that to-device events sent by Alice will simulate a sync response in Bob’s client, and to capture room events sent by Alice.
async def send_to_device_callback(url, **kwargs):
nonlocal bob
messages = kwargs["json"]["messages"]
to_bob = messages.get("@bob:example.org", {}).get("HIJKLMN")
if to_bob:
await bob.publisher.publish(
client.ToDeviceEvents(
[
events.Event(
sender="@alice:example.org",
type="m.room.encrypted",
content=to_bob,
)
]
)
)
return aioresponses.CallbackResult(
status=200,
body="{}",
headers={
"Content-Type": "application/json",
},
)
send_to_device_url_pattern = re.compile(
r"^https://matrix-client.example.org/_matrix/client/v3/sendToDevice/m.room.encrypted/.*$"
)
mock_aioresponse.put(
send_to_device_url_pattern,
callback=send_to_device_callback,
repeat=True,
)
room_events = []
def send_event_callback(url, **kwargs):
nonlocal bob, room_events
match = re.search(
"^/_matrix/client/v3/rooms/([^/]+)/send/([^/]+)/",
url.raw_path,
)
room_id = urllib.parse.unquote(match.group(1))
event_type = urllib.parse.unquote(match.group(2))
room_events.append((room_id, event_type, kwargs["json"]))
return aioresponses.CallbackResult(
status=200,
body='{"event_id": "$event_id"}',
headers={
"Content-Type": "application/json",
},
)
send_event_url_pattern = re.compile(
r"^https://matrix-client.example.org/_matrix/client/v3/rooms/[^/]+/send/.*$"
)
mock_aioresponse.put(
send_event_url_pattern,
callback=send_event_callback,
repeat=True,
)
And we create a callback so that Alice can claim one of Bob’s one-time keys.
def keys_claim_callback(url, **kwargs):
nonlocal bob_otks
if kwargs["json"]["one_time_keys"] == {
"@bob:example.org": {"HIJKLMN": "signed_curve25519"}
}:
otk_id, otk = [
(id, key)
for id, key in bob_otks.items()
if id.startswith("signed_curve25519:")
][0]
return aioresponses.CallbackResult(
status=200,
body=json.dumps(
{
"one_time_keys": {
"@bob:example.org": {"HIJKLMN": {otk_id: otk}}
}
}
),
headers={"Content-Type": "application/json"},
)
else:
return aioresponses.CallbackResult(
status=400,
body="unexpected response",
)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/claim",
callback=keys_claim_callback,
)
Finally, Alice sends an encrypted event to the room, and we ensure that Bob can decrypt it.
await alice_encryption_manager.encrypt_and_send_room_event(
"!room_id",
"m.room.message",
{
"body": "Hello World!",
"msgtype": "m.text",
},
)
assert len(room_events) == 1
room_id, event_type, content = room_events[0]
decrypted = await bob_encryption_manager.decrypt_room_event(
events.RoomEvent(
type=event_type,
sender="@alice:example.org",
room_id=room_id,
content=content,
event_id="$event_id",
origin_server_ts=1234567890123,
)
)
assert decrypted.content == {
"body": "Hello World!",
"msgtype": "m.text",
}
Todo
test other scenarios
Pre-sending Megolm sessions¶
Since we potentially need to send the Megolm session to every recipient device in the room, sending an encrypted event in a large room may take a long time. One way to increase the perceived performance is to start sending out the Megolm sessions before the room event is ready to be encrypted and sent. In this way, when the room event is ready to be encrypted and sent, some of the devices will have already received the Megolm session, and we will need to send the Megolm session to fewer devices.
However, this must be done with some care, as this could result in wasted effort: if the Megolm session is sent out too early, then something could occur (e.g. a user leaving the room) that would cause the Megolm session to need to be rotated, meaning that we sent out a Megolm session that does not get used. If the client wants to use this technique, it must determine how to decide when to pre-send the Megolm sessions so that it is unlikely that the Megolm session that gets sent needs to be rotated before the event is ready. For example, an interactive client might start pre-sending Megolm sessions when a user starts typing in a room. This is an indication that they will likely be sending an event soon, and it’s lees likely that, for example, a user will leave the room in the short time between the user starting to type, and the user sending the event. If the user takes a while to write their message, the client may pre-send Megolm sessions multiple times, in case new users or devices join, or in case the Megolm session needs to be rotated.
Here, we provide a function that a client can call to pre-send a Megolm session
in a room, which simply calls the _get_and_send_outbound_megolm_session
function that we wrote earlier.
async def pre_send_megolm_session(self, room_id: str) -> None:
"""Pre-send the Megolm session for a room.
If the room does not yet have an outbound Megolm session, or the Megolm
session needs to be rotated, a new one will be created.
This can be used before a room event is ready to be sent, to reduce the
number of devices that we need to send the Megolm session to
Arguments:
``room_id``:
the ID of the room
"""
# FIXME: explain the timeout
await self._get_and_send_outbound_megolm_session(room_id, 1000)
Tests
To test this, we will have Alice pre-send a Megolm session, and then send an encrypted room event. We check that Bob receives the key when it is pre-sent, but not when the room event is sent.
@pytest.mark.asyncio
async def test_pre_send_megolm_session(mock_aioresponse):
room_state = {
"m.room.encryption": {
"": {
"room_id": "!room_id",
"type": "m.room.encryption",
"state_key": "",
"sender": "@alice:example.org",
"event_id": "$encryption_event",
"origin_server_ts": 1234567890123,
"content": {
"algorithm": megolm.MEGOLM_ALGORITHM,
"rotation_period_msgs": 2,
},
},
},
"m.room.member": {
"@alice:example.org": ALICE_ROOM_MEMBERSHIP,
"@bob:example.org": BOB_ROOM_MEMBERSHIP,
},
}
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"room_state_tracker": {
"!room_id": room_state,
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as alice:
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@bob:example.org",
"device_id": "HIJKLMN",
"room_state_tracker": {
"!room_id": room_state,
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as bob:
{{test pre-send Megolm session}}
We create device keys, populate the clients’ device caches, and create encryption managers.
(
alice_device_keys,
alice_otks,
alice_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, alice)
(
bob_device_keys,
bob_otks,
bob_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, bob)
bob.storage["device_tracker.cache.@alice:example.org"] = alice.storage[
"device_tracker.cache.@alice:example.org"
] = {
"device_keys": {
"ABCDEFG": alice_device_keys,
}
}
bob.storage["device_tracker.cache.@bob:example.org"] = alice.storage[
"device_tracker.cache.@bob:example.org"
] = {
"device_keys": {
"HIJKLMN": bob_device_keys,
}
}
alice_device_tracker = devices.DeviceTracker(alice)
alice_encryption_manager = megolm.OlmMegolmEncryptionManager(
alice,
b"\x00" * 32,
alice_device_keys_manager,
alice_device_tracker,
)
bob_device_tracker = devices.DeviceTracker(alice)
bob_encryption_manager = megolm.OlmMegolmEncryptionManager(
bob,
b"\x00" * 32,
bob_device_keys_manager,
bob_device_tracker,
)
Here, we create the same callbacks as we did the test for sending a Megolm-encrypted event, except that in the to-device handler, we count the number of events that we have received, so that we can check that we only receive the Megolm session once.
to_device_count = 0
async def send_to_device_callback(url, **kwargs):
nonlocal bob, to_device_count
messages = kwargs["json"]["messages"]
to_bob = messages.get("@bob:example.org", {}).get("HIJKLMN")
if to_bob:
to_device_count = to_device_count + 1
await bob.publisher.publish(
client.ToDeviceEvents(
[
events.Event(
sender="@alice:example.org",
type="m.room.encrypted",
content=to_bob,
)
]
)
)
return aioresponses.CallbackResult(
status=200,
body="{}",
headers={
"Content-Type": "application/json",
},
)
send_to_device_url_pattern = re.compile(
r"^https://matrix-client.example.org/_matrix/client/v3/sendToDevice/m.room.encrypted/.*$"
)
mock_aioresponse.put(
send_to_device_url_pattern,
callback=send_to_device_callback,
repeat=True,
)
room_events = []
def send_event_callback(url, **kwargs):
nonlocal bob, room_events
match = re.search(
"^/_matrix/client/v3/rooms/([^/]+)/send/([^/]+)/",
url.raw_path,
)
room_id = urllib.parse.unquote(match.group(1))
event_type = urllib.parse.unquote(match.group(2))
room_events.append((room_id, event_type, kwargs["json"]))
return aioresponses.CallbackResult(
status=200,
body='{"event_id": "$event_id"}',
headers={
"Content-Type": "application/json",
},
)
send_event_url_pattern = re.compile(
r"^https://matrix-client.example.org/_matrix/client/v3/rooms/[^/]+/send/.*$"
)
mock_aioresponse.put(
send_event_url_pattern,
callback=send_event_callback,
repeat=True,
)
def keys_claim_callback(url, **kwargs):
nonlocal bob_otks
if kwargs["json"]["one_time_keys"] == {
"@bob:example.org": {"HIJKLMN": "signed_curve25519"}
}:
otk_id, otk = [
(id, key)
for id, key in bob_otks.items()
if id.startswith("signed_curve25519:")
][0]
return aioresponses.CallbackResult(
status=200,
body=json.dumps(
{
"one_time_keys": {
"@bob:example.org": {"HIJKLMN": {otk_id: otk}}
}
}
),
headers={"Content-Type": "application/json"},
)
else:
return aioresponses.CallbackResult(
status=400,
body="unexpected response",
)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/claim",
callback=keys_claim_callback,
)
Alice then pre-sends the Megolm session for the room, and we ensure that Bob has received one to-device event, indicating that he has received the Megolm session.
await alice_encryption_manager.pre_send_megolm_session("!room_id")
assert to_device_count == 1
Finally, Alice encrypts and sends an event to the room, and we ensure that Bob has not received a new to-device event (indicating that Alice didn’t send the Megolm session to Bob again), and that he can decrypt the event (indicating that the pre-sent Megolm session was sent correctly).
await alice_encryption_manager.encrypt_and_send_room_event(
"!room_id",
"m.room.message",
{
"body": "Hello World!",
"msgtype": "m.text",
},
)
assert to_device_count == 1
assert len(room_events) == 1
room_id, event_type, content = room_events[0]
decrypted = await bob_encryption_manager.decrypt_room_event(
events.RoomEvent(
type=event_type,
sender="@alice:example.org",
room_id=room_id,
content=content,
event_id="$event_id",
origin_server_ts=1234567890123,
)
)
assert decrypted.content == {
"body": "Hello World!",
"msgtype": "m.text",
}
Discarding outbound sessions¶
Clients may wish to force the Megolm session in a room to rotate, apart from the times when it is automatically rotated. Some clients give users the ability to discard the current Megolm session for a room. This can be used, for example, if a device in the room has been known to be compromised, and the senders want to ensure that the device cannot read any future messages.
To force a Megolm session to rotate, we simply need to remove the outbound session for the room from the storage. The next time it tries to encrypt an event for that room, it will not be able to find an outbound Megolm session for it, and so will create a new one.
def discard_session(self, room_id: str) -> None:
"""
Discard the outbound Megolm session for a room
This forces a new session to be created the next time we send an event to
the room.
Arguments:
``room_id``:
the ID of the room
"""
storage_key = f"outbound_megolm_session.{room_id}"
if storage_key in self.client.storage:
session_id = self.client.storage[storage_key]
del self.client.storage[storage_key]
del self.client.storage[f"outbound_megolm_session.{room_id}.{session_id}"]
Tests
To test this, we encrypt an event, call discard_session
encrypt another
event, and ensure that the session IDs for the two events are different.
Todo
@pytest.mark.asyncio
async def test_discard_session(mock_aioresponse):
room_state = {
"m.room.encryption": {
"": {
"room_id": "!room_id",
"type": "m.room.encryption",
"state_key": "",
"sender": "@alice:example.org",
"event_id": "$encryption_event",
"origin_server_ts": 1234567890123,
"content": {
"algorithm": megolm.MEGOLM_ALGORITHM,
"rotation_period_msgs": 2,
},
},
},
"m.room.member": {
"@alice:example.org": ALICE_ROOM_MEMBERSHIP,
"@bob:example.org": BOB_ROOM_MEMBERSHIP,
},
}
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"room_state_tracker": {
"!room_id": {
"m.room.encryption": {
"": ROOM_ENCRYPTION_EVENT,
},
"m.room.member": {
"@alice:example.org": ALICE_ROOM_MEMBERSHIP,
"@bob:example.org": BOB_ROOM_MEMBERSHIP,
},
},
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as alice:
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@bob:example.org",
"device_id": "HIJKLMN",
"room_state_tracker": {
"!room_id": room_state,
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as bob:
{{test discard session}}
We set up the encryption manager for Alice and Bob.
(
alice_device_keys,
alice_otks,
alice_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, alice)
(
bob_device_keys,
bob_otks,
bob_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, bob)
bob.storage["device_tracker.cache.@alice:example.org"] = alice.storage[
"device_tracker.cache.@alice:example.org"
] = {
"device_keys": {
"ABCDEFG": alice_device_keys,
}
}
bob.storage["device_tracker.cache.@bob:example.org"] = alice.storage[
"device_tracker.cache.@bob:example.org"
] = {
"device_keys": {
"HIJKLMN": bob_device_keys,
}
}
alice_device_tracker = devices.DeviceTracker(alice)
alice_encryption_manager = megolm.OlmMegolmEncryptionManager(
alice,
b"\x00" * 32,
alice_device_keys_manager,
alice_device_tracker,
)
bob_device_tracker = devices.DeviceTracker(alice)
bob_encryption_manager = megolm.OlmMegolmEncryptionManager(
bob,
b"\x00" * 32,
bob_device_keys_manager,
bob_device_tracker,
)
We set up handlers to handle the HTTP requests that Alice will make. This is similar to the handlers that we defined for previous tests, so we will not say much about them.
async def send_to_device_callback(url, **kwargs):
nonlocal bob
messages = kwargs["json"]["messages"]
to_bob = messages.get("@bob:example.org", {}).get("HIJKLMN")
if to_bob:
await bob.publisher.publish(
client.ToDeviceEvents(
[
events.Event(
sender="@alice:example.org",
type="m.room.encrypted",
content=to_bob,
)
]
)
)
return aioresponses.CallbackResult(
status=200,
body="{}",
headers={
"Content-Type": "application/json",
},
)
send_to_device_url_pattern = re.compile(
r"^https://matrix-client.example.org/_matrix/client/v3/sendToDevice/m.room.encrypted/.*$"
)
mock_aioresponse.put(
send_to_device_url_pattern,
callback=send_to_device_callback,
repeat=True,
)
room_events = []
def send_event_callback(url, **kwargs):
nonlocal bob, room_events
match = re.search(
"^/_matrix/client/v3/rooms/([^/]+)/send/([^/]+)/",
url.raw_path,
)
room_id = urllib.parse.unquote(match.group(1))
event_type = urllib.parse.unquote(match.group(2))
room_events.append((room_id, event_type, kwargs["json"]))
return aioresponses.CallbackResult(
status=200,
body='{"event_id": "$event_id"}',
headers={
"Content-Type": "application/json",
},
)
send_event_url_pattern = re.compile(
r"^https://matrix-client.example.org/_matrix/client/v3/rooms/[^/]+/send/.*$"
)
mock_aioresponse.put(
send_event_url_pattern,
callback=send_event_callback,
repeat=True,
)
def keys_claim_callback(url, **kwargs):
nonlocal bob_otks
if kwargs["json"]["one_time_keys"] == {
"@bob:example.org": {"HIJKLMN": "signed_curve25519"}
}:
otk_id, otk = [
(id, key)
for id, key in bob_otks.items()
if id.startswith("signed_curve25519:")
][0]
return aioresponses.CallbackResult(
status=200,
body=json.dumps(
{
"one_time_keys": {
"@bob:example.org": {"HIJKLMN": {otk_id: otk}}
}
}
),
headers={"Content-Type": "application/json"},
)
else:
return aioresponses.CallbackResult(
status=400,
body="unexpected response",
)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/claim",
callback=keys_claim_callback,
)
Alice sends an encrypted event to the room, discards the outgoing Megolm session, then sends another event. We ensure that the two events have different session IDs.
await alice_encryption_manager.encrypt_and_send_room_event(
"!room_id",
"m.room.message",
{
"body": "Hello World!",
"msgtype": "m.text",
},
)
alice_encryption_manager.discard_session("!room_id")
await alice_encryption_manager.encrypt_and_send_room_event(
"!room_id",
"m.room.message",
{
"body": "Hello again!",
"msgtype": "m.text",
},
)
(_, _, event_content0) = room_events[0]
(_, _, event_content1) = room_events[1]
assert event_content0["session_id"] != event_content1["session_id"]
Example: encrypted echo bot¶
We can now modify the echo bot that we wrote previously to handle encrypted rooms. As before, it relies on having previously logged in with the login script.
# {{copyright}}
"""Echo events, with encryption support"""
import asyncio
from base64 import b64decode, b64encode
import json
import logging
import os
import sys
import typing
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
from matrixlib import client
from matrixlib import events
from matrixlib import megolm
{{json file storage}}
async def main() -> None:
global clientref
async with client.Client(storage=JsonFileStorage()) as c:
{{encrypted echo example}}
asyncio.run(main())
Since we will be dealing with encrypted events, we create an
OlmMegolmEncryptionManager
. For the key, for encrypting the storage, we
derive a key from a password. As this is just a demo, we hard-code the
password, but in a real client, the password should be entered by the user, or
the key should be stored securely in some way.
salt_b64 = c.storage.get("storage_encryption_key_salt")
if salt_b64 is not None:
salt = b64decode(salt_b64)
else:
salt = os.urandom(32)
c.storage["storage_encryption_key_salt"] = b64encode(salt).decode()
password = b"SecretPasswordDon'tTellAnyone"
key = Scrypt(
length=32,
salt=salt,
n=2**20,
r=8,
p=1,
).derive(password)
encryption_manager = megolm.OlmMegolmEncryptionManager(c, key)
As with our non-encrypted echo bot, we create a subscriber to receive events.
In our non-encrypted echo bot, we only handled m.room.message
events; here,
we have to handle both m.room.message
events and m.room.encrypted
events.
With m.room.message
events, we can simply process the event as-is, but with
m.room.encrypted
events, we need to decrypt the event and then process the
decrypted event. If we fail to decrypt the event, we will log the failure to
the console.
async def timeline_subscriber(updates: client.RoomTimelineUpdates) -> None:
for event in updates.events:
if isinstance(event, events.StateEvent):
continue
elif event.type == "m.room.message":
await process_message(event)
elif event.type == "m.room.encrypted":
try:
decrypted = await encryption_manager.decrypt_room_event(event)
except Exception as e:
logging.error("unable to decrypt event:", e)
continue
if decrypted.type == "m.room.message":
await process_message(decrypted)
async def process_message(event: events.RoomEvent) -> None:
{{encrypted echo bot: process message}}
When we process message events, we ignore any invalid events that are not text events. We also ignore events sent by our own user.
if (
"body" not in event.content
or event.content.get("msgtype") != "m.text"
or event.sender == c.user_id
):
return
As before, if we get a message saying “!quit”, we take this as a signal to exit.
if event.content["body"] == "!quit":
c.stop_sync()
return
Finally, we construct the echo. We check if the room is encrypted, and if so, we encrypt and send the echo event. Otherwise, we send the echo event unencrypted. If the encryption state for the room is invalid (for example, if encryption was disabled), we refuse to send events to the room, and log the error to the console.
content = {"body": event.content["body"], "msgtype": "m.notice"}
try:
is_encrypted = encryption_manager.is_room_encrypted(event.room_id)
except Exception as e:
logging.error(
"Room encryption is in an invalid state",
e,
"Refusing to send event",
)
return
if is_encrypted:
await encryption_manager.encrypt_and_send_room_event(
event.room_id, "m.room.message", content
)
else:
await c.send_event(event.room_id, "m.room.message", content)
We now register our event subscriber, start the sync, and wait until the sync task is done.
c.publisher.subscribe(client.RoomTimelineUpdates, timeline_subscriber)
c.start_sync()
await typing.cast(asyncio.Task[None], c.sync_task)
If you run this script, it should echo back any messages as the previous echo bot did, but it should also work in encrypted rooms.