High level aggregator internals

This describes how the aggregator works on a high level. It doesn't talk about concrete classes or so, mostly only about concepts.

1 Goals

The aggregator shall read flow reports from external sources. It then aggregates them and provides history with ability to query it. It also allows querying the current „live“ data.

While the recent history can be relatively detailed, the older records get gradually less extensive.

Also, the aggregator should be lightweight (it'll run on a router ‒ a powerful router, but still with limited RAM and disk space). It shouldn't write to the permanent storage too much and too often and it should prefer appending over updating somewhere in the middle, as the storage is flash.

On-disk format needs to be extensible in some way, as we may need more features in the future.

The design aims at simplicity where possible.

2 Terminology

3 Handling of the snapshots

The aggregator keeps a cache of live flows. It also keeps one time proto-slice.

When a snapshot arrives, the relevant flow is looked up in the cache.

Furthermore, if the flow is dying (which is marked in the report), we remove it from the cache. Otherwise we update the information in the cache.

Then, we notify all the internal computations about the events and let them produce some more updates.

4 The time flow

As the time goes, we produce new time slices (each containing bunch of flow slices) by finishing proto-slices. Such time slice naturally forms a trivial bucket. We keep number of the newest buckets in memory for requests about current traffic. Each time we drop only the oldest one.

Once in a while we join these buckets and put them into a file. These files form something like a journal.

Once in a longer while we take the data in the journal files and aggregate them, providing a second tier of data. We do a similar thing with the second tier to form a third one (obviously even less often).

The aggregation steps are preconfigured somehow, eg. they are not adaptive in some way (they could be, in the future). The number of tiers is not limited by the design.

The last step is dropping the oldest buckets.

5 Querying

A query is very similar to how aggregation works. However, instead of taking all the data for aggregation, it is first filtered.

The other interesting challenge about querying is choosing the right buckets to scan. For that we build an index of what is in which bucket file we have at startup and keep it up to date. We scan the file metadata from the fine grained to coarse and pick the ones that match our criteria. We may need multiple files at the same time interval (because they might cover different tag values which we are interested in). On the other hand we may need to omit some as it would cover the same data (eg. if we have alternatives to compute what we need by summing it either by tag1 or by tag2). Currently we just pick the first viable alternative, but in the future we may want to pick one that has the smallest total file size.

6 Subscriptions

We simply run the query repeatedly whenever we produce a new time slice.

7 File format

Each bucket has its own file. The file is first created in a staging area and moved into place atomically once done. That way we won't have partial files.

Each file starts with several headers.

The first header describes format version and a canary magic number (so we don't get confused by files that are not ours).

The second describe the length of slices, the timestamp of the first one and number of them. It also describes what tags were used for aggregation and what additional tags are included.

The data follow next and span to the end of the file.

The exact format (eg. entry framing) is not yet decided. But some ideas include:

8 Scheduling

There's a communication thread that runs a message loop. That one both reads data from the guts daemon and communicates with downstream clients.

There's also a thread pool to perform long-running tasks. Both queries and aggregations are sent there (not yet, to be done). As a result, queries may be responded asynchronously and out of order.

9 Communication

We use json RPC 2.0 for both communication paths.

10 Startup

When the daemon starts up, it scans all the available files. It creates an index of what is where, to allow scheduling how to do queries. Also, it drops any unnecessary files (for example if the previous run crashed while aggregating, if the results are there and the source buckets also, the source ones may be dropped).

In future versions we would also schedule migration of file format, if we find something in older format than we support.