Journal entries

An Atom feed Atom feed of these posts is also available.

Internal flow control within message brokers

Implementing a quality message broker involves quite a bit of subtlety. Dealing with high-speed message publication, where either the server, downstream consumers, or both can become overloaded can be especially challenging.

These were problems that faced me when I was putting together the first versions of the RabbitMQ server, and the same issues crop up in servers that speak the Syndicate network protocol, too.

I recently came up with a really interesting and apparently effective approach to internal flow control while working on syndicate-server.

The idea is to associate all activity with an account that records how much outstanding work the server has left to do to fully complete processing of that activity.

  • Actors reading from e.g. network sockets call an ensure_clear_funds function on their associated Accounts. This will suspend the reader until enough of the account’s “debt” has been “cleared”. This will in turn cause the TCP socket’s read buffers to fill up and the TCP window to close, which will throttle upstream senders.

  • Every time an actor (in particular, actors triggered by data from sockets) sends an event to some other actor, a LoanedItem is constructed which “borrows” some credit from the sending actor’s nominated account.

  • Every time a LoanedItem is completely processed (in the Rust implementation, when it is dropped), its cost is “repaid” to its associated account.

  • But crucially, when an actor is responding to an event by sending more events, it charges the sent events to the account associated with the triggering event. This lets the server automatically account for fan-out of events.1

Example. Imagine an actor A receiving publications from a TCP/IP socket. If it ever “owes” more than, say, 5 units of cost on its account, it stops reading from its socket until its debt decreases. Each message it forwards on to another actor costs it 1 unit. Say a given incoming message M is routed to a dataspace actor D (thereby charging A’s account 1 unit), where it results in nine outbound events M′ to peer actors O1···O9.

Then, when D receives M, 1 unit is repaid to A’s account. When D sends M′ on to each of O1···O9, 1 unit is charged to A’s account, resulting in a total of 9 units charged. At this point in time, A’s account has had net +1−1+9 = 9 units withdrawn from it as a result of M’s processing.

Imagine now that all of O1···O9 are busy with other work. Then, next time around A’s main loop, A notices that its outstanding debt is higher than its configured threshold, and stops reading from its socket. As each of O1···O9 eventually gets around to processing its copy of M′, it repays the associated 1 unit to A’s account.2 Eventually, A’s account drops below the threshold, A is woken up, and it resumes reading from its socket.  □


Anecdotally, this appears to work well. Experimenting with producers sending as quickly as they can, producers are throttled by the server, and the server seems stable even though its consumers are not able to keep up with the unthrottled send rate of each producer.

It’ll be interesting to see how it does with other workloads!

  1. A corollary to this is that, for each event internal to the system, you can potentially identify the “ultimate cause” of the event: namely, the actor owning the associated account. 

  2. Of course, if O1, say, sends more events internally as a result of receiving M′, more units will be charged to A’s account! 

Common Syndicate protocols repository

Because there are a number of different Syndicate implementations, and they all need to interoperate, I’ve used Preserves Schema1 to define message formats that all the implementations can share.

You can find the common protocol definitions in a new git repository:

git clone https://git.syndicate-lang.org/syndicate-lang/syndicate-protocols

I used git-subtree to carve out a portion of the novy-syndicate source tree to form the new repository. It seems to be working well. In the projects that depend on the protocol definitions, I run the following every now and then to sync up with the syndicate-protocols repository:

1
2
3
4
git subtree pull -P protocols \
    -m 'Merge latest changes from the syndicate-protocols repository' \
    git@git.syndicate-lang.org:syndicate-lang/syndicate-protocols \
    main

Syndicated Actors for Python 3

Previously, the mini-syndicate package for Python 3 implemented an older version of the Syndicate network protocol.

A couple of weeks ago, I dusted it off, updated it to the new capability-oriented Syndicate protocol, and fleshed out its nascent Syndicated Actor Model code to be a full implementation of the model, including capabilities, object references, actors, facets, assertions and so on.

The new implementation makes heavy use of Python decorators to work around Python’s limited lambda forms and its poor support for syntactic extensibility. The result is surprisingly not terrible!

The revised codebase is different enough to the previous one that it deserves its own new git repository:

git clone https://git.syndicate-lang.org/syndicate-lang/syndicate-py

It’s also available on pypi.org, as package syndicate-py.

Updated Preserves for Python

As part of the work, I updated the Python Preserves implementation (for both python 2 and python 3) to include the text-based Preserves syntax as well as the binary syntax, plus implementations of Preserves Schema and Preserves Path. Version 0.7.0 or newer of the preserves package on pypi.org has the new features.

A little “chat” demo

(Based on chat.py in the repository.)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import sys
import asyncio
import random
import syndicate
from syndicate import patterns as P, actor, dataspace
from syndicate.schema import simpleChatProtocol, sturdy

Present = simpleChatProtocol.Present
Says = simpleChatProtocol.Says

ds_capability = syndicate.parse('<ref "syndicate" [] #[acowDB2/oI+6aSEC3YIxGg==]>')

@actor.run_system()
def main(turn):
    root_facet = turn._facet

    @syndicate.relay.connect(turn, '<tcp "localhost" 8001>', sturdy.SturdyRef.decode(ds_capability))
    def on_connected(turn, ds):
        me = 'user_' + str(random.randint(10, 1000))

        turn.publish(ds, Present(me))

        @dataspace.during(turn, ds, P.rec('Present', P.CAPTURE), inert_ok=True)
        def on_presence(turn, who):
            print('%s joined' % (who,))
            turn.on_stop(lambda turn: print('%s left' % (who,)))

        @dataspace.on_message(turn, ds, P.rec('Says', P.CAPTURE, P.CAPTURE))
        def on_says(turn, who, what):
            print('%s says %r' % (who, what))

        @turn.linked_task()
        async def accept_input(f):
            reader = asyncio.StreamReader()
            await actor.find_loop().connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
            while line := (await reader.readline()).decode('utf-8'):
                actor.Turn.external(f, lambda turn: turn.send(ds, Says(me, line.strip())))
            actor.Turn.external(f, lambda turn: turn.stop(root_facet))

Syndicated Actors for Rust (and a new extensible server implementation)

For my system layer project, I need a fast, low-RAM-usage, flexible, extensible daemon that speaks the Syndicate network protocol and exposes dataspace and system management services to other processes.

So I built one in Rust.

git clone https://git.syndicate-lang.org/syndicate-lang/syndicate-rs

It uses the Syndicated Actor Model internally to structure its concurrent activities. The actor model implementation is split out into a crate of its own, syndicate, that can be used by other programs. There is also a crate of macros, syndicate-macros, that makes working with dataspace patterns over Preserves values a bit easier.1

The syndicate crate is reasonably extensively documented. The server itself is documented here.

The implementation includes:

  1. Future work for syndicate-macros is to add syntactic constructs for easily establishing handlers for responding to assertions and messages, cutting out the boilerplate, in the same way that Racket’s syndicate macros do. In particular, having a during! macro would be very useful. 

Rust Preserves v1.0.0 released

As part of my other implementation efforts, I made enough improvements to the Rust Preserves implementation to warrant releasing version 1.0.0.

This release supports the Preserves data model and the binary and text codecs. It also includes a Preserves Schema compiler for Rust (for use e.g. in build.rs) and an implementation of Preserves Path.

There are four crates:

Preserves Path: a query language inspired by XPath

At the beginning of August, I designed a query language for Preserves documents inspired by XPath. Here’s the draft specification. To give just a taste of the language, here are a couple of example selectors:

1
2
3
.annotations ^ Documentation . 0 /

// [.^ [= Test + = NondeterministicTest]] [. 1 rec]

They’re properly explained in the Examples section of the spec.

So far, I’ve used Preserves Path to access portions of network packets received from the server in an implementation of the Syndicate network protocol for bash.

Syndicate for Bash

🙂 Really! The Syndicate network protocol is simple, easily within reach of a shell script plus e.g. netcat or openssl.

Here’s the code so far. The heart of it is about 90 SLOC, showing how easy it can be to interoperate with a Syndicate ecosystem.

Instructions are in the README, if you’d like to try it out!

The code so far contains functions to interact with a Syndicate server along with a small demo, which implements an interactive chat program with presence notifications.

Because Syndicate’s network protocol is polyglot, and the bash demo uses generic chat assertions, the demo automatically interoperates with other implementations, such as the analogous python chat demo.

The next step would be to factor out the protocol implementation from the demo and come up with a simple make install step to make it available for system scripting.

I actually have a real use for this: it’ll be convenient for implementing simple system monitoring services as part of a Syndicate-based system layer. Little bash script services could easily publish the battery charge level, screen brightness level, WiFi connection status, etc. etc., by reading files from /sys and publishing them to the system-wide Syndicate server using this library.

Services and Service Activation

One promising application of dataspaces is dependency tracking for orderly service startup.

The problem of service startup appears at all scales. It could be services cooperating within a single program and process; services cooperating as separate processes on a single machine; containers running in a distributed system; or some combination of them all.

Syndicate programs are composed of multiple services running together, with dependencies on each other, so it makes sense to express service dependency tracking and startup within the programming language.

In the following, I’ll sketch service dependency support for cooperating modules within a single program and process. The same pattern can be used in larger systems; the only essential differences are the service names and the procedures for loading and starting services.

A scenario

Let’s imagine we have the following situation:

G program Top level program syndicate/drivers/tcp syndicate/drivers/tcp program->syndicate/drivers/tcp syndicate/drivers/timer syndicate/drivers/timer program->syndicate/drivers/timer syndicate/drivers/stream syndicate/drivers/stream syndicate/drivers/tcp->syndicate/drivers/stream

A program we are writing depends on the “tcp” service, which in turn depends on the “stream” service. Separately, the top-level program depends on the “timer” service.

Describing the data and protocol

A small protocol for services and service activations describes the data involved:

RequireService = <require-service @service-name any>.
ServiceRunning = <service-running @service-name any>.

An asserted RequireService record indicates demand for a running instance of the named service; an asserted ServiceRunning record indicates presence of the same; and interest in a ServiceRunning implies assertion of a RequireService.

A library “service manager” process, started alongside the top level program, translates observed interest in ServiceRunning into RequireService, and then translates observed RequireService assertions into service startup actions and provision of matching ServiceRunning assertions.

1
2
3
4
5
6
(during (Observe (:pattern (ServiceRunning ,(DLit $service-name))) _)
  (assert (RequireService service-name)))

(during/spawn (RequireService $service-name)
  ;; ... code to load and start the named service ...
  )

Putting these pieces together, we can write a program that waits for a service called 'some-service-name to be running as follows:

1
(during (ServiceRunning 'some-service-name) ...)

When the service appears, the facet in the ellipsis will be started, and if the service crashes, the facet will be stopped (and restarted if the service is restarted).

Services can wait for their own dependencies, of course. This automatically gives a topologically sorted startup order.

Modules as services, and macros for declaring dependencies

In the Syndicate/rkt implementation, a few standard macros and functions implement the necessary protocols.

First, services can be required using a with-services macro:

1
2
3
4
(with-services [syndicate/drivers/tcp
                syndicate/drivers/timer]
  ;; ... body expressions ...
  )

Second, each Racket module can offer a service named after the module by using a provide-service macro at module toplevel. For example, in the syndicate/drivers/tcp Racket module, we find the following form:

1
2
3
4
5
(provide-service [ds]
  (with-services [syndicate/drivers/stream]
    (at ds
      ;; ... set up tcp driver subscriptions ...
      )))

Finally, the main entry point to a Syndicate/rkt program can use a standard-actor-system macro to arrange for the startup of the “service manager” process and a few of the most frequently-used library services:

1
2
3
4
(standard-actor-system [ds]
  ;; ... code making use of a pre-made dataspace (ds) and
  ;;     preloaded standard services ...
  )

Implementing the SSH protocol in Syndicate

This past week I have been dusting off my old implementation of the SSH protocol in order to exercise the new Syndicated Actor design and implementations. You can find the new SSH code here. (You’ll need the latest Syndicate/rkt implementation to run it.)

The old SSH code used a 2014 dataspace-like language dialect called “Marketplace”, which shared some concepts with modern Syndicate but relied much more heavily on a pure-functional programming style within each actor. The new code uses mutability within each actor where it makes sense, and takes advantage of Syndicate DSL features like facets and dataflow variables that I didn’t have back in 2014 to make the code more modular and easier to read and work with.

Big changes include:

plus a couple of small Syndicate/rkt syntax changes (renaming when to on, making the notion of “event expander” actually useful, and a new once form for convenient definition of state-machine-like behaviour).

Diagram of Syndicate Features

I just found an old diagram, part of a talk on Marketplace I gave at RacketCon back in 2013, which relates a bunch of ideas that all fall under the broader Syndicate umbrella. Here it is:

Syndicate Features

I quite like it. I also think it’s interesting how I had so many of the core ideas already in place as far back as 2013.