Encrypting with Megolm¶
Now that we know what devices are in the room, we can start encrypting our event. As mentioned previously, encryption happens in two parts, we encrypt the room event using a Megolm session, and then we encrypt the Megolm session for each recipient device. We start by looking at encrypting room events with the Megolm session.
We have two types of Megolm sessions: one for encrypting events, which we refer to as an outbound session; and one for decrypting events (including events we sent ourselves), which we refer to as an inbound session. We will create classes to manage these sessions. Let us start with the outbound session.
# {{copyright}}
"""Megolm-related functionality"""
import asyncio
from base64 import b64decode, b64encode
import json
import logging
import os
import time
import typing
import sys
import vodozemac
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from .client import Client
from . import client
from . import devices
from . import error
from . import events
from . import olm
from . import pubsub
from . import rooms
from . import schema
MEGOLM_ALGORITHM = "m.megolm.v1.aes-sha2"
{{megolm module classes}}
class OutboundMegolmSession:
"""Manages a Megolm session for decrypting"""
{{OutboundMegolmSession class methods}}
Each Megolm session is tied to a room, and we will need the room ID when we encrypt a message, so we will pass it into our initialization function. Part of the behaviour of the Megolm session will be determined by the current room state, so we will also need a room state tracker. We also need a device tracker so that we can get the recipient devices, and a device keys manager so that we can get our own device keys.
We will allow for the possibility of creating a brand new session or loading an
existing one from the storage, for example, after restarting the client. We
will do this by optionally specifying the ID of the Megolm session to the
initialization function, and loading the session (along with associated data)
from storage. As with the vodozemac Account
, we will store the vodozemac
Megolm session encrypted in storage, so we will allow passing in a key. For
convenience, if no key is passed in, we use the same key as the key used in the
device keys manager.
We will be storing some data alongside the Megolm session. We will create the
initial values for that data when we discuss the individual items. Some of the
data will be encrypted, which we will do using AES-GCM and the same key as the
pickle key for vodozemac, so we create an object that will allow us to do that
using the cryptography
package.
As well, we will create a lock to ensure that we don’t have conflicts when manipulating our session data.
def __init__(
self,
c: Client,
room_id: str,
room_state_tracker: rooms.RoomStateTracker,
device_tracker: devices.DeviceTracker,
device_keys_manager: devices.DeviceKeysManager,
key: typing.Optional[bytes] = None,
session_id: typing.Optional[str] = None,
):
"""
Arguments:
``c``:
the client object
``room_id``:
the ID of the room that the session belongs to
``room_state_tracker``:
a ``RoomStateTracker`` object
``device_tracker``:
a ``DeviceTracker`` object
``device_keys_manager``:
a ``DeviceKeysManager`` object
``key``:
a 32-byte binary used to encrypt the objects in storage
``session_id``:
if given, will load a session from storage. If omitted a new session
will be created
"""
self.client = c
self.room_id = room_id
self.room_state_tracker = room_state_tracker
self.device_tracker = device_tracker
self.device_keys_manager = device_keys_manager
self.key = key if key else device_keys_manager.key
if session_id:
self.session_data = c.storage[
f"outbound_megolm_session.{room_id}.{session_id}"
]
self.session = vodozemac.GroupSession.from_pickle(
self.session_data["pickle"], self.key
)
else:
self.session = vodozemac.GroupSession()
self.session_data = {
"pickle": self.session.pickle(self.key),
{{OutboundMegolmSession session data initialization}}
}
self._store_session_data()
self.aesgcm = AESGCM(self.key)
self.lock = asyncio.Lock()
def _store_session_data(self, pickle=False) -> None:
if pickle:
self.session_data["pickle"] = self.session.pickle(self.key)
name = f"outbound_megolm_session.{self.room_id}.{self.session.session_id}"
self.client.storage[name] = self.session_data
Each Megolm session has an ID that allows clients to refer to it. We allow the client to access this.
@property
def session_id(self) -> str:
"""The ID of the Megolm session"""
return self.session.session_id
Getting the session key for sending to recipients¶
When we encrypt a message, we will need to send the current state of the Megolm
session (referred to as the “session key”) to all the recipient devices that
have not already received this Megolm session. We create a function that will
return a list of devices that we need to send the Megolm session to, along with
the data to send to them, in the form of the contents of an m.room_key
event. The m.room_key
event will be sent
to the devices encrypted; we will discuss how this is done in the section on
olm encryption. For now, we will assume that the event is magically
securely teleported to the recipient.
async def get_session_key_for_sending(self) -> list[typing.Tuple[dict, dict]]:
"""Get the devices that we need to send the Megolm session, along with the
data to send.
Returns a list of tuples. The first item in the tuple is the device key
to send to, and the second item in the tuple is the contents of an
``m.room_key`` event to send to the device. The event must be sent
encrypted.
"""
async with self.lock:
{{check if Megolm session is expired}}
{{get Megolm session key}}
{{get devices to send Megolm session to}}
A single Megolm session can be used to encrypt multiple messages, but there are limits to this. After a while, the Megolm session should be rotated (that is, replaced with a new session). This is to ensure that if an attacker somehow obtains a Megolm session, their ability to use it will be limited. To signal that the Megolm session must be rotated, this function will raise an exception.
The expiry parameters for a room are set by the m.room.encryption
state event,
and Megolm sessions can expire based on time (the rotation_period_ms
property,
which defaults to one week if not set), or based on the number of messages that
it has encrypted (the rotation_period_msgs
property, which defaults to 100
messages). To check whether the session is expired based on time, we will need
to store the time at which we created the session. To check whether the session
is expired based on the number of messages encrypted, we can use the session’s
own message_index
property, which gives the number of messages that have been
encrypted. Note that the message_index
starts counting from zero, and we call
get_session_key_for_sending
before we encrypt, so we raise the exception if
the message_index
is greater or equal to rotation_period_msgs
, rather than
strictly greater than.
Todo
allow the application to specify maximums for rotation period parameters
"creation_time": time.time_ns() / 1000,
encryption_state = self.room_state_tracker.get_state(
self.room_id, "m.room.encryption"
)
if encryption_state == None:
raise RuntimeError("Room is not encrypted")
encryption_state = typing.cast(events.StateEvent, encryption_state)
rotation_period_ms = encryption_state.content.get(
"rotation_period_ms", 7 * 24 * 60 * 60 * 1000
)
rotation_period_msgs = encryption_state.content.get(
"rotation_period_msgs", 100
)
if (
rotation_period_msgs <= self.session.message_index
or time.time_ns() / 1000
> self.session_data["creation_time"] + rotation_period_ms
):
raise SessionExpiredException()
class SessionExpiredException(Exception):
"""Indicates that the session has expired and must be rotated"""
pass
We construct the contents of an m.room_key
event that includes the session
key from the vodozemac object, along with associated information. This will
form part of our return value, along with the devices that we are sending this
to.
room_key = {
"algorithm": MEGOLM_ALGORITHM,
"room_id": self.room_id,
"session_id": self.session.session_id,
"session_key": self.session.session_key,
}
Due to the nature of a cryptographic ratchet, we only need to send the session
key to devices that have not already received it. Devices that have already
received it will be able to decrypt new messages that are encrypted with it,
while devices that receive the current iteration of the session key will not be
able to decrypt old messages. To avoid sending the session key unnecessarily,
we will keep track of the devices that we have already sent the session to. We
will track this as a dict
mapping user ID to device ID to a dict
indicating
the status of the device. We will also use this to provide some measure of
fault tolerance. The device status will have the following property:
status
indicates the status of our attempt to share the session key with this device. It can be eithersent
, indicating that the key was, as far as we can tell, successfully sent to the device; orpending
, indicating that we do not know if the key was successfully sent. A status ofpending
could be because we have not yet tried to send the key, or we encountered some sort of error when trying to send it.session_key
is the session key to be sent to the device. This property is only present ifstatus
ispending
. This property allows us to re-try sending the key to the device if we had received an error after our first attempt. Since this allows encrypted messages to be read, this will be encrypted so that it is not stored in the clear.
Tradeoff
Storing the session_key
property will take up more space in the storage. If
there are many recipient devices, this can add up. Rather than storing the
session key, we can re-create the session key each time, but since the session
may get ratcheted to a higher index, this means that the recipient may not be
able to decrypt some messages.
Initially, this map of devices that we have sent the key to will be the empty
dict
.
"sent_to_devices": {},
To determine the devices that we need to send to, we first get the room
membership of users. The users who are allowed to read the message are
indicated by the m.room.history_visibility
state event. If it is set to
world_readable
, shared
, or invited
, or if it is unset, users who are
invited are allowed to read the messages, so we will send the session to any
users who are joined or invited (users whose current m.room.member
event has
a membership
of join
or invite
). If it is set to joined
, then only
users who are joined to the room (user whose current m.room.member
event has a
membership
of join
) are allowed to read the messages.
Once we know which users to share the session with, we query the device tracker to find the device keys for the devices in the room.
history_visibility = self.room_state_tracker.get_state(
self.room_id,
"m.room.history_visibility",
)
if (
history_visibility
and history_visibility.content.get("history_visibility") == "join"
):
allowed_membership = ["join"]
else:
allowed_membership = ["join", "invite"]
member_events = self.room_state_tracker.get_all_state_for_type(
self.room_id,
"m.room.member",
)
members = [
user_id
for user_id, event in member_events.items()
if event.content.get("membership") in allowed_membership
]
recipient_keys = await self.device_tracker.get_device_keys(members)
We remove our own device, since we do not need to send the session to
ourselves. We will need to create an inbound Megolm session to decrypt our
own messages, but we will do that directly from our outbound Megolm session,
rather than sending ourselves an m.room_key
message.
own_key_info = recipient_keys.get(typing.cast(str, self.client.user_id))
if own_key_info:
own_devices = own_key_info.device_keys
own_devices.pop(typing.cast(str, self.client.device_id), None)
if own_devices == {}:
del recipient_keys[typing.cast(str, self.client.user_id)]
Todo
also need to provide app a way to filter recipient devices (via a callback), e.g. so it can block recipients, only send to verified devices, etc.
If we have already sent the session to a device that is no longer in the room, then we cannot use the session any more, as that device will be able to decrypt new messages encrypted using the session. So we check if any devices that we’ve sent the session to are not present in the set of recipient devices. Note that we need to check devices rather than users. Even if the user owning the device is still in the room, we need to treat any removed devices a potentially being compromised.
Note that for this part, we count devices that were marked as pending
as if
the session was sent to them: even though the application may not yet have
succeeded in sending the session to those devices, it may sent it later. So
for the purposes of ensuring the messages stay secret among the group members,
we will treat them as if they have already received the session.
If we detect that a device that we have previously sent the session to is no longer in the room, then we raise an exception to let the application know to rotate the session, just like we did if the session was active for too many messages or for too long of a time.
for user_id, device_info in self.session_data["sent_to_devices"].items():
if user_id not in recipient_keys:
raise SessionExpiredException()
recipient_device_keys = recipient_keys.get(
user_id, devices.UserDeviceKeysResult({})
).device_keys
for device_id in device_info.keys():
if device_id not in recipient_device_keys:
raise SessionExpiredException()
We can now find any devices that are in the room, but that we have not sent the session to yet: these are the devices that we will need to send the session key to. We also mark those devices as pending.
For each device, if they were already marked as pending, we will return the session key that should have previously been sent to it. Otherwise, we send the latest session key.
devices_to_send_to = []
for user_id, key_info in recipient_keys.items():
sent_to_devices = self.session_data["sent_to_devices"].setdefault(
user_id, {}
)
for device_id, device_key in key_info.device_keys.items():
sent_info = sent_to_devices.get(device_id, {})
status = sent_info.get("status")
if status == "pending":
[nonce, encrypted_key] = sent_info["session_key"]
devices_to_send_to.append(
(
device_key,
{
"algorithm": MEGOLM_ALGORITHM,
"room_id": self.room_id,
"session_id": self.session.session_id,
"session_key": self.aesgcm.decrypt(
b64decode(nonce), b64decode(encrypted_key), None
).decode(),
},
)
)
elif status != "sent":
devices_to_send_to.append(
(
device_key,
room_key,
)
)
nonce = os.urandom(12)
sent_to_devices[device_id] = {
"status": "pending",
"session_key": [
b64encode(nonce).decode(),
b64encode(
self.aesgcm.encrypt(
nonce, room_key["session_key"].encode(), None
)
).decode(),
],
}
# Note: if storing things takes a long time, we could check if we actually made
# any changes before saving the session data
self._store_session_data()
return devices_to_send_to
The application now has messages that it can send to recipient devices. After the application has sent these to the recipients, we need to mark them as being sent so that when we send keys again, we don’t won’t try to resend to those recipients. We create a function to mark that we have sent the keys.
def mark_as_sent(self, device_keys: typing.Iterable[dict]) -> None:
"""Indicate that the session has been sent to the given devices.
Arguments:
``device_keys``:
an interable of devicec keys
"""
for device_key in device_keys:
if "user_id" in device_key and "device_id" in device_key:
sent_to_devices = self.session_data["sent_to_devices"].setdefault(
device_key["user_id"], {}
)
sent_to_devices[device_key["device_id"]] = {"status": "sent"}
self._store_session_data()
Tests
# {{copyright}}
import asyncio
import aioresponses
import json
import pytest
import re
import typing
from unittest.mock import Mock
import unittest.mock as mock
import urllib
import vodozemac
from matrixlib import client
from matrixlib.client import Client
from matrixlib import devices
from matrixlib import events
from matrixlib import error
from matrixlib import megolm
from matrixlib import olm
from matrixlib import rooms
{{megolm test constants}}
{{megolm test utility functions}}
{{test megolm}}
We will several pieces of data that we use in several tests, so we define constants to avoid repeating the data
ROOM_ENCRYPTION_EVENT = {
"room_id": "!room_id",
"type": "m.room.encryption",
"state_key": "",
"sender": "@alice:example.org",
"event_id": "$encryption_event",
"origin_server_ts": 1234567890123,
"content": {
"algorithm": megolm.MEGOLM_ALGORITHM,
},
}
ALICE_ROOM_MEMBERSHIP = {
"room_id": "!room_id",
"type": "m.room.member",
"state_key": "@alice:example.org",
"sender": "@alice:example.org",
"event_id": "$alice_event",
"origin_server_ts": 1234567890123,
"content": {"membership": "join"},
}
BOB_ROOM_MEMBERSHIP = {
"room_id": "!room_id",
"type": "m.room.member",
"state_key": "@bob:example.org",
"sender": "@bob:example.org",
"event_id": "$bob_event",
"origin_server_ts": 1234567890123,
"content": {"membership": "join"},
}
ALICE_DEVICE_KEY = {
"algorithms": [megolm.MEGOLM_ALGORITHM],
"device_id": "ABCDEFG",
"keys": {
"curve25519:ABCDEFG": "some+key",
},
"user_id": "@alice:example.org",
}
BOB_DEVICE_KEY = {
"algorithms": [megolm.MEGOLM_ALGORITHM],
"device_id": "HIJKLMN",
"keys": {
"curve25519:HIJKLMN": "some+other+key",
},
"user_id": "@bob:example.org",
}
BOB_DEVICE_KEY2 = {
"algorithms": [megolm.MEGOLM_ALGORITHM],
"device_id": "OPQRSTU",
"keys": {
"curve25519:OPQRSTU": "yet+another+key",
},
"user_id": "@bob:example.org",
}
Todo
add test for history_visibilty
We test that we can generate a session key to send to another device. Until we get to implementing encryption and decryption, we will not be able to test that the session key works correctly, but we can test that we get the devices in the room (including when devices are added and leave), and that we can mark devices as having had the key sent to them.
@pytest.mark.asyncio
async def test_megolm_get_session_key(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"room_state_tracker": {
"!room_id": {
"m.room.encryption": {
"": ROOM_ENCRYPTION_EVENT,
},
"m.room.member": {
"@alice:example.org": ALICE_ROOM_MEMBERSHIP,
"@bob:example.org": BOB_ROOM_MEMBERSHIP,
},
},
},
"device_tracker.cache.@alice:example.org": {
"device_keys": {
"ABCDEFG": ALICE_DEVICE_KEY,
},
},
"device_tracker.cache.@bob:example.org": {
"device_keys": {
"HIJKLMN": BOB_DEVICE_KEY,
},
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
{{test megolm get session key}}
room_state_tracker = rooms.RoomStateTracker(c)
device_tracker = devices.DeviceTracker(c)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/upload",
status=200,
body='{"one_time_key_counts":{"signed_curve25519":100}}',
headers={
"Content-Type": "application/json",
},
)
device_keys_manager = devices.DeviceKeysManager(c, b"\x00" * 32)
outbound_session = megolm.OutboundMegolmSession(
c,
"!room_id",
room_state_tracker,
device_tracker,
device_keys_manager,
b"\x00" * 32,
)
[
(bob_device_key, bob_room_key)
] = await outbound_session.get_session_key_for_sending()
assert bob_device_key == {
"algorithms": [megolm.MEGOLM_ALGORITHM],
"device_id": "HIJKLMN",
"keys": {
"curve25519:HIJKLMN": "some+other+key",
},
"user_id": "@bob:example.org",
}
[
(bob_device_key_again, bob_room_key_again)
] = await outbound_session.get_session_key_for_sending()
assert bob_device_key_again == bob_device_key
assert bob_room_key_again == bob_room_key
outbound_session.mark_as_sent([bob_device_key])
assert await outbound_session.get_session_key_for_sending() == []
We can also test that the outbound session detects when it needs to be rotated. First we test that it detects that it needs to be rotated based on time. We do this by setting the rotation period to 5ms, creating a session, waiting for 10ms, and then trying to get the session key.
@pytest.mark.asyncio
async def test_megolm_get_session_key_rotation_by_time(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"room_state_tracker": {
"!room_id": {
"m.room.encryption": {
"": {
"room_id": "!room_id",
"type": "m.room.encryption",
"state_key": "",
"sender": "@alice:example.org",
"event_id": "$encryption_event",
"origin_server_ts": 1234567890123,
"content": {
"algorithm": megolm.MEGOLM_ALGORITHM,
"rotation_period_ms": 5,
},
},
},
"m.room.member": {
"@alice:example.org": ALICE_ROOM_MEMBERSHIP,
},
},
},
"device_tracker.cache.@alice:example.org": {
"device_keys": {
"ABCDEFG": ALICE_DEVICE_KEY,
},
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
{{test megolm get session key rotation by time}}
room_state_tracker = rooms.RoomStateTracker(c)
device_tracker = devices.DeviceTracker(c)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/upload",
status=200,
body='{"one_time_key_counts":{"signed_curve25519":100}}',
headers={
"Content-Type": "application/json",
},
)
device_keys_manager = devices.DeviceKeysManager(c, b"\x00" * 32)
outbound_session = megolm.OutboundMegolmSession(
c,
"!room_id",
room_state_tracker,
device_tracker,
device_keys_manager,
b"\x00" * 32,
)
await asyncio.sleep(0.01) # sleep for 10ms to make sure session has expired
with pytest.raises(megolm.SessionExpiredException):
await outbound_session.get_session_key_for_sending()
Next we test that it detects that it needs to rotate when a user or device leaves. In this situation, Bob starts with two devices. When one device logs out, the session needs to be rotated. We then create a new session. When Bob leaves the room completely, the new session will also need to be rotated.
@pytest.mark.asyncio
async def test_megolm_get_session_key_rotation_by_membership(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"room_state_tracker": {
"!room_id": {
"m.room.encryption": {
"": ROOM_ENCRYPTION_EVENT,
},
"m.room.member": {
"@alice:example.org": ALICE_ROOM_MEMBERSHIP,
"@bob:example.org": BOB_ROOM_MEMBERSHIP,
},
},
},
"device_tracker.cache.@alice:example.org": {
"device_keys": {
"ABCDEFG": ALICE_DEVICE_KEY,
},
},
"device_tracker.cache.@bob:example.org": {
"device_keys": {
"HIJKLMN": BOB_DEVICE_KEY,
"OPQRSTU": BOB_DEVICE_KEY2,
},
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
{{test megolm get session key rotation by membership}}
room_state_tracker = rooms.RoomStateTracker(c)
device_tracker = devices.DeviceTracker(c)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/upload",
status=200,
body='{"one_time_key_counts":{"signed_curve25519":100}}',
headers={
"Content-Type": "application/json",
},
)
device_keys_manager = devices.DeviceKeysManager(c, b"\x00" * 32)
outbound_session = megolm.OutboundMegolmSession(
c,
"!room_id",
room_state_tracker,
device_tracker,
device_keys_manager,
b"\x00" * 32,
)
await outbound_session.get_session_key_for_sending()
# simulate Bob logging out a device
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/query",
status=200,
body=json.dumps(
{
"device_keys": {
"@bob:example.org": {
"HIJKLMN": BOB_DEVICE_KEY,
},
},
}
),
headers={
"Content-Type": "application/json",
},
)
await c.publisher.publish(client.DeviceChanges(["@bob:example.org"], []))
with pytest.raises(megolm.SessionExpiredException):
await outbound_session.get_session_key_for_sending()
outbound_session2 = megolm.OutboundMegolmSession(
c,
"!room_id",
room_state_tracker,
device_tracker,
device_keys_manager,
b"\x00" * 32,
)
await outbound_session2.get_session_key_for_sending()
# simulate Bob leaving
await c.publisher.publish(
client.RoomTimelineUpdates(
"!room_id",
[
events.StateEvent(
room_id="!room_id",
type="m.room.member",
state_key="@bob:example.org",
sender="@bob:example.org",
content={
"membership": "leave",
},
event_id="$bob_leave_event",
origin_server_ts=1234567890123,
)
],
False,
"",
)
)
with pytest.raises(megolm.SessionExpiredException):
await outbound_session2.get_session_key_for_sending()
Encrypting¶
After we have provided the recipients with the session key, we can encrypt the event using our Megolm session and send it to the room. To encrypt the event, we:
construct a
dict
containing the room ID, event type, and event contents;serialize it as JSON;
encrypt it; and
include the resulting ciphertext in a new
m.room.encrypted
event.
The application can then send this m.room.encrypted
event to the room.
Note that the sender_key
and device_id
properties in the m.room.encrypted
event are deprecated: we include them in events that we send, for compatibility
with older clients, but we should tolerate receiving events that do not have
them.
def encrypt(self, event_type: str, content: dict) -> dict:
"""Encrypt an event
Arguments:
``event_type``:
the type of the event (e.g. ``m.room.message``)
``content``:
the event ``content``
Returns the ``content`` of a ``m.room.encrypted`` event
"""
plaintext = json.dumps(
{
"room_id": self.room_id,
"type": event_type,
"content": content,
}
)
ciphertext = self.session.encrypt(plaintext)
self._store_session_data(True)
return {
"algorithm": MEGOLM_ALGORITHM,
"sender_key": self.device_keys_manager.identity_key.to_base64(),
"device_id": self.client.device_id,
"session_id": self.session.session_id,
"ciphertext": ciphertext,
}
Tests
Now that we can encrypt, we can test that we detect that the session needs rotating based on the number of messages that it has encrypted. We set it to require rotation after two messages, ensure that we can encrypt two messages, and ensure that we get an error when we try to get the session key for the third encryption.
@pytest.mark.asyncio
async def test_megolm_get_session_key_rotation_by_number(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"room_state_tracker": {
"!room_id": {
"m.room.encryption": {
"": {
"room_id": "!room_id",
"type": "m.room.encryption",
"state_key": "",
"sender": "@alice:example.org",
"event_id": "$encryption_event",
"origin_server_ts": 1234567890123,
"content": {
"algorithm": megolm.MEGOLM_ALGORITHM,
"rotation_period_msgs": 2,
},
},
},
"m.room.member": {
"@alice:example.org": ALICE_ROOM_MEMBERSHIP,
},
},
},
"device_tracker.cache.@alice:example.org": {
"device_keys": {
"ABCDEFG": ALICE_DEVICE_KEY,
},
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
{{test megolm get session key rotation by number}}
room_state_tracker = rooms.RoomStateTracker(c)
device_tracker = devices.DeviceTracker(c)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/upload",
status=200,
body='{"one_time_key_counts":{"signed_curve25519":100}}',
headers={
"Content-Type": "application/json",
},
)
device_keys_manager = devices.DeviceKeysManager(c, b"\x00" * 32)
outbound_session = megolm.OutboundMegolmSession(
c,
"!room_id",
room_state_tracker,
device_tracker,
device_keys_manager,
b"\x00" * 32,
)
await outbound_session.get_session_key_for_sending()
outbound_session.encrypt("m.room.message", {"body": "one"})
await outbound_session.get_session_key_for_sending()
outbound_session.encrypt("m.room.message", {"body": "two"})
with pytest.raises(megolm.SessionExpiredException):
await outbound_session.get_session_key_for_sending()
Decrypting¶
We now look at what the recipients need to do to decrypt an event. We create a class representing an inbound Megolm session, which we can instantiate either from a session key that we received from the sender, or directly from an outbound Megolm session for the case where we ourselves are the sender.
class InboundMegolmSession:
"""Manages a Megolm session for decrypting"""
{{InboundMegolmSession member variables}}
{{InboundMegolmSession class methods}}
There are several ways that we can construct an inbound Megolm session, which
can take different arguments. To avoid confusion, we will not use the
initializer function, and instead create instances of this class using from_*
class methods.
def __init__(self):
"""Do not use initializer function. Use the ``from_*`` methods instead"""
raise RuntimeError("Use the from_* methods instead") # pragma: no cover
One way to create an inbound Megolm session is directly from on outbound Megolm session. This allows us to decrypt our own messages. Note: this should be done before any events are encrypted with the outbound session, otherwise we will not be able to decrypt all messages.
Like inbound sessions, outbound sessions are stored by room ID and session ID. Along with the session, we store the sender’s identity key, as well as whether the session is authenticated, that is, whether we trust that the session comes from the given identity key. In this case, since we are the source of the session and we trust ourselves not to lie to ourselves, we mark it as authenticated.
@classmethod
def from_outbound_session(
cls,
c: Client,
outbound: OutboundMegolmSession,
key: typing.Optional[bytes] = None,
) -> "InboundMegolmSession":
"""Create an ``InboundMegolmSession`` from an ``OutboundMegolmSession``
Arguments:
``c``:
the client object
``outbound``:
the ``OutboundMegolmSession`` to use
``key``:
a 32-byte binary used to encrypt the objects in storage. If not
specified, uses the same key as used by ``outbound``
"""
obj = cls.__new__(cls)
obj.client = c
obj.user_id = typing.cast(str, c.user_id)
obj.room_id = outbound.room_id
obj.key = key if key else outbound.key
obj.session = vodozemac.InboundGroupSession(outbound.session.session_key)
obj.session_data = {
"pickle": obj.session.pickle(obj.key),
"sender_key": outbound.device_keys_manager.identity_key.to_base64(),
"authenticated": True,
"event_ids": {},
}
obj._store_session_data()
return obj
def _store_session_data(self, pickle=False) -> None:
if pickle:
self.session_data["pickle"] = self.session.pickle(self.key)
name = f"inbound_megolm_session.{self.room_id}.{self.user_id}.{self.session.session_id}"
self.client.storage[name] = self.session_data
Since we aren’t using the initializer function, we need to declare our member variables so that the type checker knows about them.
client: Client
user_id: str
room_id: str
key: bytes
session: vodozemac.InboundGroupSession
session_data: dict[str, typing.Any]
We also expose the data about the session so that the application can make use of it. This allows the application to determine whether to trust messages decrypted using the session, depending on the application’s definition of “trust”. We will explore this concept later on. FIXME: link to section.
@property
def sender_key(self) -> vodozemac.Curve25519PublicKey:
"""The identity key of the Megolm session's sender"""
return vodozemac.Curve25519PublicKey.from_base64(
self.session_data["sender_key"]
)
@property
def authenticated(self) -> bool:
"""Whether we know that the Megolm session comes from the associated ``sender_key``"""
return self.session_data["authenticated"]
Another way to get an inbound session is via an m.room_key
event. In this
case, we need to include the sender’s identity key, which we will get from the
Olm session that we received the event from. (We will see how this happens when
we discuss Olm.) Since Olm is authenticated, we set the authenticated
flag to
True
.
@classmethod
def from_room_key(
cls,
c: Client,
user_id: str,
sender_key: vodozemac.Curve25519PublicKey,
room_key_content: dict,
key: bytes,
) -> "InboundMegolmSession":
"""Create an ``InboundMegolmSession`` from an ``m.room_key`` event
Arguments:
``c``:
the client object
``user_id``:
the user ID of the sender of the ``m.room_key`` event
``sender_key``:
the identity key of the sender of the ``m.room_key`` event
``room_key_content``:
the ``content`` of the ``m.room_key`` event
``key``:
a 32-byte binary used to encrypt the objects in storage
"""
obj = cls.__new__(cls)
if room_key_content["algorithm"] != MEGOLM_ALGORITHM:
raise RuntimeError("Invalid algorithm")
obj.client = c
obj.user_id = user_id
obj.room_id = room_key_content["room_id"]
obj.key = key
obj.session = vodozemac.InboundGroupSession(room_key_content["session_key"])
if obj.session.session_id != room_key_content["session_id"]:
raise RuntimeError("Mismatched session ID")
obj.session_data = {
"pickle": obj.session.pickle(obj.key),
"sender_key": sender_key.to_base64(),
"authenticated": True,
"event_ids": {},
}
obj._store_session_data()
return obj
Todo
if we already have this session in storage, we should only keep the “best” one
We can also load an inbound session from storage. In this case, we will return
None
if we cannot find the given session.
@classmethod
def from_storage(
cls,
c: Client,
room_id: str,
user_id: str,
session_id: str,
key: bytes,
) -> typing.Optional["InboundMegolmSession"]:
"""Load a session from storage
Arguments:
``c``:
the client object
``room_id``:
the ID of the room that the session belongs to
``user_id``:
the ID of the user that the session belongs to
``session ID``:
the ID of the Megolm session
``key``:
a 32-byte binary used to encrypt the objects in storage
"""
obj = cls.__new__(cls)
obj.client = c
obj.room_id = room_id
obj.user_id = user_id
obj.key = key
name = f"inbound_megolm_session.{room_id}.{user_id}.{session_id}"
obj.session_data = c.storage.get(name)
if obj.session_data == None:
return None
obj.session = vodozemac.InboundGroupSession.from_pickle(
obj.session_data["pickle"],
obj.key,
)
return obj
There is a fourth way that we can get an inbound session: via a key export. We will discuss this in a later section. FIXME: link to section
Now that we have an inbound Megolm session, we can use it to decrypt an encrypted event. We first check the event contents to make sure that it is in the expected format and that it is a Megolm-encrypted message. We then use the session to decrypt the ciphertext, and then check that the decrypted contents are in the expected format, and belong to the same room as the Megolm session.
You will see a code referring to replay attack detection. This will be explained below; you can ignore it for now.
def decrypt(self, event: events.RoomEvent) -> dict:
"""Decrypt an ``m.room.encrypted`` event encrypted with Megolm
Arguments:
``event``:
the encrypted event
Returns the decrypted event, which will be a dict that should have ``type``
(the decrypted event type), ``content`` (the event content), and
``room_id`` (the ID of the room the event was sent to) properties.
"""
# check cleartext
schema.ensure_valid(
event.content,
{
"algorithm": str,
"sender_key": schema.Optional(str),
"device_id": schema.Optional(str),
"session_id": str,
"ciphertext": str,
},
)
if event.content["algorithm"] != MEGOLM_ALGORITHM:
raise RuntimeError("Invalid algorithm")
# decrypt ciphertext
decrypted = self.session.decrypt(event.content["ciphertext"])
plaintext = json.loads(decrypted.plaintext)
{{detect replay attacks}}
self._store_session_data(True)
# check plaintext
schema.ensure_valid(
plaintext,
{
"type": str,
"content": dict,
"room_id": str,
},
)
if plaintext["room_id"] != self.room_id:
raise RuntimeError("Mismatched room ID")
return plaintext
Tradeoff
In this implementation, we store the vodozemac inbound session every time we
decrypt a message (via the call to _store_session_data
). Although this is
not strictly necessary (unlike for outbound sessions, where we must save every
time, otherwise it may encrypt multiple messages at the same ratchet index),
vodozemac caches the ratchet state so that decrypting the message with the next
ratchet index will be faster. However, this causes some potential issues.
First of all, if accessing the storage is async
(which it likely will be in
most implementations), then the decrypt
function will need to be async
.
Secondly, if saving the session takes a long time, then we may lose any benefit
we got from caching.
There are several solutions to this. The simplest solutions are to either ignore these problems, as they are not critical, or to not re-save the session at all. Another solution (which only deals with the second problem and not the first) is to not save every time — perhaps save the session every 5 or 10 decryptions. A more complicated solution would be to create a new async task for saving the outbound sessions: something that runs in a separate thread/process/co-routine, depending on your language’s concurrency functionality. When we decrypt a message, we tell the task to store the session. The task can then schedule the storage as needed. For example, it can rate-limit the storage based on time: if we decrypt several messages in the same session in a short amount of time, it can skip some and store only the latest version of the session.
Tests
Now that we can decrypt, we can test more functionality of the outbound session as well.
For this set of tests, we will have 3 users: Alice, Bob, and Carol. Alice will encrypt messages, and we will check which messages Bob and Carol can decrypt.
@pytest.mark.asyncio
async def test_megolm_decryption(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"room_state_tracker": {
"!room_id": {
"m.room.encryption": {
"": ROOM_ENCRYPTION_EVENT,
},
"m.room.member": {
"@alice:example.org": ALICE_ROOM_MEMBERSHIP,
"@bob:example.org": BOB_ROOM_MEMBERSHIP,
},
},
},
"device_tracker.cache.@alice:example.org": {
"device_keys": {
"ABCDEFG": ALICE_DEVICE_KEY,
},
},
"device_tracker.cache.@bob:example.org": {
"device_keys": {
"HIJKLMN": BOB_DEVICE_KEY,
},
},
"device_tracker.cache.@carol:example.org": {
"device_keys": {
"OPQRSTU": {
"algorithms": [megolm.MEGOLM_ALGORITHM],
"device_id": "OPQRSTU",
"keys": {
"curve25519:OPQRSTU": "yet+another+key",
},
"user_id": "@carol:example.org",
},
},
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as alice:
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@bob:example.org",
"device_id": "HIJKLMN",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as bob:
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@carol:example.org",
"device_id": "OPQRSTU",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as carol:
{{megolm decryption test}}
First, Alice and Bob are in the room. We test that Alice can encrypt a message with the outbound session, and she can decrypt it using an inbound session created directly from the outbound session, and Bob can decrypt it using a inbound session created from a room key event.
room_state_tracker = rooms.RoomStateTracker(alice)
device_tracker = devices.DeviceTracker(alice)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/upload",
status=200,
body='{"one_time_key_counts":{"signed_curve25519":100}}',
headers={
"Content-Type": "application/json",
},
)
device_keys_manager = devices.DeviceKeysManager(alice, b"\x00" * 32)
outbound_session = megolm.OutboundMegolmSession(
alice,
"!room_id",
room_state_tracker,
device_tracker,
device_keys_manager,
b"\x00" * 32,
)
[
(bob_device_key, bob_room_key)
] = await outbound_session.get_session_key_for_sending()
alice_inbound_session = (
megolm.InboundMegolmSession.from_outbound_session(
alice,
outbound_session,
b"\x00" * 32,
)
)
assert alice_inbound_session.authenticated
assert (
alice_inbound_session.sender_key == device_keys_manager.identity_key
)
bob_inbound_session = megolm.InboundMegolmSession.from_room_key(
bob,
"@alice:example.org",
device_keys_manager.identity_key,
bob_room_key,
b"\x00" * 32,
)
assert bob_inbound_session.authenticated
assert (
bob_inbound_session.sender_key == device_keys_manager.identity_key
)
outbound_session.mark_as_sent([bob_device_key])
encrypted_content1 = outbound_session.encrypt(
"m.room.message",
{"body": "Hello World!", "msgtype": "m.text"},
)
encrypted1 = events.RoomEvent(
sender="@alice:example.org",
event_id="$event1",
type="m.room.encrypted",
room_id="!room_id",
content=encrypted_content1,
origin_server_ts=1234567890000,
)
assert alice_inbound_session.decrypt(encrypted1) == {
"room_id": "!room_id",
"type": "m.room.message",
"content": {"body": "Hello World!", "msgtype": "m.text"},
}
assert bob_inbound_session.decrypt(encrypted1) == {
"room_id": "!room_id",
"type": "m.room.message",
"content": {"body": "Hello World!", "msgtype": "m.text"},
}
We can also test that Bob can decrypt an event using an inbound session loaded from storage.
bob_loaded_inbound_session = megolm.InboundMegolmSession.from_storage(
bob,
"!room_id",
"@alice:example.org",
encrypted_content1["session_id"],
b"\x00" * 32,
)
assert bob_loaded_inbound_session.authenticated
assert (
bob_loaded_inbound_session.sender_key
== device_keys_manager.identity_key
)
assert bob_loaded_inbound_session.decrypt(encrypted1) == {
"room_id": "!room_id",
"type": "m.room.message",
"content": {"body": "Hello World!", "msgtype": "m.text"},
}
We now test that Alice can load the outbound session from storage and encrypt a new message. Carol joins the room, so the encryption key will be shared with a her and we can test that Carol cannot decrypt the previously-sent message, but all three can decrypt the new message.
loaded_outbound_session = megolm.OutboundMegolmSession(
alice,
"!room_id",
room_state_tracker,
device_tracker,
device_keys_manager,
b"\x00" * 32,
outbound_session.session_id,
)
await alice.publisher.publish(
client.RoomTimelineUpdates(
"!room_id",
[
events.StateEvent(
room_id="!room_id",
type="m.room.member",
state_key="@carol:example.org",
sender="@carol:example.org",
content={
"membership": "join",
},
event_id="$carol_event",
origin_server_ts=1234567890123,
)
],
False,
"",
)
)
[
(carol_device_key, carol_room_key)
] = await outbound_session.get_session_key_for_sending()
carol_inbound_session = megolm.InboundMegolmSession.from_room_key(
carol,
"@alice:example.org",
device_keys_manager.identity_key,
carol_room_key,
b"\x00" * 32,
)
outbound_session.mark_as_sent([carol_device_key])
encrypted_content2 = outbound_session.encrypt(
"m.room.message",
{"body": "Bonjour!", "msgtype": "m.text"},
)
encrypted2 = events.RoomEvent(
sender="@alice:example.org",
event_id="$event1",
type="m.room.encrypted",
room_id="!room_id",
content=encrypted_content2,
origin_server_ts=1234567890000,
)
assert alice_inbound_session.decrypt(encrypted2) == {
"room_id": "!room_id",
"type": "m.room.message",
"content": {"body": "Bonjour!", "msgtype": "m.text"},
}
assert bob_inbound_session.decrypt(encrypted2) == {
"room_id": "!room_id",
"type": "m.room.message",
"content": {"body": "Bonjour!", "msgtype": "m.text"},
}
assert carol_inbound_session.decrypt(encrypted2) == {
"room_id": "!room_id",
"type": "m.room.message",
"content": {"body": "Bonjour!", "msgtype": "m.text"},
}
with pytest.raises(vodozemac.MegolmDecryptionException):
carol_inbound_session.decrypt(encrypted1)
Detecting replay attacks¶
When we decrypt events, we must be careful to guard against replay attacks. A replay attack is an attack in which an attacker obtains a previously-sent ciphertext and replays it. Since the message was actually sent by the sender, recipients treat it as authentic, even though the context of the message may be different. The attacker does not need to encrypt a new message; they simply take an existing message and re-send it.
The way that we will detect replay attacks is by recording the event ID of the event decrypted by each ratchet index for a given Megolm session. In Megolm, each ratchet index is used to encrypt a single event, so if we see a ratchet index being used to encrypt multiple events, then we know that a replay attack has occurred (or the sender is buggy).
if decrypted.message_index in self.session_data["event_ids"]:
if (
self.session_data["event_ids"][decrypted.message_index]
!= event.event_id
):
raise RuntimeError("Replay attack detected")
else:
self.session_data["event_ids"][decrypted.message_index] = event.event_id
Note that we do not need to guard against replay attacks in Olm because with Olm, we can only decrypt each event once — after decryption, we ratchet the Olm session forwards so that it can no longer be used to re-decrypt the event. Since events that we encrypt with Olm are keys, which we store after decryption, we do not have a need to re-decrypt them. However, with Megolm, we allow re-decryption in order to allow users to re-read old messages.
Tests
To test this, encrypt an event, and try to decrypt it three times. The first two times, we give it the same event ID and ensure that it encrypts correctly both times. The third time, we give it a different event ID and ensure that we get an error.
@pytest.mark.asyncio
async def test_replay_detection(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
"room_state_tracker": {
"!room_id": {
"m.room.encryption": {
"": ROOM_ENCRYPTION_EVENT,
},
"m.room.member": {
"@alice:example.org": ALICE_ROOM_MEMBERSHIP,
"@bob:example.org": BOB_ROOM_MEMBERSHIP,
},
},
},
"device_tracker.cache.@alice:example.org": {
"device_keys": {
"ABCDEFG": ALICE_DEVICE_KEY,
},
},
"device_tracker.cache.@bob:example.org": {
"device_keys": {
"HIJKLMN": BOB_DEVICE_KEY,
},
},
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as alice:
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@bob:example.org",
"device_id": "HIJKLMN",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as bob:
{{replay detection test}}
room_state_tracker = rooms.RoomStateTracker(alice)
device_tracker = devices.DeviceTracker(alice)
mock_aioresponse.post(
"https://matrix-client.example.org/_matrix/client/v3/keys/upload",
status=200,
body='{"one_time_key_counts":{"signed_curve25519":100}}',
headers={
"Content-Type": "application/json",
},
)
device_keys_manager = devices.DeviceKeysManager(alice, b"\x00" * 32)
outbound_session = megolm.OutboundMegolmSession(
alice,
"!room_id",
room_state_tracker,
device_tracker,
device_keys_manager,
b"\x00" * 32,
)
[
(bob_device_key, bob_room_key)
] = await outbound_session.get_session_key_for_sending()
bob_inbound_session = megolm.InboundMegolmSession.from_room_key(
bob,
"@alice:example.org",
device_keys_manager.identity_key,
bob_room_key,
b"\x00" * 32,
)
outbound_session.mark_as_sent([bob_device_key])
encrypted_content1 = outbound_session.encrypt(
"m.room.message",
{"body": "Hello World!", "msgtype": "m.text"},
)
encrypted1 = events.RoomEvent(
sender="@alice:example.org",
event_id="$event1",
type="m.room.encrypted",
room_id="!room_id",
content=encrypted_content1,
origin_server_ts=1234567890000,
)
assert bob_inbound_session.decrypt(encrypted1) == {
"room_id": "!room_id",
"type": "m.room.message",
"content": {"body": "Hello World!", "msgtype": "m.text"},
}
assert bob_inbound_session.decrypt(encrypted1) == {
"room_id": "!room_id",
"type": "m.room.message",
"content": {"body": "Hello World!", "msgtype": "m.text"},
}
encrypted2 = events.RoomEvent(
sender="@alice:example.org",
event_id="$event2",
type="m.room.encrypted",
room_id="!room_id",
content=encrypted_content1,
origin_server_ts=1234567890123,
)
with pytest.raises(RuntimeError):
bob_inbound_session.decrypt(encrypted2)
Todo
explain what checks we need to do
sender key matches sending user