The Client class¶
We create a Client
class, which will represent a user’s connection to the
homeserver. It will be used to handle requests to the homeserver, such as
sending and receiving messages, ensuring that the requests are properly
authenticated.
class Client:
"""Represents a client connection"""
{{Client class methods}}
Our client will need to persist some data. For example, when a user logs in, closes the client, and later re-opens the client, the client should remember what user it was logged in as. We will leave it up to the application to define the exact mechanics for persisting, such as using a flat file or a database, but we will create a protocol that the storage layer will need to follow so that we can use it. We will use a simple key-value storage system, as this will be sufficient for our purposes; a more advanced client library might want to use something more sophisticated.
Since we are using a key-value system, our storage protocol will use Python’s
standard item access
notation
(i.e. we will access things using storage[key]
), as well as a get
method
that will retrieve a stored value or return a default if no value is found.
(One consequence of this is that it will allow us to use Python dict
s as
storage in our tests.) We also need a method to clear the storage when the
user is logged out; for this, we will use a method called clear
, again
matching the dict
interface.
For simplicity, we will assume that our storage system is able to access data immediately; a real client library would want to keep frequently-accessed data (such as the homeserver’s base URL) in memory. Also, in a real Matrix library, interacting with the storage would likely require asynchronous code, and may wish to include features such as transactions to ensure consistency of the data.
Note
If the storage is made asynchronous, some of the initialization functions may need to be reorganized from what is given in this book. Several of the initialization functions load data from the storage, but many languages do not allow initialization functions/constructors to be asynchronous. Some possible solutions are:
spawning a task in the initialization function that loads the data, and making every member function wait until the data is loaded before proceeding;
loading the data the first time a member function is called;
adding another initialization function to load the data, and that must be called before calling any other member function.
class Storage(typing.Protocol):
"""Protocol for classes that persist data for the client"""
def __getitem__(self, key: str) -> typing.Any:
... # pragma: no cover
def __setitem__(self, key: str, value: typing.Any) -> None:
... # pragma: no cover
def __delitem__(self, key: str) -> None:
... # pragma: no cover
def __contains__(self, key: str) -> bool:
... # pragma: no cover
def get(self, key: str, default: typing.Any = None) -> typing.Any:
... # pragma: no cover
def clear(self) -> None:
... # pragma: no cover
Let’s create an initializer for our Client
class, which will take as
arguments:
a storage object;
optionally, a set of callbacks, which will be called when the client needs the application to do something (the callbacks will be defined in the places where we call them); and,
optionally, the output of our
discover
function.
These latter arguments should be included only when creating a new session. The initializer will store these values in the storage object, so when the client is re-created it will read have access to the previously-stored values.
The initializer will also create an aiohttp
session that will be used for the
client’s requests.
def __init__(
self,
storage: "Storage",
callbacks: "Callbacks" = {},
base_client_url: typing.Optional[str] = None,
well_known: typing.Optional[dict[str, typing.Any]] = None,
versions: typing.Optional[dict[str, typing.Any]] = None,
):
"""
Arguments:
``storage``:
The storage object to use
``callbacks``:
Functions that will be called to perform processing needed by the
client
``base_client_url``:
The base URL to use to connect to the server, including the
``_matrix/client/`` suffix. If omitted, the client must have
previously logged in and stored the base URL in the storage
``well_known``:
The contents of the ``.well-known`` file obtained from server
discovery.
``versions``:
The result of calling ``/_matrix/client/versions`` on the server,
for example as returned by the ``discover`` function.
"""
self.storage = storage
if base_client_url:
self.storage["base_client_url"] = base_client_url
if well_known:
self.storage["well_known"] = well_known
if versions:
self.storage["versions"] = versions
else:
if "base_client_url" not in storage:
raise RuntimeError("No base URL available")
self.callbacks = callbacks
self.http_session = aiohttp.ClientSession()
{{Client class initialization}}
class Callbacks(typing.TypedDict, total=False):
"""Client callback functions"""
{{client callbacks}}
While we’re here, we’ll also write a function to clean up our resources when we’re done with the client.
async def close(self) -> None:
"""Free up resources used by the client."""
{{client cleanup}}
await self.http_session.close()
And we create the __aenter__
and __aexit__
methods needed to use our class
with the async with
statement.
async def __aenter__(self) -> "Client":
return self
async def __aexit__(self, *ignored: typing.Any) -> None:
await self.close()
We will also write a convenience function that, given an endpoint name, returns the full URL. We will use this function whenever we need to call an endpoint.
def url(self, endpoint: str) -> str:
"""Returns the full URL of a given endpoint"""
return urljoin(self.storage["base_client_url"], endpoint)
Publish/Subscribe¶
As the client runs, it needs a mechanism of letting the application know that
certain things have happened, such as when new events arrive in a room. To do
this, we will use the publish-subscribe
pattern (or
pubsub). In this pattern, a publisher publishes messages to notify others that
something has happened; subscribers can subscribe to the publisher and receive
the messages. This is somewhat similar to the callbacks that we passed to the
Client
object when we created it, except that 1) multiple subscribers can be
subscribed to each message type, whereas there is at most one callback per
type; and 2) subscribers can subscribe and unsubscribe at any time, whereas
callbacks usually cannot change. In some languages, this functionality is
known as “events”, with the publisher called an “event emitter” and the
subscriber called a “handler” or “listener”. We will avoid the “event” term
here, to avoid confusion with Matrix events.
We will create a fairly simple publisher class. Our messages will be objects, and when we subscribe, we will pass in the class of objects that we are interested in (or a tuple of objects, to match multiple classes), as well as the subscriber function itself.
In our case, we want to allow subscribers to be async
functions, we want to
notify the subscribers in sequence, and we want to wait until all the
subscribers have been notified and finished their processing.
# {{copyright}}
"""Implementation of the publish-subscribe pattern"""
import inspect
import logging
import sys
import typing
class Publisher:
"""Publish messages that can be subscribed to.
Subscribers indicate which message types they are interested in by
specifying the class of the messages."""
def __init__(self):
self.subscribers = []
self.id = 0
def subscribe(
self,
message_type: typing.Union[type, typing.Tuple[type, ...]],
subscriber: typing.Callable[
[typing.Any], typing.Union[None, typing.Awaitable[None]]
],
) -> int:
"""Subscribe to a message type
Returns an ID that can be used to unsubscribe
"""
self.id += 1
self.subscribers.append((message_type, subscriber, self.id))
return self.id
def unsubscribe(self, id: int) -> bool:
"""Unsubscribe from a message type
Returns whether or not the subscriber was found.
"""
idx = 0
for _, _, subscriber_id in self.subscribers:
if subscriber_id == id:
del self.subscribers[idx]
return True
idx += 1
return False
async def publish(self, message: typing.Any) -> None:
"""Publish a message to the subscribers"""
for message_type, subscriber, _ in self.subscribers:
if issubclass(message.__class__, message_type):
try:
res = subscriber(message)
if inspect.isawaitable(res):
await res
except:
# we don't want a misbehaving subscriber to bring down the
# whole system, but we need to record the error somehow
e = sys.exc_info()[1]
logging.error("Subscriber threw an exception: %s", e)
Our Client
class will have a publisher
member that will be used for
publishing and subscribing to messages.
self.publisher = pubsub.Publisher()
We can use this mechanism to notify components when the Client
object is
closed. This can be used, for example, to clean up resources or to stop tasks.
await self.publisher.publish(ClientClosed())
class ClientClosed:
"""A message indicating that the client is closed"""
def __eq__(self, other):
return other.__class__ == ClientClosed
To subscribe to this message, the application could create a subscriber
function to
perform an action when the message is sent, and call
client.publisher.subscribe(ClientClosed, subscriber)
to register the subscriber.