Sending and receiving to-device events¶
Olm-encrypted events are not sent to a room. Instead, they are addressed to,
and only delivered to, a specific device. These events are usually referred to
as “to-device” events (after the property name in the /sync
response), or
“send-to-device” events (after the endpoint name used to send them).
Sending events¶
These events are sent using the PUT /sendToDevice/{eventType}/{txnId}
endpoint. As with Sending events to a room, this endpoint takes a transaction ID do
avoid sending the same events multiple times if the call is retried. This
endpoint allows events to be sent to multiple recipients (one event per
recipient), but they all must be of the same type. This is helpful as when we
share a Megolm session key, we often send it to multiple devices, so this
reduces the number of calls that we need to make.
Sending an event to the device ID *
sends the event to all of a user’s devices.
async def send_to_device(
self,
event_type: str,
event_contents: dict[str, dict[str, dict]],
retry_ms: int = 0,
txn_id: typing.Optional[str] = None,
) -> None:
"""Send an events to devices
Arguments:
``room_id``:
the ID of the room to send to
``event_type``:
the type of event to send
``event_contents``:
a map from user ID to device ID to event content. If "*" is used as the
device ID, the event is sent to all of the user's devices.
``retry_ms``:
how long to retry sending, in milliseconds
"""
txn_id = txn_id or self.make_txn_id()
url = self.url(f"v3/sendToDevice/{urlquote(event_type, '')}/{txn_id}")
resp = await retry(
retry_ms,
self.authenticated,
self.http_session.put,
url,
json={"messages": event_contents},
)
async with resp:
status, resp_body = await check_response(resp)
Tests
@pytest.mark.asyncio
async def test_send_to_device_event(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
pattern = re.compile(
r"^https://matrix-client.example.org/_matrix/client/v3/sendToDevice/m.room.message/.*$"
)
mock_aioresponse.put(
pattern,
status=200,
body="{}",
headers={
"content-type": "application/json",
},
)
await c.send_to_device(
"m.room.message",
json.dumps({"@alice:example.org": {"HIJKLMNO": {"body": "Hello World!"}}}),
)
Receiving events¶
As with room events, to-device events are received in the /sync
response.
They are included in the to_device
property, and include the sender’s user
ID, the event type, and the event content. Note that it does not include the
sender’s device ID; for events where the identity of the sender’s device is
required, the event content will need to include the information, either in the
form of a device ID or an device key.
"to_device": schema.Optional(
{
"events": schema.Optional(
schema.Array(events.Event.schema())
)
}
),
When we receive to-device events, we will publish a message that contains the events so that they can be processed.
if "to_device" in body and "events" in body["to_device"]:
await self.publisher.publish(
ToDeviceEvents(
[events.Event(**e) for e in body["to_device"]["events"]]
)
)
class ToDeviceEvents(typing.NamedTuple):
"""A message indicating that to-device events have been received"""
events: list[events.Event]
ToDeviceEvents.events.__doc__ = "The to-device events"
Todo
do we still need to do “catchup” syncs?
Tests
@pytest.mark.asyncio
async def test_receive_to_device_events(mock_aioresponse):
async with client.Client(
storage={
"access_token": "anaccesstoken",
"user_id": "@alice:example.org",
"device_id": "ABCDEFG",
},
callbacks={},
base_client_url="https://matrix-client.example.org/_matrix/client/",
) as c:
{{receive to-device events test}}
mock_aioresponse.get(
"https://matrix-client.example.org/_matrix/client/v3/sync?timeout=30000",
status=200,
body=json.dumps(
{
"to_device": {
"events": [
{
"sender": "@alice:example.org",
"type": "m.room.message",
"content": {
"body": "Hello world!",
},
},
],
},
"next_batch": "token1",
}
),
headers={
"content-type": "application/json",
},
)
mock_aioresponse.get(
"https://matrix-client.example.org/_matrix/client/v3/sync?since=token1&timeout=30000",
status=200,
body='{"next_batch":"token1"}',
headers={
"content-type": "application/json",
},
repeat=True,
)
def subscriber(msg) -> None:
assert msg.events == [
Event(
sender="@alice:example.org",
type="m.room.message",
content={"body": "Hello world!"},
)
]
c.stop_sync()
c.publisher.subscribe(client.ToDeviceEvents, subscriber)
c.start_sync()
try:
await c.sync_task
except asyncio.CancelledError:
pass