Encrypting with Megolm

Now that we know what devices are in the room, we can start encrypting our event. As mentioned previously, encryption happens in two parts, we encrypt the room event using a Megolm session, and then we encrypt the Megolm session for each recipient device. We start by looking at encrypting room events with the Megolm session.

We have two types of Megolm sessions: one for encrypting events, which we refer to as an outbound session; and one for decrypting events (including events we sent ourselves), which we refer to as an inbound session. We will create classes to manage these sessions. Let us start with the outbound session.

src/matrixlib/megolm.py:
# {{copyright}}

"""Megolm-related functionality"""

import asyncio
from base64 import b64decode, b64encode
import json
import logging
import os
import time
import typing
import sys
import vodozemac
from cryptography.hazmat.primitives.ciphers.aead import AESGCM

from .client import Client
from . import client
from . import devices
from . import error
from . import events
from . import olm
from . import pubsub
from . import rooms
from . import schema


MEGOLM_ALGORITHM = "m.megolm.v1.aes-sha2"


{{megolm module classes}}
megolm module classes:
class OutboundMegolmSession:
    """Manages a Megolm session for decrypting"""

    {{OutboundMegolmSession class methods}}

Each Megolm session is tied to a room, and we will need the room ID when we encrypt a message, so we will pass it into our initialization function. Part of the behaviour of the Megolm session will be determined by the current room state, so we will also need a room state tracker. We also need a device tracker so that we can get the recipient devices, and a device keys manager so that we can get our own device keys.

We will allow for the possibility of creating a brand new session or loading an existing one from the storage, for example, after restarting the client. We will do this by optionally specifying the ID of the Megolm session to the initialization function, and loading the session (along with associated data) from storage. As with the vodozemac Account, we will store the vodozemac Megolm session encrypted in storage, so we will allow passing in a key. For convenience, if no key is passed in, we use the same key as the key used in the device keys manager.

We will be storing some data alongside the Megolm session. We will create the initial values for that data when we discuss the individual items. Some of the data will be encrypted, which we will do using AES-GCM and the same key as the pickle key for vodozemac, so we create an object that will allow us to do that using the cryptography package.

As well, we will create a lock to ensure that we don’t have conflicts when manipulating our session data.

OutboundMegolmSession class methods:
def __init__(
    self,
    c: Client,
    room_id: str,
    room_state_tracker: rooms.RoomStateTracker,
    device_tracker: devices.DeviceTracker,
    device_keys_manager: devices.DeviceKeysManager,
    key: typing.Optional[bytes] = None,
    session_id: typing.Optional[str] = None,
):
    """
    Arguments:

    ``c``:
      the client object
    ``room_id``:
      the ID of the room that the session belongs to
    ``room_state_tracker``:
      a ``RoomStateTracker`` object
    ``device_tracker``:
      a ``DeviceTracker`` object
    ``device_keys_manager``:
      a ``DeviceKeysManager`` object
    ``key``:
      a 32-byte binary used to encrypt the objects in storage
    ``session_id``:
      if given, will load a session from storage.  If omitted a new session
      will be created
    """
    self.client = c
    self.room_id = room_id
    self.room_state_tracker = room_state_tracker
    self.device_tracker = device_tracker
    self.device_keys_manager = device_keys_manager
    self.key = key if key else device_keys_manager.key
    if session_id:
        self.session_data = c.storage[
            f"outbound_megolm_session.{room_id}.{session_id}"
        ]
        self.session = vodozemac.GroupSession.from_pickle(
            self.session_data["pickle"], self.key
        )
    else:
        self.session = vodozemac.GroupSession()
        self.session_data = {
            "pickle": self.session.pickle(self.key),
            {{OutboundMegolmSession session data initialization}}
        }
        self._store_session_data()
    self.aesgcm = AESGCM(self.key)
    self.lock = asyncio.Lock()
OutboundMegolmSession class methods:
def _store_session_data(self, pickle=False) -> None:
    if pickle:
        self.session_data["pickle"] = self.session.pickle(self.key)
    name = f"outbound_megolm_session.{self.room_id}.{self.session.session_id}"
    self.client.storage[name] = self.session_data

Each Megolm session has an ID that allows clients to refer to it. We allow the client to access this.

OutboundMegolmSession class methods:
@property
def session_id(self) -> str:
    """The ID of the Megolm session"""
    return self.session.session_id

Getting the session key for sending to recipients

When we encrypt a message, we will need to send the current state of the Megolm session (referred to as the “session key”) to all the recipient devices that have not already received this Megolm session. We create a function that will return a list of devices that we need to send the Megolm session to, along with the data to send to them, in the form of the contents of an m.room_key event. The m.room_key event will be sent to the devices encrypted; we will discuss how this is done in the section on olm encryption. For now, we will assume that the event is magically securely teleported to the recipient.

OutboundMegolmSession class methods:
async def get_session_key_for_sending(self) -> list[typing.Tuple[dict, dict]]:
    """Get the devices that we need to send the Megolm session, along with the
    data to send.

    Returns a list of tuples.  The first item in the tuple is the device key
    to send to, and the second item in the tuple is the contents of an
    ``m.room_key`` event to send to the device.  The event must be sent
    encrypted.
    """
    async with self.lock:
        {{check if Megolm session is expired}}
        {{get Megolm session key}}
        {{get devices to send Megolm session to}}

A single Megolm session can be used to encrypt multiple messages, but there are limits to this. After a while, the Megolm session should be rotated (that is, replaced with a new session). This is to ensure that if an attacker somehow obtains a Megolm session, their ability to use it will be limited. To signal that the Megolm session must be rotated, this function will raise an exception.

The expiry parameters for a room are set by the m.room.encryption state event, and Megolm sessions can expire based on time (the rotation_period_ms property, which defaults to one week if not set), or based on the number of messages that it has encrypted (the rotation_period_msgs property, which defaults to 100 messages). To check whether the session is expired based on time, we will need to store the time at which we created the session. To check whether the session is expired based on the number of messages encrypted, we can use the session’s own message_index property, which gives the number of messages that have been encrypted. Note that the message_index starts counting from zero, and we call get_session_key_for_sending before we encrypt, so we raise the exception if the message_index is greater or equal to rotation_period_msgs, rather than strictly greater than.

Todo

allow the application to specify maximums for rotation period parameters

OutboundMegolmSession session data initialization:
"creation_time": time.time_ns() / 1000,
check if Megolm session is expired:
encryption_state = self.room_state_tracker.get_state(
    self.room_id, "m.room.encryption"
)

if encryption_state == None:
    raise RuntimeError("Room is not encrypted")

encryption_state = typing.cast(events.StateEvent, encryption_state)

rotation_period_ms = encryption_state.content.get(
    "rotation_period_ms", 7 * 24 * 60 * 60 * 1000
)
rotation_period_msgs = encryption_state.content.get(
    "rotation_period_msgs", 100
)

if (
    rotation_period_msgs <= self.session.message_index
    or time.time_ns() / 1000
    > self.session_data["creation_time"] + rotation_period_ms
):
    raise SessionExpiredException()
megolm module classes:
class SessionExpiredException(Exception):
    """Indicates that the session has expired and must be rotated"""

    pass

We construct the contents of an m.room_key event that includes the session key from the vodozemac object, along with associated information. This will form part of our return value, along with the devices that we are sending this to.

get Megolm session key:
room_key = {
    "algorithm": MEGOLM_ALGORITHM,
    "room_id": self.room_id,
    "session_id": self.session.session_id,
    "session_key": self.session.session_key,
}

Due to the nature of a cryptographic ratchet, we only need to send the session key to devices that have not already received it. Devices that have already received it will be able to decrypt new messages that are encrypted with it, while devices that receive the current iteration of the session key will not be able to decrypt old messages. To avoid sending the session key unnecessarily, we will keep track of the devices that we have already sent the session to. We will track this as a dict mapping user ID to device ID to a dict indicating the status of the device. We will also use this to provide some measure of fault tolerance. The device status will have the following property:

  • status indicates the status of our attempt to share the session key with this device. It can be either sent, indicating that the key was, as far as we can tell, successfully sent to the device; or pending, indicating that we do not know if the key was successfully sent. A status of pending could be because we have not yet tried to send the key, or we encountered some sort of error when trying to send it.

  • session_key is the session key to be sent to the device. This property is only present if status is pending. This property allows us to re-try sending the key to the device if we had received an error after our first attempt. Since this allows encrypted messages to be read, this will be encrypted so that it is not stored in the clear.

Tradeoff

Storing the session_key property will take up more space in the storage. If there are many recipient devices, this can add up. Rather than storing the session key, we can re-create the session key each time, but since the session may get ratcheted to a higher index, this means that the recipient may not be able to decrypt some messages.

Initially, this map of devices that we have sent the key to will be the empty dict.

OutboundMegolmSession session data initialization:
"sent_to_devices": {},

To determine the devices that we need to send to, we first get the room membership of users. The users who are allowed to read the message are indicated by the m.room.history_visibility state event. If it is set to world_readable, shared, or invited, or if it is unset, users who are invited are allowed to read the messages, so we will send the session to any users who are joined or invited (users whose current m.room.member event has a membership of join or invite). If it is set to joined, then only users who are joined to the room (user whose current m.room.member event has a membership of join) are allowed to read the messages.

Once we know which users to share the session with, we query the device tracker to find the device keys for the devices in the room.

get devices to send Megolm session to:
history_visibility = self.room_state_tracker.get_state(
    self.room_id,
    "m.room.history_visibility",
)
if (
    history_visibility
    and history_visibility.content.get("history_visibility") == "join"
):
    allowed_membership = ["join"]
else:
    allowed_membership = ["join", "invite"]

member_events = self.room_state_tracker.get_all_state_for_type(
    self.room_id,
    "m.room.member",
)
members = [
    user_id
    for user_id, event in member_events.items()
    if event.content.get("membership") in allowed_membership
]

recipient_keys = await self.device_tracker.get_device_keys(members)

We remove our own device, since we do not need to send the session to ourselves. We will need to create an inbound Megolm session to decrypt our own messages, but we will do that directly from our outbound Megolm session, rather than sending ourselves an m.room_key message.

get devices to send Megolm session to:
own_key_info = recipient_keys.get(typing.cast(str, self.client.user_id))
if own_key_info:
    own_devices = own_key_info.device_keys
    own_devices.pop(typing.cast(str, self.client.device_id), None)
    if own_devices == {}:
        del recipient_keys[typing.cast(str, self.client.user_id)]

Todo

also need to provide app a way to filter recipient devices (via a callback), e.g. so it can block recipients, only send to verified devices, etc.

If we have already sent the session to a device that is no longer in the room, then we cannot use the session any more, as that device will be able to decrypt new messages encrypted using the session. So we check if any devices that we’ve sent the session to are not present in the set of recipient devices. Note that we need to check devices rather than users. Even if the user owning the device is still in the room, we need to treat any removed devices a potentially being compromised.

Note that for this part, we count devices that were marked as pending as if the session was sent to them: even though the application may not yet have succeeded in sending the session to those devices, it may sent it later. So for the purposes of ensuring the messages stay secret among the group members, we will treat them as if they have already received the session.

If we detect that a device that we have previously sent the session to is no longer in the room, then we raise an exception to let the application know to rotate the session, just like we did if the session was active for too many messages or for too long of a time.

get devices to send Megolm session to:
for user_id, device_info in self.session_data["sent_to_devices"].items():
    if user_id not in recipient_keys:
        raise SessionExpiredException()
    recipient_device_keys = recipient_keys.get(
        user_id, devices.UserDeviceKeysResult({})
    ).device_keys
    for device_id in device_info.keys():
        if device_id not in recipient_device_keys:
            raise SessionExpiredException()

We can now find any devices that are in the room, but that we have not sent the session to yet: these are the devices that we will need to send the session key to. We also mark those devices as pending.

For each device, if they were already marked as pending, we will return the session key that should have previously been sent to it. Otherwise, we send the latest session key.

get devices to send Megolm session to:
devices_to_send_to = []

for user_id, key_info in recipient_keys.items():
    sent_to_devices = self.session_data["sent_to_devices"].setdefault(
        user_id, {}
    )
    for device_id, device_key in key_info.device_keys.items():
        sent_info = sent_to_devices.get(device_id, {})
        status = sent_info.get("status")
        if status == "pending":
            [nonce, encrypted_key] = sent_info["session_key"]
            devices_to_send_to.append(
                (
                    device_key,
                    {
                        "algorithm": MEGOLM_ALGORITHM,
                        "room_id": self.room_id,
                        "session_id": self.session.session_id,
                        "session_key": self.aesgcm.decrypt(
                            b64decode(nonce), b64decode(encrypted_key), None
                        ).decode(),
                    },
                )
            )
        elif status != "sent":
            devices_to_send_to.append(
                (
                    device_key,
                    room_key,
                )
            )
            nonce = os.urandom(12)
            sent_to_devices[device_id] = {
                "status": "pending",
                "session_key": [
                    b64encode(nonce).decode(),
                    b64encode(
                        self.aesgcm.encrypt(
                            nonce, room_key["session_key"].encode(), None
                        )
                    ).decode(),
                ],
            }

# Note: if storing things takes a long time, we could check if we actually made
# any changes before saving the session data
self._store_session_data()

return devices_to_send_to

The application now has messages that it can send to recipient devices. After the application has sent these to the recipients, we need to mark them as being sent so that when we send keys again, we don’t won’t try to resend to those recipients. We create a function to mark that we have sent the keys.

OutboundMegolmSession class methods:
def mark_as_sent(self, device_keys: typing.Iterable[dict]) -> None:
    """Indicate that the session has been sent to the given devices.

    Arguments:

    ``device_keys``:
      an interable of devicec keys
    """
    for device_key in device_keys:
        if "user_id" in device_key and "device_id" in device_key:
            sent_to_devices = self.session_data["sent_to_devices"].setdefault(
                device_key["user_id"], {}
            )
            sent_to_devices[device_key["device_id"]] = {"status": "sent"}

    self._store_session_data()
Tests
tests/test_megolm.py:
# {{copyright}}

import asyncio
import aioresponses
import json
import pytest
import re
import typing
from unittest.mock import Mock
import unittest.mock as mock
import urllib
import vodozemac

from matrixlib import client
from matrixlib.client import Client
from matrixlib import devices
from matrixlib import events
from matrixlib import error
from matrixlib import megolm
from matrixlib import olm
from matrixlib import rooms


{{megolm test constants}}


{{megolm test utility functions}}


{{test megolm}}

We will several pieces of data that we use in several tests, so we define constants to avoid repeating the data

megolm test constants:
ROOM_ENCRYPTION_EVENT = {
    "room_id": "!room_id",
    "type": "m.room.encryption",
    "state_key": "",
    "sender": "@alice:example.org",
    "event_id": "$encryption_event",
    "origin_server_ts": 1234567890123,
    "content": {
        "algorithm": megolm.MEGOLM_ALGORITHM,
    },
}

ALICE_ROOM_MEMBERSHIP = {
    "room_id": "!room_id",
    "type": "m.room.member",
    "state_key": "@alice:example.org",
    "sender": "@alice:example.org",
    "event_id": "$alice_event",
    "origin_server_ts": 1234567890123,
    "content": {"membership": "join"},
}

BOB_ROOM_MEMBERSHIP = {
    "room_id": "!room_id",
    "type": "m.room.member",
    "state_key": "@bob:example.org",
    "sender": "@bob:example.org",
    "event_id": "$bob_event",
    "origin_server_ts": 1234567890123,
    "content": {"membership": "join"},
}

ALICE_DEVICE_KEY = {
    "algorithms": [megolm.MEGOLM_ALGORITHM],
    "device_id": "ABCDEFG",
    "keys": {
        "curve25519:ABCDEFG": "some+key",
    },
    "user_id": "@alice:example.org",
}

BOB_DEVICE_KEY = {
    "algorithms": [megolm.MEGOLM_ALGORITHM],
    "device_id": "HIJKLMN",
    "keys": {
        "curve25519:HIJKLMN": "some+other+key",
    },
    "user_id": "@bob:example.org",
}

BOB_DEVICE_KEY2 = {
    "algorithms": [megolm.MEGOLM_ALGORITHM],
    "device_id": "OPQRSTU",
    "keys": {
        "curve25519:OPQRSTU": "yet+another+key",
    },
    "user_id": "@bob:example.org",
}

Todo

add test for history_visibilty

We test that we can generate a session key to send to another device. Until we get to implementing encryption and decryption, we will not be able to test that the session key works correctly, but we can test that we get the devices in the room (including when devices are added and leave), and that we can mark devices as having had the key sent to them.

test megolm:
@pytest.mark.asyncio
async def test_megolm_get_session_key(mock_aioresponse):
    async with client.Client(
        storage={
            "access_token": "anaccesstoken",
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
            "room_state_tracker": {
                "!room_id": {
                    "m.room.encryption": {
                        "": ROOM_ENCRYPTION_EVENT,
                    },
                    "m.room.member": {
                        "@alice:example.org": ALICE_ROOM_MEMBERSHIP,
                        "@bob:example.org": BOB_ROOM_MEMBERSHIP,
                    },
                },
            },
            "device_tracker.cache.@alice:example.org": {
                "device_keys": {
                    "ABCDEFG": ALICE_DEVICE_KEY,
                },
            },
            "device_tracker.cache.@bob:example.org": {
                "device_keys": {
                    "HIJKLMN": BOB_DEVICE_KEY,
                },
            },
        },
        callbacks={},
        base_client_url="https://matrix-client.example.org/_matrix/client/",
    ) as c:
        {{test megolm get session key}}
test megolm get session key:
room_state_tracker = rooms.RoomStateTracker(c)
device_tracker = devices.DeviceTracker(c)
mock_aioresponse.post(
    "https://matrix-client.example.org/_matrix/client/v3/keys/upload",
    status=200,
    body='{"one_time_key_counts":{"signed_curve25519":100}}',
    headers={
        "Content-Type": "application/json",
    },
)
device_keys_manager = devices.DeviceKeysManager(c, b"\x00" * 32)
outbound_session = megolm.OutboundMegolmSession(
    c,
    "!room_id",
    room_state_tracker,
    device_tracker,
    device_keys_manager,
    b"\x00" * 32,
)

[
    (bob_device_key, bob_room_key)
] = await outbound_session.get_session_key_for_sending()

assert bob_device_key == {
    "algorithms": [megolm.MEGOLM_ALGORITHM],
    "device_id": "HIJKLMN",
    "keys": {
        "curve25519:HIJKLMN": "some+other+key",
    },
    "user_id": "@bob:example.org",
}

[
    (bob_device_key_again, bob_room_key_again)
] = await outbound_session.get_session_key_for_sending()

assert bob_device_key_again == bob_device_key
assert bob_room_key_again == bob_room_key

outbound_session.mark_as_sent([bob_device_key])

assert await outbound_session.get_session_key_for_sending() == []

We can also test that the outbound session detects when it needs to be rotated. First we test that it detects that it needs to be rotated based on time. We do this by setting the rotation period to 5ms, creating a session, waiting for 10ms, and then trying to get the session key.

test megolm:
@pytest.mark.asyncio
async def test_megolm_get_session_key_rotation_by_time(mock_aioresponse):
    async with client.Client(
        storage={
            "access_token": "anaccesstoken",
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
            "room_state_tracker": {
                "!room_id": {
                    "m.room.encryption": {
                        "": {
                            "room_id": "!room_id",
                            "type": "m.room.encryption",
                            "state_key": "",
                            "sender": "@alice:example.org",
                            "event_id": "$encryption_event",
                            "origin_server_ts": 1234567890123,
                            "content": {
                                "algorithm": megolm.MEGOLM_ALGORITHM,
                                "rotation_period_ms": 5,
                            },
                        },
                    },
                    "m.room.member": {
                        "@alice:example.org": ALICE_ROOM_MEMBERSHIP,
                    },
                },
            },
            "device_tracker.cache.@alice:example.org": {
                "device_keys": {
                    "ABCDEFG": ALICE_DEVICE_KEY,
                },
            },
        },
        callbacks={},
        base_client_url="https://matrix-client.example.org/_matrix/client/",
    ) as c:
        {{test megolm get session key rotation by time}}
test megolm get session key rotation by time:
room_state_tracker = rooms.RoomStateTracker(c)
device_tracker = devices.DeviceTracker(c)
mock_aioresponse.post(
    "https://matrix-client.example.org/_matrix/client/v3/keys/upload",
    status=200,
    body='{"one_time_key_counts":{"signed_curve25519":100}}',
    headers={
        "Content-Type": "application/json",
    },
)
device_keys_manager = devices.DeviceKeysManager(c, b"\x00" * 32)
outbound_session = megolm.OutboundMegolmSession(
    c,
    "!room_id",
    room_state_tracker,
    device_tracker,
    device_keys_manager,
    b"\x00" * 32,
)

await asyncio.sleep(0.01)  # sleep for 10ms to make sure session has expired

with pytest.raises(megolm.SessionExpiredException):
    await outbound_session.get_session_key_for_sending()

Next we test that it detects that it needs to rotate when a user or device leaves. In this situation, Bob starts with two devices. When one device logs out, the session needs to be rotated. We then create a new session. When Bob leaves the room completely, the new session will also need to be rotated.

test megolm:
@pytest.mark.asyncio
async def test_megolm_get_session_key_rotation_by_membership(mock_aioresponse):
    async with client.Client(
        storage={
            "access_token": "anaccesstoken",
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
            "room_state_tracker": {
                "!room_id": {
                    "m.room.encryption": {
                        "": ROOM_ENCRYPTION_EVENT,
                    },
                    "m.room.member": {
                        "@alice:example.org": ALICE_ROOM_MEMBERSHIP,
                        "@bob:example.org": BOB_ROOM_MEMBERSHIP,
                    },
                },
            },
            "device_tracker.cache.@alice:example.org": {
                "device_keys": {
                    "ABCDEFG": ALICE_DEVICE_KEY,
                },
            },
            "device_tracker.cache.@bob:example.org": {
                "device_keys": {
                    "HIJKLMN": BOB_DEVICE_KEY,
                    "OPQRSTU": BOB_DEVICE_KEY2,
                },
            },
        },
        callbacks={},
        base_client_url="https://matrix-client.example.org/_matrix/client/",
    ) as c:
        {{test megolm get session key rotation by membership}}
test megolm get session key rotation by membership:
room_state_tracker = rooms.RoomStateTracker(c)
device_tracker = devices.DeviceTracker(c)
mock_aioresponse.post(
    "https://matrix-client.example.org/_matrix/client/v3/keys/upload",
    status=200,
    body='{"one_time_key_counts":{"signed_curve25519":100}}',
    headers={
        "Content-Type": "application/json",
    },
)
device_keys_manager = devices.DeviceKeysManager(c, b"\x00" * 32)
outbound_session = megolm.OutboundMegolmSession(
    c,
    "!room_id",
    room_state_tracker,
    device_tracker,
    device_keys_manager,
    b"\x00" * 32,
)

await outbound_session.get_session_key_for_sending()

# simulate Bob logging out a device
mock_aioresponse.post(
    "https://matrix-client.example.org/_matrix/client/v3/keys/query",
    status=200,
    body=json.dumps(
        {
            "device_keys": {
                "@bob:example.org": {
                    "HIJKLMN": BOB_DEVICE_KEY,
                },
            },
        }
    ),
    headers={
        "Content-Type": "application/json",
    },
)
await c.publisher.publish(client.DeviceChanges(["@bob:example.org"], []))

with pytest.raises(megolm.SessionExpiredException):
    await outbound_session.get_session_key_for_sending()

outbound_session2 = megolm.OutboundMegolmSession(
    c,
    "!room_id",
    room_state_tracker,
    device_tracker,
    device_keys_manager,
    b"\x00" * 32,
)

await outbound_session2.get_session_key_for_sending()

# simulate Bob leaving
await c.publisher.publish(
    client.RoomTimelineUpdates(
        "!room_id",
        [
            events.StateEvent(
                room_id="!room_id",
                type="m.room.member",
                state_key="@bob:example.org",
                sender="@bob:example.org",
                content={
                    "membership": "leave",
                },
                event_id="$bob_leave_event",
                origin_server_ts=1234567890123,
            )
        ],
        False,
        "",
    )
)

with pytest.raises(megolm.SessionExpiredException):
    await outbound_session2.get_session_key_for_sending()

Encrypting

After we have provided the recipients with the session key, we can encrypt the event using our Megolm session and send it to the room. To encrypt the event, we:

  • construct a dict containing the room ID, event type, and event contents;

  • serialize it as JSON;

  • encrypt it; and

  • include the resulting ciphertext in a new m.room.encrypted event.

The application can then send this m.room.encrypted event to the room.

Note that the sender_key and device_id properties in the m.room.encrypted event are deprecated: we include them in events that we send, for compatibility with older clients, but we should tolerate receiving events that do not have them.

OutboundMegolmSession class methods:
def encrypt(self, event_type: str, content: dict) -> dict:
    """Encrypt an event

    Arguments:

    ``event_type``:
      the type of the event (e.g. ``m.room.message``)
    ``content``:
      the event ``content``

    Returns the ``content`` of a ``m.room.encrypted`` event
    """
    plaintext = json.dumps(
        {
            "room_id": self.room_id,
            "type": event_type,
            "content": content,
        }
    )
    ciphertext = self.session.encrypt(plaintext)
    self._store_session_data(True)
    return {
        "algorithm": MEGOLM_ALGORITHM,
        "sender_key": self.device_keys_manager.identity_key.to_base64(),
        "device_id": self.client.device_id,
        "session_id": self.session.session_id,
        "ciphertext": ciphertext,
    }
Tests

Now that we can encrypt, we can test that we detect that the session needs rotating based on the number of messages that it has encrypted. We set it to require rotation after two messages, ensure that we can encrypt two messages, and ensure that we get an error when we try to get the session key for the third encryption.

test megolm:
@pytest.mark.asyncio
async def test_megolm_get_session_key_rotation_by_number(mock_aioresponse):
    async with client.Client(
        storage={
            "access_token": "anaccesstoken",
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
            "room_state_tracker": {
                "!room_id": {
                    "m.room.encryption": {
                        "": {
                            "room_id": "!room_id",
                            "type": "m.room.encryption",
                            "state_key": "",
                            "sender": "@alice:example.org",
                            "event_id": "$encryption_event",
                            "origin_server_ts": 1234567890123,
                            "content": {
                                "algorithm": megolm.MEGOLM_ALGORITHM,
                                "rotation_period_msgs": 2,
                            },
                        },
                    },
                    "m.room.member": {
                        "@alice:example.org": ALICE_ROOM_MEMBERSHIP,
                    },
                },
            },
            "device_tracker.cache.@alice:example.org": {
                "device_keys": {
                    "ABCDEFG": ALICE_DEVICE_KEY,
                },
            },
        },
        callbacks={},
        base_client_url="https://matrix-client.example.org/_matrix/client/",
    ) as c:
        {{test megolm get session key rotation by number}}
test megolm get session key rotation by number:
room_state_tracker = rooms.RoomStateTracker(c)
device_tracker = devices.DeviceTracker(c)
mock_aioresponse.post(
    "https://matrix-client.example.org/_matrix/client/v3/keys/upload",
    status=200,
    body='{"one_time_key_counts":{"signed_curve25519":100}}',
    headers={
        "Content-Type": "application/json",
    },
)
device_keys_manager = devices.DeviceKeysManager(c, b"\x00" * 32)
outbound_session = megolm.OutboundMegolmSession(
    c,
    "!room_id",
    room_state_tracker,
    device_tracker,
    device_keys_manager,
    b"\x00" * 32,
)

await outbound_session.get_session_key_for_sending()
outbound_session.encrypt("m.room.message", {"body": "one"})

await outbound_session.get_session_key_for_sending()
outbound_session.encrypt("m.room.message", {"body": "two"})

with pytest.raises(megolm.SessionExpiredException):
    await outbound_session.get_session_key_for_sending()

Decrypting

We now look at what the recipients need to do to decrypt an event. We create a class representing an inbound Megolm session, which we can instantiate either from a session key that we received from the sender, or directly from an outbound Megolm session for the case where we ourselves are the sender.

megolm module classes:
class InboundMegolmSession:
    """Manages a Megolm session for decrypting"""

    {{InboundMegolmSession member variables}}

    {{InboundMegolmSession class methods}}

There are several ways that we can construct an inbound Megolm session, which can take different arguments. To avoid confusion, we will not use the initializer function, and instead create instances of this class using from_* class methods.

InboundMegolmSession class methods:
def __init__(self):
    """Do not use initializer function.  Use the ``from_*`` methods instead"""
    raise RuntimeError("Use the from_* methods instead")  # pragma: no cover

One way to create an inbound Megolm session is directly from on outbound Megolm session. This allows us to decrypt our own messages. Note: this should be done before any events are encrypted with the outbound session, otherwise we will not be able to decrypt all messages.

Like inbound sessions, outbound sessions are stored by room ID and session ID. Along with the session, we store the sender’s identity key, as well as whether the session is authenticated, that is, whether we trust that the session comes from the given identity key. In this case, since we are the source of the session and we trust ourselves not to lie to ourselves, we mark it as authenticated.

InboundMegolmSession class methods:
@classmethod
def from_outbound_session(
    cls,
    c: Client,
    outbound: OutboundMegolmSession,
    key: typing.Optional[bytes] = None,
) -> "InboundMegolmSession":
    """Create an ``InboundMegolmSession`` from an ``OutboundMegolmSession``

    Arguments:

    ``c``:
      the client object
    ``outbound``:
      the ``OutboundMegolmSession`` to use
    ``key``:
      a 32-byte binary used to encrypt the objects in storage.  If not
      specified, uses the same key as used by ``outbound``
    """
    obj = cls.__new__(cls)

    obj.client = c
    obj.user_id = typing.cast(str, c.user_id)
    obj.room_id = outbound.room_id
    obj.key = key if key else outbound.key
    obj.session = vodozemac.InboundGroupSession(outbound.session.session_key)
    obj.session_data = {
        "pickle": obj.session.pickle(obj.key),
        "sender_key": outbound.device_keys_manager.identity_key.to_base64(),
        "authenticated": True,
        "event_ids": {},
    }
    obj._store_session_data()

    return obj
InboundMegolmSession class methods:
def _store_session_data(self, pickle=False) -> None:
    if pickle:
        self.session_data["pickle"] = self.session.pickle(self.key)
    name = f"inbound_megolm_session.{self.room_id}.{self.user_id}.{self.session.session_id}"
    self.client.storage[name] = self.session_data

Since we aren’t using the initializer function, we need to declare our member variables so that the type checker knows about them.

InboundMegolmSession member variables:
client: Client
user_id: str
room_id: str
key: bytes
session: vodozemac.InboundGroupSession
session_data: dict[str, typing.Any]

We also expose the data about the session so that the application can make use of it. This allows the application to determine whether to trust messages decrypted using the session, depending on the application’s definition of “trust”. We will explore this concept later on. FIXME: link to section.

InboundMegolmSession class methods:
@property
def sender_key(self) -> vodozemac.Curve25519PublicKey:
    """The identity key of the Megolm session's sender"""
    return vodozemac.Curve25519PublicKey.from_base64(
        self.session_data["sender_key"]
    )

@property
def authenticated(self) -> bool:
    """Whether we know that the Megolm session comes from the associated ``sender_key``"""
    return self.session_data["authenticated"]

Another way to get an inbound session is via an m.room_key event. In this case, we need to include the sender’s identity key, which we will get from the Olm session that we received the event from. (We will see how this happens when we discuss Olm.) Since Olm is authenticated, we set the authenticated flag to True.

InboundMegolmSession class methods:
@classmethod
def from_room_key(
    cls,
    c: Client,
    user_id: str,
    sender_key: vodozemac.Curve25519PublicKey,
    room_key_content: dict,
    key: bytes,
) -> "InboundMegolmSession":
    """Create an ``InboundMegolmSession`` from an ``m.room_key`` event

    Arguments:

    ``c``:
      the client object
    ``user_id``:
      the user ID of the sender of the ``m.room_key`` event
    ``sender_key``:
      the identity key of the sender of the ``m.room_key`` event
    ``room_key_content``:
      the ``content`` of the ``m.room_key`` event
    ``key``:
      a 32-byte binary used to encrypt the objects in storage
    """
    obj = cls.__new__(cls)

    if room_key_content["algorithm"] != MEGOLM_ALGORITHM:
        raise RuntimeError("Invalid algorithm")
    obj.client = c
    obj.user_id = user_id
    obj.room_id = room_key_content["room_id"]
    obj.key = key
    obj.session = vodozemac.InboundGroupSession(room_key_content["session_key"])
    if obj.session.session_id != room_key_content["session_id"]:
        raise RuntimeError("Mismatched session ID")
    obj.session_data = {
        "pickle": obj.session.pickle(obj.key),
        "sender_key": sender_key.to_base64(),
        "authenticated": True,
        "event_ids": {},
    }
    obj._store_session_data()

    return obj

Todo

if we already have this session in storage, we should only keep the “best” one

We can also load an inbound session from storage. In this case, we will return None if we cannot find the given session.

InboundMegolmSession class methods:
@classmethod
def from_storage(
    cls,
    c: Client,
    room_id: str,
    user_id: str,
    session_id: str,
    key: bytes,
) -> typing.Optional["InboundMegolmSession"]:
    """Load a session from storage

    Arguments:

    ``c``:
      the client object
    ``room_id``:
      the ID of the room that the session belongs to
    ``user_id``:
      the ID of the user that the session belongs to
    ``session ID``:
      the ID of the Megolm session
    ``key``:
      a 32-byte binary used to encrypt the objects in storage
    """
    obj = cls.__new__(cls)

    obj.client = c
    obj.room_id = room_id
    obj.user_id = user_id
    obj.key = key
    name = f"inbound_megolm_session.{room_id}.{user_id}.{session_id}"
    obj.session_data = c.storage.get(name)

    if obj.session_data == None:
        return None

    obj.session = vodozemac.InboundGroupSession.from_pickle(
        obj.session_data["pickle"],
        obj.key,
    )

    return obj

There is a fourth way that we can get an inbound session: via a key export. We will discuss this in a later section. FIXME: link to section

Now that we have an inbound Megolm session, we can use it to decrypt an encrypted event. We first check the event contents to make sure that it is in the expected format and that it is a Megolm-encrypted message. We then use the session to decrypt the ciphertext, and then check that the decrypted contents are in the expected format, and belong to the same room as the Megolm session.

You will see a code referring to replay attack detection. This will be explained below; you can ignore it for now.

InboundMegolmSession class methods:
def decrypt(self, event: events.RoomEvent) -> dict:
    """Decrypt an ``m.room.encrypted`` event encrypted with Megolm

    Arguments:

    ``event``:
      the encrypted event

    Returns the decrypted event, which will be a dict that should have ``type``
    (the decrypted event type), ``content`` (the event content), and
    ``room_id`` (the ID of the room the event was sent to) properties.
    """
    # check cleartext
    schema.ensure_valid(
        event.content,
        {
            "algorithm": str,
            "sender_key": schema.Optional(str),
            "device_id": schema.Optional(str),
            "session_id": str,
            "ciphertext": str,
        },
    )
    if event.content["algorithm"] != MEGOLM_ALGORITHM:
        raise RuntimeError("Invalid algorithm")

    # decrypt ciphertext
    decrypted = self.session.decrypt(event.content["ciphertext"])
    plaintext = json.loads(decrypted.plaintext)
    {{detect replay attacks}}
    self._store_session_data(True)

    # check plaintext
    schema.ensure_valid(
        plaintext,
        {
            "type": str,
            "content": dict,
            "room_id": str,
        },
    )
    if plaintext["room_id"] != self.room_id:
        raise RuntimeError("Mismatched room ID")

    return plaintext

Tradeoff

In this implementation, we store the vodozemac inbound session every time we decrypt a message (via the call to _store_session_data). Although this is not strictly necessary (unlike for outbound sessions, where we must save every time, otherwise it may encrypt multiple messages at the same ratchet index), vodozemac caches the ratchet state so that decrypting the message with the next ratchet index will be faster. However, this causes some potential issues. First of all, if accessing the storage is async (which it likely will be in most implementations), then the decrypt function will need to be async. Secondly, if saving the session takes a long time, then we may lose any benefit we got from caching.

There are several solutions to this. The simplest solutions are to either ignore these problems, as they are not critical, or to not re-save the session at all. Another solution (which only deals with the second problem and not the first) is to not save every time — perhaps save the session every 5 or 10 decryptions. A more complicated solution would be to create a new async task for saving the outbound sessions: something that runs in a separate thread/process/co-routine, depending on your language’s concurrency functionality. When we decrypt a message, we tell the task to store the session. The task can then schedule the storage as needed. For example, it can rate-limit the storage based on time: if we decrypt several messages in the same session in a short amount of time, it can skip some and store only the latest version of the session.

Tests

Now that we can decrypt, we can test more functionality of the outbound session as well.

For this set of tests, we will have 3 users: Alice, Bob, and Carol. Alice will encrypt messages, and we will check which messages Bob and Carol can decrypt.

test megolm:
@pytest.mark.asyncio
async def test_megolm_decryption(mock_aioresponse):
    async with client.Client(
        storage={
            "access_token": "anaccesstoken",
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
            "room_state_tracker": {
                "!room_id": {
                    "m.room.encryption": {
                        "": ROOM_ENCRYPTION_EVENT,
                    },
                    "m.room.member": {
                        "@alice:example.org": ALICE_ROOM_MEMBERSHIP,
                        "@bob:example.org": BOB_ROOM_MEMBERSHIP,
                    },
                },
            },
            "device_tracker.cache.@alice:example.org": {
                "device_keys": {
                    "ABCDEFG": ALICE_DEVICE_KEY,
                },
            },
            "device_tracker.cache.@bob:example.org": {
                "device_keys": {
                    "HIJKLMN": BOB_DEVICE_KEY,
                },
            },
            "device_tracker.cache.@carol:example.org": {
                "device_keys": {
                    "OPQRSTU": {
                        "algorithms": [megolm.MEGOLM_ALGORITHM],
                        "device_id": "OPQRSTU",
                        "keys": {
                            "curve25519:OPQRSTU": "yet+another+key",
                        },
                        "user_id": "@carol:example.org",
                    },
                },
            },
        },
        callbacks={},
        base_client_url="https://matrix-client.example.org/_matrix/client/",
    ) as alice:
        async with client.Client(
            storage={
                "access_token": "anaccesstoken",
                "user_id": "@bob:example.org",
                "device_id": "HIJKLMN",
            },
            callbacks={},
            base_client_url="https://matrix-client.example.org/_matrix/client/",
        ) as bob:
            async with client.Client(
                storage={
                    "access_token": "anaccesstoken",
                    "user_id": "@carol:example.org",
                    "device_id": "OPQRSTU",
                },
                callbacks={},
                base_client_url="https://matrix-client.example.org/_matrix/client/",
            ) as carol:
                {{megolm decryption test}}

First, Alice and Bob are in the room. We test that Alice can encrypt a message with the outbound session, and she can decrypt it using an inbound session created directly from the outbound session, and Bob can decrypt it using a inbound session created from a room key event.

megolm decryption test:
room_state_tracker = rooms.RoomStateTracker(alice)
device_tracker = devices.DeviceTracker(alice)
mock_aioresponse.post(
    "https://matrix-client.example.org/_matrix/client/v3/keys/upload",
    status=200,
    body='{"one_time_key_counts":{"signed_curve25519":100}}',
    headers={
        "Content-Type": "application/json",
    },
)
device_keys_manager = devices.DeviceKeysManager(alice, b"\x00" * 32)
outbound_session = megolm.OutboundMegolmSession(
    alice,
    "!room_id",
    room_state_tracker,
    device_tracker,
    device_keys_manager,
    b"\x00" * 32,
)
[
    (bob_device_key, bob_room_key)
] = await outbound_session.get_session_key_for_sending()

alice_inbound_session = (
    megolm.InboundMegolmSession.from_outbound_session(
        alice,
        outbound_session,
        b"\x00" * 32,
    )
)
assert alice_inbound_session.authenticated
assert (
    alice_inbound_session.sender_key == device_keys_manager.identity_key
)

bob_inbound_session = megolm.InboundMegolmSession.from_room_key(
    bob,
    "@alice:example.org",
    device_keys_manager.identity_key,
    bob_room_key,
    b"\x00" * 32,
)
assert bob_inbound_session.authenticated
assert (
    bob_inbound_session.sender_key == device_keys_manager.identity_key
)

outbound_session.mark_as_sent([bob_device_key])

encrypted_content1 = outbound_session.encrypt(
    "m.room.message",
    {"body": "Hello World!", "msgtype": "m.text"},
)
encrypted1 = events.RoomEvent(
    sender="@alice:example.org",
    event_id="$event1",
    type="m.room.encrypted",
    room_id="!room_id",
    content=encrypted_content1,
    origin_server_ts=1234567890000,
)

assert alice_inbound_session.decrypt(encrypted1) == {
    "room_id": "!room_id",
    "type": "m.room.message",
    "content": {"body": "Hello World!", "msgtype": "m.text"},
}
assert bob_inbound_session.decrypt(encrypted1) == {
    "room_id": "!room_id",
    "type": "m.room.message",
    "content": {"body": "Hello World!", "msgtype": "m.text"},
}

We can also test that Bob can decrypt an event using an inbound session loaded from storage.

megolm decryption test:
bob_loaded_inbound_session = megolm.InboundMegolmSession.from_storage(
    bob,
    "!room_id",
    "@alice:example.org",
    encrypted_content1["session_id"],
    b"\x00" * 32,
)
assert bob_loaded_inbound_session.authenticated
assert (
    bob_loaded_inbound_session.sender_key
    == device_keys_manager.identity_key
)
assert bob_loaded_inbound_session.decrypt(encrypted1) == {
    "room_id": "!room_id",
    "type": "m.room.message",
    "content": {"body": "Hello World!", "msgtype": "m.text"},
}

We now test that Alice can load the outbound session from storage and encrypt a new message. Carol joins the room, so the encryption key will be shared with a her and we can test that Carol cannot decrypt the previously-sent message, but all three can decrypt the new message.

megolm decryption test:
loaded_outbound_session = megolm.OutboundMegolmSession(
    alice,
    "!room_id",
    room_state_tracker,
    device_tracker,
    device_keys_manager,
    b"\x00" * 32,
    outbound_session.session_id,
)

await alice.publisher.publish(
    client.RoomTimelineUpdates(
        "!room_id",
        [
            events.StateEvent(
                room_id="!room_id",
                type="m.room.member",
                state_key="@carol:example.org",
                sender="@carol:example.org",
                content={
                    "membership": "join",
                },
                event_id="$carol_event",
                origin_server_ts=1234567890123,
            )
        ],
        False,
        "",
    )
)

[
    (carol_device_key, carol_room_key)
] = await outbound_session.get_session_key_for_sending()
carol_inbound_session = megolm.InboundMegolmSession.from_room_key(
    carol,
    "@alice:example.org",
    device_keys_manager.identity_key,
    carol_room_key,
    b"\x00" * 32,
)

outbound_session.mark_as_sent([carol_device_key])

encrypted_content2 = outbound_session.encrypt(
    "m.room.message",
    {"body": "Bonjour!", "msgtype": "m.text"},
)
encrypted2 = events.RoomEvent(
    sender="@alice:example.org",
    event_id="$event1",
    type="m.room.encrypted",
    room_id="!room_id",
    content=encrypted_content2,
    origin_server_ts=1234567890000,
)

assert alice_inbound_session.decrypt(encrypted2) == {
    "room_id": "!room_id",
    "type": "m.room.message",
    "content": {"body": "Bonjour!", "msgtype": "m.text"},
}
assert bob_inbound_session.decrypt(encrypted2) == {
    "room_id": "!room_id",
    "type": "m.room.message",
    "content": {"body": "Bonjour!", "msgtype": "m.text"},
}
assert carol_inbound_session.decrypt(encrypted2) == {
    "room_id": "!room_id",
    "type": "m.room.message",
    "content": {"body": "Bonjour!", "msgtype": "m.text"},
}

with pytest.raises(vodozemac.MegolmDecryptionException):
    carol_inbound_session.decrypt(encrypted1)

Detecting replay attacks

When we decrypt events, we must be careful to guard against replay attacks. A replay attack is an attack in which an attacker obtains a previously-sent ciphertext and replays it. Since the message was actually sent by the sender, recipients treat it as authentic, even though the context of the message may be different. The attacker does not need to encrypt a new message; they simply take an existing message and re-send it.

The way that we will detect replay attacks is by recording the event ID of the event decrypted by each ratchet index for a given Megolm session. In Megolm, each ratchet index is used to encrypt a single event, so if we see a ratchet index being used to encrypt multiple events, then we know that a replay attack has occurred (or the sender is buggy).

detect replay attacks:
if decrypted.message_index in self.session_data["event_ids"]:
    if (
        self.session_data["event_ids"][decrypted.message_index]
        != event.event_id
    ):
        raise RuntimeError("Replay attack detected")
else:
    self.session_data["event_ids"][decrypted.message_index] = event.event_id

Note that we do not need to guard against replay attacks in Olm because with Olm, we can only decrypt each event once — after decryption, we ratchet the Olm session forwards so that it can no longer be used to re-decrypt the event. Since events that we encrypt with Olm are keys, which we store after decryption, we do not have a need to re-decrypt them. However, with Megolm, we allow re-decryption in order to allow users to re-read old messages.

Tests

To test this, encrypt an event, and try to decrypt it three times. The first two times, we give it the same event ID and ensure that it encrypts correctly both times. The third time, we give it a different event ID and ensure that we get an error.

test megolm:
@pytest.mark.asyncio
async def test_replay_detection(mock_aioresponse):
    async with client.Client(
        storage={
            "access_token": "anaccesstoken",
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
            "room_state_tracker": {
                "!room_id": {
                    "m.room.encryption": {
                        "": ROOM_ENCRYPTION_EVENT,
                    },
                    "m.room.member": {
                        "@alice:example.org": ALICE_ROOM_MEMBERSHIP,
                        "@bob:example.org": BOB_ROOM_MEMBERSHIP,
                    },
                },
            },
            "device_tracker.cache.@alice:example.org": {
                "device_keys": {
                    "ABCDEFG": ALICE_DEVICE_KEY,
                },
            },
            "device_tracker.cache.@bob:example.org": {
                "device_keys": {
                    "HIJKLMN": BOB_DEVICE_KEY,
                },
            },
        },
        callbacks={},
        base_client_url="https://matrix-client.example.org/_matrix/client/",
    ) as alice:
        async with client.Client(
            storage={
                "access_token": "anaccesstoken",
                "user_id": "@bob:example.org",
                "device_id": "HIJKLMN",
            },
            callbacks={},
            base_client_url="https://matrix-client.example.org/_matrix/client/",
        ) as bob:
            {{replay detection test}}
replay detection test:
room_state_tracker = rooms.RoomStateTracker(alice)
device_tracker = devices.DeviceTracker(alice)
mock_aioresponse.post(
    "https://matrix-client.example.org/_matrix/client/v3/keys/upload",
    status=200,
    body='{"one_time_key_counts":{"signed_curve25519":100}}',
    headers={
        "Content-Type": "application/json",
    },
)
device_keys_manager = devices.DeviceKeysManager(alice, b"\x00" * 32)
outbound_session = megolm.OutboundMegolmSession(
    alice,
    "!room_id",
    room_state_tracker,
    device_tracker,
    device_keys_manager,
    b"\x00" * 32,
)
[
    (bob_device_key, bob_room_key)
] = await outbound_session.get_session_key_for_sending()

bob_inbound_session = megolm.InboundMegolmSession.from_room_key(
    bob,
    "@alice:example.org",
    device_keys_manager.identity_key,
    bob_room_key,
    b"\x00" * 32,
)

outbound_session.mark_as_sent([bob_device_key])

encrypted_content1 = outbound_session.encrypt(
    "m.room.message",
    {"body": "Hello World!", "msgtype": "m.text"},
)
encrypted1 = events.RoomEvent(
    sender="@alice:example.org",
    event_id="$event1",
    type="m.room.encrypted",
    room_id="!room_id",
    content=encrypted_content1,
    origin_server_ts=1234567890000,
)

assert bob_inbound_session.decrypt(encrypted1) == {
    "room_id": "!room_id",
    "type": "m.room.message",
    "content": {"body": "Hello World!", "msgtype": "m.text"},
}
assert bob_inbound_session.decrypt(encrypted1) == {
    "room_id": "!room_id",
    "type": "m.room.message",
    "content": {"body": "Hello World!", "msgtype": "m.text"},
}

encrypted2 = events.RoomEvent(
    sender="@alice:example.org",
    event_id="$event2",
    type="m.room.encrypted",
    room_id="!room_id",
    content=encrypted_content1,
    origin_server_ts=1234567890123,
)
with pytest.raises(RuntimeError):
    bob_inbound_session.decrypt(encrypted2)

Todo

explain what checks we need to do

  • sender key matches sending user