1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
// Copyright 2017, CZ.NIC z.s.p.o. (http://www.nic.cz/)
//
// This file is part of the pakon system.
//
// Pakon is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
//  (at your option) any later version.
//
// Pakon is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Pakon.  If not, see <http://www.gnu.org/licenses/>.

//! The reactor (event loop).
//!
//! This is the entry-point into the infinite (almost) loop of handling events. It holds the logic
//! to orchestrate other functionality.

use std::cell::RefCell;
use std::fmt::Debug;
use std::fs;
use std::io::{Error as IoError, ErrorKind, Result as IoResult};
use std::num::Wrapping;
use std::path::Path;
use std::rc::Rc;
use std::time::Duration;

use futures::{Future, IntoFuture, Stream, Sink};
use futures::future;
use futures::unsync::mpsc::{self, Receiver};
use libc;
use slog::Logger;
use tk_listen::ListenExt;
use tokio_core::reactor::{Core, Interval, Handle};
use tokio_signal::unix::Signal;
use tokio_uds::UnixListener;

use downstream;
use dsrc::{Dsrc, Factory as DsrcFactory};
use int_compute::{IntCompute, Factory as ComputeFactory};
use libflow;
use libflow::update::{CorkedUpdate, UpdateSender, UpdateSenderUnbounded};
use libgather::Gather;
use libutils::{CmdlineOpts, LocalBoxFuture};
use libutils::tunable::{DSRC_CHANNEL_SIZE, LISTEN_ERR_MILLIS, MAX_CLIENTS, SLICE_LENGTH_SECONDS};
use keeper::Keeper;
use wakeup::Manager;

/// Provides a future that resolves once a termination signal arrives.
///
/// A termination signal is one of:
///
/// * SIGINT
/// * SIGQUIT
/// * SIGTERM
fn term_signals(handle: &Handle) -> LocalBoxFuture<(), IoError> {
    // Create a stream of signal wakeups for each. Then take only the first element from it and
    // turn it into ().
    let sigs = [libc::SIGINT, libc::SIGQUIT, libc::SIGTERM];
    let sig_handlers = sigs.iter()
        .map(|sig| {
            Signal::new(*sig, handle)
                // The stream will be created some time in the future, so once it happens, provide
                // it
                .flatten_stream()
                // Take just the first element (and the rest of the stream, which we throw away)
                .into_future()
                // Throw away the result, keeping just an empty wakeup
                .map(|_| ())
                // Throw away the rest of the stream on error and keep just the error
                .map_err(|(e, _)| e)
        });
    let wakeup = future::select_all(sig_handlers)
        .map(|_| ())
        .map_err(|(e, _, _)| e);
    Box::new(wakeup)
}

/// Processes the snapshots and feeds them into a proto slice.
///
/// It creates a proto slice internally and feeds it the snapshots from the passed stream. It also
/// closes the slices periodically (currently every minute, but that should be configurable in the
/// future).
///
/// Returns a future that never resolves (at least not successfully, but the errors are not
/// expected either).
///
/// # Parameters
///
/// * `logger`: The logger used inside the proto slice.
/// * `handle`: A core's handle used to start the timers.
/// * `update_stream`: The source of the updates. In the final application, this would be a
///    channel. However, the type is left generic for testing purposes.
/// * `datasources`: The data sources that are queried before closing every slice.
/// * `computes`: The internal computations that augment header data.
/// * `keeper`: The history keeper where the slices are fed into.
/// * `manager`: The wakeup manager to use.
fn slice_feeder<S, E>(logger: Logger, handle: &Handle, update_stream: S,
                      mut datasources: Vec<Box<Dsrc>>, mut computes: Vec<Box<IntCompute>>,
                      keeper: Keeper, manager: Manager)
    -> LocalBoxFuture<(), IoError>
where
    S: Stream<Item = CorkedUpdate, Error = E> + 'static,
    E: Debug + 'static,
{
    let (event_sender, event_receiver) = mpsc::unbounded();
    // The gatherer needs to be shared between multiple futures here.
    let gather = Rc::new(RefCell::new(Gather::new(logger.clone(), event_sender)));

    // Repeatedly closing the slices...
    // these kinds of configurations that shouldn't be tweaked by an end user.
    let interval = match Interval::new(Duration::from_secs(SLICE_LENGTH_SECONDS), handle) {
        Ok(i) => i,
        Err(e) => return Box::new(Err(e).into_future()),
    };
    let tick_logger = logger.clone();
    let cork_gather = Rc::clone(&gather);
    let slice_gather = Rc::clone(&gather);
    let cork_logger = logger.clone();
    let interval = interval
        // Whenever we time out, ask the data sources to flush
        .and_then(move |_| {
            debug!(tick_logger, "Asking all data sources to tick");
            let done_logger = tick_logger.clone();
            let (cork, finished) = libflow::update::cork();
            for dsrc in &mut datasources {
                dsrc.tick(cork.clone());
            }
            finished
                .map(move |_| debug!(done_logger, "All data sources ticked"))
                .map_err(|_| panic!("Void can't happen"))
        })
        // Once they're all done, cork the events from the gather, so internal computations can be
        // done before we actually close the slice.
        .and_then(move |_| {
            debug!(cork_logger, "Corking the events from the gather");
            let (cork, finished) = libflow::update::cork();
            cork_gather.borrow().push_cork(cork);
            let done_logger = cork_logger.clone();
            finished
                .map(move |_| debug!(done_logger, "Internal computations done"))
                .map_err(|_| panic!("Void can't happen"))
        })
        // Then close the slice and wake up client queries if needed
        .for_each(move |_| {
            let slice = slice_gather.borrow_mut().close_slice();
            keeper.append(slice);
            manager.wakeup();
            Ok(())
        });

    // In parallel with that, process the events from the gatherer, pass them through internal
    // computations. They already have the channel to send the updates to the gatherer, so we don't
    // have to care about that.
    let event_gather = Rc::clone(&gather);
    let event_logger = logger.clone();
    let events = event_receiver
        .for_each(move |event| {
            trace!(event_logger, "Triggering event {:?}", event);
            let gather = event_gather.borrow();
            for compute in &mut computes {
                compute.event(&event, &*gather);
            }
            Ok(())
        })
        .map_err(move |_| IoError::new(ErrorKind::BrokenPipe, "Closed event stream"));

    // And feed the updates (no matter where they come from) into the gatherer as well.
    let feed = update_stream
        .for_each(move |update| {
            match update {
                CorkedUpdate::Update(update) => {
                    trace!(logger, "Applying update {:?}", update);
                    gather.borrow_mut().update(update);
                },
                CorkedUpdate::Cork(cork) => {
                    trace!(logger, "Cork went through update channel");
                    drop(cork);
                },
            }
            Ok(())
        })
        .map_err(move |e| IoError::new(ErrorKind::Other, format!("{:?}", e)));

    // And finally, combine everything together. These things should never end, so it doesn't
    // matter if we use join or select.
    let all = interval.join3(events, feed)
        .map(|_| ());
    Box::new(all)
}

/// Listens for incoming downstream client connections.
///
/// This creates a unix-domain socket and listens on it. It accepts connections from clients and
/// handles them using the [`downstream`](../downstream/index.html) module. It allows at most
/// `MAX_CLIENTS` connected clients at the same time and recovers from usual IO errors.
///
/// The returned future never resolves successfully and only very serious error (usually during
/// setup) are reported.
///
/// # Parameters
///
/// * `logger`: Log through this logger.
/// * `handle`: The handle to the tokio's `Core` to run on.
/// * `socket`: Path to the file socket to create and listen on.
/// * `keeper`: The flow history keeper which will be queried by the clients.
/// * `manager`: The wakeup manager the clients will plug themselves into.
fn listen(logger: Logger, handle: Handle, socket: &Path, keeper: Keeper, manager: Manager)
    -> LocalBoxFuture<(), IoError>
{
    // Try removing the socket if it's there, but don't fail if it's not there or the removal
    // doesn't work for whatever reason. The bind below would produce the error we want to show.
    // This is just a convenience, so the socket doesn't have to be deleted every time the program
    // is restarted.
    drop(fs::remove_file(socket));
    let listener = UnixListener::bind(socket, &handle)
        .into_future()
        .and_then(move |listener| -> LocalBoxFuture<(), IoError> {
            debug!(logger, "Opened listening socket");
            let mut client_serial = Wrapping(0);
            let handled = listener.incoming()
                // If there's a serious error, like „too many opened files“, wait a bit before
                // trying again.
                .sleep_on_error(Duration::from_millis(LISTEN_ERR_MILLIS), &handle)
                .map(move |(connection, addr)| {
                    // Delegate each new connection to the `downstream` module.
                    let logger = logger.new(o!("client" => format!("{:?}/{}", addr, client_serial)));
                    // Simple serial number for the clients, used in logging only
                    client_serial += Wrapping(1);
                    let err_logger = logger.clone();
                    downstream::handle(logger,
                                       &handle,
                                       connection,
                                       keeper.clone(),
                                       &manager)
                        .map_err(move |e| {
                            error!(err_logger, "Lost client: {}", e);
                        })
                })
                .listen(MAX_CLIENTS)
                .map_err(|_| IoError::new(ErrorKind::Other, "Listener error, should not happen"));
            Box::new(handled)
        });
    Box::new(listener)
}

/// An auxiliary builder structure to plug all data sources at startup.
///
/// The builder is used to accumulate all the needed data sources. Then it is used to start up the
/// reactor.
pub struct Reactor {
    /// The base logger used by the reactor.
    ///
    /// Further child loggers shall be created for it.
    logger: Logger,
    /// The reactor core from Tokio.
    ///
    /// This shall drive the main application.
    core: Core,
    /// The sender the data sources will write updates into.
    sender: UpdateSender,
    /// The unbounded variant, for internal uses that don't need backpressure (and the related
    /// complexity)
    unbounded_sender: UpdateSenderUnbounded,
    /// The receiving end from the data sources.
    receiver: Receiver<CorkedUpdate>,
    /// The already created data sources.
    dsrcs: Vec<Box<Dsrc>>,
    /// The terminator futures of the data sources.
    terminators: Vec<LocalBoxFuture<(), ()>>,
    /// The internal computations we have registered, to post-process updates.
    computes: Vec<Box<IntCompute>>,
}

impl Reactor {
    /// Creates a new reactor.
    ///
    /// The reactor starts as empty, with no data sources.
    pub fn new(logger: Logger) -> IoResult<Self> {
        let (sender, receiver) = mpsc::channel(DSRC_CHANNEL_SIZE);
        let (unbounded_sender, unbounded_receiver) = mpsc::unbounded();
        let unbounded_logger = logger.clone();
        let unbounded_cp = sender.clone()
            .sink_map_err(move |e| {
                error!(unbounded_logger, "Error forwarding internal updates: {}", e);
            })
            .send_all(unbounded_receiver)
            .map(|_| ());
        let core = Core::new()?;
        core.handle().spawn(unbounded_cp);
        Ok(Self {
            logger,
            core,
            sender,
            unbounded_sender,
            receiver,
            dsrcs: Vec::new(),
            terminators: Vec::new(),
            computes: Vec::new(),
        })
    }
    /// Adds another data sources.
    ///
    /// It calls the factory to create a new data source and installs it into the reactor.
    pub fn add_dsrc<F: DsrcFactory>(&mut self, factory: F) {
        debug!(self.logger, "Adding a new data source into the reactor");
        let (dsrc, terminated) = factory.create(&self.logger,
                                                self.core.handle(),
                                                self.sender.clone());
        self.dsrcs.push(dsrc);
        self.terminators.push(terminated);
    }
    /// Adds another internal computation.
    ///
    /// It calls the factory and installs the new data source into the reactor.
    pub fn add_compute<F: ComputeFactory>(&mut self, factory: F) {
        debug!(self.logger, "Adding a new internal compute processor");
        let compute = factory.create(&self.logger,
                                     self.core.handle(),
                                     self.unbounded_sender.clone());
        self.computes.push(compute);
    }
    /// Consumes the reactor builder and runs the main application loop.
    ///
    /// This is the main entrypoint for the application and it'll drive all the asynchronous events
    /// in it. It handles:
    ///
    /// * The timers for storage maintainence.
    /// * Signal handlers (to terminate gracefuly).
    /// * Eating data from the data sources.
    /// * Listening and handling the downstream connections.
    ///
    /// # Params
    ///
    /// * `options`: The command line options the application has been run with.
    pub fn run(mut self, options: CmdlineOpts) -> IoResult<()> {
        debug!(self.logger, "Running the reactor");
        let handle = self.core.handle();
        // Terminate gracefully when we receive the termination signal
        let sig_terminator = term_signals(&handle);
        // The keeper of flows (a globalish object that holds the in-memory data, manages the data on
        // disk and allows querying it).
        let keeper = Keeper::new(self.logger.new(o!("context" => "keeper")));
        // The wakeup manager
        let manager = Manager::new();
        // Feed the received snapshots to the appropriate place.
        let feeder = slice_feeder(self.logger.new(o!("context" => "slice feeder")),
                                  &handle,
                                  self.receiver,
                                  self.dsrcs,
                                  self.computes,
                                  keeper.clone(),
                                  manager.clone());
        // And the downstream connections
        let listener = listen(self.logger.new(o!("context" => "listen")),
                              handle.clone(),
                              &options.downstream,
                              keeper,
                              manager);
        // Combine everything together.
        let logger = self.logger;
        let dsrcs = self.terminators.into_iter()
            .map(|dsrc| -> LocalBoxFuture<_, _> {
                let dsrc_logger = logger.clone();
                let dsrc = dsrc
                    .map(move |_| error!(dsrc_logger, "Premature data source termination"))
                    .map_err(|_| IoError::new(ErrorKind::Other, "Lost data source"));
                Box::new(dsrc)
            });
        let all = vec![feeder, listener, sig_terminator]
            .into_iter()
            .chain(dsrcs);
        let terminated = future::select_all(all)
            .map_err(|(e, _, _)| e)
            .map(|_| ());
        self.core.run(terminated)
    }
}

#[cfg(test)]
mod tests {
    extern crate nix;
    extern crate tempdir;

    use std::cell::Cell;
    use std::io::BufReader;
    use std::rc::Rc;
    use std::str;

    use futures::stream;
    use futures::future::Either;
    use self::nix::sys::signal::{Signal, raise};
    use self::tempdir::TempDir;
    use tokio_core::reactor::Timeout;
    use tokio_io::io;
    use tokio_uds::UnixStream;

    use super::*;
    use libdata::column::{Tags, Type};
    use libdata::flow::{IpProto};
    use libflow::update::{Cork, Key, Status, Update};
    use libutils;

    /// Check we receive the termination signals correctly.
    #[test]
    fn signal() {
        let mut core = Core::new().unwrap();
        let handle = core.handle();
        let signals = term_signals(&handle);
        let sender = Timeout::new(Duration::from_millis(500), &handle)
            .unwrap()
            .and_then(|_| {
                raise(Signal::SIGTERM).unwrap();
                Ok(())
            });
        let all = signals.join(sender);
        core.run(all).unwrap();
    }

    /// Checks the slice feeder.
    ///
    /// This is basically a smoke test ‒ checking it doesn't crash or such. Real functionality
    /// tests are to be done in the slice itself.
    #[test]
    fn slice_feeder_smoke() {
        let mut core = Core::new().unwrap();
        let handle = core.handle();
        let logger = libutils::test_logger();

        // Create a dummy update
        #[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
        struct Id;
        impl Type for Id {
            fn name() -> String { "test-flow-id".to_owned() }
        }
        let mut tags = Tags::new();
        tags.insert(IpProto::Tcp);
        let update = Update {
            keys: vec![Key::Simple(Id.into())],
            status: Status::Ongoing,
            tags,
            stats: None,
        };

        // Create a fake stream out of single-element vector.

        let updates = vec![CorkedUpdate::Update(update)];
        let updates_stream = stream::iter_ok::<_, IoError>(updates);
        let keeper = Keeper::dummy();
        let manager = Manager::new();
        let process = slice_feeder(logger,
                                   &handle,
                                   updates_stream,
                                   Vec::new(),
                                   Vec::new(),
                                   keeper,
                                   manager);
        // As the stream never ends, we just terminate the test. We assume the single element has
        // been processed by that time and if it didn't produce an error, everything is OK.
        let timeout = Timeout::new(Duration::from_secs(1), &handle).unwrap();
        // Make sure there are no errors when we run to the end of the stream.
        assert!(core.run(process.select(timeout)).is_ok());
    }

    /// Check the slice feeder calls the tick on data sources.
    #[test]
    fn slice_feeder_tick() {
        let done = Rc::new(Cell::new(false));
        struct D(Rc<Cell<bool>>);
        impl Dsrc for D {
            fn tick(&mut self, _cork: Cork) {
                self.0.set(true);
            }
        }
        let mut core = Core::new().unwrap();
        let handle = core.handle();
        let logger = libutils::test_logger();

        let keeper = Keeper::dummy();
        let manager = Manager::new();
        let dsrcs: Vec<Box<Dsrc>> = vec![Box::new(D(done.clone()))];
        let process = slice_feeder(logger,
                                   &handle,
                                   stream::empty::<_, IoError>(),
                                   dsrcs,
                                   Vec::new(),
                                   keeper,
                                   manager);
        // Make sure there's time for that tick.
        let timeout = Timeout::new(Duration::from_secs(SLICE_LENGTH_SECONDS + 1), &handle)
            .unwrap();
        // Make sure there are no errors when we run to the end of the stream.
        assert!(core.run(process.select(timeout)).is_ok());
        assert!(done.get());
    }

    /// Check the downstream listener.
    ///
    /// Check that when we start to listen, it actually accepts connections.
    #[test]
    fn downstream_connect() {
        let mut core = Core::new().unwrap();
        let handle = core.handle();
        let logger = libutils::test_logger();
        let dir = TempDir::new("downstream_connect").unwrap();
        let path = dir.path().join("socket");
        let keeper = Keeper::dummy();
        let manager = Manager::new();
        // Run the listening end in the background.
        let listener = listen(logger, handle.clone(), &path, keeper, manager);
        handle.spawn(listener.map_err(|e| {
            panic!("{}", e);
        }));
        // We send a dummy request and check that something comes back. We don't really check the
        // validity of the response here.
        let socket = UnixStream::connect(&path, &handle).unwrap();
        let exchange = io::write_all(socket,
                                     &b"{\"jsonrpc\":\"2.0\",\"method\":\"no\",\"id\":42}\n"[..])
            .and_then(|(socket, _)| io::flush(socket))
            // The version notification
            .and_then(|socket| io::read_until(BufReader::new(socket), b'\n', Vec::new()))
            // And the response
            .and_then(|(socket, part1)| io::read_until(socket, b'\n', part1));
        // Make sure this doesn't run forever, that we get an answer in some reasonable time
        let timeout = Timeout::new(Duration::from_secs(5), &handle).unwrap();
        // Run the exchange
        match core.run(exchange.select2(timeout)) {
            Ok(Either::A(((_socket, data), _timeout))) => {
                // Check some basic sanity of the returned data
                let s = str::from_utf8(&data).unwrap();
                assert!(s.contains("jsonrpc"));
                assert!(s.contains("version"));
                assert!(s.contains("error"));
            },
            // Timeouts or errors shouldn't happen
            Ok(Either::B(_)) => panic!("Timeout happened"),
            Err(_) => panic!("An error happened"),
        }
    }
}