1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
// Copyright 2017, CZ.NIC z.s.p.o. (http://www.nic.cz/) // // This file is part of the pakon system. // // Pakon is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // Pakon is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with Pakon. If not, see <http://www.gnu.org/licenses/>. //! An abstract interface for internally computed values //! //! Most headers for flows are generated by data sources. These are external, however. Sometimes //! the values need to be computed based on other data (either on the same flow or data seen on //! some other flows). //! //! This abstract interface ([`IntCompute`](trait.IntCompute.html)) allows for providing modules //! that do just that ‒ watch events on flows and produce more potential updates. //! //! The computation gets events about changes on the flows. During these event callbacks it can //! query the existing flows for further information (eg. when it wants to look into different //! flow than the one that caused the event) and produce updates to flows. //! //! Care should be taken not to cause infinite loops of updates, it should produce an update only //! if something really changes. #![deny(missing_docs)] extern crate libdata; extern crate libflow; extern crate slog; extern crate tokio_core; use std::collections::HashSet; use slog::Logger; use tokio_core::reactor::Handle; use libdata::column::{FlowTags, Ident, RefHeaders}; use libflow::update::{Cork, UpdateSenderUnbounded}; /// Some event the internal computation might want to react to. /// /// All the variants that are caused by a concrete flow carry the `FlowTags` handle. It can be both /// used to examine the flow (it contains the current headers of the flow) and to reference the /// flow in a newly created update. #[derive(Debug)] pub enum Event { /// A flow has started. FlowStarted(FlowTags), /// A flow has been dropped. /// /// This isn't the same as the flow being closed. This is triggered when the flow is destroyed /// in the in-memory representation. Therefore, generating new updates for this flow is useless /// (there will be no flow to apply them to) and the purpose is mainly to trigger clearing of /// computations' internal caches. FlowTerminated(FlowTags), /// Something changed on a flow. FlowUpdated { /// Handle to the changed flow. flow: FlowTags, /// Identities of the columns that changed. /// /// This contains both the newly added columns and columns with updated values. Note that /// the value might have been replaced by the same value. columns: HashSet<Ident>, }, /// There's a request to cork (flush) previous updates. /// /// If the computation produced any updates since the last `CorkRequest`, a clone of the /// provided `Cork` must be pushed through the sender channel. This ensures all the updates are /// applied before the corresponding time slice is closed. /// /// It is recommended to simply push the clone into the stream every time the event comes, /// without keeping track if any updates happened. CorkRequest(Cork), } /// An interface to query existing flows. /// /// While an [`Event`](enum.Event.html) carries the `FlowTags` of the flow that caused it (and /// therefore the flow can be examined directly), access to the other existing flows can be /// performed through this interface. The interface is passed with every event. pub trait Query { /// Performs a query of the existing flows. /// /// It returns an iterator for all the flows that match the filter. The filter is applied in a /// similar way as in the user's query ‒ a flow must match on all the present columns. To match /// on one, the value of the column on the flow must be present in the corresponding /// `RefHeader` of the filter (eg. the filter is CNF). fn query<'a, 'b, 'r>(&'a self, filter: &'b RefHeaders<'b>) -> Box<Iterator<Item = FlowTags> + 'r> where 'a: 'r, 'b: 'r; } /// The internal computation itself. /// /// This represents the internal computation. It can hold whatever state (or simply compute the /// values from the events on the fly) and it processes the events. The updates should be pushed /// through the channel sender passed during its construction. pub trait IntCompute { /// Processes a single event. /// /// This is the main callback of the internal computation. It gets called with each event /// produced and all the computation is expected to take place there. /// /// # Params /// /// * `event`: The event that happened. /// * `flow_query`: An interface to query all the existing flows. The computation can use it to /// access all the flows, but it is not necessary for accessing the flow that originated the /// event. /// /// The callback doesn't return anything directly. The computation shall push its updates /// through the sender channel. /// /// # Note /// /// You should *not* update the flow headers directly. Your update might need to be processed /// by some other internal computations and just modifying the flow headers doesn't trigger it. fn event(&mut self, event: &Event, flow_query: &Query); } /// A factory for a single type of internal computation. /// /// This allows creating the computations in somewhat generic way. pub trait Factory { /// Creates the corresponding computation. /// /// # params /// /// * `logger`: The logger for internal use of the computation. The factory should create a /// child logger with the correct name (eg. setting its `context` variable). /// * `handle`: The handle to a tokio reactor core. Most computations will not need to do /// anything asynchronous, but it is possible some of them might, therefore they can use it. /// * `sender`: The sink where the generated updates should be sent. fn create(self, logger: &Logger, handle: Handle, sender: UpdateSenderUnbounded) -> Box<IntCompute>; }