High level aggregator internals
This describes how the aggregator works on a high level. It doesn't talk about concrete classes or so, mostly only about concepts.
1 Goals
The aggregator shall read flow reports from external sources. It then aggregates them and provides history with ability to query it. It also allows querying the current „live“ data.
While the recent history can be relatively detailed, the older records get gradually less extensive.
Also, the aggregator should be lightweight (it'll run on a router ‒ a powerful router, but still with limited RAM and disk space). It shouldn't write to the permanent storage too much and too often and it should prefer appending over updating somewhere in the middle, as the storage is flash.
On-disk format needs to be extensible in some way, as we may need more features in the future.
The design aims at simplicity where possible.
2 Terminology
- A flow is communication between two endpoints, usually described by a tuple of IP addresses, ports and a protocol. This is a generalized TCP connection (flows work on UDP too).
- A live flow is such that hasn't yet ended or timed out. We expect more communication to happen on it. A flow that ended and we expect no more packets to come through it is dead.
- A flow snapshot is a report from the external world containing information about a live or dying flow up to that time (eg. IP addresses, relevant domain names, amount of data transferred, etc).
- A time slice is a fixed-length interval of time. The usual time in flow is continuous, which poses a slight problem. For simplicity, we split the time into slices, which are fixed-sized intervals that cover the whole span of time we are interested in, but don't overlap. The length of the interval may be different in different contexts (eg. depending on what we do, we chop the time at a different granularity).
- A flow slice is a similar operation applied to a flow. We cut the flows as they span across multiple time slices. Note that the flows don't necessarily start and end at the time slice boundaries, therefore the ends may be shorter than the relevant time slice.
- A flow group is a set of flow slices having the same time slice and common set of tag values (eg. all flows at time X-Y between these 2 IP addresses). The flow group has summed metrics for the flows and doesn't hold separate information about each flow. Each flow slice is implicitly a trivial flow group.
- A proto-slice is yet unfinished slice ‒ one that has started, but hasn't ended yet. After some time it is finished and becomes a slice. At that time we start a new proto-slice.
- A tag is a bit of information about a flow that doesn't change during its lifetime. An example could be the IP addresses of the endpoints.
- A metric or statistic is a bit of information that does change during the flow's lifetime ‒ like the amount of data transferred or the current speed.
- A bucket is a storage for flow groups with their tags and metrics. A bucket contains only flow groups with the same interval length (it has only one granularity) and the slices cover a continuous interval of time. It may be restricted to only subset of tag values (for example only flows with even IP addresses) and it doesn't have to keep all the tags or metrics that the flow originally had, but is otherwise complete (eg. a bucket is specified by the interval, granularity, tag values and stored tags and contains all the flow groups that match such criteria).
- Aggregation is the process of taking flow groups of some granularity and tags and producing less granular (longer) flow slices optionally with less tags. The metrics are summed (or averaged, or something similar, depending on what makes sense for the given metric) whenever more groups are put together. The restriction is the new length of the slice interval must be a multiple of the old one. The input of aggregation is usually more than one bucket, the output may also be.
- Joining of buckets is a trivial form of aggregation that doesn't change anything, only takes all the flow groups from multiple buckets and puts them into a single one.
3 Handling of the snapshots
The aggregator keeps a cache of live flows. It also keeps one time proto-slice.
When a snapshot arrives, the relevant flow is looked up in the cache.
- If it exists in the cache, the cached data is compared with info in the record. The difference is added to the flow in the proto-slice.
- If it doesn't exist, it is created in the cache. If the time of the flow is short enough (the report contains both the time of the last flow update and when it started), the whole of the flow is added to the proto-slice. If it runs for some time, it means we missed the start (maybe we just started) and we want to ignore whatever happened up until now, so nothing is added to the cache.
Furthermore, if the flow is dying (which is marked in the report), we remove it from the cache. Otherwise we update the information in the cache.
Then, we notify all the internal computations about the events and let them produce some more updates.
4 The time flow
As the time goes, we produce new time slices (each containing bunch of flow slices) by finishing proto-slices. Such time slice naturally forms a trivial bucket. We keep number of the newest buckets in memory for requests about current traffic. Each time we drop only the oldest one.
Once in a while we join these buckets and put them into a file. These files form something like a journal.
Once in a longer while we take the data in the journal files and aggregate them, providing a second tier of data. We do a similar thing with the second tier to form a third one (obviously even less often).
The aggregation steps are preconfigured somehow, eg. they are not adaptive in some way (they could be, in the future). The number of tiers is not limited by the design.
The last step is dropping the oldest buckets.
5 Querying
A query is very similar to how aggregation works. However, instead of taking all the data for aggregation, it is first filtered.
The other interesting challenge about querying is choosing the right buckets to scan. For that we build an index of what is in which bucket file we have at startup and keep it up to date. We scan the file metadata from the fine grained to coarse and pick the ones that match our criteria. We may need multiple files at the same time interval (because they might cover different tag values which we are interested in). On the other hand we may need to omit some as it would cover the same data (eg. if we have alternatives to compute what we need by summing it either by tag1 or by tag2). Currently we just pick the first viable alternative, but in the future we may want to pick one that has the smallest total file size.
6 Subscriptions
We simply run the query repeatedly whenever we produce a new time slice.
7 File format
Each bucket has its own file. The file is first created in a staging area and moved into place atomically once done. That way we won't have partial files.
Each file starts with several headers.
The first header describes format version and a canary magic number (so we don't get confused by files that are not ours).
The second describe the length of slices, the timestamp of the first one and number of them. It also describes what tags were used for aggregation and what additional tags are included.
The data follow next and span to the end of the file.
The exact format (eg. entry framing) is not yet decided. But some ideas include:
- Have the file compressed (eg. gzip)
- Use msgpack for the fields
8 Scheduling
There's a communication thread that runs a message loop. That one both reads data from the guts daemon and communicates with downstream clients.
There's also a thread pool to perform long-running tasks. Both queries and aggregations are sent there (not yet, to be done). As a result, queries may be responded asynchronously and out of order.
9 Communication
We use json RPC 2.0 for both communication paths.
10 Startup
When the daemon starts up, it scans all the available files. It creates an index of what is where, to allow scheduling how to do queries. Also, it drops any unnecessary files (for example if the previous run crashed while aggregating, if the results are there and the source buckets also, the source ones may be dropped).
In future versions we would also schedule migration of file format, if we find something in older format than we support.