Support for extending the aggregator

The core of the aggregator is designed to work in somewhat generic way. Some other parts can be exchanged or other implementations added without significant changes to the core or other areas.

In general, these exchangeable parts are enabled or disabled during compilation via feature flags.

Don't worry about not knowing certain types or parts this talks about. Other parts of documentation make that cleaner, once the whole thing is read, it should make sense. Or you can look them up in the API documentation.

At the time of writing this, there are these extendible places:

1 Columns or tags

Each flow can be tagged with different kinds of data, like IP addresses, domain names or a version of TLS encryption. These kinds are not baked in.

An extension that produces some tags brings its own types (or reuses some shared ones), implements the libdata::column::Type trait for them and registers them with libdata::column::register (or register_endpoint for types that attach to each end of a connection) upon its initialization.

From that time the data type can be attached to a flow and it'll participate in normal user queries, aggregation and so on.

There are some technical tricks to achieve this, but on the most basic level, each data type is wrapped into a wrapper type that implements the needed functionality in trait object safe way (see documentation for Rust's trait objects). On the outer side, these object safe methods are used to re-implement convenient interface. Therefore, all the calls are passed through a dynamic dispatch of traits.

2 Data sources

A data source is something that takes data from the outside world and brings them into the aggregator.

A data source is created in two phases. First, a factory object is built. Any parameters that are specific to that given data source are passed to it during the factory creation. This happens in the application's main. The factory object is then passed to the Reactor (the top-level event loop). Inside, it is provided with further parameters (common for all the data sources, like a logger or a channel to send updates). The columns provided by the data source should be registered during either of these phases.

The primary way to provide data is through the libflow::update::Update type. It describes which flow the data is related to, any new tags to add to it and optionally some new statistics. The aggregator takes updates from all its active data sources and combines them together.

The updates can be sent in two modes. One is a push mode, when the data source actively provides updates as they become available (for example by reading the data from some kind of socket), the other is pull mode. The aggregator asks the data source regularly to provide the newest snapshot of data just before the current time slice is closed and new one is started. It is up to the data source to choose what mode (or combination) it wants to support.

2.1 Guts data source

This connects to the pakon-guts daemon and provides the updates as they come in (eg. this is push only). This provides both the tags and statistics.

It is our historical (and currently only) data source, but we plan on replacing it in favor of suricata+conntrack combination. It probably makes sense to keep the data source around but not include it in compilation of the final distribution.

2.2 Suricata data source

Once written (eg. this is only planned now), it will provide tags in push-only mode. It is rather information rich (eg. it provides things like URLs in HTTP connections and common names of TLS certificates). However, it is unable to provide us with timely statistics ‒ it provides statistics only after the whole flow ended (and we want them every slice) and only if Suricata doesn't use the bypass optimisation. Therefore, it is planned to combine the data with the conntrack data source.

2.3 Conntrack data source

The conntrack in linux kernel optionally provides accounting information for flows ‒ basically what the aggregator considers to be statistics. Therefore, this data source will (once written) connect to the netlink socket and provide hybrid data source with only the statistics. The conntrack data can be both dumped (to get a snapshot at the end of each time slice) and a live streamed (to get ends of connections). This'll supply the other part of information for the Suricata data source.

3 Internal computations

Data sources provide data that arrive from the outside world. However, some further information may be computed internally, derived from other information (eg. cross-referencing different connections, combining multiple tags into something aggregated). The internal computations play this role.

An internal computation is informed about events ‒ mostly changes on the live flows currently in aggregator. As a reaction to an event it may peek into other live flows and their tags and issue updates (both of the flow causing the event and other flows).

Their creation is very similar to the data sources.

3.1 The best domain name computation

There may be multiple domain names corresponding to a single IP address. Therefore, this computation guesses what the primary (the one that the user is expecting to see) domain name is. The current heuristic is very simple (eg. the shortest one), but there might be some better ones in future (like tracking what the user actually requested through DNS, what are CNAMEs, …).

3.2 Ideas for future

4 Storage backends

Currently, there's only the in-memory backend for storing data. The querying interface is somewhat generic and will be possible to implement on the future data sources. But it is likely it'll need some tweaking and refactoring in that area (from both the interface and the usage point of view).

In addition to the in-memory backend, we will need the journal (where the time slices from the in-memory data source fall off regularly) and several tiers of indexed bucket files.