1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
// Copyright 2017, CZ.NIC z.s.p.o. (http://www.nic.cz/) // // This file is part of the pakon system. // // Pakon is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // Pakon is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with Pakon. If not, see <http://www.gnu.org/licenses/>. //! Traits used internally to query the data. //! //! As we have multiple sources of data (in-memory, the journal and the aggregated data, we define //! several traits to handle them uniformly during the query. //! //! There are two important traits. One is for the containers (note that the in-memory is actually //! multiple containers and the files on the disk is one container each), called //! [`Container`](trait.Container.html). The other is on the bits of data ‒ flow slices ‒ which is //! [`ValueSrc`](trait.ValueStr.html). They depend on further traits. //! //! The plan of how the filtering is done is as follows: //! //! * Each large container is first checked if it *may* contain flows in the given interval (so it //! must implement [`InTimeInterval`](trait.InTimeInterval.html). //! * Then, it is checked if it *may* contain flows matching the filter. For that it must implement //! [`Container`](trait.Container.html). Note that some kinds of containers can have a trivial //! (eg `return MayUse`) implementation. Also, a container might refuse the query because it //! doesn't contain the right columns even if it may contain the relevant data. //! * Each container that passes both preliminary checks is turned into an iterator with //! `into_iter` (from std). //! * The iterator is iterated through and each item is checked for interval again (because the //! flows there might be shorter ‒ but if they span the whole length of the container, simple //! `return true` is enough) and is queried for relevant columns through the //! [`ValueSrc`](trait.ValueSrc.html), which are used both for filtering and aggregation. //! * Each passing item is asked to provide `Stat` through the `ValueSrc` trait to add to //! the result. #![deny(missing_docs)] #[cfg(test)] extern crate eui48; extern crate libdata; #[macro_use] extern crate serde_derive; #[cfg(test)] #[macro_use] extern crate serde_json; #[cfg(test)] extern crate test_help; use std::cmp; use std::time::{Duration, SystemTime}; use query::Query; use libdata::column::{Ident, Value}; use libdata::time::Timeline; use libdata::stats::Stats; pub mod query; /// A trait to check if the data fall into the queried time interval. /// /// This is expected to be implemented on larger containers of potentially many flows as well as /// on any flow slices provided by these containers. However, providing the actual interval is /// enough, as there's the default implementation. Furthermore, slices spanning the whole container /// might simply return `true`. pub trait InTimeInterval { /// The interval of the data inside. /// /// The tuple is `(start, end)` and `start` must not be larger than `end`. fn interval(&self) -> (SystemTime, SystemTime); /// Provides the granularity of the container. /// /// This specifies in how long slices the container cuts time. Note that this may be different /// than the time covered by the container (as the container may contain multiple consecutive /// time slices). Also, even for flow slices, this provides the information about the /// container, not the duration of the flow. fn granularity(&self) -> Duration; /// Returns if the queried interval is for the data contained. /// /// Note that the queries interval is something like ℝ* ‒ it allows -∞ and ∞ as well. /// /// The slice is considered inside the query interval if it is either fully inside or if it /// overlaps by at least half of the granularity length (which is something like rounding to /// whole time slices when considering if it's inside or not). This might act in a seemingly /// wierd way for very short query intervals, as that would match nothing, but if we have very /// coarse data, we can't reasonably provide anything better. /// /// # Parameters /// /// * `start`: Start of the interval, or `None` if the interval is infinite to the left. /// * `end`: End of the interval, or `None` if it is infinite to the right. // Note that this method is tested as part of the implementation for time slice. fn in_time_interval(&self, start: Option<SystemTime>, end: Option<SystemTime>) -> bool { /* * my_{start, end} ‒ ends of my interval. Both ends must exist. * query.{start, end} ‒ ends of the interval of the query, but these might not exist (eg. * are to infinity). * overlap_{start, end} ‒ ends of the part of time that overlaps. If there's no overlap, * this naturally produces a crossed interval: end < start. */ let (my_start, my_end) = self.interval(); // We assume the time slice is in the interval if it overlaps by at least half of its // length (or, if it is fully inside). let full = self.granularity(); let half = full / 2; // Precompute stuff to deal with the possible open start or end of the query interval. let (query_starts_before, overlap_start) = match start { None => (true, my_start), Some(query_start) => (query_start <= my_start, cmp::max(query_start, my_start)), }; let (query_ends_after, overlap_end) = match end { None => (true, my_end), Some(query_end) => (query_end >= my_end, cmp::min(query_end, my_end)), }; let fully_inside = query_starts_before && query_ends_after; let large_overlap = overlap_end.duration_since(overlap_start) .map(|overlap_len| overlap_len >= half) // In case overlap_end < overlap_start, this produces an error → turn it into false .unwrap_or(false); fully_inside || large_overlap } } /// Result of the `pre_filter` call from the [`PreFilter`](trait.PreFilter.html) trait. #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub enum PreFilterResult { /// The container might contain the relevant data and it has all the needed columns, so it can /// be used. /// /// Note that the columns might be missing with *some* of the flow slices as being optional. MayUse, /// No relevant data may be inside. NoData, /// The data may be here, but some columns that whould be requested are not present. MissingColumns, } /// A trait to choose which containers are to be examined to find relevant data. /// /// This needs to check only the contained columns of the data, the times are checked by the /// [`InTimeInterval`](trait.InTimeInterval.html) trait independently. // TODO: How do we specify that we want this to be IntoIter returning something that is ValueSrc? pub trait Container: InTimeInterval { /// Given the provided query, *may* the container contain relevant data? fn pre_filter(&self, query: &Query) -> PreFilterResult; /// Provides the timeline of data inside this container. /// /// The timeline reflects how the container is split into time slices. fn timeline(&self) -> Timeline; } /// A trait to extract values out of flow slices. pub trait ValueSrc: InTimeInterval { /// Extracts the given column. /// /// Note that the caller must first check the container (with the /// [`PreFilter`](trait.PreFilter.html) with the relevant query and must not /// ask for columns that are not accepted by the container (eg. if the container said /// `MissingColumns`. It is OK for the implementation to panic if it does. /// /// If the implementation signaled `MayUse` and it is missing some values in *some* flows /// (because they are optional), it shall produce empty set inside the value. /// /// Some implementation may produce multiple values if they have pre-aggregated input data. It /// must not happen to columns requested to aggregate by in the relevant `PreFilter` query. fn value(&self, ident: &Ident) -> Option<Value>; /// Provides the statistics. /// /// This provides the statistics (eg. sizes, speeds, etc) of the flow slice. fn stats(&self) -> &Stats; } #[cfg(test)] mod tests { use std::time::Duration; use super::*; use libdata::flow; /// Checks the detection of being in the interval. #[test] fn in_time() { enum FakeSlice { FullLength, VeryShort, } impl InTimeInterval for FakeSlice { fn interval(&self) -> (SystemTime, SystemTime) { match *self { FakeSlice::FullLength => { let start = flow::system_time_from_ms(2 * 1000 * 60); let end = start + self.granularity(); (start, end) }, FakeSlice::VeryShort => { let start = flow::system_time_from_ms(2 * 1000 * 60 - 10); let end = flow::system_time_from_ms(2 * 1000 * 60 + 10); (start, end) }, } } fn granularity(&self) -> Duration { Duration::from_secs(60) } } // The middle of the interval of the full-length time slice let middle = 1000 * (2 * 60 + 60 / 2); // A check if the slice is in the given interval. let interval = |start: Option<u64>, end: Option<u64>, slice: FakeSlice| { slice.in_time_interval(start.map(flow::system_time_from_ms), end.map(flow::system_time_from_ms)) }; let full_interval = |start, end| interval(start, end, FakeSlice::FullLength); let short_interval = |start, end| interval(start, end, FakeSlice::VeryShort); // A query for the whole ethernity assert!(full_interval(None, None)); // A query for the whole ethernity, but camouflaged assert!(full_interval(Some(0), None)); // A query for 0-some time far in the future (relative to the slice) assert!(full_interval(None, Some(1_000_000_000))); assert!(full_interval(Some(0), Some(1_000_000_000))); assert!(full_interval(Some(middle - 1), None)); assert!(full_interval(Some(middle - 1), Some(1_000_000_000))); assert!(!full_interval(Some(middle + 1), None)); assert!(!full_interval(Some(middle + 1), Some(1_000_000_000))); assert!(full_interval(None, Some(middle + 1))); assert!(full_interval(Some(0), Some(middle + 1))); assert!(!full_interval(None, Some(middle - 1))); assert!(!full_interval(Some(0), Some(middle - 1))); assert!(!full_interval(Some(middle - 1), Some(middle + 1))); assert!(!full_interval(Some(middle + 1), Some(middle - 1))); assert!(!full_interval(Some(1_000_000_000), Some(2_000_000_000))); assert!(full_interval(Some(2 * 1000 * 60 + 1), Some(3 * 1000 * 60 - 1))); // This one is inside the interval even when it is very short, because it fits fully inside assert!(short_interval(None, None)); assert!(short_interval(Some(0), None)); assert!(short_interval(None, Some(1_000_000_000))); assert!(short_interval(Some(0), Some(1_000_000_000))); // This is exactly the same, but it still counts as inside assert!(short_interval(Some(2 * 1000 * 60 - 10), Some(2 * 1000 * 60 + 10))); // But this does not fit fully and is too short to count as interesting assert!(!short_interval(Some(2 * 1000 * 60 - 9), Some(2 * 1000 * 60 + 10))); // Some degraded queries. These don't have any „inside“, so nothing really fits, but we // want to check it does't crash something. assert!(!short_interval(Some(2 * 1000 * 60), Some(2 * 1000 * 60))); assert!(!short_interval(Some(1_000_000_000), Some(0))); } }