The Client class

We create a Client class, which will represent a user’s connection to the homeserver. It will be used to handle requests to the homeserver, such as sending and receiving messages, ensuring that the requests are properly authenticated.

client module classes:
class Client:
    """Represents a client connection"""

    {{Client class methods}}

Our client will need to persist some data. For example, when a user logs in, closes the client, and later re-opens the client, the client should remember what user it was logged in as. We will leave it up to the application to define the exact mechanics for persisting, such as using a flat file or a database, but we will create a protocol that the storage layer will need to follow so that we can use it. We will use a simple key-value storage system, as this will be sufficient for our purposes; a more advanced client library might want to use something more sophisticated.

Since we are using a key-value system, our storage protocol will use Python’s standard item access notation (i.e. we will access things using storage[key]), as well as a get method that will retrieve a stored value or return a default if no value is found. (One consequence of this is that it will allow us to use Python dicts as storage in our tests.) We also need a method to clear the storage when the user is logged out; for this, we will use a method called clear, again matching the dict interface.

For simplicity, we will assume that our storage system is able to access data immediately; a real client library would want to keep frequently-accessed data (such as the homeserver’s base URL) in memory. Also, in a real Matrix library, interacting with the storage would likely require asynchronous code, and may wish to include features such as transactions to ensure consistency of the data.

Note

If the storage is made asynchronous, some of the initialization functions may need to be reorganized from what is given in this book. Several of the initialization functions load data from the storage, but many languages do not allow initialization functions/constructors to be asynchronous. Some possible solutions are:

  • spawning a task in the initialization function that loads the data, and making every member function wait until the data is loaded before proceeding;

  • loading the data the first time a member function is called;

  • adding another initialization function to load the data, and that must be called before calling any other member function.

client module classes:
class Storage(typing.Protocol):
    """Protocol for classes that persist data for the client"""

    def __getitem__(self, key: str) -> typing.Any:
        ...  # pragma: no cover

    def __setitem__(self, key: str, value: typing.Any) -> None:
        ...  # pragma: no cover

    def __delitem__(self, key: str) -> None:
        ...  # pragma: no cover

    def __contains__(self, key: str) -> bool:
        ...  # pragma: no cover

    def get(self, key: str, default: typing.Any = None) -> typing.Any:
        ...  # pragma: no cover

    def clear(self) -> None:
        ...  # pragma: no cover

Let’s create an initializer for our Client class, which will take as arguments:

  • a storage object;

  • optionally, a set of callbacks, which will be called when the client needs the application to do something (the callbacks will be defined in the places where we call them); and,

  • optionally, the output of our discover function.

These latter arguments should be included only when creating a new session. The initializer will store these values in the storage object, so when the client is re-created it will read have access to the previously-stored values.

The initializer will also create an aiohttp session that will be used for the client’s requests.

Client class methods:
def __init__(
    self,
    storage: "Storage",
    callbacks: "Callbacks" = {},
    base_client_url: typing.Optional[str] = None,
    well_known: typing.Optional[dict[str, typing.Any]] = None,
    versions: typing.Optional[dict[str, typing.Any]] = None,
):
    """
    Arguments:

    ``storage``:
        The storage object to use
    ``callbacks``:
        Functions that will be called to perform processing needed by the
        client
    ``base_client_url``:
        The base URL to use to connect to the server, including the
        ``_matrix/client/`` suffix.  If omitted, the client must have
        previously logged in and stored the base URL in the storage
    ``well_known``:
        The contents of the ``.well-known`` file obtained from server
        discovery.
    ``versions``:
        The result of calling ``/_matrix/client/versions`` on the server,
        for example as returned by the ``discover`` function.
    """
    self.storage = storage
    if base_client_url:
        self.storage["base_client_url"] = base_client_url
        if well_known:
            self.storage["well_known"] = well_known
        if versions:
            self.storage["versions"] = versions
    else:
        if "base_client_url" not in storage:
            raise RuntimeError("No base URL available")

    self.callbacks = callbacks

    self.http_session = aiohttp.ClientSession()

    {{Client class initialization}}
client module classes:
class Callbacks(typing.TypedDict, total=False):
    """Client callback functions"""

    {{client callbacks}}

While we’re here, we’ll also write a function to clean up our resources when we’re done with the client.

Client class methods:
async def close(self) -> None:
    """Free up resources used by the client."""
    {{client cleanup}}
client cleanup:
await self.http_session.close()

And we create the __aenter__ and __aexit__ methods needed to use our class with the async with statement.

Client class methods:
async def __aenter__(self) -> "Client":
    return self

async def __aexit__(self, *ignored: typing.Any) -> None:
    await self.close()

We will also write a convenience function that, given an endpoint name, returns the full URL. We will use this function whenever we need to call an endpoint.

Client class methods:
def url(self, endpoint: str) -> str:
    """Returns the full URL of a given endpoint"""
    return urljoin(self.storage["base_client_url"], endpoint)

Publish/Subscribe

As the client runs, it needs a mechanism of letting the application know that certain things have happened, such as when new events arrive in a room. To do this, we will use the publish-subscribe pattern (or pubsub). In this pattern, a publisher publishes messages to notify others that something has happened; subscribers can subscribe to the publisher and receive the messages. This is somewhat similar to the callbacks that we passed to the Client object when we created it, except that 1) multiple subscribers can be subscribed to each message type, whereas there is at most one callback per type; and 2) subscribers can subscribe and unsubscribe at any time, whereas callbacks usually cannot change. In some languages, this functionality is known as “events”, with the publisher called an “event emitter” and the subscriber called a “handler” or “listener”. We will avoid the “event” term here, to avoid confusion with Matrix events.

We will create a fairly simple publisher class. Our messages will be objects, and when we subscribe, we will pass in the class of objects that we are interested in (or a tuple of objects, to match multiple classes), as well as the subscriber function itself.

In our case, we want to allow subscribers to be async functions, we want to notify the subscribers in sequence, and we want to wait until all the subscribers have been notified and finished their processing.

src/matrixlib/pubsub.py:
# {{copyright}}

"""Implementation of the publish-subscribe pattern"""

import inspect
import logging
import sys
import typing


class Publisher:
    """Publish messages that can be subscribed to.

    Subscribers indicate which message types they are interested in by
    specifying the class of the messages."""

    def __init__(self):
        self.subscribers = []
        self.id = 0

    def subscribe(
        self,
        message_type: typing.Union[type, typing.Tuple[type, ...]],
        subscriber: typing.Callable[
            [typing.Any], typing.Union[None, typing.Awaitable[None]]
        ],
    ) -> int:
        """Subscribe to a message type

        Returns an ID that can be used to unsubscribe
        """
        self.id += 1
        self.subscribers.append((message_type, subscriber, self.id))
        return self.id

    def unsubscribe(self, id: int) -> bool:
        """Unsubscribe from a message type

        Returns whether or not the subscriber was found.
        """
        idx = 0
        for _, _, subscriber_id in self.subscribers:
            if subscriber_id == id:
                del self.subscribers[idx]
                return True
            idx += 1
        return False

    async def publish(self, message: typing.Any) -> None:
        """Publish a message to the subscribers"""
        for message_type, subscriber, _ in self.subscribers:
            if issubclass(message.__class__, message_type):
                try:
                    res = subscriber(message)
                    if inspect.isawaitable(res):
                        await res
                except:
                    # we don't want a misbehaving subscriber to bring down the
                    # whole system, but we need to record the error somehow
                    e = sys.exc_info()[1]
                    logging.error("Subscriber threw an exception: %s", e)

Our Client class will have a publisher member that will be used for publishing and subscribing to messages.

Client class initialization:
self.publisher = pubsub.Publisher()

We can use this mechanism to notify components when the Client object is closed. This can be used, for example, to clean up resources or to stop tasks.

client cleanup:
await self.publisher.publish(ClientClosed())
client module classes:
class ClientClosed:
    """A message indicating that the client is closed"""

    def __eq__(self, other):
        return other.__class__ == ClientClosed

To subscribe to this message, the application could create a subscriber function to perform an action when the message is sent, and call

client.publisher.subscribe(ClientClosed, subscriber)

to register the subscriber.