1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
// Copyright 2017, CZ.NIC z.s.p.o. (http://www.nic.cz/)
//
// This file is part of the pakon system.
//
// Pakon is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
//  (at your option) any later version.
//
// Pakon is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Pakon.  If not, see <http://www.gnu.org/licenses/>.

//! The time and flow slices and protoslices.
//!
//! This module splits time into slices and cuts the flows into corresponding bits and stuffs it in
//! them. It also keeps a proto-slice (one slice that hasn't finished yet).
//!
//! This is basically the part of the program that accumulates the data and transforms them into
//! something usable from the online stream of reports from pakon-guts.

use std::cmp;
use std::slice::Iter as SliceIter;
use std::time::{Duration, SystemTime};

use libdata::column::{Ident, FlowTags, Value};
use libdata::flow::{Count, Speed};
use libdata::time::Timeline;
use libdata::stats::{Stat, Stats};
use libquery::{Container, InTimeInterval, PreFilterResult, ValueSrc};
use libquery::query::Query;
use libutils::tunable::SLICE_LENGTH_SECONDS;

/// One flow slice.
///
/// This struct holds one flow slice ‒ an interval of a flow. Note that it does not have to ocupy
/// the full time slice and that the ends may be bit fuzzy.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Flow {
    /// The tags of the flow.
    ///
    /// The tags are things that don't change during the lifetime of the flow (that doesn't
    /// necessarily mean our knowledge of them can't change during the flow's lifetime). These are
    /// things like the IP addresses or domain names.
    ///
    /// This is shared between all the flow slices of a single flow, or at least of its in-memory
    /// representation.
    tags: FlowTags,
    /// The statistics (the part relevant to this time slice).
    stats: Stats,
}

impl Flow {
    /// Constructs new Flow
    ///
    /// It simply builds it from the relevant parts.
    ///
    /// # Params
    ///
    /// * tags: The tags of the flow. Note that due to interior mutability, the data there might
    ///   change in the future.
    /// * stats: Statistics (eg. how much was transferred, etc) during the slice. This is only in
    ///   the slice, not from the start of the flow.
    pub fn new(tags: FlowTags, stats: Stats) -> Self {
        Flow {
            tags,
            stats,
        }
    }
    /// Builds a dummy test flow.
    ///
    /// It is mostly empty, with 1 packet in the local→remote direction and set addresses. It is a
    /// TCP flow, with ports set to 1 and 2.
    ///
    /// # Parameters
    ///
    /// * id: Id of the new flow.
    /// * local: Local endpoint IP address for the new flow.
    /// * remote: Remote endpoint IP address for the new flow.
    #[cfg(test)]
    pub fn dummy(local: ::std::net::IpAddr, remote: ::std::net::IpAddr) -> Self {
        use std::time::UNIX_EPOCH;

        use libdata::column::{Local, Remote, Tags};
        use libdata::flow::{Bytes, Count, Direction, IpProto, IpProtoRaw, Port, Speed};
        use libdata::stats::Stat;

        let mut tags = Tags::new();
        tags.insert(Direction::In);
        tags.insert(Local(local));
        tags.insert(Local(Port(1)));
        tags.insert(Remote(remote));
        tags.insert(Remote(Port(2)));
        tags.insert(IpProto::Tcp);
        tags.insert(IpProtoRaw(16));

        let stats = Stats {
            dir_in: Stat {
                speed_duration: Duration::from_secs(5),
                flows: Count(1),
                flows_started: Count(0),
                size: Bytes(120),
                packets: Count(1),
                max_speed: Speed(24),
                start: Some(UNIX_EPOCH),
                end: Some(UNIX_EPOCH + Duration::from_secs(5)),
            },
            dir_out: Stat {
                flows: Count(1),
                start: Some(UNIX_EPOCH),
                end: Some(UNIX_EPOCH + Duration::from_secs(5)),
                ..Stat::default()
            },
        };

        Self::new(FlowTags::new(tags), stats)
    }
    /// Replace real times with a 5-second interval at the start of the time.
    ///
    /// As some of the real flows are produced with real-life time stamps in them, they are hard to
    /// compare to a constant. This replaces all such real times with preset interval `(0, 5s)`.
    ///
    /// This avoids testing the actual time processing, but it also makes testing the rest
    /// possible.
    #[cfg(test)]
    pub fn zero_time(&mut self) {
        use std::time::UNIX_EPOCH;

        self.stats.dir_in.start = Some(UNIX_EPOCH);
        self.stats.dir_in.end = Some(UNIX_EPOCH + Duration::from_secs(5));
        self.stats.dir_out.start = Some(UNIX_EPOCH);
        self.stats.dir_out.end = Some(UNIX_EPOCH + Duration::from_secs(5));
    }
}

// Note that we implement the trait on the reference, because the iterator from `Time` returns
// references.
impl<'a> ValueSrc for &'a Flow {
    fn value(&self, ident: &Ident) -> Option<Value> {
        self.tags.borrow().get(ident).cloned()
    }
    fn stats(&self) -> &Stats {
        &self.stats
    }
}

impl<'a> InTimeInterval for &'a Flow {
    fn interval(&self) -> (SystemTime, SystemTime) {
        // The statistics can be empty if they contain no flows. However, if we are *inside* a
        // flow, it makes no sense for them to be empty, so we can safely unwrap.
        let start = cmp::min(self.stats.dir_in.start.unwrap(), self.stats.dir_out.start.unwrap());
        let end = cmp::max(self.stats.dir_in.end.unwrap(), self.stats.dir_out.end.unwrap());
        (start, end)
    }
    fn granularity(&self) -> Duration {
        Duration::from_secs(SLICE_LENGTH_SECONDS)
    }
}

/// One time slice.
///
/// It is produced by the [`ProtoTime`](struct.ProtoTime.html).
#[derive(Debug, Eq, PartialEq)]
pub struct Time {
    start: SystemTime,
    end: SystemTime,
    flows: Vec<Flow>,
}

impl Time {
    /// Constructs a new time slice.
    ///
    /// It simply puts all the parts together.
    ///
    /// # Params
    ///
    /// * start: The time when the slice starts.
    /// * end: The time when the slice ends.
    /// * flows: The flows that live inside.
    ///
    /// # Panics
    ///
    /// If some of the flows sneak out of the `start`-`end` interval, if they don't have the
    /// interval set or if they are invalid in some other way.
    pub fn new(start: SystemTime, end: SystemTime, flows: Vec<Flow>) -> Self {
        let check = |stat: &Stat| {
            assert!(start <= stat.start.unwrap());
            assert!(end >= stat.end.unwrap());
            assert_eq!(Count(1), stat.flows, "This is a single flow!");
            assert!(stat.flows_started <= Count(1));
            assert!(stat.max_speed >= Speed::compute(stat.size, stat.speed_duration));
        };
        for flow in &flows {
            check(&flow.stats.dir_in);
            check(&flow.stats.dir_out);
        }
        Self {
            start,
            end,
            flows,
        }
    }
}

impl InTimeInterval for Time {
    fn interval(&self) -> (SystemTime, SystemTime) {
        (self.start, self.end)
    }
    fn granularity(&self) -> Duration {
        self.end.duration_since(self.start).unwrap()
    }
}

// Delegate the iterator into the flows map
impl<'a> IntoIterator for &'a Time {
    type Item = &'a Flow;
    type IntoIter = SliceIter<'a, Flow>;
    fn into_iter(self) -> Self::IntoIter {
        self.flows.iter()
    }
}

impl Container for Time {
    fn pre_filter(&self, _query: &Query) -> PreFilterResult {
        // The in-memory representation contains everything, so there's no query it can't handle.
        PreFilterResult::MayUse
    }
    fn timeline(&self) -> Timeline {
        Timeline::from_borders(vec![self.start, self.start + self.granularity()])
    }
}

// Note: There are some tests in test-helper-flow. They want to reuse the helpers and we can't
// bring them in as a dependency, as that would be cyclic.