Sending and receiving to-device events

Olm-encrypted events are not sent to a room. Instead, they are addressed to, and only delivered to, a specific device. These events are usually referred to as “to-device” events (after the property name in the /sync response), or “send-to-device” events (after the endpoint name used to send them).

Sending events

These events are sent using the PUT /sendToDevice/{eventType}/{txnId} endpoint. As with Sending events to a room, this endpoint takes a transaction ID do avoid sending the same events multiple times if the call is retried. This endpoint allows events to be sent to multiple recipients (one event per recipient), but they all must be of the same type. This is helpful as when we share a Megolm session key, we often send it to multiple devices, so this reduces the number of calls that we need to make.

Sending an event to the device ID * sends the event to all of a user’s devices.

Client class methods:
async def send_to_device(
    self,
    event_type: str,
    event_contents: dict[str, dict[str, dict]],
    retry_ms: int = 0,
    txn_id: typing.Optional[str] = None,
) -> None:
    """Send an events to devices

    Arguments:

    ``room_id``:
      the ID of the room to send to
    ``event_type``:
      the type of event to send
    ``event_contents``:
      a map from user ID to device ID to event content.  If "*" is used as the
      device ID, the event is sent to all of the user's devices.
    ``retry_ms``:
      how long to retry sending, in milliseconds
    """
    txn_id = txn_id or self.make_txn_id()
    url = self.url(f"v3/sendToDevice/{urlquote(event_type, '')}/{txn_id}")
    resp = await retry(
        retry_ms,
        self.authenticated,
        self.http_session.put,
        url,
        json={"messages": event_contents},
    )
    async with resp:
        status, resp_body = await check_response(resp)
Tests
test events:
@pytest.mark.asyncio
async def test_send_to_device_event(mock_aioresponse):
    async with client.Client(
        storage={
            "access_token": "anaccesstoken",
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
        },
        callbacks={},
        base_client_url="https://matrix-client.example.org/_matrix/client/",
    ) as c:
        pattern = re.compile(
            r"^https://matrix-client.example.org/_matrix/client/v3/sendToDevice/m.room.message/.*$"
        )
        mock_aioresponse.put(
            pattern,
            status=200,
            body="{}",
            headers={
                "content-type": "application/json",
            },
        )
        await c.send_to_device(
            "m.room.message",
            json.dumps({"@alice:example.org": {"HIJKLMNO": {"body": "Hello World!"}}}),
        )

Receiving events

As with room events, to-device events are received in the /sync response. They are included in the to_device property, and include the sender’s user ID, the event type, and the event content. Note that it does not include the sender’s device ID; for events where the identity of the sender’s device is required, the event content will need to include the information, either in the form of a device ID or an device key.

sync schema:
"to_device": schema.Optional(
    {
        "events": schema.Optional(
            schema.Array(events.Event.schema())
        )
    }
),

When we receive to-device events, we will publish a message that contains the events so that they can be processed.

process sync response:
if "to_device" in body and "events" in body["to_device"]:
    await self.publisher.publish(
        ToDeviceEvents(
            [events.Event(**e) for e in body["to_device"]["events"]]
        )
    )
client module classes:
class ToDeviceEvents(typing.NamedTuple):
    """A message indicating that to-device events have been received"""

    events: list[events.Event]


ToDeviceEvents.events.__doc__ = "The to-device events"

Todo

do we still need to do “catchup” syncs?

Tests
test sync:
@pytest.mark.asyncio
async def test_receive_to_device_events(mock_aioresponse):
    async with client.Client(
        storage={
            "access_token": "anaccesstoken",
            "user_id": "@alice:example.org",
            "device_id": "ABCDEFG",
        },
        callbacks={},
        base_client_url="https://matrix-client.example.org/_matrix/client/",
    ) as c:
        {{receive to-device events test}}
receive to-device events test:
mock_aioresponse.get(
    "https://matrix-client.example.org/_matrix/client/v3/sync?timeout=30000",
    status=200,
    body=json.dumps(
        {
            "to_device": {
                "events": [
                    {
                        "sender": "@alice:example.org",
                        "type": "m.room.message",
                        "content": {
                            "body": "Hello world!",
                        },
                    },
                ],
            },
            "next_batch": "token1",
        }
    ),
    headers={
        "content-type": "application/json",
    },
)
mock_aioresponse.get(
    "https://matrix-client.example.org/_matrix/client/v3/sync?since=token1&timeout=30000",
    status=200,
    body='{"next_batch":"token1"}',
    headers={
        "content-type": "application/json",
    },
    repeat=True,
)

def subscriber(msg) -> None:
    assert msg.events == [
        Event(
            sender="@alice:example.org",
            type="m.room.message",
            content={"body": "Hello world!"},
        )
    ]
    c.stop_sync()

c.publisher.subscribe(client.ToDeviceEvents, subscriber)

c.start_sync()

try:
    await c.sync_task
except asyncio.CancelledError:
    pass