Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

import sys 

import logging 

import inspect 

import signal 

 

from types import SimpleNamespace 

 

import zmq 

 

from .network import SN 

from .network import get_arg_parser 

from .messages import encode_msg, parse_msg 

from .exceptions import * 

 

 

class LoopHardFail(Exception): 

pass 

 

 

class LoopFail(Exception): 

pass 

 

 

class SNBox(): 

def __init__(self, box_name, argparser=None): 

# Local contexts for dependencies 

self.zmq_ctx = zmq.Context.instance() 

self.sn_ctx = SN(self.zmq_ctx, argparser or get_arg_parser()) 

# Important values provided to box 

self.name = box_name 

self.logger = logging.getLogger(box_name) 

self.args = self.sn_ctx.args 

# Error management of the loop 

self.loop_continue = True 

self.errors_in_row = 0 

# User data 

# Data generated by setup function are placed into separate variable 

# Final box shouldn't use "self" - we want to isolate its values 

self.ctx = None 

 

# Core methods - Will be implemented in non-abstract boxes 

def check_configuration(self): 

raise NotImplementedError("check_configuration") 

 

def get_processed_message(self): 

raise NotImplementedError("get_processed_message") 

 

def process_result(self, result): 

raise NotImplementedError("process_result") 

 

# Public API for boxes - will be optionally implemented in final boxes 

def setup(self): 

return {} 

 

def teardown(self): 

pass 

 

def before_first_request(self): 

pass 

 

def process(self, msg_type, payload): 

raise NotImplementedError("process") 

 

# Provided functionality - should be final implementation 

def run(self): 

# This is the only way to be sure that check will be called. 

# Constructors will be overwritten in non-abstract boxes 

self.check_configuration() 

 

try: 

self.ctx = self.get_user_data() 

 

self.logger.info("SNBox starting loop for %s box", self.name) 

self.register_signals() 

 

self.before_loop() 

self.run_loop() 

 

except LoopHardFail as e: 

self.logger.error("Hard Fail of box: %s", self.name) 

self.logger.exception(e) 

# Finally will be called, because sys.exit() raises exception that will be uncaught. 

sys.exit(1) 

 

except KeyboardInterrupt: 

pass 

 

finally: 

self.teardown() # Clean-up data generated by setup() 

self.teardown_box() # Clean-up my local contexts 

 

def get_user_data(self): 

user_data = self.setup() 

 

if isinstance(user_data, dict): 

return SimpleNamespace(**user_data) 

else: 

raise SetupError("Setup function didn't return a dictionary") 

 

def register_signals(self): 

def signal_handler(signum, frame): 

self.logger.info("Signal %s received", signum) 

self.loop_continue = False 

 

for sig in [ signal.SIGTERM, signal.SIGQUIT, signal.SIGABRT ]: 

signal.signal(sig, signal_handler) 

 

def before_loop(self): 

result = self.before_first_request() 

if result: 

self.process_result(result) 

 

def teardown_box(self): 

self.zmq_ctx.destroy() 

self.logger.info("SNBox shutting down box %s", self.name) 

 

def run_loop(self): 

while self.loop_continue: 

try: 

result = self.get_processed_message() 

self.process_result(result) 

self.errors_in_row = 0 

 

except StopIteration: 

self.logger.info("Box %s raised StopIteration", self.name) 

break 

 

except (SetupError, NotImplementedError) as e: 

# These error are considered as show-stopper. 

# It means programmer error ans there is no reason for trying to recover 

raise e 

 

except Exception as e: 

self.logger.error("Uncaught exception from loop: %s", type(e).__name__) 

self.logger.exception(e) 

 

self.errors_in_row += 1 

if self.errors_in_row > 10: 

raise LoopHardFail("Many errors in row.") 

 

# Helper methods 

def get_socket(self, sock_name): 

socket = None 

try: 

socket = self.sn_ctx.get_socket(sock_name) 

 

except UndefinedSocketError as e: 

pass 

 

return socket 

 

 

class SNPipelineBox(SNBox): 

def __init__(self, box_name, argparser=None): 

super().__init__(box_name, argparser) 

self.socket_recv = self.get_socket("in") 

self.socket_send = self.get_socket("out") 

 

def check_configuration(self): 

if not self.socket_recv: 

raise SetupError("Input socket wasn't provided") 

if not self.socket_send: 

raise SetupError("Output socket wasn't provided") 

 

def teardown_box(self): 

self.socket_recv.close() 

self.socket_send.close() 

super().teardown_box() 

 

def get_processed_message(self): 

msg = self.socket_recv.recv_multipart() 

msg_type, payload = parse_msg(msg) 

 

return self.process(msg_type, payload) 

 

def process_result(self, result): 

if not result: 

# The box hasn't any reasonable answer 

return 

 

try: 

msg_type, payload = result 

msg_out = encode_msg(msg_type, payload) 

self.socket_send.send_multipart(msg_out) 

 

except (ValueError, InvalidMsgError): 

raise LoopFail("Generated broken output message. Possibly bug in box.") 

 

 

class SNGeneratorBox(SNBox): 

def __init__(self, box_name, argparser=None): 

super().__init__(box_name, argparser) 

self.socket_send = self.get_socket("out") 

 

# Ensure about process() method before try to get iterator 

self.check_configuration() 

 

self.process_iterator = self.process() 

 

def check_configuration(self): 

if not self.socket_send: 

raise SetupError("Output socket wasn't provided") 

if not inspect.isgeneratorfunction(self.process): 

raise SetupError("Generator is expected for output-only box") 

 

def teardown_box(self): 

self.socket_send.close() 

super().teardown_box() 

 

def get_processed_message(self): 

return next(self.process_iterator) 

 

def process_result(self, result): 

if not result: 

# The box hasn't any reasonable answer 

return 

 

try: 

msg_type, payload = result 

msg_out = encode_msg(msg_type, payload) 

self.socket_send.send_multipart(msg_out) 

 

except (ValueError, InvalidMsgError): 

raise LoopFail("Generated broken output message. Possibly bug in box.") 

 

 

class SNTerminationBox(SNBox): 

def __init__(self, box_name, argparser=None): 

super().__init__(box_name, argparser) 

self.socket_recv = self.get_socket("in") 

 

def check_configuration(self): 

if not self.socket_recv: 

raise SetupError("Input socket wasn't provided") 

 

def teardown_box(self): 

self.socket_recv.close() 

super().teardown_box() 

 

def get_processed_message(self): 

msg = self.socket_recv.recv_multipart() 

msg_type, payload = parse_msg(msg) 

 

return self.process(msg_type, payload) 

 

def process_result(self, result): 

if result: 

raise SetupError("Input-only box generated output message. Possibly bug in box.") 

 

 

class SNMultipleOutputPipelineBox(SNPipelineBox): 

def process_result(self, result): 

if result: 

for single_result in result: 

super().process_result(single_result)