Tying together the Olm/Megolm pieces

We now have all the pieces that we need to encrypt and decrypt events using Olm/Megolm. In this section, we will tie all the pieces together so that clients can easily send and receive encrypted events. We create a class called OlmMegolmEncryptionManager that will handle this.

megolm module classes:
class OlmMegolmEncryptionManager:
    """Manages encryption and decryption of room events"""

    {{OlmMegolmEncryptionManager class methods}}

We create an OlmMegolmEncryptionManager by passing it a Client object, a key for encrypting data in the storage, and optionally a DeviceKeysManager, a DeviceTracker, and a RoomStateTracker. If they are not provided, new ones will be created.

OlmMegolmEncryptionManager class methods:
def __init__(
    self,
    c: Client,
    key: bytes,
    device_keys_manager: typing.Optional[devices.DeviceKeysManager] = None,
    device_tracker: typing.Optional[devices.DeviceTracker] = None,
    room_state_tracker: typing.Optional[rooms.RoomStateTracker] = None,
):
    """
    Arguments:

    ``c``:
      the client object
    ``key``:
      a 32-byte binary used to encrypt the objects in storage
    ``device_keys_manager``:
      a ``devices.DeviceKeysManager`` object to use.  If none is provided, a
      new one will be created.
    ``device_tracker``:
      a ``devices.DeviceTracker`` object to use.  If none is provided, a new
      one will be created.
    """
    self.client = c
    self.key = key
    self.device_keys_manager = typing.cast(
        devices.DeviceKeysManager,
        device_keys_manager
        if device_keys_manager != None
        else devices.DeviceKeysManager(c, key),
    )
    self.device_tracker = typing.cast(
        devices.DeviceTracker,
        device_tracker if device_tracker != None else devices.DeviceTracker(c),
    )
    self.room_state_tracker = typing.cast(
        rooms.RoomStateTracker,
        room_state_tracker
        if room_state_tracker != None
        else rooms.RoomStateTracker(c),
    )

    {{OlmMegolmEncryptionManager initialization}}

We will also subscribe to the necessary messages from the Client object so that it can receive the information that it needs from the /sync response, namely to-device events.

OlmMegolmEncryptionManager initialization:
c.publisher.subscribe(client.ToDeviceEvents, self._to_device_subscriber)

And we create a lock to ensure that concurrent operations do not conflict. We use a single lock here; it may be possible to improve concurrency by using finer-grained locks.

OlmMegolmEncryptionManager initialization:
self.lock = asyncio.Lock()

We will publish messages using our publish/subscribe mechanism, so we will have our own publisher object as a member.

OlmMegolmEncryptionManager initialization:
self.publisher = pubsub.Publisher()

Decrypting to-device events

First, let us handle the to-device events. The events that we will be interested in are m.room.encrypted events that are Olm-encrypted. (We ignore any Megolm-encrypted events since they should not be sent as to-device events.)

OlmMegolmEncryptionManager class methods:
async def _to_device_subscriber(self, msg: client.ToDeviceEvents) -> None:
    async with self.lock:
        for event in msg.events:
            if (
                event.type == "m.room.encrypted"
                and event.content.get("algorithm") == olm.OLM_ALGORITHM
                and schema.is_valid(
                    event.content,
                    {
                        "sender_key": str,
                        "ciphertext": schema.Object({"type": int, "body": str}),
                    },
                )
            ):
                await self._decrypt_and_process_todevice(event)
OlmMegolmEncryptionManager class methods:
async def _decrypt_and_process_todevice(self, event: events.Event) -> None:
    sender_key = event.content["sender_key"]
    try:
        {{decrypt Olm-encrypted event}}
    except:
        e = sys.exc_info()[1]
        logging.error(
            f"Unable to decrypt to-device event from {event.sender}:{sender_key}", e
        )
        return

    {{process decrypted Olm event}}
    else:
        logging.info(
            f"Ignoring {decrypted['type']} event from {event.sender}:{sender_key}"
        )

To decrypt the event, we first determine whether we already have an Olm channel established with the sender. If so, we use the channel to decrypt the event. Otherwise, we create a new channel and decrypt the event.

decrypt Olm-encrypted event:
olm_channel = olm.OlmChannel.create_from_storage(
    self.client,
    self.device_keys_manager,
    event.sender,
    sender_key,
)
if isinstance(olm_channel, olm.OlmChannel):
    decrypted = olm_channel.decrypt(event.content)
else:
    # FIXME: we should probably only query the cache, and never query
    # the server here, to avoid delays
    sender_devices = (
        (await self.device_tracker.get_device_keys([event.sender]))
        .get(event.sender, devices.UserDeviceKeysResult({}))
        .device_keys
    )
    sender_device_id, sender_fingerprint_key = None, None
    for device_id, device in sender_devices.items():
        if (
            device.get("keys", {}).get(f"curve25519:{device_id}")
            == sender_key
        ):
            sender_device_id = device_id
            sender_fingerprint_key = device["keys"].get(
                f"ed25519:{device_id}"
            )

    (
        olm_channel,
        decrypted_or_err,
    ) = olm.OlmChannel.create_from_encrypted_event(
        self.client,
        self.device_keys_manager,
        event.sender,
        sender_key,
        event.content,
        sender_device_id,
        sender_fingerprint_key,
    )

    if isinstance(decrypted_or_err, dict):
        decrypted = decrypted_or_err
    else:
        raise decrypted_or_err
# FIXME: Olm unwedging - on error, create new olm session and send dummy event

After the event is decrypted, we can process the plaintext event. For now, we will only consider m.room_key events. When we receive an m.room_key event, we try to create an inbound Megolm session from it. We then publish a message indicating that the session has been received. This can be used by the client to decrypt room events that were received before the m.room_key event was received, as we will explain below.

process decrypted Olm event:
if decrypted["type"] == "m.room_key":
    try:
        session = InboundMegolmSession.from_room_key(
            self.client,
            event.sender,
            sender_key,
            decrypted["content"],
            self.key,
        )
        await self.publisher.publish(session)
    except:
        e = sys.exc_info()[1]
        logging.error(
            f"Unable to process room key from {event.sender}:{sender_key}", e
        )
Tests

We test that we can decrypt an Olm-encrypted event by simulating the reception of an encrypted room key event.

test megolm:
@pytest.mark.asyncio
async def test_decrypt_olm_encrypted_room_key(mock_aioresponse):
    async with client.Client(
        storage={
            "access_token": "anaccesstoken",
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
            "room_state_tracker": {
                "!room_id": {
                    "m.room.encryption": {
                        "": ROOM_ENCRYPTION_EVENT,
                    },
                    "m.room.member": {
                        "@alice:example.org": ALICE_ROOM_MEMBERSHIP,
                        "@bob:example.org": BOB_ROOM_MEMBERSHIP,
                    },
                },
            },
        },
        callbacks={},
        base_client_url="https://matrix-client.example.org/_matrix/client/",
    ) as alice:
        async with client.Client(
            storage={
                "access_token": "anaccesstoken",
                "user_id": "@bob:example.org",
                "device_id": "HIJKLMN",
            },
            callbacks={},
            base_client_url="https://matrix-client.example.org/_matrix/client/",
        ) as bob:
            {{test decrypt Olm-encrypted room key}}

First, we create the device keys for our two clients. We write a utility function to create device keys for a client, since we will be doing that it several tests.

megolm test utility functions:
async def _create_device_keys(
    mock_aioresponse, client: Client
) -> typing.Tuple[dict, dict, devices.DeviceKeysManager]:
    device_keys = {}
    otks = {}

    ev = asyncio.Event()

    def callback(url, **kwargs):
        nonlocal device_keys, otks
        device_keys = kwargs["json"]["device_keys"]
        otks = kwargs["json"]["one_time_keys"]
        ev.set()

        return aioresponses.CallbackResult(
            status=200,
            body='{"one_time_key_counts":{"signed_curve25519":100}}',
            headers={
                "Content-Type": "application/json",
            },
        )

    mock_aioresponse.post(
        "https://matrix-client.example.org/_matrix/client/v3/keys/upload",
        callback=callback,
    )
    device_keys_manager = devices.DeviceKeysManager(client, b"\x00" * 32)
    await ev.wait()

    return (device_keys, otks, device_keys_manager)

Then we use the function to create keys for Alice and Bob.

test decrypt Olm-encrypted room key:
(
    alice_device_keys,
    alice_otks,
    alice_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, alice)

(
    bob_device_keys,
    bob_otks,
    bob_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, bob)

# store the device keys so that the device tracker can get them
alice.storage["device_tracker.cache.@alice:example.org"] = bob.storage[
    "device_tracker.cache.@alice:example.org"
] = {"device_keys": {"ABCDEFG": alice_device_keys}}

alice.storage["device_tracker.cache.@bob:example.org"] = bob.storage[
    "device_tracker.cache.@bob:example.org"
] = {"device_keys": {"HIJKLMN": bob_device_keys}}

Next Alice creates an outbound Megolm session to share with Bob.

test decrypt Olm-encrypted room key:
alice_room_state_tracker = rooms.RoomStateTracker(alice)
alice_device_tracker = devices.DeviceTracker(alice)
outbound_session = megolm.OutboundMegolmSession(
    alice,
    "!room_id",
    alice_room_state_tracker,
    alice_device_tracker,
    alice_device_keys_manager,
    b"\x00" * 32,
)

[
    (bob_device_keys, bob_room_key)
] = await outbound_session.get_session_key_for_sending()

Alice creates an Olm channel with Bob and encrypts the Megolm session.

test decrypt Olm-encrypted room key:
# use the first `signed_curve25519` one-time key
otk = [
    key
    for id, key in bob_otks.items()
    if id.startswith("signed_curve25519:")
][0]

alice_olm_channel = olm.OlmChannel.create_outbound_channel(
    alice,
    alice_device_keys_manager,
    bob_device_keys,
    otk,
)

room_key_encrypted = alice_olm_channel.encrypt("m.room_key", bob_room_key)

Bob creates an OlmMegolmEncryptionManager and then receives the encrypted room key. We can check that the OlmMegolmEncryptionManager has correctly processed the event by checking that the inbound Megolm session is in Bob’s storage.

test decrypt Olm-encrypted room key:
bob_device_tracker = devices.DeviceTracker(bob)

bob_encryption_manager = megolm.OlmMegolmEncryptionManager(
    bob,
    b"\x00" * 32,
    bob_device_keys_manager,
    bob_device_tracker,
)

await bob.publisher.publish(
    client.ToDeviceEvents(
        [
            events.Event(
                sender="@alice:example.org",
                type="m.room.encrypted",
                content=room_key_encrypted,
            )
        ]
    )
)

assert (
    f"inbound_megolm_session.!room_id.@alice:example.org.{outbound_session.session_id}"
    in bob.storage
)

Decrypting room events

Now that we can decrypt to-device events, allowing us to receive room keys, we can try to decrypt room events.

Trying to decrypt a room event is fairly straightforward. We first ensure that it actually is encrypted, and has the required information, such as the session ID. We then try to load the inbound Megolm session from storage using the room ID, the sender’s user ID, and the session ID, and if we were able to load the Megolm session, then we use it to try to decrypt the event.

Todo

allow handling withheld codes

OlmMegolmEncryptionManager class methods:
async def decrypt_room_event(self, encrypted: events.RoomEvent) -> events.RoomEvent:
    """Decrypt an event

    Returns the room event that would have been received if the event was not
    encrypted.

    Arguments:

    ``encrypted``:
      the encrypted room event
    """
    async with self.lock:
        content = encrypted.content
        if (
            encrypted.type != "m.room.encrypted"
            or content.get("algorithm") != MEGOLM_ALGORITHM
            or "session_id" not in content
        ):
            raise error.UnableToDecryptError("Not encrypted or malformed event")
        session = InboundMegolmSession.from_storage(
            self.client,
            encrypted.room_id,
            encrypted.sender,
            content["session_id"],
            self.key,
        )
        if not session:
            # FIXME: add the session ID
            raise error.UnableToDecryptError("Unknown session ID")
        decrypted = session.decrypt(encrypted)
        return events.RoomEvent(
            type=decrypted["type"],
            sender=encrypted.sender,
            content=decrypted["content"],
            unsigned=encrypted.unsigned,
            room_id=decrypted["room_id"],
            event_id=encrypted.event_id,
            origin_server_ts=encrypted.origin_server_ts,
        )

Tradeoff

An SDK could automatically decrypt events that arrive from the sync and send the decrypted events to the application automatically, rather than having the application call a function to decrypt the events. This is more convenient, but introduces some more complexity and raises some questions regarding how to do it. The SDK should not notify the application of both the encrypted and the decrypted versions of the event separately, as this could be confusing, so the approach that we have taken of publishing a message containing the room timeline from the sync would need to be modified somehow.

There is also the question of how to handle events that cannot be decrypted. In some cases, the to-device event might be received after the room event is received, so we receive the Megolm session after we receive the event that it decrypts. This delay might be short, so we may want to wait a bit before notifying the application about the failure to decrypt, but of course this raises the question of how long to wait.

Another question is whether the failure to decrypt one event should delay notifying the application of subsequent events. The answer to this depends on how sensitive the application is to the ordering of events. If the application depends on processing events in-order, then it should not be notified of an event until all previous events either have been decrypted or we are reasonably certain that they are undecryptable. However, if the application does not need to process events in-order, we do not need to delay notifying it about decrypted events.

Having the application call a function to decrypt and encrypted event, as we have done here, makes a bit more work for the application, but allows it to make the decision on how to handle decryption failures.

Tests

We will first test that we can successfully decrypt an event if we already have the Megolm session.

test megolm:
@pytest.mark.asyncio
async def test_decrypt_megolm_encrypted_event(mock_aioresponse):
    async with client.Client(
        storage={
            "access_token": "anaccesstoken",
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
            "room_state_tracker": {
                "!room_id": {
                    "m.room.encryption": {
                        "": ROOM_ENCRYPTION_EVENT,
                    },
                    "m.room.member": {
                        "@alice:example.org": ALICE_ROOM_MEMBERSHIP,
                        "@bob:example.org": BOB_ROOM_MEMBERSHIP,
                    },
                },
            },
        },
        callbacks={},
        base_client_url="https://matrix-client.example.org/_matrix/client/",
    ) as alice:
        async with client.Client(
            storage={
                "access_token": "anaccesstoken",
                "user_id": "@bob:example.org",
                "device_id": "HIJKLMN",
            },
            callbacks={},
            base_client_url="https://matrix-client.example.org/_matrix/client/",
        ) as bob:
            {{test decrypt Megolm-encrypted event}}

We first create the device keys and create an outbound Megolm session for Alice to encrypt with. We then create an inbound Megolm session that Bob will use to decrypt.

test decrypt Megolm-encrypted event:
(
    alice_device_keys,
    alice_otks,
    alice_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, alice)

(
    bob_device_keys,
    bob_otks,
    bob_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, bob)

# store the device keys so that the device tracker can get them
alice.storage["device_tracker.cache.@alice:example.org"] = bob.storage[
    "device_tracker.cache.@alice:example.org"
] = {"device_keys": {"ABCDEFG": alice_device_keys}}

alice.storage["device_tracker.cache.@bob:example.org"] = bob.storage[
    "device_tracker.cache.@bob:example.org"
] = {"device_keys": {"HIJKLMN": bob_device_keys}}

alice_room_state_tracker = rooms.RoomStateTracker(alice)
alice_device_tracker = devices.DeviceTracker(alice)
outbound_session = megolm.OutboundMegolmSession(
    alice,
    "!room_id",
    alice_room_state_tracker,
    alice_device_tracker,
    alice_device_keys_manager,
    b"\x00" * 32,
)

[
    (bob_device_keys, bob_room_key)
] = await outbound_session.get_session_key_for_sending()

megolm.InboundMegolmSession.from_room_key(
    bob,
    "@alice:example.org",
    alice_device_keys["keys"]["ed25519:ABCDEFG"],
    bob_room_key,
    b"\x00" * 32,
)

Alice then encrypts a room event, and we check that Bob can decrypt it.

test decrypt Megolm-encrypted event:
encrypted_content = outbound_session.encrypt(
    "m.room.message",
    {"msgtype": "m.text", "body": "Hello World!"},
)

bob_device_tracker = devices.DeviceTracker(bob)
bob_encryption_manager = megolm.OlmMegolmEncryptionManager(
    bob,
    b"\x00" * 32,
    bob_device_keys_manager,
    bob_device_tracker,
)

decrypted = await bob_encryption_manager.decrypt_room_event(
    events.RoomEvent(
        sender="@alice:example.org",
        type="m.room.encrypted",
        room_id="!room_id",
        event_id="$event_id",
        content=encrypted_content,
        origin_server_ts=1234567890123,
    )
)

assert decrypted == events.RoomEvent(
    sender="@alice:example.org",
    type="m.room.message",
    room_id="!room_id",
    event_id="$event_id",
    content={"msgtype": "m.text", "body": "Hello World!"},
    origin_server_ts=1234567890123,
)

We then test that it handles certain errors as expected.

test megolm:
@pytest.mark.asyncio
async def test_megolm_decryption_error_handling(mock_aioresponse):
    async with Client(
        storage={
            "access_token": "anaccesstoken",
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
        },
        callbacks={},
        base_client_url="https://matrix-client.example.org/_matrix/client/",
    ) as client:
        {{test Megolm decryption error handling}}

We set up an OlmMegolmEncryptionManagar to test.

test Megolm decryption error handling:
device_keys_manager = devices.DeviceKeysManager(client, b"\x00" * 32)
device_tracker = devices.DeviceTracker(client)

encryption_manager = megolm.OlmMegolmEncryptionManager(
    client,
    b"\x00" * 32,
    device_keys_manager,
    device_tracker,
)

We first try to decrypt a non-encrypted event and expect to get an error.

test Megolm decryption error handling:
with pytest.raises(
    error.UnableToDecryptError, match="Not encrypted or malformed event"
):
    await encryption_manager.decrypt_room_event(
        events.RoomEvent(
            sender="@alice:example.org",
            type="m.room.message",
            room_id="!room_id",
            event_id="$event_id",
            content={"body": "Hello world!", "msgtype": "m.text"},
            origin_server_ts=1234567890123,
        )
    )

Then we try to decrypt an event encrypted with an unknown algorithm.

test Megolm decryption error handling:
with pytest.raises(
    error.UnableToDecryptError, match="Not encrypted or malformed event"
):
    await encryption_manager.decrypt_room_event(
        events.RoomEvent(
            sender="@alice:example.org",
            type="m.room.encrypted",
            room_id="!room_id",
            event_id="$event_id",
            content={"algorithm": "org.example.unknown"},
            origin_server_ts=1234567890123,
        )
    )

Finally we try to decrypt a Megolm-encrypted event that we do not have the Megolm session for.

test Megolm decryption error handling:
with pytest.raises(error.UnableToDecryptError, match="Unknown session ID"):
    await encryption_manager.decrypt_room_event(
        events.RoomEvent(
            sender="@alice:example.org",
            type="m.room.encrypted",
            room_id="!room_id",
            event_id="$event_id",
            content={
                "algorithm": megolm.MEGOLM_ALGORITHM,
                "session_id": "nonexistent",
                "ciphertext": "nothing",
            },
            origin_server_ts=1234567890123,
        )
    )

Checking if a encryption is enabled

Now that we can decrypt events, let us turn our attention to encrypting. Before we actually encrypt an event, we must first determine if we need to encrypt events for the room. That is, we check if the room has encryption enabled.

This is done by checking if the room has an m.room.encryption state event set with the algorithm property set to m.megolm.v1.aes-sha2. However, the m.room.encryption state event is slightly different from other state events in that we are not interested in just the current value: once encryption has been enabled in a room, it cannot be disabled. If encryption could be disabled in a room, then users might not notice it being disabled, and send sensitive messages under the assumption that the messages are encrypted.

It could also be problematic if the encryption algorithm is changed, as the new encryption algorithm could be weaker than the old one, or have different security properties. Although we only support Megolm for now, we will treat any attempt to change the encryption algorithm as an undesirable condition. In the future, if we support multiple encryption algorithms, we may allow some transitions, but it is safest to default to rejecting by default.

Thus we need to track the m.room.encryption state event, and keep track of whether encryption was ever enabled in a room. We do this in the same way that we made the RoomStateTracker, where we subscribe to sync changes. When we receive an m.room.encryption state event, we compare it with the previously set values, and we will detect different conditions and publish messages that the application can subscribe to:

  • if Megolm encryption is enabled for the first time or re-enabled after having been disabled or encryption set to a different algorithm,

  • if encryption is disabled (that is, if encryption had previously been enabled, and we receive an m.room.encryption event that does not have an algorithm),

  • if the encryption algorithm changes, or

  • if the encryption algorithm is initially set to an unknown algorithm.

In addition to publishing messages, we will also create a function that will tell us whether encryption is enabled in a room, and which will raise exceptions for the above conditions (except for Megolm encryption being enabled).

Todo

We also don’t want to allow changing parameters to something that is less secure?

OlmMegolmEncryptionManager initialization:
c.publisher.subscribe(
    (client.RoomStateUpdates, client.RoomTimelineUpdates),
    self._state_subscriber,
)
OlmMegolmEncryptionManager class methods:
async def _state_subscriber(
    self, msg: typing.Union[client.RoomStateUpdates, client.RoomTimelineUpdates]
) -> None:
    room_id = msg.room_id
    for e in msg.events:
        if (
            isinstance(e, events.StateEvent)
            and e.type == "m.room.encryption"
            and e.state_key == ""
        ):
            state = self.client.storage.get("room_encryption_state", {})
            [prev_alg, orig_alg] = state.get(room_id, [None, None])
            new_alg = e.content.get("algorithm")
            if type(new_alg) != str:
                new_alg = None

            if prev_alg == new_alg:
                # if the encryption algorithm isn't changing, we don't need to
                # do anything
                return
            elif new_alg is None:
                await self.publisher.publish(EncryptionDisabled(room_id, orig_alg))
            elif orig_alg == new_alg:
                # encryption is being restored to the original algorithm
                await self.publisher.publish(EncryptionEnabled(room_id, new_alg))
            elif prev_alg is None:
                if orig_alg is None:
                    # encryption is being enabled for the first time
                    # the new encryption algorithm will be the room's original
                    # encryption algorithm
                    orig_alg = new_alg

                    if new_alg == MEGOLM_ALGORITHM:
                        await self.publisher.publish(
                            EncryptionEnabled(room_id, new_alg)
                        )
                    else:
                        await self.publisher.publish(
                            UnknownRoomEncryptionAlgorithm(
                                room_id, typing.cast(str, new_alg)
                            )
                        )
                else:
                    # the encryption algorithm is being set to something
                    # other than the previous algorithm, and other than the
                    # original algorithm
                    await self.publisher.publish(
                        EncryptionAlgorithmChanged(room_id, orig_alg, new_alg)
                    )
            else:
                await self.publisher.publish(
                    EncryptionAlgorithmChanged(room_id, orig_alg, new_alg)
                )
            state[room_id] = [new_alg, orig_alg]
            self.client.storage["room_encryption_state"] = state

def is_room_encrypted(self, room_id: str) -> bool:
    """
    Determines whether events sent to the given room should be encrypted.

    May raise

    * ``EncryptionDisabledError`` if encryption was enabled in the room in the
      past, but is now disabled
    * ``EncryptionAlgorithmChangedError`` if the encryption algorithm for the
      room changed from its initial algorithm
    * ``UnknownRoomEncryptionAlgorithmError`` if the encryption algorithm for
      the room is not known (that is, it is not Megolm)
    """
    state = self.client.storage.get("room_encryption_state", {})
    [curr_alg, orig_alg] = state.get(room_id, [None, None])
    if curr_alg is None:
        if orig_alg is None:
            return False
        else:
            raise error.EncryptionDisabledError()
    elif curr_alg != orig_alg:
        raise error.EncryptionAlgorithmChangedError(orig_alg, curr_alg)
    elif curr_alg == MEGOLM_ALGORITHM:
        return True
    else:
        raise error.UnknownEncryptionAlgorithmError(orig_alg)

We now define the messages and exceptions that we use above.

megolm module classes:
class EncryptionEnabled(typing.NamedTuple):
    """A message indicating that encryption was enabled in a room"""

    room_id: str
    algorithm: str


class EncryptionDisabled(typing.NamedTuple):
    """A message indicating that encryption was disabled in a room"""

    room_id: str
    original_algorithm: str


EncryptionDisabled.original_algorithm.__doc__ = (
    "The encryption algorithm that was initially set for the room"
)


class UnknownRoomEncryptionAlgorithm(typing.NamedTuple):
    """A message indicating that the encryption algorithm for a room was
    initially set to an unknown algorithm"""

    room_id: str
    algorithm: str


class EncryptionAlgorithmChanged(typing.NamedTuple):
    """A message indicating that the encryption algorithm for a room changed"""

    room_id: str
    original_algorithm: str
    new_algorithm: str
error module classes:
class EncryptionDisabledError(RuntimeError):
    """Encryption was enabled in a room but is now disabled"""

    pass


class EncryptionAlgorithmChangedError(RuntimeError):
    """The encryption algorithm in a room was changed from its initial
    algorithm"""

    def __init__(self, initial_algorithm: str, new_algorithm: str):
        super()
        self.initial_algorithm = initial_algorithm
        self.new_algorithm = new_algorithm


class UnknownEncryptionAlgorithmError(RuntimeError):
    """The initial encryption algorithm for a room is not a known algorithm"""

    def __init__(self, algorithm: str):
        super()
        self.algorithm = algorithm
Tests

To test this, we will simulate m.room.encryption events being sent to two different rooms. In the first room, we will enable encryption, change it to a different algorithm, and then disable encryption. And in the second room, we will set the encryption algorithm to an unknown algorithm. We will subscribe to all the messages, and ensure that we receive the expected messages at the right times. We will also check that is_room_encrypted returns the correct response in each situation.

test megolm:
@pytest.mark.asyncio
async def test_room_encryption_event(mock_aioresponse):
    async with client.Client(
        storage={
            "access_token": "anaccesstoken",
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
        },
        callbacks={},
        base_client_url="https://matrix-client.example.org/_matrix/client/",
    ) as c:
        {{room encryption event test}}

We first set up an OlmMegolmEncryptionManager.

room encryption event test:
(
    device_keys,
    otks,
    device_keys_manager,
) = await _create_device_keys(mock_aioresponse, c)

device_tracker = devices.DeviceTracker(c)

encryption_manager = megolm.OlmMegolmEncryptionManager(
    c,
    b"\x00" * 32,
    device_keys_manager,
    device_tracker,
)

We then subscribe to the messages that we are interested in. We use Python’s Mock class to create subscriber methods so that we can check whether they were called.

room encryption event test:
subscriber = Mock()
encryption_manager.publisher.subscribe(
    (
        megolm.EncryptionEnabled,
        megolm.UnknownRoomEncryptionAlgorithm,
        megolm.EncryptionDisabled,
        megolm.EncryptionAlgorithmChanged,
    ),
    subscriber,
)

We check that our rooms are marked as unencrypted.

room encryption event test:
assert encryption_manager.is_room_encrypted("!room1") == False
assert encryption_manager.is_room_encrypted("!room2") == False

We then cause the client to publish sync events indicating changes in room encryption. First enable encryption in a room, and we check that we received a message indicating that encryption was enabled, and that the room is marked as encrypted.

room encryption event test:
await c.publisher.publish(
    client.RoomTimelineUpdates(
        "!room1",
        [
            events.StateEvent(
                type="m.room.encryption",
                state_key="",
                sender="@bob:example.org",
                content={"algorithm": megolm.MEGOLM_ALGORITHM},
                event_id="$enable_encryption",
                room_id="!room1",
                origin_server_ts=1234567890000,
            )
        ],
        False,
        None,
    )
)

assert subscriber.call_count == 1
assert subscriber.call_args == mock.call(
    megolm.EncryptionEnabled("!room1", megolm.MEGOLM_ALGORITHM)
)
assert encryption_manager.is_room_encrypted("!room1") == True

We then change the encryption algorithm, and check that we receive an event indicating that the encryption algorithm is being changed, and that is_room_encrypted raises an error.

room encryption event test:
subscriber.reset_mock()
await c.publisher.publish(
    client.RoomTimelineUpdates(
        "!room1",
        [
            events.StateEvent(
                type="m.room.encryption",
                state_key="",
                sender="@bob:example.org",
                content={"algorithm": "a.different.algorithm"},
                event_id="$enable_encryption",
                room_id="!room1",
                origin_server_ts=1234567890001,
            )
        ],
        False,
        None,
    )
)

assert subscriber.call_count == 1
assert subscriber.call_args == mock.call(
    megolm.EncryptionAlgorithmChanged(
        "!room1", megolm.MEGOLM_ALGORITHM, "a.different.algorithm"
    ),
)
with pytest.raises(error.EncryptionAlgorithmChangedError):
    encryption_manager.is_room_encrypted("!room1")

Next, we send an empty m.room.encryption event, and check that we receive an event indicating that encryption was disabled, and that is_room_encrypted raises an error.

room encryption event test:
subscriber.reset_mock()
await c.publisher.publish(
    client.RoomTimelineUpdates(
        "!room1",
        [
            events.StateEvent(
                type="m.room.encryption",
                state_key="",
                sender="@bob:example.org",
                content={},
                event_id="$enable_encryption",
                room_id="!room1",
                origin_server_ts=1234567890002,
            )
        ],
        False,
        None,
    )
)

assert subscriber.call_count == 1
assert subscriber.call_args == mock.call(
    megolm.EncryptionDisabled("!room1", megolm.MEGOLM_ALGORITHM)
)
with pytest.raises(error.EncryptionDisabledError):
    encryption_manager.is_room_encrypted("!room1")

When we re-enable encryption with the correct algorithm, we should receive another event indicating that encryption was enabled, and is_room_encrypted should return True.

room encryption event test:
subscriber.reset_mock()
await c.publisher.publish(
    client.RoomTimelineUpdates(
        "!room1",
        [
            events.StateEvent(
                type="m.room.encryption",
                state_key="",
                sender="@bob:example.org",
                content={"algorithm": megolm.MEGOLM_ALGORITHM},
                event_id="$enable_encryption",
                room_id="!room1",
                origin_server_ts=1234567890003,
            )
        ],
        False,
        None,
    )
)

assert subscriber.call_count == 1
assert subscriber.call_args == mock.call(
    megolm.EncryptionEnabled("!room1", megolm.MEGOLM_ALGORITHM)
)
assert encryption_manager.is_room_encrypted("!room1") == True

If we initially enable encryption in a room with an unknown algorithm, we should receive an event indicating that the algorithm is unknown, and is_room_encrypted should raise an error.

room encryption event test:
subscriber.reset_mock()
await c.publisher.publish(
    client.RoomTimelineUpdates(
        "!room2",
        [
            events.StateEvent(
                type="m.room.encryption",
                state_key="",
                sender="@bob:example.org",
                content={"algorithm": "an.unknown.algorithm"},
                event_id="$enable_encryption",
                room_id="!room2",
                origin_server_ts=1234567890004,
            )
        ],
        False,
        None,
    )
)

assert subscriber.call_count == 1
assert subscriber.call_args == mock.call(
    megolm.UnknownRoomEncryptionAlgorithm("!room2", "an.unknown.algorithm")
)
with pytest.raises(error.UnknownEncryptionAlgorithmError):
    encryption_manager.is_room_encrypted("!room2")

Encrypting to-device events

After determining that we need to encrypt events to the room, we will new actually perform the encryption. As before, we start with to-device events, and will look at room events after.

Applications will not encrypt to-device events directly, rather it will be called by the other functions such us the function used to encrypt room events. So the function that we write here to encrypt to-devices events will be an internal function.

The Matrix API allows us to send multiple to-device events, of the same type, in one call, so our function will take an event type and a list of device key objects and event contents, and will send the events to those devices. Our function will also take a batch size, as it will split the requests into batches to avoid making a request that is too big, and so that the server can start delivering the events before receiving all of them.

We must also handle errors — if we are unable to encrypt an event for a user, we will store the event to retry later.

Conceptual explanation

The code that we write here will be quite complicated, so before we get to the code, let as discuss how it will operate and explain why the code becomes complicated.

In the Encrypting with Olm section, we saw that creating a new Olm session to encrypt a to-device event required claiming one of the recipient’s one-time keys. If we need to create a new Olm session, encrypting an event for a single recipient is a fairly straightforward process when taken in isolation:

  1. claim a one-time key;

  2. create an Olm session (via our OlmChannel class);

  3. encrypt the event;

  4. send the encrypted event.

However, in practice, it is not always this simple, even without taking into account error handling. When sending an encrypted room event, sending the Megolm sessions can take a significant amount of time in large rooms since we must ensure that each recipient gets the Megolm session, and we must send it to each recipient individually. Thus, for example, if there are 1000 recipient devices in a room, and none of them have the Megolm session, then we would need to go through this procedure 1000 times.

The Matrix API contains some optimizations to speed this up a bit. The endpoints for claiming one-time keys and for sending to-device events can claim from/send to multiple devices at a time, reducing the number of calls that must be made to the server. Thus if we need to encrypt to multiple recipients, and we need to create a new Olm session for all of them, the process could look something like this:

  1. claim one-time keys for all recipients;

  2. for each recipient:

    1. create an Olm session;

    2. encrypt the event;

  3. send all the encrypted events.

If we already have Olm sessions created with some recipients, then we could modify the process to look something like this:

  1. claim one-time keys for all recipients that we need new Olm sessions for;

  2. for each recipient:

    1. create an Olm session if we don’t already have one

    2. encrypt the event;

  3. send all the encrypted events.

But there are some obvious ways in which this is slower than necessary. Firstly, if we already have an Olm session with some recipients, we don’t need to wait until after we have claimed the one-time keys; we can begin encrypting to those recipients while we are waiting for the key claim request to return, so that we can start encrypting earlier. Secondly, we don’t need to wait until all the events have been encrypted before sending them all off. We can batch them up and start sending some events as others are still being encrypted. This new process might look something like:

  1. send a key claim request to the server for all the recipients that we need new Olm sessions for, but don’t block waiting for it;

  2. for each recipient that we already have on Olm session for

    1. encrypt the event;

    2. put the encrypted event in a send queue;

    3. if the send queue is full

      1. wait for the previous send task (if any) to finish

      2. send all the events in the queue (don’t block waiting for the send to complete)

      3. and empty the queue;

  3. if the key claim request has not returned yet, but the previous send task (if any) has finished, send all the events in the send queue (if any) and empty the queue;

  4. wait until the key claim request has returned;

  5. for each of the recipient that we claimed a one-time key:

    1. create an Olm session;

    2. encrypt the event;

    3. put the encrypted event in the send queue;

    4. if the send queue is full

      1. wait for the previous send task (if any) to finish

      2. send all the events in the queue (don’t block waiting for the send to complete)

      3. and empty the queue;

  6. wait for the previous send task (if any) to finish and send all the events in the send queue (if any) and empty the queue.

You can see that it’s now starting to get complicated. We also need to make sure that we handle errors (for example, re-sending events if the initial attempt fails). As well, we may need to consider the case where we concurrently send multiple events to a single recipient, for example, if we send two encrypted events in two different rooms and so need to send two different Megolm sessions. In this case, ideally we do not want to claim two one-time keys an create two Olm sessions for the same recipient as that would be wasted effort; we only want to create one Olm session for each recipient.

There are many possible variations and many different ways to structure the code, depending on programming language capabilities and tradeoffs between complexity and efficiency. Here, we will make a moderately-complicated approach: we will implement the parallel process outlined above, but for error handling and concurrent sending, we will handle them in fairly simple ways.

We will explain how we handle each type of error as we get to them. With regards to concurrent sending, we will take the simple approach of simply disallowing it. When we encrypt with Olm, we will take out a lock, preventing any other Olm encryption. This is a very coarse approach, and in some cases may slow things down considerably, but it is very simple, and should work well enough in most situations. For example, in an interactive client, the user will only send events in one room at a time. If the client needs something different, other approaches could be used instead. For example, a separate lock could be used for each recipient, so that the a concurrent send will only block if it has a recipient in common with another.

Implementation

We will now implement the process that we outlined above.

Todo

explain OTK request timeout

OlmMegolmEncryptionManager class methods:
async def _encrypt_and_send_to_device(
    self,
    events: typing.Iterable[typing.Tuple[dict, str, dict]],
    otk_request_timeout: typing.Optional[int] = None,
    batch_size: int = 200,
) -> None:
    async with self.lock:
        send_queue: list[typing.Tuple[str, str, dict]] = []
        send_task: asyncio.Task[None] = asyncio.create_task(asyncio.sleep(0))

        {{determine recipients without Olm channels}}
        {{send keys claim request}}
        {{encrypt to recipients with existing Olm sessions}}
        {{send pending events while waiting for key claim request}}
        {{create new Olm sessions and encrypt}}
        {{flush send queue}}

First we determine which recipients we already have Olm channels with, and which recipients we need to create channels for. If we have an existing Olm channel for a recipient, we will put the Olm channel, the event type, and the event contents in the events_to_encrypt list. If we don’t have an existing Olm channel, we store the recipient’s device keys (which we will use to create an Olm channel), the event type, and the event contents in the channels_to_create list.

determine recipients without Olm channels:
events_to_encrypt: list[typing.Tuple[olm.OlmChannel, str, dict]] = []
channels_to_create: list[typing.Tuple[dict, str, dict]] = []

for device_key, event_type, event_content in events:
    if not schema.is_valid(
        device_key, {"user_id": str, "device_id": str, "keys": dict}
    ):
        logging.error("invalid device key", device_key)
        continue
    user_id = device_key["user_id"]
    device_id = device_key["device_id"]
    identity_key_id = f"curve25519:{device_id}"
    if identity_key_id not in device_key["keys"]:
        logging.error("device key does not have Olm identity key")
        continue
    identity_key = device_key["keys"][identity_key_id]

    olm_channel = olm.OlmChannel.create_from_storage(
        self.client, self.device_keys_manager, user_id, identity_key
    )

    if isinstance(olm_channel, olm.OlmChannel):
        olm_channel.assert_partner_device_id(device_id)
        events_to_encrypt.append((olm_channel, event_type, event_content))
    else:
        channels_to_create.append((device_key, event_type, event_content))

For the recipients that we need to create Olm channels with, we need to send a POST /keys/claim request to claim a one-time key. We have previously written a function for making the POST /keys/claim request, so we will make use of it. Since we do not want to block, waiting for the request to finish, we will create an asyncio.Task to make the request concurrently with our other processing.

send keys claim request:
# create the `devices` parameter for `claim_otks`, which is a dict mapping
# user IDs to a list of device IDs
claim_otks_devices: dict[str, list[str]] = {}
for device_key, _, _ in channels_to_create:
    claim_otks_devices.setdefault(device_key["user_id"], []).append(
        device_key["device_id"]
    )

keys_claim_task = (
    asyncio.create_task(
        self.client.claim_otks(
            "signed_curve25519",
            claim_otks_devices,
            otk_request_timeout,
        )
    )
    if claim_otks_devices  # an empty dict is falsy in Python
    else asyncio.create_task(asyncio.sleep(0, ({}, [])))
)

While we’re waiting for the POST /keys/claim request, we encrypt to the recipients that we already have Olm channels with (if any). As we encrypt, we place the encrypted events in a queue and when the queue is full, we send them. Again, we do not need to wait for the sending to complete before continuing to encrypt events, so we will use another asyncio.Task for sending. However, we only want to have one send request in flight at a time, so we wait for the previous send to complete (if any) before sending a new batch of encrypted events.

Todo

How do we handle errors?

encrypt to recipients with existing Olm sessions:
for olm_channel, event_type, event_content in events_to_encrypt:
    encrypted = olm_channel.encrypt(event_type, event_content)
    send_queue.append(
        (
            olm_channel.partner_user_id,
            typing.cast(str, olm_channel.partner_device_id),
            encrypted,
        )
    )
    if len(send_queue) >= batch_size:
        send_task = asyncio.create_task(
            self._flush_to_device_send_queue(send_task, send_queue)
        )
        send_queue = []
OlmMegolmEncryptionManager class methods:
async def _flush_to_device_send_queue(
    self,
    send_task: asyncio.Task[None],
    send_queue: list[typing.Tuple[str, str, dict]],
) -> None:
    to_device_contents: dict[str, dict[str, dict]] = {}
    for user_id, device_id, contents in send_queue:
        to_device_contents.setdefault(user_id, {})[device_id] = contents

    # wait until the previous send is done, so that we don't have multiple
    # in-flight
    await send_task

    await asyncio.create_task(
        self.client.send_to_device("m.room.encrypted", to_device_contents)
    )

After we have finished encrypting events to existing Olm channels, we will need to wait for the result of the POST /keys/claim query so that we can create the new Olm channels. If we don’t have the result yet, and we have some events in the send queue, we can send those events while we are waiting for the POST /keys/claim result. But this only makes sense to do if we don’t currently have a send task in flight. Thus we wait for the first of the two tasks to complete, and if the keys claim task is not complete while the send task is, then we will send the events.

send pending events while waiting for key claim request:
if len(send_queue) > 0:
    done, pending = await asyncio.wait(
        [keys_claim_task, send_task], return_when=asyncio.FIRST_COMPLETED
    )
    if keys_claim_task in pending:
        send_task = asyncio.create_task(
            self._flush_to_device_send_queue(send_task, send_queue)
        )
        send_queue = []

Now we must wait for the keys claim task to complete before we can continue any further. When that returns, we can use the one-time keys to create new Olm channels and encrypt the events to the remaining recipients. Again, as we encrypt events, we will put them in the queue, and when the queue is full, we will send the events. Note that we may not receive a one-time key for all of the recipients that we requested. This may happen if they do not have any one-time keys available, or if they are on a remote server that did not respond in time.

Todo

explain how to handle missing OTKs.

create new Olm sessions and encrypt:
await keys_claim_task

otk_result, otk_failures = keys_claim_task.result()
for device_key, event_type, event_content in channels_to_create:
    id_and_otk = otk_result.get(device_key["user_id"], {}).get(
        device_key["device_id"]
    )
    if id_and_otk:
        id, otk = id_and_otk
        olm_channel = olm.OlmChannel.create_outbound_channel(
            self.client,
            self.device_keys_manager,
            device_key,
            otk,
        )
        encrypted = olm_channel.encrypt(event_type, event_content)
        send_queue.append(
            (
                device_key["user_id"],
                device_key["device_id"],
                encrypted,
            )
        )
        if len(send_queue) >= batch_size:
            send_task = asyncio.create_task(
                self._flush_to_device_send_queue(send_task, send_queue)
            )
            send_queue = []

Finally, after we finish encrypting the events, we send any events that are left in the send queue.

flush send queue:
if len(send_queue) > 0:
    await self._flush_to_device_send_queue(send_task, send_queue)
Tests

To test this, we will send two sets of events. The first set will be one event sent to Bob’s device, and the second set will be another event sent to Bob’s device as well as one event sent to Carol’s device. We expect that for the second set of events, it will only try to create a new Olm session with Carol’s device, and use the existing Olm session with Bob’s device. We check this by looking at the one-time keys that it tries to claim.

test megolm:
@pytest.mark.asyncio
async def test_send_olm_encrypted_events(mock_aioresponse):
    async with client.Client(
        storage={
            "access_token": "anaccesstoken",
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
            "room_state_tracker": {
                "!room_id": {
                    "m.room.encryption": {
                        "": ROOM_ENCRYPTION_EVENT,
                    },
                    "m.room.member": {
                        "@alice:example.org": ALICE_ROOM_MEMBERSHIP,
                        "@bob:example.org": BOB_ROOM_MEMBERSHIP,
                    },
                },
            },
        },
        callbacks={},
        base_client_url="https://matrix-client.example.org/_matrix/client/",
    ) as alice:
        async with client.Client(
            storage={
                "access_token": "anaccesstoken",
                "user_id": "@bob:example.org",
                "device_id": "HIJKLMN",
            },
            callbacks={},
            base_client_url="https://matrix-client.example.org/_matrix/client/",
        ) as bob:
            async with client.Client(
                storage={
                    "access_token": "anaccesstoken",
                    "user_id": "@carol:example.org",
                    "device_id": "OPQRSTU",
                },
                callbacks={},
                base_client_url="https://matrix-client.example.org/_matrix/client/",
            ) as carol:
                {{test send Olm-encrypted events}}

We first create the device keys for Alice, Bob, and Carol, and an OlmMegolmEncryptionManager for Alice.

test send Olm-encrypted events:
(
    alice_device_keys,
    alice_otks,
    alice_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, alice)

(
    bob_device_keys,
    bob_otks,
    bob_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, bob)

(
    carol_device_keys,
    carol_otks,
    carol_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, carol)

alice_device_tracker = devices.DeviceTracker(alice)

encryption_manager = megolm.OlmMegolmEncryptionManager(
    alice,
    b"\x00" * 32,
    alice_device_keys_manager,
    alice_device_tracker,
)

We create a callback that will capture the to-device event that Alice sends to Bob and Carol, so that we can ensure that they are decryptable. With the repeat=True option passed to mock_aioresponse, the callback will be called for all calls to the sendToDevice endpoint with m.room.encrypted as the event type.

test send Olm-encrypted events:
to_device: dict[typing.Tuple[str, str], typing.Any] = {}

def send_to_device_callback(url, **kwargs):
    nonlocal to_device

    for user_id, device_messages in kwargs["json"]["messages"].items():
        for device_id, message in device_messages.items():
            to_device.setdefault((user_id, device_id), []).append(
                message
            )

    return aioresponses.CallbackResult(
        status=200,
        body="{}",
        headers={"Content-Type": "application/json"},
    )

send_to_device_url_pattern = re.compile(
    r"^https://matrix-client.example.org/_matrix/client/v3/sendToDevice/m.room.encrypted/.*$"
)
mock_aioresponse.put(
    send_to_device_url_pattern,
    callback=send_to_device_callback,
    repeat=True,
)

We add a callback to handle the first /keys/claim call that Alice will make, and ensure that she is requesting Bob’s one-time key. Then we encrypt and send an event to Bob.

test send Olm-encrypted events:
def keys_claim_1_callback(url, **kwargs):
    nonlocal bob_otks

    if kwargs["json"]["one_time_keys"] == {
        "@bob:example.org": {"HIJKLMN": "signed_curve25519"}
    }:
        otk_id, otk = [
            (id, key)
            for id, key in bob_otks.items()
            if id.startswith("signed_curve25519:")
        ][0]

        return aioresponses.CallbackResult(
            status=200,
            body=json.dumps(
                {
                    "one_time_keys": {
                        "@bob:example.org": {"HIJKLMN": {otk_id: otk}}
                    }
                }
            ),
            headers={"Content-Type": "application/json"},
        )
    else:
        return aioresponses.CallbackResult(
            status=400,
            body="unexpected response",
        )

mock_aioresponse.post(
    "https://matrix-client.example.org/_matrix/client/v3/keys/claim",
    callback=keys_claim_1_callback,
)

await encryption_manager._encrypt_and_send_to_device(
    [(bob_device_keys, "m.room.message", {"content": "Hello world!"})]
)

We then add a callback to handle the second /keys/claim call that Alice will make, which should only be requesting Carol’s one-time key. Then we encrypt and send an event to Bob and an event to Carol.

test send Olm-encrypted events:
def keys_claim_2_callback(url, **kwargs):
    nonlocal carol_otks

    if kwargs["json"]["one_time_keys"] == {
        "@carol:example.org": {"OPQRSTU": "signed_curve25519"}
    }:
        otk_id, otk = [
            (id, key)
            for id, key in carol_otks.items()
            if id.startswith("signed_curve25519:")
        ][0]

        return aioresponses.CallbackResult(
            status=200,
            body=json.dumps(
                {
                    "one_time_keys": {
                        "@carol:example.org": {"OPQRSTU": {otk_id: otk}}
                    }
                }
            ),
            headers={"Content-Type": "application/json"},
        )
    else:
        return aioresponses.CallbackResult(
            status=400,
            body="unexpected response",
        )

mock_aioresponse.post(
    "https://matrix-client.example.org/_matrix/client/v3/keys/claim",
    callback=keys_claim_2_callback,
)

await encryption_manager._encrypt_and_send_to_device(
    [
        (bob_device_keys, "m.room.message", {"content": "Hello!"}),
        (carol_device_keys, "m.room.message", {"content": "Hello!"}),
    ]
)

Finally, we check that Alice encrypted the events properly by having Bob and Carol try to decrypt them.

test send Olm-encrypted events:
bob_events = to_device[("@bob:example.org", "HIJKLMN")]

assert len(bob_events) == 2

(
    bob_channel,
    bob_decrypted_event_0,
) = olm.OlmChannel.create_from_encrypted_event(
    bob,
    bob_device_keys_manager,
    "@alice:example.org",
    alice_device_keys_manager.identity_key,
    bob_events[0],
    partner_device_id="ABCDEFG",
    partner_fingerprint_key=alice_device_keys_manager.fingerprint_key,
)

assert bob_decrypted_event_0["content"] == {"content": "Hello world!"}

assert bob_channel.decrypt(bob_events[1])["content"] == {
    "content": "Hello!"
}

carol_events = to_device[("@carol:example.org", "OPQRSTU")]

assert len(carol_events) == 1

(
    carol_channel,
    carol_decrypted_event_0,
) = olm.OlmChannel.create_from_encrypted_event(
    carol,
    carol_device_keys_manager,
    "@alice:example.org",
    alice_device_keys_manager.identity_key,
    carol_events[0],
    partner_device_id="ABCDEFG",
    partner_fingerprint_key=alice_device_keys_manager.fingerprint_key,
)

assert carol_decrypted_event_0["content"] == {"content": "Hello!"}

Todo

Test other situations

Encrypting room events

We now write a function that allows us to encrypt and send room events. To encrypt a room event, we must have an outbound Megolm session for that room. So we remember the current Megolm session for each room. If we don’t have a Megolm session, then we create one. Also, if the Megolm session detects that it needs to be rotated, we will create a new one.

We create a function that obtains the Megolm session for the room (either using an existing one or creating a new one), and sends it to all the other devices in the room (as necessary). We have already written functions to do most of this; we just need to call them.

OlmMegolmEncryptionManager class methods:
async def _get_and_send_outbound_megolm_session(
    self,
    room_id: str,
    otk_request_timeout: typing.Optional[int] = None,
) -> OutboundMegolmSession:
    {{get or create Megolm session for room}}
    {{send Megolm sessions to recipient devices}}

    return outbound_session

When we create an outbound Megolm session for a room, we remember the ID of the session. Then, when we need to encrypt for that room again, we get the session ID associated with the room, and the load the outbound session from the storage. We also need to create an inbound Megolm session from the outbound Megolm session, so that we can decrypt our own events.

Note

For better performance, we can maintain a cache of recently used outbound Megolm sessions, rather than loading the sessions from storage each time. However, that is beyond the scope of this book.

get or create Megolm session for room:
storage_key = f"outbound_megolm_session.{room_id}"
if storage_key in self.client.storage:
    session_id = self.client.storage[storage_key]
    outbound_session = OutboundMegolmSession(
        self.client,
        room_id,
        room_state_tracker=self.room_state_tracker,
        device_tracker=self.device_tracker,
        device_keys_manager=self.device_keys_manager,
        key=self.key,
        session_id=session_id,  # passing in the session ID loads it from storage
    )
else:
    outbound_session = OutboundMegolmSession(
        self.client,
        room_id,
        room_state_tracker=self.room_state_tracker,
        device_tracker=self.device_tracker,
        device_keys_manager=self.device_keys_manager,
        key=self.key,
    )
    InboundMegolmSession.from_outbound_session(
        self.client,
        outbound_session,
        key=self.key,
    )
    self.client.storage[storage_key] = outbound_session.session_id

We use the Megolm session’s get_session_key_for_sending method to get the room key event payload to send to each device. This method may raise an exception if it detects that the session needs rotating, in which case we create a new Megolm session and store the session ID for the room.

send Megolm sessions to recipient devices:
try:
    session_keys = await outbound_session.get_session_key_for_sending()
except SessionExpiredException:
    outbound_session = OutboundMegolmSession(
        self.client,
        room_id,
        room_state_tracker=self.room_state_tracker,
        device_tracker=self.device_tracker,
        device_keys_manager=self.device_keys_manager,
        key=self.key,
    )
    InboundMegolmSession.from_outbound_session(
        self.client,
        outbound_session,
        key=self.key,
    )
    self.client.storage[storage_key] = outbound_session.session_id
    session_keys = await outbound_session.get_session_key_for_sending()

    # remove the old outbound session from storage
    del self.client.storage[
        f"outbound_megolm_session.{room_id}.{outbound_session.session_id}"
    ]

We then encrypt and send the room key events using the function we wrote in the last section.

send Megolm sessions to recipient devices:
if len(session_keys) != 0:
    keys_to_send = [
        (device_key, "m.room_key", content)
        for (device_key, content) in session_keys
    ]
    await self._encrypt_and_send_to_device(keys_to_send, otk_request_timeout)

    outbound_session.mark_as_sent(
        [device_key for (device_key, _) in session_keys]
    )

Todo

Handle errors from _encrypt_and_send_to_device

Using this function, we can create a function that takes an event, and encrypts and sends it to a room. We use the same arguments as for sending unencrypted events. Using the functions that we have already written, this function is fairly straightforward: we get the Megolm and send session for encrypting the event, we encrypt the event, and then we send it to the room.

OlmMegolmEncryptionManager class methods:
async def encrypt_and_send_room_event(
    self,
    room_id: str,
    event_type: str,
    event_content: dict[str, typing.Any],
    retry_ms: int = 0,
    txn_id: typing.Optional[str] = None,
) -> None:
    """Send an event to a room

    Arguments:

    ``room_id``:
      the ID of the room to send to
    ``event_type``:
      the type of event to send
    ``event_content``:
      the content of the event
    ``retry_ms``:
      how long to retry sending, in milliseconds
    ``txn_id``:
      the transaction ID to use.  If none is specified, one is generated.
    """
    session = await self._get_and_send_outbound_megolm_session(room_id)
    encrypted_content = session.encrypt(event_type, event_content)
    await self.client.send_event(
        room_id,
        "m.room.encrypted",
        encrypted_content,
        retry_ms,
        txn_id,
    )

Todo

Handle errors

Tests

We test that we can send an encrypted event to a room. Alice will send an encrypted event to a room, and we will ensure that Bob can decrypt it. We will create an OlmMegolmEncryptionManager for each user, and ensure that Alice’s OlmMegolmEncryptionManager can generate events that Bob’s OlmMegolmEncryptionManager can decrypt.

test megolm:
@pytest.mark.asyncio
async def test_send_megolm_encrypted_event_to_room(mock_aioresponse):
    room_state = {
        "m.room.encryption": {
            "": {
                "room_id": "!room_id",
                "type": "m.room.encryption",
                "state_key": "",
                "sender": "@alice:example.org",
                "event_id": "$encryption_event",
                "origin_server_ts": 1234567890123,
                "content": {
                    "algorithm": megolm.MEGOLM_ALGORITHM,
                    "rotation_period_msgs": 2,
                },
            },
        },
        "m.room.member": {
            "@alice:example.org": ALICE_ROOM_MEMBERSHIP,
            "@bob:example.org": BOB_ROOM_MEMBERSHIP,
        },
    }
    async with client.Client(
        storage={
            "access_token": "anaccesstoken",
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
            "room_state_tracker": {
                "!room_id": room_state,
            },
        },
        callbacks={},
        base_client_url="https://matrix-client.example.org/_matrix/client/",
    ) as alice:
        async with client.Client(
            storage={
                "access_token": "anaccesstoken",
                "user_id": "@bob:example.org",
                "device_id": "HIJKLMN",
                "room_state_tracker": {
                    "!room_id": room_state,
                },
            },
            callbacks={},
            base_client_url="https://matrix-client.example.org/_matrix/client/",
        ) as bob:
            {{test send Megolm-encrypted event to room}}

First we create device keys for Alice and Bob, we populate the clients’ device caches with the keys, and we create encryption managers.

test send Megolm-encrypted event to room:
(
    alice_device_keys,
    alice_otks,
    alice_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, alice)

(
    bob_device_keys,
    bob_otks,
    bob_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, bob)

bob.storage["device_tracker.cache.@alice:example.org"] = alice.storage[
    "device_tracker.cache.@alice:example.org"
] = {
    "device_keys": {
        "ABCDEFG": alice_device_keys,
    }
}

bob.storage["device_tracker.cache.@bob:example.org"] = alice.storage[
    "device_tracker.cache.@bob:example.org"
] = {
    "device_keys": {
        "HIJKLMN": bob_device_keys,
    }
}

alice_device_tracker = devices.DeviceTracker(alice)

alice_encryption_manager = megolm.OlmMegolmEncryptionManager(
    alice,
    b"\x00" * 32,
    alice_device_keys_manager,
    alice_device_tracker,
)

bob_device_tracker = devices.DeviceTracker(alice)

bob_encryption_manager = megolm.OlmMegolmEncryptionManager(
    bob,
    b"\x00" * 32,
    bob_device_keys_manager,
    bob_device_tracker,
)

We then create request callbacks so that to-device events sent by Alice will simulate a sync response in Bob’s client, and to capture room events sent by Alice.

test send Megolm-encrypted event to room:
async def send_to_device_callback(url, **kwargs):
    nonlocal bob

    messages = kwargs["json"]["messages"]

    to_bob = messages.get("@bob:example.org", {}).get("HIJKLMN")
    if to_bob:
        await bob.publisher.publish(
            client.ToDeviceEvents(
                [
                    events.Event(
                        sender="@alice:example.org",
                        type="m.room.encrypted",
                        content=to_bob,
                    )
                ]
            )
        )

    return aioresponses.CallbackResult(
        status=200,
        body="{}",
        headers={
            "Content-Type": "application/json",
        },
    )

send_to_device_url_pattern = re.compile(
    r"^https://matrix-client.example.org/_matrix/client/v3/sendToDevice/m.room.encrypted/.*$"
)
mock_aioresponse.put(
    send_to_device_url_pattern,
    callback=send_to_device_callback,
    repeat=True,
)

room_events = []

def send_event_callback(url, **kwargs):
    nonlocal bob, room_events

    match = re.search(
        "^/_matrix/client/v3/rooms/([^/]+)/send/([^/]+)/",
        url.raw_path,
    )

    room_id = urllib.parse.unquote(match.group(1))
    event_type = urllib.parse.unquote(match.group(2))

    room_events.append((room_id, event_type, kwargs["json"]))

    return aioresponses.CallbackResult(
        status=200,
        body='{"event_id": "$event_id"}',
        headers={
            "Content-Type": "application/json",
        },
    )

send_event_url_pattern = re.compile(
    r"^https://matrix-client.example.org/_matrix/client/v3/rooms/[^/]+/send/.*$"
)
mock_aioresponse.put(
    send_event_url_pattern,
    callback=send_event_callback,
    repeat=True,
)

And we create a callback so that Alice can claim one of Bob’s one-time keys.

test send Megolm-encrypted event to room:
def keys_claim_callback(url, **kwargs):
    nonlocal bob_otks

    if kwargs["json"]["one_time_keys"] == {
        "@bob:example.org": {"HIJKLMN": "signed_curve25519"}
    }:
        otk_id, otk = [
            (id, key)
            for id, key in bob_otks.items()
            if id.startswith("signed_curve25519:")
        ][0]

        return aioresponses.CallbackResult(
            status=200,
            body=json.dumps(
                {
                    "one_time_keys": {
                        "@bob:example.org": {"HIJKLMN": {otk_id: otk}}
                    }
                }
            ),
            headers={"Content-Type": "application/json"},
        )
    else:
        return aioresponses.CallbackResult(
            status=400,
            body="unexpected response",
        )

mock_aioresponse.post(
    "https://matrix-client.example.org/_matrix/client/v3/keys/claim",
    callback=keys_claim_callback,
)

Finally, Alice sends an encrypted event to the room, and we ensure that Bob can decrypt it.

test send Megolm-encrypted event to room:
await alice_encryption_manager.encrypt_and_send_room_event(
    "!room_id",
    "m.room.message",
    {
        "body": "Hello World!",
        "msgtype": "m.text",
    },
)

assert len(room_events) == 1

room_id, event_type, content = room_events[0]

decrypted = await bob_encryption_manager.decrypt_room_event(
    events.RoomEvent(
        type=event_type,
        sender="@alice:example.org",
        room_id=room_id,
        content=content,
        event_id="$event_id",
        origin_server_ts=1234567890123,
    )
)

assert decrypted.content == {
    "body": "Hello World!",
    "msgtype": "m.text",
}

Todo

test other scenarios

Pre-sending Megolm sessions

Since we potentially need to send the Megolm session to every recipient device in the room, sending an encrypted event in a large room may take a long time. One way to increase the perceived performance is to start sending out the Megolm sessions before the room event is ready to be encrypted and sent. In this way, when the room event is ready to be encrypted and sent, some of the devices will have already received the Megolm session, and we will need to send the Megolm session to fewer devices.

However, this must be done with some care, as this could result in wasted effort: if the Megolm session is sent out too early, then something could occur (e.g. a user leaving the room) that would cause the Megolm session to need to be rotated, meaning that we sent out a Megolm session that does not get used. If the client wants to use this technique, it must determine how to decide when to pre-send the Megolm sessions so that it is unlikely that the Megolm session that gets sent needs to be rotated before the event is ready. For example, an interactive client might start pre-sending Megolm sessions when a user starts typing in a room. This is an indication that they will likely be sending an event soon, and it’s lees likely that, for example, a user will leave the room in the short time between the user starting to type, and the user sending the event. If the user takes a while to write their message, the client may pre-send Megolm sessions multiple times, in case new users or devices join, or in case the Megolm session needs to be rotated.

Here, we provide a function that a client can call to pre-send a Megolm session in a room, which simply calls the _get_and_send_outbound_megolm_session function that we wrote earlier.

OlmMegolmEncryptionManager class methods:
async def pre_send_megolm_session(self, room_id: str) -> None:
    """Pre-send the Megolm session for a room.

    If the room does not yet have an outbound Megolm session, or the Megolm
    session needs to be rotated, a new one will be created.

    This can be used before a room event is ready to be sent, to reduce the
    number of devices that we need to send the Megolm session to

    Arguments:

    ``room_id``:
      the ID of the room
    """
    # FIXME: explain the timeout
    await self._get_and_send_outbound_megolm_session(room_id, 1000)
Tests

To test this, we will have Alice pre-send a Megolm session, and then send an encrypted room event. We check that Bob receives the key when it is pre-sent, but not when the room event is sent.

test megolm:
@pytest.mark.asyncio
async def test_pre_send_megolm_session(mock_aioresponse):
    room_state = {
        "m.room.encryption": {
            "": {
                "room_id": "!room_id",
                "type": "m.room.encryption",
                "state_key": "",
                "sender": "@alice:example.org",
                "event_id": "$encryption_event",
                "origin_server_ts": 1234567890123,
                "content": {
                    "algorithm": megolm.MEGOLM_ALGORITHM,
                    "rotation_period_msgs": 2,
                },
            },
        },
        "m.room.member": {
            "@alice:example.org": ALICE_ROOM_MEMBERSHIP,
            "@bob:example.org": BOB_ROOM_MEMBERSHIP,
        },
    }
    async with client.Client(
        storage={
            "access_token": "anaccesstoken",
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
            "room_state_tracker": {
                "!room_id": room_state,
            },
        },
        callbacks={},
        base_client_url="https://matrix-client.example.org/_matrix/client/",
    ) as alice:
        async with client.Client(
            storage={
                "access_token": "anaccesstoken",
                "user_id": "@bob:example.org",
                "device_id": "HIJKLMN",
                "room_state_tracker": {
                    "!room_id": room_state,
                },
            },
            callbacks={},
            base_client_url="https://matrix-client.example.org/_matrix/client/",
        ) as bob:
            {{test pre-send Megolm session}}

We create device keys, populate the clients’ device caches, and create encryption managers.

test pre-send Megolm session:
(
    alice_device_keys,
    alice_otks,
    alice_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, alice)

(
    bob_device_keys,
    bob_otks,
    bob_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, bob)

bob.storage["device_tracker.cache.@alice:example.org"] = alice.storage[
    "device_tracker.cache.@alice:example.org"
] = {
    "device_keys": {
        "ABCDEFG": alice_device_keys,
    }
}

bob.storage["device_tracker.cache.@bob:example.org"] = alice.storage[
    "device_tracker.cache.@bob:example.org"
] = {
    "device_keys": {
        "HIJKLMN": bob_device_keys,
    }
}

alice_device_tracker = devices.DeviceTracker(alice)

alice_encryption_manager = megolm.OlmMegolmEncryptionManager(
    alice,
    b"\x00" * 32,
    alice_device_keys_manager,
    alice_device_tracker,
)

bob_device_tracker = devices.DeviceTracker(alice)

bob_encryption_manager = megolm.OlmMegolmEncryptionManager(
    bob,
    b"\x00" * 32,
    bob_device_keys_manager,
    bob_device_tracker,
)

Here, we create the same callbacks as we did the test for sending a Megolm-encrypted event, except that in the to-device handler, we count the number of events that we have received, so that we can check that we only receive the Megolm session once.

test pre-send Megolm session:
to_device_count = 0

async def send_to_device_callback(url, **kwargs):
    nonlocal bob, to_device_count

    messages = kwargs["json"]["messages"]

    to_bob = messages.get("@bob:example.org", {}).get("HIJKLMN")
    if to_bob:
        to_device_count = to_device_count + 1
        await bob.publisher.publish(
            client.ToDeviceEvents(
                [
                    events.Event(
                        sender="@alice:example.org",
                        type="m.room.encrypted",
                        content=to_bob,
                    )
                ]
            )
        )

    return aioresponses.CallbackResult(
        status=200,
        body="{}",
        headers={
            "Content-Type": "application/json",
        },
    )

send_to_device_url_pattern = re.compile(
    r"^https://matrix-client.example.org/_matrix/client/v3/sendToDevice/m.room.encrypted/.*$"
)
mock_aioresponse.put(
    send_to_device_url_pattern,
    callback=send_to_device_callback,
    repeat=True,
)

room_events = []

def send_event_callback(url, **kwargs):
    nonlocal bob, room_events

    match = re.search(
        "^/_matrix/client/v3/rooms/([^/]+)/send/([^/]+)/",
        url.raw_path,
    )

    room_id = urllib.parse.unquote(match.group(1))
    event_type = urllib.parse.unquote(match.group(2))

    room_events.append((room_id, event_type, kwargs["json"]))

    return aioresponses.CallbackResult(
        status=200,
        body='{"event_id": "$event_id"}',
        headers={
            "Content-Type": "application/json",
        },
    )

send_event_url_pattern = re.compile(
    r"^https://matrix-client.example.org/_matrix/client/v3/rooms/[^/]+/send/.*$"
)
mock_aioresponse.put(
    send_event_url_pattern,
    callback=send_event_callback,
    repeat=True,
)

def keys_claim_callback(url, **kwargs):
    nonlocal bob_otks

    if kwargs["json"]["one_time_keys"] == {
        "@bob:example.org": {"HIJKLMN": "signed_curve25519"}
    }:
        otk_id, otk = [
            (id, key)
            for id, key in bob_otks.items()
            if id.startswith("signed_curve25519:")
        ][0]

        return aioresponses.CallbackResult(
            status=200,
            body=json.dumps(
                {
                    "one_time_keys": {
                        "@bob:example.org": {"HIJKLMN": {otk_id: otk}}
                    }
                }
            ),
            headers={"Content-Type": "application/json"},
        )
    else:
        return aioresponses.CallbackResult(
            status=400,
            body="unexpected response",
        )

mock_aioresponse.post(
    "https://matrix-client.example.org/_matrix/client/v3/keys/claim",
    callback=keys_claim_callback,
)

Alice then pre-sends the Megolm session for the room, and we ensure that Bob has received one to-device event, indicating that he has received the Megolm session.

test pre-send Megolm session:
await alice_encryption_manager.pre_send_megolm_session("!room_id")

assert to_device_count == 1

Finally, Alice encrypts and sends an event to the room, and we ensure that Bob has not received a new to-device event (indicating that Alice didn’t send the Megolm session to Bob again), and that he can decrypt the event (indicating that the pre-sent Megolm session was sent correctly).

test pre-send Megolm session:
await alice_encryption_manager.encrypt_and_send_room_event(
    "!room_id",
    "m.room.message",
    {
        "body": "Hello World!",
        "msgtype": "m.text",
    },
)

assert to_device_count == 1
assert len(room_events) == 1

room_id, event_type, content = room_events[0]

decrypted = await bob_encryption_manager.decrypt_room_event(
    events.RoomEvent(
        type=event_type,
        sender="@alice:example.org",
        room_id=room_id,
        content=content,
        event_id="$event_id",
        origin_server_ts=1234567890123,
    )
)

assert decrypted.content == {
    "body": "Hello World!",
    "msgtype": "m.text",
}

Discarding outbound sessions

Clients may wish to force the Megolm session in a room to rotate, apart from the times when it is automatically rotated. Some clients give users the ability to discard the current Megolm session for a room. This can be used, for example, if a device in the room has been known to be compromised, and the senders want to ensure that the device cannot read any future messages.

To force a Megolm session to rotate, we simply need to remove the outbound session for the room from the storage. The next time it tries to encrypt an event for that room, it will not be able to find an outbound Megolm session for it, and so will create a new one.

OlmMegolmEncryptionManager class methods:
def discard_session(self, room_id: str) -> None:
    """
    Discard the outbound Megolm session for a room

    This forces a new session to be created the next time we send an event to
    the room.

    Arguments:

    ``room_id``:
      the ID of the room
    """
    storage_key = f"outbound_megolm_session.{room_id}"
    if storage_key in self.client.storage:
        session_id = self.client.storage[storage_key]
        del self.client.storage[storage_key]
        del self.client.storage[f"outbound_megolm_session.{room_id}.{session_id}"]
Tests

To test this, we encrypt an event, call discard_session encrypt another event, and ensure that the session IDs for the two events are different.

Todo

test megolm:
@pytest.mark.asyncio
async def test_discard_session(mock_aioresponse):
    room_state = {
        "m.room.encryption": {
            "": {
                "room_id": "!room_id",
                "type": "m.room.encryption",
                "state_key": "",
                "sender": "@alice:example.org",
                "event_id": "$encryption_event",
                "origin_server_ts": 1234567890123,
                "content": {
                    "algorithm": megolm.MEGOLM_ALGORITHM,
                    "rotation_period_msgs": 2,
                },
            },
        },
        "m.room.member": {
            "@alice:example.org": ALICE_ROOM_MEMBERSHIP,
            "@bob:example.org": BOB_ROOM_MEMBERSHIP,
        },
    }
    async with client.Client(
        storage={
            "access_token": "anaccesstoken",
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
            "room_state_tracker": {
                "!room_id": {
                    "m.room.encryption": {
                        "": ROOM_ENCRYPTION_EVENT,
                    },
                    "m.room.member": {
                        "@alice:example.org": ALICE_ROOM_MEMBERSHIP,
                        "@bob:example.org": BOB_ROOM_MEMBERSHIP,
                    },
                },
            },
        },
        callbacks={},
        base_client_url="https://matrix-client.example.org/_matrix/client/",
    ) as alice:
        async with client.Client(
            storage={
                "access_token": "anaccesstoken",
                "user_id": "@bob:example.org",
                "device_id": "HIJKLMN",
                "room_state_tracker": {
                    "!room_id": room_state,
                },
            },
            callbacks={},
            base_client_url="https://matrix-client.example.org/_matrix/client/",
        ) as bob:
            {{test discard session}}

We set up the encryption manager for Alice and Bob.

test discard session:
(
    alice_device_keys,
    alice_otks,
    alice_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, alice)

(
    bob_device_keys,
    bob_otks,
    bob_device_keys_manager,
) = await _create_device_keys(mock_aioresponse, bob)

bob.storage["device_tracker.cache.@alice:example.org"] = alice.storage[
    "device_tracker.cache.@alice:example.org"
] = {
    "device_keys": {
        "ABCDEFG": alice_device_keys,
    }
}

bob.storage["device_tracker.cache.@bob:example.org"] = alice.storage[
    "device_tracker.cache.@bob:example.org"
] = {
    "device_keys": {
        "HIJKLMN": bob_device_keys,
    }
}

alice_device_tracker = devices.DeviceTracker(alice)

alice_encryption_manager = megolm.OlmMegolmEncryptionManager(
    alice,
    b"\x00" * 32,
    alice_device_keys_manager,
    alice_device_tracker,
)

bob_device_tracker = devices.DeviceTracker(alice)

bob_encryption_manager = megolm.OlmMegolmEncryptionManager(
    bob,
    b"\x00" * 32,
    bob_device_keys_manager,
    bob_device_tracker,
)

We set up handlers to handle the HTTP requests that Alice will make. This is similar to the handlers that we defined for previous tests, so we will not say much about them.

test discard session:
async def send_to_device_callback(url, **kwargs):
    nonlocal bob

    messages = kwargs["json"]["messages"]

    to_bob = messages.get("@bob:example.org", {}).get("HIJKLMN")
    if to_bob:
        await bob.publisher.publish(
            client.ToDeviceEvents(
                [
                    events.Event(
                        sender="@alice:example.org",
                        type="m.room.encrypted",
                        content=to_bob,
                    )
                ]
            )
        )

    return aioresponses.CallbackResult(
        status=200,
        body="{}",
        headers={
            "Content-Type": "application/json",
        },
    )

send_to_device_url_pattern = re.compile(
    r"^https://matrix-client.example.org/_matrix/client/v3/sendToDevice/m.room.encrypted/.*$"
)
mock_aioresponse.put(
    send_to_device_url_pattern,
    callback=send_to_device_callback,
    repeat=True,
)

room_events = []

def send_event_callback(url, **kwargs):
    nonlocal bob, room_events

    match = re.search(
        "^/_matrix/client/v3/rooms/([^/]+)/send/([^/]+)/",
        url.raw_path,
    )

    room_id = urllib.parse.unquote(match.group(1))
    event_type = urllib.parse.unquote(match.group(2))

    room_events.append((room_id, event_type, kwargs["json"]))

    return aioresponses.CallbackResult(
        status=200,
        body='{"event_id": "$event_id"}',
        headers={
            "Content-Type": "application/json",
        },
    )

send_event_url_pattern = re.compile(
    r"^https://matrix-client.example.org/_matrix/client/v3/rooms/[^/]+/send/.*$"
)
mock_aioresponse.put(
    send_event_url_pattern,
    callback=send_event_callback,
    repeat=True,
)

def keys_claim_callback(url, **kwargs):
    nonlocal bob_otks

    if kwargs["json"]["one_time_keys"] == {
        "@bob:example.org": {"HIJKLMN": "signed_curve25519"}
    }:
        otk_id, otk = [
            (id, key)
            for id, key in bob_otks.items()
            if id.startswith("signed_curve25519:")
        ][0]

        return aioresponses.CallbackResult(
            status=200,
            body=json.dumps(
                {
                    "one_time_keys": {
                        "@bob:example.org": {"HIJKLMN": {otk_id: otk}}
                    }
                }
            ),
            headers={"Content-Type": "application/json"},
        )
    else:
        return aioresponses.CallbackResult(
            status=400,
            body="unexpected response",
        )

mock_aioresponse.post(
    "https://matrix-client.example.org/_matrix/client/v3/keys/claim",
    callback=keys_claim_callback,
)

Alice sends an encrypted event to the room, discards the outgoing Megolm session, then sends another event. We ensure that the two events have different session IDs.

test discard session:
await alice_encryption_manager.encrypt_and_send_room_event(
    "!room_id",
    "m.room.message",
    {
        "body": "Hello World!",
        "msgtype": "m.text",
    },
)

alice_encryption_manager.discard_session("!room_id")

await alice_encryption_manager.encrypt_and_send_room_event(
    "!room_id",
    "m.room.message",
    {
        "body": "Hello again!",
        "msgtype": "m.text",
    },
)

(_, _, event_content0) = room_events[0]
(_, _, event_content1) = room_events[1]

assert event_content0["session_id"] != event_content1["session_id"]

Example: encrypted echo bot

We can now modify the echo bot that we wrote previously to handle encrypted rooms. As before, it relies on having previously logged in with the login script.

examples/encrypted_echo.py:
# {{copyright}}

"""Echo events, with encryption support"""

import asyncio
from base64 import b64decode, b64encode
import json
import logging
import os
import sys
import typing
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt

from matrixlib import client
from matrixlib import events
from matrixlib import megolm


{{json file storage}}


async def main() -> None:
    global clientref
    async with client.Client(storage=JsonFileStorage()) as c:
        {{encrypted echo example}}


asyncio.run(main())

Since we will be dealing with encrypted events, we create an OlmMegolmEncryptionManager. For the key, for encrypting the storage, we derive a key from a password. As this is just a demo, we hard-code the password, but in a real client, the password should be entered by the user, or the key should be stored securely in some way.

encrypted echo example:
salt_b64 = c.storage.get("storage_encryption_key_salt")
if salt_b64 is not None:
    salt = b64decode(salt_b64)
else:
    salt = os.urandom(32)
    c.storage["storage_encryption_key_salt"] = b64encode(salt).decode()

password = b"SecretPasswordDon'tTellAnyone"
key = Scrypt(
    length=32,
    salt=salt,
    n=2**20,
    r=8,
    p=1,
).derive(password)
encryption_manager = megolm.OlmMegolmEncryptionManager(c, key)

As with our non-encrypted echo bot, we create a subscriber to receive events. In our non-encrypted echo bot, we only handled m.room.message events; here, we have to handle both m.room.message events and m.room.encrypted events. With m.room.message events, we can simply process the event as-is, but with m.room.encrypted events, we need to decrypt the event and then process the decrypted event. If we fail to decrypt the event, we will log the failure to the console.

encrypted echo example:
async def timeline_subscriber(updates: client.RoomTimelineUpdates) -> None:
    for event in updates.events:
        if isinstance(event, events.StateEvent):
            continue
        elif event.type == "m.room.message":
            await process_message(event)
        elif event.type == "m.room.encrypted":
            try:
                decrypted = await encryption_manager.decrypt_room_event(event)
            except Exception as e:
                logging.error("unable to decrypt event:", e)
                continue
            if decrypted.type == "m.room.message":
                await process_message(decrypted)
encrypted echo example:
async def process_message(event: events.RoomEvent) -> None:
    {{encrypted echo bot: process message}}

When we process message events, we ignore any invalid events that are not text events. We also ignore events sent by our own user.

encrypted echo bot: process message:
if (
    "body" not in event.content
    or event.content.get("msgtype") != "m.text"
    or event.sender == c.user_id
):
    return

As before, if we get a message saying “!quit”, we take this as a signal to exit.

encrypted echo bot: process message:
if event.content["body"] == "!quit":
    c.stop_sync()
    return

Finally, we construct the echo. We check if the room is encrypted, and if so, we encrypt and send the echo event. Otherwise, we send the echo event unencrypted. If the encryption state for the room is invalid (for example, if encryption was disabled), we refuse to send events to the room, and log the error to the console.

encrypted echo bot: process message:
content = {"body": event.content["body"], "msgtype": "m.notice"}
try:
    is_encrypted = encryption_manager.is_room_encrypted(event.room_id)
except Exception as e:
    logging.error(
        "Room encryption is in an invalid state",
        e,
        "Refusing to send event",
    )
    return
if is_encrypted:
    await encryption_manager.encrypt_and_send_room_event(
        event.room_id, "m.room.message", content
    )
else:
    await c.send_event(event.room_id, "m.room.message", content)

We now register our event subscriber, start the sync, and wait until the sync task is done.

encrypted echo example:
c.publisher.subscribe(client.RoomTimelineUpdates, timeline_subscriber)
c.start_sync()

await typing.cast(asyncio.Task[None], c.sync_task)

If you run this script, it should echo back any messages as the previous echo bot did, but it should also work in encrypted rooms.