1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
// Copyright 2017, CZ.NIC z.s.p.o. (http://www.nic.cz/)
//
// This file is part of the pakon system.
//
// Pakon is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
//  (at your option) any later version.
//
// Pakon is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Pakon.  If not, see <http://www.gnu.org/licenses/>.

//! Traits used internally to query the data.
//!
//! As we have multiple sources of data (in-memory, the journal and the aggregated data, we define
//! several traits to handle them uniformly during the query.
//!
//! There are two important traits. One is for the containers (note that the in-memory is actually
//! multiple containers and the files on the disk is one container each), called
//! [`Container`](trait.Container.html). The other is on the bits of data ‒ flow slices ‒ which is
//! [`ValueSrc`](trait.ValueStr.html). They depend on further traits.
//!
//! The plan of how the filtering is done is as follows:
//!
//! * Each large container is first checked if it *may* contain flows in the given interval (so it
//!   must implement [`InTimeInterval`](trait.InTimeInterval.html).
//! * Then, it is checked if it *may* contain flows matching the filter. For that it must implement
//!   [`Container`](trait.Container.html). Note that some kinds of containers can have a trivial
//!   (eg `return MayUse`) implementation. Also, a container might refuse the query because it
//!   doesn't contain the right columns even if it may contain the relevant data.
//! * Each container that passes both preliminary checks is turned into an iterator with
//!   `into_iter` (from std).
//! * The iterator is iterated through and each item is checked for interval again (because the
//!   flows there might be shorter ‒ but if they span the whole length of the container, simple
//!   `return true` is enough) and is queried for relevant columns through the
//!   [`ValueSrc`](trait.ValueSrc.html), which are used both for filtering and aggregation.
//! * Each passing item is asked to provide `Stat` through the `ValueSrc` trait to add to
//!   the result.

#![deny(missing_docs)]

#[cfg(test)]
extern crate eui48;
extern crate libdata;
#[macro_use]
extern crate serde_derive;
#[cfg(test)]
#[macro_use]
extern crate serde_json;
#[cfg(test)]
extern crate test_help;

use std::cmp;
use std::time::{Duration, SystemTime};

use query::Query;
use libdata::column::{Ident, Value};
use libdata::time::Timeline;
use libdata::stats::Stats;

pub mod query;

/// A trait to check if the data fall into the queried time interval.
///
/// This is expected to be implemented on larger containers of potentially many flows as well as
/// on any flow slices provided by these containers. However, providing the actual interval is
/// enough, as there's the default implementation. Furthermore, slices spanning the whole container
/// might simply return `true`.
pub trait InTimeInterval {
    /// The interval of the data inside.
    ///
    /// The tuple is `(start, end)` and `start` must not be larger than `end`.
    fn interval(&self) -> (SystemTime, SystemTime);

    /// Provides the granularity of the container.
    ///
    /// This specifies in how long slices the container cuts time. Note that this may be different
    /// than the time covered by the container (as the container may contain multiple consecutive
    /// time slices). Also, even for flow slices, this provides the information about the
    /// container, not the duration of the flow.
    fn granularity(&self) -> Duration;

    /// Returns if the queried interval is for the data contained.
    ///
    /// Note that the queries interval is something like ℝ* ‒ it allows -∞ and ∞ as well.
    ///
    /// The slice is considered inside the query interval if it is either fully inside or if it
    /// overlaps by at least half of the granularity length (which is something like rounding to
    /// whole time slices when considering if it's inside or not). This might act in a seemingly
    /// wierd way for very short query intervals, as that would match nothing, but if we have very
    /// coarse data, we can't reasonably provide anything better.
    ///
    /// # Parameters
    ///
    /// * `start`: Start of the interval, or `None` if the interval is infinite to the left.
    /// * `end`: End of the interval, or `None` if it is infinite to the right.
    // Note that this method is tested as part of the implementation for time slice.
    fn in_time_interval(&self, start: Option<SystemTime>, end: Option<SystemTime>) -> bool {
        /*
         * my_{start, end} ‒ ends of my interval. Both ends must exist.
         * query.{start, end} ‒ ends of the interval of the query, but these might not exist (eg.
         *     are to infinity).
         * overlap_{start, end} ‒ ends of the part of time that overlaps. If there's no overlap,
         *     this naturally produces a crossed interval: end < start.
         */
        let (my_start, my_end) = self.interval();
        // We assume the time slice is in the interval if it overlaps by at least half of its
        // length (or, if it is fully inside).
        let full = self.granularity();
        let half = full / 2;

        // Precompute stuff to deal with the possible open start or end of the query interval.
        let (query_starts_before, overlap_start) = match start {
            None => (true, my_start),
            Some(query_start) => (query_start <= my_start, cmp::max(query_start, my_start)),
        };
        let (query_ends_after, overlap_end) = match end {
            None => (true, my_end),
            Some(query_end) => (query_end >= my_end, cmp::min(query_end, my_end)),
        };

        let fully_inside = query_starts_before && query_ends_after;

        let large_overlap = overlap_end.duration_since(overlap_start)
            .map(|overlap_len| overlap_len >= half)
            // In case overlap_end < overlap_start, this produces an error → turn it into false
            .unwrap_or(false);

        fully_inside || large_overlap
    }
}

/// Result of the `pre_filter` call from the [`PreFilter`](trait.PreFilter.html) trait.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum PreFilterResult {
    /// The container might contain the relevant data and it has all the needed columns, so it can
    /// be used.
    ///
    /// Note that the columns might be missing with *some* of the flow slices as being optional.
    MayUse,
    /// No relevant data may be inside.
    NoData,
    /// The data may be here, but some columns that whould be requested are not present.
    MissingColumns,
}

/// A trait to choose which containers are to be examined to find relevant data.
///
/// This needs to check only the contained columns of the data, the times are checked by the
/// [`InTimeInterval`](trait.InTimeInterval.html) trait independently.
// TODO: How do we specify that we want this to be IntoIter returning something that is ValueSrc?
pub trait Container: InTimeInterval {
    /// Given the provided query, *may* the container contain relevant data?
    fn pre_filter(&self, query: &Query) -> PreFilterResult;
    /// Provides the timeline of data inside this container.
    ///
    /// The timeline reflects how the container is split into time slices.
    fn timeline(&self) -> Timeline;
}

/// A trait to extract values out of flow slices.
pub trait ValueSrc: InTimeInterval {
    /// Extracts the given column.
    ///
    /// Note that the caller must first check the container (with the
    /// [`PreFilter`](trait.PreFilter.html) with the relevant query and must not
    /// ask for columns that are not accepted by the container (eg. if the container said
    /// `MissingColumns`. It is OK for the implementation to panic if it does.
    ///
    /// If the implementation signaled `MayUse` and it is missing some values in *some* flows
    /// (because they are optional), it shall produce empty set inside the value.
    ///
    /// Some implementation may produce multiple values if they have pre-aggregated input data. It
    /// must not happen to columns requested to aggregate by in the relevant `PreFilter` query.
    fn value(&self, ident: &Ident) -> Option<Value>;
    /// Provides the statistics.
    ///
    /// This provides the statistics (eg. sizes, speeds, etc) of the flow slice.
    fn stats(&self) -> &Stats;
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use super::*;
    use libdata::flow;

    /// Checks the detection of being in the interval.
    #[test]
    fn in_time() {
        enum FakeSlice {
            FullLength,
            VeryShort,
        }

        impl InTimeInterval for FakeSlice {
            fn interval(&self) -> (SystemTime, SystemTime) {
                match *self {
                    FakeSlice::FullLength => {
                        let start = flow::system_time_from_ms(2 * 1000 * 60);
                        let end = start + self.granularity();
                        (start, end)
                    },
                    FakeSlice::VeryShort => {
                        let start = flow::system_time_from_ms(2 * 1000 * 60 - 10);
                        let end = flow::system_time_from_ms(2 * 1000 * 60 + 10);
                        (start, end)
                    },
                }
            }
            fn granularity(&self) -> Duration {
                Duration::from_secs(60)
            }
        }
        // The middle of the interval of the full-length time slice
        let middle = 1000 * (2 * 60 + 60 / 2);

        // A check if the slice is in the given interval.
        let interval = |start: Option<u64>, end: Option<u64>, slice: FakeSlice| {
            slice.in_time_interval(start.map(flow::system_time_from_ms),
                                   end.map(flow::system_time_from_ms))
        };
        let full_interval = |start, end| interval(start, end, FakeSlice::FullLength);
        let short_interval = |start, end| interval(start, end, FakeSlice::VeryShort);
        // A query for the whole ethernity
        assert!(full_interval(None, None));
        // A query for the whole ethernity, but camouflaged
        assert!(full_interval(Some(0), None));
        // A query for 0-some time far in the future (relative to the slice)
        assert!(full_interval(None, Some(1_000_000_000)));
        assert!(full_interval(Some(0), Some(1_000_000_000)));
        assert!(full_interval(Some(middle - 1), None));
        assert!(full_interval(Some(middle - 1), Some(1_000_000_000)));
        assert!(!full_interval(Some(middle + 1), None));
        assert!(!full_interval(Some(middle + 1), Some(1_000_000_000)));
        assert!(full_interval(None, Some(middle + 1)));
        assert!(full_interval(Some(0), Some(middle + 1)));
        assert!(!full_interval(None, Some(middle - 1)));
        assert!(!full_interval(Some(0), Some(middle - 1)));
        assert!(!full_interval(Some(middle - 1), Some(middle + 1)));
        assert!(!full_interval(Some(middle + 1), Some(middle - 1)));
        assert!(!full_interval(Some(1_000_000_000), Some(2_000_000_000)));
        assert!(full_interval(Some(2 * 1000 * 60 + 1),
                              Some(3 * 1000 * 60 - 1)));
        // This one is inside the interval even when it is very short, because it fits fully inside
        assert!(short_interval(None, None));
        assert!(short_interval(Some(0), None));
        assert!(short_interval(None, Some(1_000_000_000)));
        assert!(short_interval(Some(0), Some(1_000_000_000)));
        // This is exactly the same, but it still counts as inside
        assert!(short_interval(Some(2 * 1000 * 60 - 10),
                               Some(2 * 1000 * 60 + 10)));
        // But this does not fit fully and is too short to count as interesting
        assert!(!short_interval(Some(2 * 1000 * 60 - 9),
                                Some(2 * 1000 * 60 + 10)));

        // Some degraded queries. These don't have any „inside“, so nothing really fits, but we
        // want to check it does't crash something.
        assert!(!short_interval(Some(2 * 1000 * 60),
                                Some(2 * 1000 * 60)));
        assert!(!short_interval(Some(1_000_000_000), Some(0)));
    }
}