Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

import re 

import socket 

 

import zmq 

 

from .argparser import get_arg_parser 

from .exceptions import * 

 

 

class Resource: 

NAME = re.compile("[a-z0-9_-]+") 

SIMPLE_ADDRESS = re.compile("[a-z0-9_-]+") 

DIRECTIONS = [ 

"connect", 

"bind" 

] 

SOCK_TYPES = [ 

"REQ", 

"REP", 

"DEALER", 

"ROUTER", 

"PUB", 

"SUB", 

"PUSH", 

"PULL", 

"PAIR", 

] 

 

def __init__(self, name, direction, sock_type, address, port): 

if not Resource.NAME.match(name): 

raise SockConfigError("Inadmissible characters in resource name") 

 

if direction not in Resource.DIRECTIONS: 

raise SockConfigError("Inadmissible or empty value for direction (use connect or bind)") 

 

if sock_type not in Resource.SOCK_TYPES: 

raise SockConfigError("Inadmissible or empty socket type") 

 

if not self.check_address(address): 

raise SockConfigError("Inadmissible characters in resource address") 

 

try: 

port_number = int(port) 

except ValueError: 

raise SockConfigError("Port must be a number") 

 

if port_number < 1 or port_number > 65535: 

raise SockConfigError("Port number is out of range (0-65535)") 

 

# This is a little bit higher logic 

if address == "*" and direction != "bind": 

raise SockConfigError("On '*' is only bind operation permitted") 

 

self.name = name 

self.direction = direction 

self.sock_type = sock_type 

self.address = address 

self.port = port_number 

 

 

def check_address(self, address): 

try: 

if socket.inet_pton(socket.AF_INET, address): 

return True 

except OSError: 

pass 

 

try: 

if socket.inet_pton(socket.AF_INET6, address.strip("[]")): 

return True 

except OSError: 

pass 

 

if address == "*": 

return True 

 

if Resource.SIMPLE_ADDRESS.match(address): 

return True 

 

return False 

 

 

def get_connection_string(self): 

return "tcp://{}:{}".format(self.address, self.port) 

 

 

@classmethod 

def from_string(cls, arg): 

splitted = arg.split(",") 

 

if len(splitted) != 5: 

raise SockConfigError("Bad count of resource string items") 

 

return cls(*splitted) 

 

 

def __eq__(self, other): 

if self.name == other.name and \ 

self.direction == other.direction and \ 

self.sock_type == other.sock_type and \ 

self.address == other.address and \ 

self.port == other.port: 

return True 

 

return False 

 

 

def __ne__(self, other): 

return not self.__eq__(other) 

 

 

class Socket: 

SOCKET_TYPE_MAP = { 

"REQ": zmq.REQ, 

"REP": zmq.REP, 

"DEALER": zmq.DEALER, 

"ROUTER": zmq.ROUTER, 

"PUB": zmq.PUB, 

"SUB": zmq.SUB, 

"PUSH": zmq.PUSH, 

"PULL": zmq.PULL, 

"PAIR": zmq.PAIR, 

} 

 

def __init__(self, name, **configuration): 

self.name = name 

self.resources = [] 

self.my_type = None 

self.my_direction = None 

self.configuration = configuration 

self.setup_done = False 

 

 

def check_resource(self, resource): 

if self.name != resource.name: 

raise SockConfigError("Putting bad resource to socket") 

 

if not self.my_type: 

self.my_type = resource.sock_type 

 

if self.my_type != resource.sock_type: 

raise SockConfigError("New resource is different type than current Socket type") 

 

if not self.my_direction: 

self.my_direction = resource.direction 

 

if self.setup_done and self.my_direction == "bind": 

raise SockConfigError("Socket can have only one bind operation") 

 

if resource in self.resources: 

raise SockConfigError("Resource duplication") 

 

 

def add_resource(self, resource): 

self.check_resource(resource) 

 

self.resources.append(resource) 

 

self.setup_done = True 

 

 

def build(self, ctx, name, sock_type=None): 

if self.name != name: 

raise SockConfigError("Name of requested resource is invalid") 

 

if sock_type and self.my_type != sock_type: 

raise SockConfigError("Unmatched socket type with requested one") 

 

socket = ctx.socket(Socket.SOCKET_TYPE_MAP[self.my_type]) 

self.configure(socket) 

 

if self.my_direction == "bind": 

socket.bind(self.resources[0].get_connection_string()) 

 

else: 

for resource in self.resources: 

socket.connect(resource.get_connection_string()) 

 

return socket 

 

 

def configure(self, socket): 

if "ipv6" in self.configuration: 

socket.ipv6 = self.configuration["ipv6"] 

socket.setsockopt(zmq.LINGER, 1*1000) # In msec 

 

 

class SN: 

""" This class serves as a container for all resources. This class provides 

an API-like interface for requesting ZMQ sockets based on available 

resources. 

""" 

def __init__(self, ctx, argparser=None): 

## Gather data 

self.context = ctx 

 

if argparser: 

self.args = argparser.parse_args() 

else: 

self.args = get_arg_parser().parse_args() 

 

## Build all necessary configuration 

self.build_global_configuration() 

self.parse_resources() 

self.build_sockets() 

 

 

def build_global_configuration(self): 

self.global_configuration = { 

"ipv6": not self.args.disable_ipv6, 

} 

 

 

def parse_resources(self): 

self.resources = [ Resource.from_string(res) for res in self.args.resource ] 

 

 

def build_sockets(self): 

self.sockets = {} 

 

for resource in self.resources: 

if resource.name not in self.sockets: 

self.sockets[resource.name] = Socket(resource.name, **self.global_configuration) 

 

self.sockets[resource.name].add_resource(resource) 

 

 

def get_socket(self, *requested_sockets): 

""" Gets multiple socket names in 'get_socket(name1, name2,...)' 

or 'get_socket((name1, TYPE1), name2, (name3,TYPE3),...)' or any of 

their combinations. Returns list of all available ZMQ sockets with the 

required names. Exception is risen when there is no socket with the 

desired name or when the socket is of another type. 

""" 

ret = [] 

 

for request in requested_sockets: 

if type(request) == tuple: 

sock_name, sock_type = request 

else: 

sock_name, sock_type = request, None 

 

if sock_name not in self.sockets: 

raise UndefinedSocketError("Requesting undefined socket") 

 

socket = self.sockets[sock_name] 

ret.append(socket.build(self.context, sock_name, sock_type)) 

 

if len(ret) == 1: 

return ret[0] 

else: 

return ret