hobbes
a language, embedded compiler, and runtime for efficient dynamic expression evaluation, data storage and analysis
net.H
Go to the documentation of this file.
1 /*
2  * net : structured communication between processes
3  *
4  * use DEFINE_NET_CLIENT(T, C) to create a type T to send/receive through the commands in C
5  * each tuple in C is written (N, T, E) where
6  * N is a name for a member function in T to mediate the network command
7  * T is a C++ _function type_ (the type we expect N to have)
8  * E is a constant string expression to evaluate in the remote process
9  *
10  */
11 
12 #ifndef HNET_H_INCLUDED
13 #define HNET_H_INCLUDED
14 
15 #include <vector>
16 #include <queue>
17 #include <functional>
18 #include <string>
19 #include <sstream>
20 #include <tuple>
21 #include <map>
22 #include <stdexcept>
23 
24 #include <sys/types.h>
25 #include <sys/un.h>
26 #include <sys/socket.h>
27 #include <sys/ioctl.h>
28 #include <netinet/in.h>
29 #include <netdb.h>
30 #include <unistd.h>
31 #include <fcntl.h>
32 #include <string.h>
33 
34 namespace hobbes { namespace net {
35 
36 #define HNET_VERSION ((uint32_t)0x00010000)
37 #define HNET_CMD_DEFEXPR ((uint8_t)0)
38 #define HNET_CMD_INVOKE ((uint8_t)2)
39 #define HNET_RESULT_FAIL 0
40 
41 typedef std::vector<uint8_t> bytes;
42 
43 // basic socket I/O
44 inline void sendData(int socket, const uint8_t* d, size_t sz) {
45  size_t i = 0;
46  while (i < sz) {
47  ssize_t c = ::send(socket, d + i, sz - i, 0);
48  if (c < 0) {
49  throw std::runtime_error("Couldn't write to socket: " + std::string(strerror(errno)));
50  }
51  i += c;
52  }
53 }
54 
55 inline void sendString(int socket, const std::string& s) {
56  size_t n = s.size();
57  sendData(socket, (const uint8_t*)&n, sizeof(n));
58  sendData(socket, (const uint8_t*)s.data(), n);
59 }
60 
61 inline void sendBytes(int socket, const bytes& x) {
62  size_t n = x.size();
63  sendData(socket, (const uint8_t*)&n, sizeof(n));
64  if (n > 0) {
65  sendData(socket, &x[0], n);
66  }
67 }
68 
69 
70 inline void recvData(int socket, uint8_t* d, size_t sz) {
71  size_t i = 0;
72  while (i < sz) {
73  ssize_t di = recv(socket, d + i, sz - i, 0);
74 
75  if (di < 0) {
76  if (errno != EINTR) {
77  throw std::runtime_error("Couldn't read socket: " + std::string(strerror(errno)));
78  }
79  } else if (di == 0) {
80  throw std::runtime_error("Remote process closed session prematurely");
81  } else {
82  i += di;
83  }
84  }
85 }
86 
87 inline void recvString(int socket, std::string* x) {
88  size_t n = 0;
89  recvData(socket, (uint8_t*)&n, sizeof(n));
90 
91  x->resize(n);
92  recvData(socket, (uint8_t*)&((*x)[0]), n);
93 }
94 
95 inline void setBlockingBit(int socket, bool block) {
96  int f = fcntl(socket, F_GETFL, 0);
97  if (f == -1) f = 0;
98  fcntl(socket, F_SETFL, block ? (f & (~O_NONBLOCK)) : (f | O_NONBLOCK));
99 }
100 
101 inline size_t recvDataPartial(int socket, uint8_t* d, size_t sz) {
102  ssize_t di = recv(socket, d, sz, 0);
103 
104  if (di == 0) {
105  throw std::runtime_error("Remote process closed session prematurely");
106  } else if (di < 0) {
107  if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
108  return 0;
109  } else {
110  throw std::runtime_error("Couldn't read socket: " + std::string(strerror(errno)));
111  }
112  }
113  return (size_t)di;
114 }
115 
116 // socket connection and session initiation
117 struct RPCDef {
118  RPCDef(uint32_t id = 0, const std::string& expr = "", const bytes& willPut = bytes(), const bytes& willGet = bytes()) :
120  {
121  }
122 
123  uint32_t id; // how will this RPC be identified?
124  std::string expr; // what expression will be applied for this RPC on the remote side?
125  bytes willPut; // what type will be sent?
126  bytes willGet; // what type will be received?
127 };
128 typedef std::vector<RPCDef> RPCDefs;
129 
130 // initiate a session on a connected socket by sending all of the RPC defs
131 inline int initSession(int s, const RPCDefs& rpcds) {
132  uint32_t version = HNET_VERSION;
133  sendData(s, (const uint8_t*)&version, sizeof(version));
134 
135  for (const auto& rpcd : rpcds) {
136  uint8_t defCmd = HNET_CMD_DEFEXPR;
137  sendData(s, &defCmd, sizeof(defCmd));
138  sendData(s, (const uint8_t*)&rpcd.id, sizeof(rpcd.id));
139  sendString(s, rpcd.expr);
140  sendBytes(s, rpcd.willPut);
141  sendBytes(s, rpcd.willGet);
142 
143  uint8_t result = HNET_RESULT_FAIL;
144  recvData(s, &result, sizeof(result));
145  if (result == HNET_RESULT_FAIL) {
146  std::string err;
147  recvString(s, &err);
148  std::ostringstream m;
149  m << "While trying to define '" << rpcd.expr << "' with id=" << rpcd.id << ": " << err << std::flush;
150  throw std::runtime_error(m.str());
151  }
152  }
153  return s;
154 }
155 
156 inline int makeConnection(int s, sockaddr* saddr, size_t len) {
157  if (connect(s, saddr, len) == -1) {
158  std::ostringstream ss;
159  ss << "Unable to connect socket: " << strerror(errno) << std::flush;
160  close(s);
161  throw std::runtime_error(ss.str());
162  }
163 
164  fd_set wd;
165  FD_ZERO(&wd);
166  FD_SET(s, &wd);
167 
168  if (select(s + 1, 0, &wd, 0, 0) == -1) {
169  std::ostringstream ss;
170  ss << "Failed to connect socket while waiting for writeability: " << strerror(errno) << std::flush;
171  close(s);
172  throw std::runtime_error(ss.str());
173  }
174 
175  return s;
176 }
177 
178 inline int makeConnection(const hostent& host, int port) {
179  int s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
180  if (s == -1) {
181  throw std::runtime_error("Unable to allocate socket: " + std::string(strerror(errno)));
182  }
183 
184  sockaddr_in addr;
185  memset(&addr, 0, sizeof(addr));
186  addr.sin_family = AF_INET;
187  addr.sin_addr = *((in_addr*)(host.h_addr_list[0]));
188  addr.sin_port = htons(port);
189 
190  return makeConnection(s, (sockaddr*)&addr, sizeof(addr));
191 }
192 
193 inline int makeConnection(const std::string& host, size_t port) {
194  if (const hostent* h = (host.size() > 0 && std::isdigit(host[0]) != 0) ? gethostbyaddr(host.c_str(), host.size(), AF_INET) : gethostbyname(host.c_str())) {
195  return makeConnection(*h, port);
196  } else {
197  throw std::runtime_error("Unable to resolve host: " + host);
198  }
199 }
200 
201 inline int makeConnection(const std::string& host, const std::string& port) {
202  std::istringstream ss(port);
203  size_t p = 0;
204  ss >> p;
205  if (ss) {
206  return makeConnection(host, p);
207  } else {
208  struct servent* s = getservbyname(port.c_str(), "tcp");
209  if (s != 0) {
210  return makeConnection(host, ntohs(s->s_port));
211  } else {
212  throw std::runtime_error("Failed to resolve port name: " + port);
213  }
214  }
215 }
216 
217 inline int makeConnection(const std::string& hostport) {
218  auto p = hostport.find(":");
219  if (p == std::string::npos) {
220  throw std::runtime_error("Failed to determine port: " + hostport);
221  } else {
222  return makeConnection(hostport.substr(0, p), hostport.substr(p + 1, hostport.size()));
223  }
224 }
225 
226 // very basic macro metaprogramming
227 #define _HNET_FIRST(a, ...) a
228 #define _HNET_SECOND(a, b, ...) b
229 #define _HNET_JOIN(a,b) a ## b
230 #define _HNET_IS_NEGATE(...) _HNET_SECOND(__VA_ARGS__, 0)
231 #define _HNET_NOT(x) _HNET_IS_NEGATE(_HNET_JOIN(_HNET__NOT_, x))
232 #define _HNET__NOT_0 NEGATE, 1
233 #define _HNET_BOOL(x) _HNET_NOT(_HNET_NOT(x))
234 #define _HNET_IF_ELSE(condition) _HNET__IF_ELSE(_HNET_BOOL(condition))
235 #define _HNET__IF_ELSE(condition) _HNET_JOIN(_HNET__IF_, condition)
236 #define _HNET__IF_1(...) __VA_ARGS__ _HNET__IF_1_ELSE
237 #define _HNET__IF_0(...) _HNET__IF_0_ELSE
238 #define _HNET__IF_1_ELSE(...)
239 #define _HNET__IF_0_ELSE(...) __VA_ARGS__
240 #define _HNET_EMPTY()
241 #define _HNET_EVAL(...) _HNET_EVAL256(__VA_ARGS__)
242 #define _HNET_EVAL256(...) _HNET_EVAL128(_HNET_EVAL128(__VA_ARGS__))
243 #define _HNET_EVAL128(...) _HNET_EVAL64(_HNET_EVAL64(__VA_ARGS__))
244 #define _HNET_EVAL64(...) _HNET_EVAL32(_HNET_EVAL32(__VA_ARGS__))
245 #define _HNET_EVAL32(...) _HNET_EVAL16(_HNET_EVAL16(__VA_ARGS__))
246 #define _HNET_EVAL16(...) _HNET_EVAL8(_HNET_EVAL8(__VA_ARGS__))
247 #define _HNET_EVAL8(...) _HNET_EVAL4(_HNET_EVAL4(__VA_ARGS__))
248 #define _HNET_EVAL4(...) _HNET_EVAL2(_HNET_EVAL2(__VA_ARGS__))
249 #define _HNET_EVAL2(...) _HNET_EVAL1(_HNET_EVAL1(__VA_ARGS__))
250 #define _HNET_EVAL1(...) __VA_ARGS__
251 #define _HNET_DEFER2(m) m _HNET_EMPTY _HNET_EMPTY()()
252 #define _HNET_HAS_PARGS(...) _HNET_BOOL(_HNET_FIRST(_HNET__EOAP_ __VA_ARGS__)())
253 #define _HNET__EOAP_(...) _HNET_BOOL(_HNET_FIRST(_HNET__EOA_ __VA_ARGS__)())
254 #define _HNET__EOA_() 0
255 #define _HNET_MAP(f, VS...) _HNET_EVAL(_HNET_MAPP(f, VS))
256 #define _HNET_MAPP(f, H, T...) \
257  f H \
258  _HNET_IF_ELSE(_HNET_HAS_PARGS(T))( \
259  _HNET_DEFER2(_HNET__MAPP)()(f, T) \
260  )( \
261  )
262 #define _HNET__MAPP() _HNET_MAPP
263 
264 /*****************************
265  * BEGIN serialization by type / type-family
266  *****************************/
267 template <typename T, typename P = void>
268  struct io {
269  };
270 
271 // type serialization
272 #define _HNET_TYCTOR_PRIM ((int)0)
273 #define _HNET_TYCTOR_TVAR ((int)2)
274 #define _HNET_TYCTOR_FIXEDARR ((int)4)
275 #define _HNET_TYCTOR_ARR ((int)5)
276 #define _HNET_TYCTOR_VARIANT ((int)6)
277 #define _HNET_TYCTOR_STRUCT ((int)7)
278 #define _HNET_TYCTOR_SIZE ((int)11)
279 #define _HNET_TYCTOR_RECURSIVE ((int)13)
280 
281 template <typename T>
282  void w(const T& x, bytes* out) {
283  out->insert(out->end(), (uint8_t*)&x, ((uint8_t*)&x) + sizeof(x));
284  }
285 inline void ws(const char* x, bytes* out) {
286  size_t n = strlen(x);
287  w(n, out);
288  out->insert(out->end(), x, x + n);
289 }
290 inline void ws(const std::string& x, bytes* out) {
291  w((size_t)x.size(), out);
292  out->insert(out->end(), x.begin(), x.end());
293 }
294 inline void ws(const bytes& x, bytes* out) {
295  w((size_t)x.size(), out);
296  out->insert(out->end(), x.begin(), x.end());
297 }
298 
299 inline void encode_primty(const char* tn, bytes* out) {
300  w(_HNET_TYCTOR_PRIM, out);
301  ws(tn, out);
302  w((bool)false, out);
303 }
304 
305 // primitive serialization
306 #define _HNET_DEFINE_PRIMTYS(T, n) \
307  template <> \
308  struct io<T> { \
309  typedef void can_memcpy; \
310  static void encode(bytes* out) { encode_primty(n, out); } \
311  static std::string describe() { return n; } \
312  static void write(int s, const T& x) { sendData(s, (const uint8_t*)&x, sizeof(x)); } \
313  static void read(int s, T* x) { recvData(s, (uint8_t*)x, sizeof(T)); } \
314  typedef uint8_t async_read_state; \
315  static void prepare(uint8_t* o) { *o = 0; } \
316  static bool accum(int s, uint8_t* o, T* x) { *o += recvDataPartial(s, ((uint8_t*)x) + *o, sizeof(T) - *o); return *o == sizeof(T); } \
317  }
318 _HNET_DEFINE_PRIMTYS(bool, "bool");
319 _HNET_DEFINE_PRIMTYS(uint8_t, "byte");
320 _HNET_DEFINE_PRIMTYS(char, "char");
321 _HNET_DEFINE_PRIMTYS(int16_t, "short");
322 _HNET_DEFINE_PRIMTYS(uint16_t, "short");
323 _HNET_DEFINE_PRIMTYS(int32_t, "int");
324 _HNET_DEFINE_PRIMTYS(uint32_t, "int");
325 _HNET_DEFINE_PRIMTYS(int64_t, "long");
326 _HNET_DEFINE_PRIMTYS(uint64_t, "long");
327 #ifdef __clang__
328 _HNET_DEFINE_PRIMTYS(size_t, "long");
329 #endif
330 _HNET_DEFINE_PRIMTYS(float, "float");
331 _HNET_DEFINE_PRIMTYS(double, "double");
332 
333 template <typename T, typename P = void>
334  struct cannot_memcpy {
335  typedef void type;
336  };
337 template <typename T>
338  struct cannot_memcpy<T, typename io<T>::can_memcpy> {
339  };
340 
341 // support unit
342 struct unit { unit() { } };
343 
344 template <>
345  struct io<unit> {
346  static void encode(bytes* out) { encode_primty("unit", out); }
347  static std::string describe() { return "()"; }
348  static void write(int s, const unit&) { }
349  static void read(int s, unit*) { }
350  };
351 
352 // support enumerations
353 #define HNET_ENUM_CTOR_DEF(n) n ,
354 #define HNET_ENUM_CTOR_CTOR(n) static const SelfT n() { return SelfT(SelfT::Enum::n); }
355 #define HNET_ENUM_CTORC_SUCC(n) +1
356 #define HNET_ENUM_CTOR_STR(n) + "|" #n
357 #define HNET_ENUM_CTOR_ENCODE(n) \
358  ::hobbes::net::ws(#n, out); \
359  ::hobbes::net::w((uint32_t)(Enum :: n), out); \
360  ::hobbes::net::encode_primty("unit", out);
361 #define HNET_ENUM_TOSTR_CASE(n) \
362  case SelfT :: Enum :: n: o << "|" #n "|"; break;
363 
364 #define DEFINE_HNET_ENUM(T, CTORS...) \
365  struct T { \
366  typedef void is_hnet_enum; \
367  enum class Enum : uint32_t { \
368  _HNET_MAP(HNET_ENUM_CTOR_DEF, CTORS) \
369  COUNT \
370  }; \
371  Enum value; \
372  T() : value() { } \
373  T(Enum v) : value(v) { } \
374  T& operator=(Enum v) { this->value = v; return *this; } \
375  operator Enum() { return this->value; } \
376  typedef T SelfT; \
377  _HNET_MAP(HNET_ENUM_CTOR_CTOR, CTORS) \
378  static void encode(::hobbes::net::bytes* out) { \
379  ::hobbes::net::w(_HNET_TYCTOR_VARIANT, out); \
380  ::hobbes::net::w((size_t)(0 _HNET_MAP(HNET_ENUM_CTORC_SUCC, CTORS)), out); \
381  _HNET_MAP(HNET_ENUM_CTOR_ENCODE, CTORS); \
382  } \
383  static std::string describe() { \
384  return (std::string("") _HNET_MAP(HNET_ENUM_CTOR_STR, CTORS)).substr(1); \
385  } \
386  bool operator==(const T& rhs) const { return this->value == rhs.value; } \
387  }; \
388  inline std::ostream& operator<<(std::ostream& o, const T& x) { \
389  typedef T SelfT; \
390  switch (x.value) { \
391  _HNET_MAP(HNET_ENUM_TOSTR_CASE, CTORS) \
392  default: \
393  o << "|?|"; \
394  break; \
395  } \
396  return o; \
397  }
398 
399 template <typename T>
400  struct io<T, typename T::is_hnet_enum> {
401  typedef void can_memcpy;
402  static void encode(bytes* out) { T::encode(out); }
403  static std::string describe() { return T::describe(); }
404  static size_t size(const T&) { return sizeof(T); }
405  static void write(int s, const T& x) { io<uint32_t>::write(s, (uint32_t)x.value); }
406  static void read(int s, T* x) { io<uint32_t>::read(s, (uint32_t*)&x->value); }
407 
408  typedef uint8_t async_read_state;
409  static void prepare(uint8_t* o) { *o = 0; }
410  static bool accum(int s, uint8_t* o, T* x) { *o += recvDataPartial(s, ((uint8_t*)x) + *o, sizeof(uint32_t) - *o); return *o == sizeof(uint32_t); }
411  };
412 
413 // support variants (with and without explicit constructor names)
414 #define DEFINE_HNET_VARIANT_GEN(T, VDECL, VCTORS, VCOPY, VDESTROY, CTORCOUNT, VENCODE, VDESC, VWRITE, VREAD, VVISITCASE, VEQCASE, CTAGS, CDATA, CAIOSTATES, CAIOSTATEINIT, CAIOREAD) \
415  template <typename R> \
416  struct T##Visitor { \
417  VDECL \
418  }; \
419  struct T { \
420  typedef void is_hnet_variant; \
421  typedef T SelfT; \
422  T() : tag(Enum::COUNT) { } \
423  VCTORS \
424  T(const T& rhs) : tag(rhs.tag) { \
425  switch (this->tag) { \
426  VCOPY \
427  default: break; \
428  } \
429  } \
430  ~T() { \
431  switch (this->tag) { \
432  VDESTROY \
433  default: break; \
434  } \
435  } \
436  T& operator=(const T& rhs) { \
437  if (this == &rhs) return *this; \
438  switch (this->tag) { \
439  VDESTROY \
440  default: break; \
441  } \
442  this->tag = rhs.tag; \
443  switch (this->tag) { \
444  VCOPY \
445  default: break; \
446  } \
447  return *this; \
448  } \
449  static void encode(::hobbes::net::bytes* out) { \
450  ::hobbes::net::w(_HNET_TYCTOR_VARIANT, out); \
451  ::hobbes::net::w((size_t)(0 CTORCOUNT), out); \
452  VENCODE; \
453  } \
454  static std::string describe() { \
455  return "|" + (std::string("") VDESC).substr(1) + "|"; \
456  } \
457  void write(int s) const { \
458  switch (this->tag) { \
459  VWRITE \
460  default: break; \
461  } \
462  } \
463  void read(int s) { \
464  switch (this->tag) { \
465  VDESTROY \
466  default: break; \
467  } \
468  this->tag = Enum::COUNT; \
469  ::hobbes::net::io<uint32_t>::read(s, (uint32_t*)&this->tag); \
470  switch (this->tag) { \
471  VREAD \
472  default: break; \
473  } \
474  } \
475  template <typename R> \
476  R visit(const T##Visitor<R>& v) const { \
477  switch (this->tag) { \
478  VVISITCASE \
479  default: throw std::runtime_error("while deconstructing the " #T " variant, cannot decide payload type because tag is invalid"); \
480  } \
481  } \
482  bool operator==(const T& rhs) const { \
483  if (this->tag != rhs.tag) { \
484  return false; \
485  } else { \
486  switch (this->tag) { \
487  VEQCASE \
488  default: return false; \
489  } \
490  } \
491  } \
492  private: \
493  enum class Enum : uint32_t { \
494  CTAGS \
495  COUNT \
496  }; \
497  Enum tag; \
498  union { \
499  char data[1]; \
500  CDATA \
501  }; \
502  public: \
503  typedef ::hobbes::net::io<uint32_t>::async_read_state TagState; \
504  union PayloadState { \
505  char data[1]; \
506  CAIOSTATES \
507  }; \
508  struct async_read_state { \
509  bool readTag; \
510  TagState tagS; \
511  PayloadState payloadS; \
512  }; \
513  static void prepare(async_read_state* o) { o->readTag = true; ::hobbes::net::io<uint32_t>::prepare(&o->tagS); } \
514  static bool accum(int s, async_read_state* o, T* x) { \
515  if (o->readTag) { \
516  if (::hobbes::net::io<uint32_t>::accum(s, &o->tagS, (uint32_t*)&x->tag)) { \
517  o->readTag = false; \
518  switch (x->tag) { \
519  CAIOSTATEINIT \
520  default: break; \
521  } \
522  } \
523  } else { \
524  switch (x->tag) { \
525  CAIOREAD \
526  default: break; \
527  } \
528  } \
529  return false; \
530  } \
531  }
532 
533 // (with implicit ctor names)
534 #define HNET_VARIANT_CTOR(n, t) static SelfT n(const t & x) { SelfT r; r.tag = Enum::tag_##n; new (r.data) t(x); return r; }
535 #define HNET_VARIANT_CTOR_STR(n, t) + "," #n ":" + ::hobbes::net::io< t >::describe()
536 #define HNET_VARIANT_CTOR_TAG(n, t) tag_##n,
537 #define HNET_VARIANT_WRITE_CASE(n, t) case Enum::tag_##n: ::hobbes::net::io<int>::write(s, (int)this->tag); ::hobbes::net::io< t >::write(s, this->n##_data); break;
538 #define HNET_VARIANT_READ_CASE(n, t) case Enum::tag_##n: new (this->data) t(); ::hobbes::net::io< t >::read(s, &this->n##_data); break;
539 #define HNET_VARIANT_PCOPY(n, t) case Enum::tag_##n: new (this->data) t(rhs.n##_data); break;
540 #define HNET_VARIANT_PDESTROY(n, t) case Enum::tag_##n: { typedef t __DT; ((__DT*)&this->n##_data)->~__DT(); } break;
541 #define HNET_VARIANT_SUCC(n, t) +1
542 #define HNET_VARIANT_CTOR_OPAQUEDATA(n, t) t n##_data;
543 #define HNET_VARIANT_CTOR_ENCODE(n, t) \
544  ::hobbes::net::ws(#n, out); \
545  ::hobbes::net::w((uint32_t)Enum::tag_##n, out); \
546  ::hobbes::net::io< t >::encode(out);
547 
548 #define HNET_VARIANT_VDECL(n, t) virtual R n(const t & x) const = 0;
549 #define HNET_VARIANT_VCASE(n, t) case Enum::tag_##n: return v.n (this->n##_data);
550 #define HNET_VARIANT_EQCASE(n, t) case Enum::tag_##n: return (this->n##_data == rhs.n##_data);
551 
552 #define HNET_VARIANT_AIOSTATE_DATA(n, t) ::hobbes::net::io<t>::async_read_state n##_aioS;
553 #define HNET_VARIANT_AIOSTATE_INIT(n, t) case Enum::tag_##n: { new (o->payloadS.data) ::hobbes::net::io<t>::async_read_state(); ::hobbes::net::io<t>::prepare(&o->payloadS.n##_aioS); new (x->data) t(); } break;
554 #define HNET_VARIANT_AIOSTATE_ACCUM(n, t) case Enum::tag_##n: return ::hobbes::net::io<t>::accum(s, &o->payloadS.n##_aioS, &x->n##_data);
555 
556 #define DEFINE_HNET_VARIANT(T, CTORS...) \
557  DEFINE_HNET_VARIANT_GEN(T, _HNET_MAP(HNET_VARIANT_VDECL, CTORS), _HNET_MAP(HNET_VARIANT_CTOR, CTORS), _HNET_MAP(HNET_VARIANT_PCOPY, CTORS), _HNET_MAP(HNET_VARIANT_PDESTROY, CTORS), _HNET_MAP(HNET_VARIANT_SUCC, CTORS), _HNET_MAP(HNET_VARIANT_CTOR_ENCODE, CTORS), _HNET_MAP(HNET_VARIANT_CTOR_STR, CTORS), _HNET_MAP(HNET_VARIANT_WRITE_CASE, CTORS), _HNET_MAP(HNET_VARIANT_READ_CASE, CTORS), _HNET_MAP(HNET_VARIANT_VCASE, CTORS), _HNET_MAP(HNET_VARIANT_EQCASE, CTORS), _HNET_MAP(HNET_VARIANT_CTOR_TAG, CTORS), _HNET_MAP(HNET_VARIANT_CTOR_OPAQUEDATA, CTORS), _HNET_MAP(HNET_VARIANT_AIOSTATE_DATA, CTORS), _HNET_MAP(HNET_VARIANT_AIOSTATE_INIT, CTORS), _HNET_MAP(HNET_VARIANT_AIOSTATE_ACCUM, CTORS))
558 
559 // (with explicit ctor names)
560 #define HNET_VARIANT_LBL_CTOR(n, lbl, t) static SelfT n(const t & x) { SelfT r; r.tag = Enum::tag_##n; new (r.data) t(x); return r; }
561 #define HNET_VARIANT_LBL_CTOR_STR(n, lbl, t) + "," #n ":" + ::hobbes::net::io< t >::describe()
562 #define HNET_VARIANT_LBL_CTOR_TAG(n, lbl, t) tag_##n,
563 #define HNET_VARIANT_LBL_WRITE_CASE(n, lbl, t) case Enum::tag_##n: ::hobbes::net::io<int>::write(s, (int)this->tag); ::hobbes::net::io< t >::write(s, this->n##_data); break;
564 #define HNET_VARIANT_LBL_READ_CASE(n, lbl, t) case Enum::tag_##n: new (this->data) t(); ::hobbes::net::io< t >::read(s, &this->n##_data); break;
565 #define HNET_VARIANT_LBL_PCOPY(n, lbl, t) case Enum::tag_##n: new (this->data) t(rhs.n##_data); break;
566 #define HNET_VARIANT_LBL_PDESTROY(n, lbl, t) case Enum::tag_##n: { typedef t __DT; ((__DT*)&this->n##_data)->~__DT(); } break;
567 #define HNET_VARIANT_LBL_SUCC(n, lbl, t) +1
568 #define HNET_VARIANT_LBL_CTOR_OPAQUEDATA(n, lbl, t) t n##_data;
569 #define HNET_VARIANT_LBL_CTOR_ENCODE(n, lbl, t) \
570  ::hobbes::net::ws(#lbl, out); \
571  ::hobbes::net::w((uint32_t)Enum::tag_##n, out); \
572  ::hobbes::net::io< t >::encode(out);
573 
574 #define HNET_VARIANT_LBL_VDECL(n, lbl, t) virtual R n(const t & x) const = 0;
575 #define HNET_VARIANT_LBL_VCASE(n, lbl, t) case Enum::tag_##n: return v. n (this->n##_data);
576 #define HNET_VARIANT_LBL_EQCASE(n, lbl, t) case Enum::tag_##n: return (this->n##_data == rhs.n##_data);
577 
578 #define HNET_VARIANT_LBL_CTOR_AIOSTATE_DATA(n, lbl, t) ::hobbes::net::io<t>::async_read_state n##_aioS;
579 #define HNET_VARIANT_LBL_CTOR_AIOSTATE_INIT(n, lbl, t) case Enum::tag_##n: { new (o->payloadS.data) ::hobbes::net::io<t>::async_read_state(); ::hobbes::net::io<t>::prepare(&o->payloadS.n##_aioS); new (x->data) t(); } break;
580 #define HNET_VARIANT_LBL_CTOR_AIOSTATE_ACCUM(n, lbl, t) case Enum::tag_##n: return ::hobbes::net::io<t>::accum(s, &o->payloadS.n##_aioS, &x->n##_data);
581 
582 #define DEFINE_HNET_VARIANT_WITH_LABELS(T, CTORS...) \
583  DEFINE_HNET_VARIANT_GEN(T, _HNET_MAP(HNET_VARIANT_LBL_VDECL, CTORS), _HNET_MAP(HNET_VARIANT_LBL_CTOR, CTORS), _HNET_MAP(HNET_VARIANT_LBL_PCOPY, CTORS), _HNET_MAP(HNET_VARIANT_LBL_PDESTROY, CTORS), _HNET_MAP(HNET_VARIANT_LBL_SUCC, CTORS), _HNET_MAP(HNET_VARIANT_LBL_CTOR_ENCODE, CTORS), _HNET_MAP(HNET_VARIANT_LBL_CTOR_STR, CTORS), _HNET_MAP(HNET_VARIANT_LBL_WRITE_CASE, CTORS), _HNET_MAP(HNET_VARIANT_LBL_READ_CASE, CTORS), _HNET_MAP(HNET_VARIANT_LBL_VCASE, CTORS), _HNET_MAP(HNET_VARIANT_LBL_EQCASE, CTORS), _HNET_MAP(HNET_VARIANT_LBL_CTOR_TAG, CTORS), _HNET_MAP(HNET_VARIANT_LBL_CTOR_OPAQUEDATA, CTORS), _HNET_MAP(HNET_VARIANT_LBL_CTOR_AIOSTATE_DATA, CTORS), _HNET_MAP(HNET_VARIANT_LBL_CTOR_AIOSTATE_INIT, CTORS), _HNET_MAP(HNET_VARIANT_LBL_CTOR_AIOSTATE_ACCUM, CTORS))
584 
585 template <typename T>
586  struct io<T, typename T::is_hnet_variant> {
587  static void encode(bytes* out) { T::encode(out); }
588  static std::string describe() { return T::describe(); }
589  static void write(int s, const T& x) { return x.write(s); }
590  static void read(int s, T* x) { return x->read(s); }
591 
592  // async reading of variants
593  typedef typename T::async_read_state async_read_state;
594  static void prepare(async_read_state* o) { T::prepare(o); }
595  static bool accum(int s, async_read_state* o, T* x) { return T::accum(s, o, x); }
596  };
597 
598 // support pairs
599 template <typename U, typename V>
600  struct io<std::pair<U,V>> {
601  static void encode(bytes* out) {
602  w(_HNET_TYCTOR_STRUCT, out);
603  w((size_t)2, out);
604 
605  ws(".f0", out);
606  w((int)-1, out);
607  io<U>::encode(out);
608 
609  ws(".f1", out);
610  w((int)-1, out);
611  io<V>::encode(out);
612  }
613  static std::string describe() {
614  return "(" + io<U>::describe() + "*" + io<V>::describe() + ")";
615  }
616  static void write(int s, const std::pair<U,V>& p) {
617  io<U>::write(s, p.first);
618  io<V>::write(s, p.second);
619  }
620  static void read(int s, std::pair<U,V>* p) {
621  io<U>::read(s, &p->first);
622  io<V>::read(s, &p->second);
623  }
624 
625  // async reading of pairs
626  struct async_read_state {
627  typedef typename io<U>::async_read_state Ustate;
628  typedef typename io<V>::async_read_state Vstate;
629 
630  bool readFirst;
631  Ustate fstState;
632  Vstate sndState;
633  };
634 
635  static void prepare(async_read_state* o) {
636  o->readFirst = true;
637  io<U>::prepare(&o->fstState);
638  io<V>::prepare(&o->sndState);
639  }
640 
641  static bool accum(int s, async_read_state* o, std::pair<U, V>* x) {
642  if (o->readFirst) {
643  if (io<U>::accum(s, &o->fstState, &x->first)) {
644  o->readFirst = false;
645  }
646  } else {
647  if (io<V>::accum(s, &o->sndState, &x->second)) {
648  return true;
649  }
650  }
651  return false;
652  }
653  };
654 
655 // support vectors of mem-copyable type
656 template <typename T>
657  struct io<std::vector<T>, typename io<T>::can_memcpy> {
658  static void encode(bytes* out) { w(_HNET_TYCTOR_ARR, out); io<T>::encode(out); }
659  static std::string describe() { return "[" + io<T>::describe() + "]"; }
660  static void write(int s, const std::vector<T>& x) { size_t n = x.size(); io<size_t>::write(s, n); if (n > 0) sendData(s, (const uint8_t*)&x[0], sizeof(T) * n); }
661  static void read(int s, std::vector<T>* x) { size_t n = 0; io<size_t>::read(s, &n); x->resize(n); if (n > 0) recvData(s, (uint8_t*)&(*x)[0], n); }
662 
663  // async reading of mem-copyable vectors
664  struct async_read_state {
666 
667  bool readLen;
668  LenS lenS;
669  size_t bytesRead;
670  size_t byteLen;
671  };
672 
673  static void prepare(async_read_state* o) {
674  o->readLen = true;
675  io<size_t>::prepare(&o->lenS);
676  }
677 
678  static bool accum(int s, async_read_state* o, std::vector<T>* x) {
679  if (o->readLen) {
680  if (io<size_t>::accum(s, &o->lenS, &o->byteLen)) {
681  x->resize(o->byteLen);
682  o->bytesRead = 0;
683  o->byteLen = sizeof(T) * o->byteLen;
684  o->readLen = false;
685  }
686  } else {
687  uint8_t* buf = (uint8_t*)&((*x)[0]);
688  o->bytesRead += recvDataPartial(s, buf + o->bytesRead, o->byteLen - o->bytesRead);
689  }
690  return !o->readLen && o->bytesRead == o->byteLen;
691  }
692  };
693 
694 template <typename T>
695  struct io<std::vector<T>, typename cannot_memcpy<T>::type> {
696  static void encode(bytes* out) { w(_HNET_TYCTOR_ARR, out); io<T>::encode(out); }
697  static std::string describe() { return "[" + io<T>::describe() + "]"; }
698  static void write(int s, const std::vector<T>& x) {
699  size_t n = x.size();
700  io<size_t>::write(s, n);
701  for (size_t i = 0; i < n; ++i) {
702  io<T>::write(s, x[i]);
703  }
704  }
705  static void read(int s, std::vector<T>* x) {
706  size_t n = 0;
707  io<size_t>::read(s, &n);
708  x->resize(n);
709  for (size_t i = 0; i < n; ++i) {
710  io<T>::read(s, &(*x)[i]);
711  }
712  }
713 
714  // async reading of vectors
715  struct async_read_state {
717  typedef typename io<T>::async_read_state ElemS;
718 
719  bool readLen;
720  LenS lenS;
721  size_t idx;
722  ElemS elemS;
723  };
724 
725  static void prepare(async_read_state* o) {
726  o->readLen = true;
727  io<size_t>::prepare(&o->lenS);
728  }
729 
730  static bool accum(int s, async_read_state* o, std::vector<T>* x) {
731  if (o->readLen) {
732  if (io<size_t>::accum(s, &o->lenS, &o->idx)) {
733  x->resize(o->idx);
734  o->idx = 0;
735  o->readLen = false;
736  io<T>::prepare(&o->elemS);
737  }
738  } else {
739  if (io<T>::accum(s, &o->elemS, &(*x)[o->idx])) {
740  ++o->idx;
741  io<T>::prepare(&o->elemS);
742  }
743  }
744  return !o->readLen && o->idx == x->size();
745  }
746  };
747 
748 // support maps (as if vectors of pairs)
749 template <typename K, typename T>
750  struct io<std::map<K,T>> {
751  static void encode(bytes* out) { io<std::vector<std::pair<K,T>>>::encode(out); }
752  static std::string describe() { return io<std::vector<std::pair<K,T>>>::describe(); }
753  static void write(int s, const std::map<K,T>& x) {
754  size_t n = x.size();
755  io<size_t>::write(s, n);
756  for (const auto& xp : x) {
757  io<K>::write(s, xp.first);
758  io<T>::write(s, xp.second);
759  }
760  }
761  static void read(int s, std::map<K,T>* x) {
762  size_t n = 0;
763  io<size_t>::read(s, &n);
764  for (size_t i = 0; i < n; ++i) {
765  K k;
766  io<K>::read(s, &k);
767  T t;
768  io<T>::read(s, &t);
769  (*x)[k] = t;
770  }
771  }
772 
774  typedef typename io<K>::async_read_state KS;
775  typedef typename io<T>::async_read_state TS;
776  enum class ReadS : uint8_t { LenS, KS, TS };
777 
778  struct async_read_state {
780  LenS lenS;
781  size_t len;
782  KS kS;
783  K k;
784  TS tS;
785  T t;
786  };
787  static void prepare(async_read_state* o) {
788  o->readS = ReadS::LenS;
789  io<size_t>::prepare(&o->lenS);
790  }
791  static bool accum(int s, async_read_state* o, std::map<K,T>* x) {
792  switch (o->readS) {
793  case ReadS::LenS:
794  if (io<size_t>::accum(s, &o->lenS, &o->len)) {
795  o->readS = ReadS::KS;
796  io<K>::prepare(&o->kS);
797  }
798  break;
799  case ReadS::KS:
800  if (io<K>::accum(s, &o->kS, &o->k)) {
801  o->readS = ReadS::TS;
802  io<T>::prepare(&o->tS);
803  }
804  break;
805  case ReadS::TS:
806  if (io<T>::accum(s, &o->tS, &o->t)) {
807  (*x)[o->k] = o->t;
808  --o->len;
809  o->readS = ReadS::KS;
810  io<K>::prepare(&o->kS);
811  }
812  break;
813  }
814  return o->readS != ReadS::LenS && o->len == 0;
815  }
816  };
817 
818 // support strings (but const char* can only be sent, not received)
819 template <>
820  struct io<const char*> {
821  static void encode(bytes* out) { w(_HNET_TYCTOR_ARR, out); io<char>::encode(out); }
822  static std::string describe() { return "[char]"; }
823  static void write(int s, const char* x) { size_t n = strlen(x); io<size_t>::write(s, n); sendData(s, (const uint8_t*)x, n); }
824  };
825 template <>
826  struct io<std::string> {
827  static void encode(bytes* out) { w(_HNET_TYCTOR_ARR, out); io<char>::encode(out); }
828  static std::string describe() { return "[char]"; }
829  static void write(int s, const std::string& x) { io<size_t>::write(s, x.size()); sendData(s, (const uint8_t*)x.data(), x.size()); }
830  static void read(int s, std::string* x) { size_t n = 0; io<size_t>::read(s, &n); x->resize(n); recvData(s, (uint8_t*)x->data(), n); }
831 
832  // async reading of strings
833  struct async_read_state {
835 
836  bool readLen;
837  LenS lenS;
838  size_t bytesRead;
839  size_t byteLen;
840  };
841 
842  static void prepare(async_read_state* o) {
843  o->readLen = true;
844  io<size_t>::prepare(&o->lenS);
845  }
846 
847  static bool accum(int s, async_read_state* o, std::string* x) {
848  if (o->readLen) {
849  if (io<size_t>::accum(s, &o->lenS, &o->byteLen)) {
850  x->resize(o->byteLen);
851  o->bytesRead = 0;
852  o->readLen = false;
853  }
854  } else {
855  uint8_t* buf = (uint8_t*)&((*x)[0]);
856  o->bytesRead += recvDataPartial(s, buf + o->bytesRead, o->byteLen - o->bytesRead);
857  }
858  return !o->readLen && o->bytesRead == o->byteLen;
859  }
860  };
861 
862 // support standard, reflective structs
863 #define HNET_FIELDC_SUCC(t,n) +1
864 #define HNET_FIELD_COUNT(FIELDS...) (0 _HNET_MAP(HNET_FIELDC_SUCC, FIELDS))
865 
866 #define HNET_FIELDW_SUCC(t,n) ::hobbes::net::io< t >::write(s, this-> n);
867 #define HNET_FIELD_WRITES(FIELDS...) _HNET_MAP(HNET_FIELDW_SUCC, FIELDS)
868 
869 #define HNET_FIELDR_SUCC(t,n) ::hobbes::net::io< t >::read(s, &this-> n);
870 #define HNET_FIELD_READS(FIELDS...) _HNET_MAP(HNET_FIELDR_SUCC, FIELDS)
871 
872 #define HNET_STRUCT_FIELD(t, n) t n;
873 #define HNET_STRUCT_FIELD_ENC(t, n) ::hobbes::net::ws(#n, out); ::hobbes::net::w((int)-1, out); ::hobbes::net::io< t >::encode(out);
874 #define HNET_STRUCT_FIELD_DESC(t, n) + (", " #n " : " + ::hobbes::net::io< t >::describe())
875 
876 #define HNET_FIELD_ASYNC_READ_IDX_DEF(_, n) readAt_##n,
877 #define HNET_FIELD_ASYNC_READ_STATE_DEC(t, n) ::hobbes::net::io< t >::async_read_state n;
878 #define HNET_FIELD_ASYNC_READ_STATE_INIT(t, n) ::hobbes::net::io< t >::prepare(&o->fieldStates.n);
879 #define HNET_FIELD_ASYNC_READ_FIELD(t, n) case FieldIndex::readAt_##n: if (::hobbes::net::io< t >::accum(s, &o->fieldStates.n, &x->n)) { o->idx = (FieldIndex)(((uint32_t)o->idx) + 1); } break;
880 
881 #define DEFINE_HNET_STRUCT(T, FIELDS...) \
882  struct T { \
883  _HNET_MAP(HNET_STRUCT_FIELD, FIELDS) /* struct fields */ \
884  typedef void is_hnet_struct; /* identify this type as a struct */ \
885  static void encode(::hobbes::net::bytes* out) { \
886  size_t arity = HNET_FIELD_COUNT(FIELDS); \
887  if (arity > 0) { \
888  ::hobbes::net::w(_HNET_TYCTOR_STRUCT, out); \
889  ::hobbes::net::w(arity, out); \
890  _HNET_MAP(HNET_STRUCT_FIELD_ENC, FIELDS) \
891  } else { \
892  ::hobbes::net::encode_primty("unit", out); \
893  } \
894  } \
895  static std::string describe() { \
896  return "{" + ( std::string("") _HNET_MAP(HNET_STRUCT_FIELD_DESC, FIELDS) ).substr(2) + "}"; \
897  } \
898  void write(int s) const { \
899  HNET_FIELD_WRITES(FIELDS); \
900  } \
901  void read(int s) { \
902  HNET_FIELD_READS(FIELDS); \
903  } \
904  /* async reading */ \
905  enum class FieldIndex : uint32_t { \
906  _ZeroInit = 0, \
907  _HNET_MAP(HNET_FIELD_ASYNC_READ_IDX_DEF, FIELDS) \
908  _FieldListEnd \
909  }; \
910  struct async_read_state { \
911  FieldIndex idx; \
912  struct { \
913  _HNET_MAP(HNET_FIELD_ASYNC_READ_STATE_DEC, FIELDS) \
914  } fieldStates; \
915  }; \
916  static void prepare(async_read_state* o) { \
917  o->idx = (FieldIndex)1; \
918  _HNET_MAP(HNET_FIELD_ASYNC_READ_STATE_INIT, FIELDS) \
919  } \
920  static bool accum(int s, async_read_state* o, T* x) { \
921  switch (o->idx) { \
922  _HNET_MAP(HNET_FIELD_ASYNC_READ_FIELD, FIELDS) \
923  default: break; \
924  } \
925  return o->idx == FieldIndex::_FieldListEnd; \
926  } \
927  }
928 
929 template <typename T>
930  struct io<T, typename T::is_hnet_struct> {
931  static void encode(bytes* out) { T::encode(out); }
932  static std::string describe() { return T::describe(); }
933  static void write(int s, const T& x) { x.write(s); }
934  static void read(int s, T* x) { x->read(s); }
935 
936  typedef typename T::async_read_state async_read_state;
937  static void prepare(async_read_state* o) { T::prepare(o); }
938  static bool accum(int s, async_read_state* o, T* x) { return T::accum(s, o, x); }
939  };
940 
941 // tuple/sequence serialization
942 template <typename ... Ts>
943  struct oSeq {
944  static const size_t count = 0;
945  static void encode(size_t,bytes*) { }
946  static std::string describe() { return ""; }
947  static void write(int, const Ts&...) { }
948  };
949 template <typename T, typename ... Ts>
950  struct oSeq<T, Ts...> {
951  static const size_t count = 1 + oSeq<Ts...>::count;
952 
953  static void encode(size_t f, bytes* out) {
954  char fn[32];
955  snprintf(fn, sizeof(fn), ".f%lld", (unsigned long long)f);
956  ws(fn, out);
957  w((int)-1, out);
958  io<T>::encode(out);
959 
960  oSeq<Ts...>::encode(f+1, out);
961  }
962  static std::string describe() {
963  return io<T>::describe() + "*" + oSeq<Ts...>::describe();
964  }
965  static void write(int socket, const T& x, const Ts&... xs) {
966  io<T>::write(socket, x);
967  oSeq<Ts...>::write(socket, xs...);
968  }
969  };
970 
971 template <size_t i, size_t e, typename ... Ts>
972  struct ioTuple {
973  static void write(int s, const std::tuple<Ts...>& t) {
974  io<typename std::tuple_element<i, std::tuple<Ts...>>::type>::write(s, std::get<i>(t));
976  }
977  static void read(int s, std::tuple<Ts...>* t) {
978  io<typename std::tuple_element<i, std::tuple<Ts...>>::type>::read(s, &std::get<i>(*t));
980  }
981  };
982 template <size_t e, typename ... Ts>
983  struct ioTuple<e, e, Ts...> {
984  static void write(int s, const std::tuple<Ts...>&) { }
985  static void read(int s, std::tuple<Ts...>*) { }
986  };
987 
988 template <typename ... Ts>
989  struct aioTupleState {
990  typedef std::tuple<> async_read_state;
991  static void prepare(async_read_state*) { }
992  };
993 template <typename T, typename ... Ts>
994  struct aioTupleState<T, Ts...> {
995  typedef typename io<T>::async_read_state FstRS;
996  typedef typename aioTupleState<Ts...>::async_read_state SndRS;
997  typedef std::pair<FstRS, SndRS> async_read_state;
998 
999  static void prepare(async_read_state* o) {
1000  io<T>::prepare(&o->first);
1001  aioTupleState<Ts...>::prepare(&o->second);
1002  }
1003  };
1004 template <size_t i, typename T>
1005  struct aioTupleStateAt { };
1006 template <typename U, typename V>
1007  struct aioTupleStateAt<0, std::pair<U,V>> {
1008  typedef U type;
1009  static U* get(std::pair<U,V>* x) { return &x->first; }
1010  };
1011 template <size_t i, typename U, typename V>
1012  struct aioTupleStateAt<i, std::pair<U,V>> {
1013  typedef typename aioTupleStateAt<i-1, V>::type type;
1014  static type* get(std::pair<U,V>* x) { return aioTupleStateAt<i-1, V>::get(&x->second); }
1015  };
1016 
1017 template <size_t i, size_t e, typename ... Ts>
1018  struct aioTupleRead {
1019  static bool accum(int s, size_t ti, typename aioTupleState<Ts...>::async_read_state* o, std::tuple<Ts...>* x) {
1020  if (ti == i) {
1021  return io<typename std::tuple_element<i, std::tuple<Ts...>>::type>::accum(s, aioTupleStateAt<i, typename aioTupleState<Ts...>::async_read_state>::get(o), &std::get<i>(*x));
1022  } else {
1023  return aioTupleRead<i+1, e, Ts...>::accum(s, ti, o, x);
1024  }
1025  }
1026  };
1027 template <size_t e, typename ... Ts>
1028  struct aioTupleRead<e, e, Ts...> {
1029  static bool accum(int s, size_t, typename aioTupleState<Ts...>::async_read_state*, std::tuple<Ts...>*) { return true; }
1030  };
1031 
1032 template <typename ... Ts>
1033  struct io< std::tuple<Ts...> > {
1034  static void encode(bytes* out) {
1035  size_t arity = oSeq<Ts...>::count;
1036 
1037  if (arity > 0) {
1038  w(_HNET_TYCTOR_STRUCT, out);
1039  w(arity, out);
1040  oSeq<Ts...>::encode(0, out);
1041  } else {
1042  encode_primty("unit", out);
1043  }
1044  }
1045  static std::string describe() {
1046  std::string x = oSeq<Ts...>::describe();
1047  return (x.size() > 1) ? x.substr(0,x.size()-1) : x;
1048  }
1049  static void write(int s, const std::tuple<Ts...>& t) {
1050  ioTuple<0, std::tuple_size<std::tuple<Ts...>>::value, Ts...>::write(s, t);
1051  }
1052  static void read(int s, std::tuple<Ts...>* t) {
1053  ioTuple<0, std::tuple_size<std::tuple<Ts...>>::value, Ts...>::read(s, t);
1054  }
1055 
1056  // async reading of tuples
1058  static void prepare(async_read_state* o) { o->first = 0; aioTupleState<Ts...>::prepare(&o->second); }
1059  static bool accum(int s, async_read_state* o, std::tuple<Ts...>* x) {
1060  if (aioTupleRead<0, std::tuple_size<std::tuple<Ts...>>::value, Ts...>::accum(s, o->first, &o->second, x)) {
1061  ++o->first;
1062  }
1063  return o->first == oSeq<Ts...>::count;
1064  }
1065  };
1066 
1067 // support opaque type aliases
1068 #define DEFINE_HNET_TYPE_ALIAS(_ATY, _REPTY) \
1069  struct _ATY { \
1070  typedef void is_hnet_alias; \
1071  typedef _REPTY type; \
1072  static const char* name() { return #_ATY; } \
1073  inline operator _REPTY() { return this->value; } \
1074  _REPTY value; \
1075  _ATY() : value() { } \
1076  _ATY(const _REPTY& x) : value(x) { } \
1077  _ATY(const _ATY& x) : value(x.value) { } \
1078  _ATY& operator=(const _ATY& x) { this->value = x.value; return *this; } \
1079  }
1080 
1081 template <typename T>
1082  struct io<T, typename T::is_hnet_alias> {
1083  static void encode(bytes* out) { w(_HNET_TYCTOR_PRIM, out); ws(T::name(), out); w((bool)true, out); io<typename T::type>::encode(out); }
1084  static std::string describe() { return T::name(); }
1085  static void write(int s, const T& x) { io<typename T::type>::write(s, x.value); }
1086  static void read(int s, T* x) { io<typename T::type>::read(s, &x->value); }
1087 
1089  static void prepare(async_read_state* o) { io<typename T::type>::prepare(o); }
1090  static bool accum(int s, async_read_state* o, T* x) { return io<typename T::type>::accum(s, o, &x->value); }
1091  };
1092 /*****************************
1093  * END serialization by type / type-family
1094  *****************************/
1095 
1096 
1097 // convert C++ compile-time types to hobbes type descriptions
1098 template <typename F>
1099  struct RPCTyDef {
1100  };
1101 template <typename R, typename ... Args>
1102  struct RPCTyDef<R(Args...)> {
1103  static bytes inputType() { bytes r; io<std::tuple<Args...>>::encode(&r); return r; }
1104  static bytes outputType() { bytes r; io<R>::encode(&r); return r; }
1105  };
1106 template <typename ... Args>
1107  struct RPCTyDef<void(Args...)> {
1108  static bytes inputType() { bytes r; io<std::tuple<Args...>>::encode(&r); return r; }
1109  static bytes outputType() { bytes r; encode_primty("unit", &r); return r; }
1110  };
1111 
1112 // synchronous request/reply
1113 template <typename F>
1114  struct RPCFunc {
1115  };
1116 template <typename R, typename ... Args>
1117  struct RPCFunc<R(Args...)> {
1118  RPCFunc(int* socket, uint32_t exprid) : socket(socket), exprid(exprid) { }
1119 
1120  R operator()(const Args&... args) {
1121  int s = *this->socket;
1122 
1124  io<uint32_t>::write(s, this->exprid);
1125  oSeq<Args...>::write(s, args...);
1126 
1127  R result;
1128  io<R>::read(s, &result);
1129  return result;
1130  }
1131  private:
1132  int* socket;
1133  uint32_t exprid;
1134  };
1135 template <typename ... Args>
1136  struct RPCFunc<void(Args...)> {
1137  RPCFunc(int* socket, uint32_t exprid) : socket(socket), exprid(exprid) { }
1138 
1139  void operator()(const Args&... args) {
1140  int s = *this->socket;
1141 
1143  io<uint32_t>::write(s, this->exprid);
1144  oSeq<Args...>::write(s, args...);
1145  }
1146  private:
1147  int* socket;
1148  uint32_t exprid;
1149  };
1150 
1151 #define _HNET_CLIENT_MAKE_EXPRID(n, _, __) , exprID_##n
1152 #define _HNET_CLIENT_MAKE_RPCDEF(n, t, e) result.push_back(::hobbes::net::RPCDef((uint32_t)exprID_##n, e, ::hobbes::net::RPCTyDef<t>::inputType(), ::hobbes::net::RPCTyDef<t>::outputType()));
1153 #define _HNET_CLIENT_INIT_RPCFUNC(n, t, _) , n(&this->s, (uint32_t)exprID_##n)
1154 #define _HNET_CLIENT_MAKE_RPCFUNC(n, t, _) ::hobbes::net::RPCFunc<t> n;
1155 
1156 #define DEFINE_NET_CLIENT(T, C...) \
1157  class T { \
1158  private: \
1159  int s; \
1160  public: \
1161  T(int fd) : s(::hobbes::net::initSession(fd, makeRPCDefs())) _HNET_MAP(_HNET_CLIENT_INIT_RPCFUNC, C) { } \
1162  T(const std::string& host, size_t port) : T(::hobbes::net::makeConnection(host, port)) { } \
1163  T(const std::string& host, const std::string& port) : T(::hobbes::net::makeConnection(host, port)) { } \
1164  T(const std::string& hostport) : T(::hobbes::net::makeConnection(hostport)) { } \
1165  virtual ~T() { closeC(); } \
1166  int fd() const { return this->s; } \
1167  void reconnect(int fd) { closeC(); this->s = ::hobbes::net::initSession(fd, makeRPCDefs()); } \
1168  void reconnect(const std::string& host, size_t port) { reconnect(::hobbes::net::makeConnection(host, port)); } \
1169  void reconnect(const std::string& host, const std::string& port) { reconnect(::hobbes::net::makeConnection(host, port)); } \
1170  void reconnect(const std::string& hostport) { reconnect(::hobbes::net::makeConnection(hostport)); } \
1171  \
1172  _HNET_MAP(_HNET_CLIENT_MAKE_RPCFUNC, C) \
1173  private: \
1174  enum ExprIDs { \
1175  NullExpr = 0 \
1176  _HNET_MAP(_HNET_CLIENT_MAKE_EXPRID, C) \
1177  }; \
1178  static ::hobbes::net::RPCDefs makeRPCDefs() { \
1179  ::hobbes::net::RPCDefs result; \
1180  _HNET_MAP(_HNET_CLIENT_MAKE_RPCDEF, C) \
1181  return result; \
1182  } \
1183  void closeC() { \
1184  ::close(this->s); \
1185  } \
1186  };
1187 
1188 // asynchronous request/reply
1189 struct AsyncReader { virtual bool readAndFinish() = 0; };
1190 struct AsyncScheduler { virtual void enqueue(AsyncReader*) = 0; };
1191 
1192 template <typename F>
1193  struct AsyncRPCFunc {
1194  };
1195 template <typename R, typename ... Args>
1196  struct AsyncRPCFunc<R(Args...)> : public AsyncReader {
1197  typedef std::function<void(const R&)> K;
1198 
1199  AsyncRPCFunc(AsyncScheduler* sched, int* socket, uint32_t exprid) :
1200  sched(sched), socket(socket), exprid(exprid)
1201  {
1202  io<R>::prepare(&this->pr);
1203  }
1204 
1205  void operator()(const Args&... args, const K& k) {
1206  int s = *this->socket;
1207 
1208  // block to write input
1209  setBlockingBit(s, true);
1211  io<uint32_t>::write(s, this->exprid);
1212  oSeq<Args...>::write(s, args...);
1213 
1214  // don't block to read output
1215  setBlockingBit(s, false);
1216  this->ks.push(k);
1217  this->sched->enqueue(this);
1218  }
1219 
1220  bool readAndFinish() {
1221  if (io<R>::accum(*this->socket, &this->pr, &this->r)) {
1222  this->ks.front()(this->r);
1223  this->ks.pop();
1224  this->r = R();
1225  io<R>::prepare(&this->pr);
1226  return true;
1227  } else {
1228  return false;
1229  }
1230  }
1231  private:
1233  int* socket;
1234  uint32_t exprid;
1235 
1237  typedef std::queue<K> KS;
1238 
1239  KS ks;
1240  R r;
1241  async_read_state pr;
1242  };
1243 template <typename ... Args>
1244  struct AsyncRPCFunc<void(Args...)> {
1245  AsyncRPCFunc(AsyncScheduler*, int* socket, uint32_t exprid) : socket(socket), exprid(exprid) { }
1246 
1247  void operator()(const Args&... args) {
1248  int s = *this->socket;
1249 
1250  // block to write input
1251  setBlockingBit(s, true);
1253  io<uint32_t>::write(s, this->exprid);
1254  oSeq<Args...>::write(s, args...);
1255 
1256  // don't block to read output
1257  setBlockingBit(s, false);
1258  }
1259  private:
1260  int* socket;
1261  uint32_t exprid;
1262  };
1263 
1264 #define _HNET_CLIENT_INIT_ASYNC_RPCFUNC(n, t, _) , n(this, &this->s, (uint32_t)exprID_##n)
1265 #define _HNET_CLIENT_MAKE_ASYNC_RPCFUNC(n, t, _) ::hobbes::net::AsyncRPCFunc<t> n;
1266 
1267 #define DEFINE_ASYNC_NET_CLIENT(T, C...) \
1268  class T : public ::hobbes::net::AsyncScheduler { \
1269  private: \
1270  int s; \
1271  public: \
1272  T(int fd) : s(::hobbes::net::initSession(fd, makeRPCDefs())) _HNET_MAP(_HNET_CLIENT_INIT_ASYNC_RPCFUNC, C) { } \
1273  T(const std::string& host, size_t port) : T(::hobbes::net::makeConnection(host, port)) { } \
1274  T(const std::string& host, const std::string& port) : T(::hobbes::net::makeConnection(host, port)) { } \
1275  T(const std::string& hostport) : T(::hobbes::net::makeConnection(hostport)) { } \
1276  virtual ~T() { closeC(); } \
1277  int fd() const { return this->s; } \
1278  void reconnect(int fd) { closeC(); this->s = ::hobbes::net::initSession(fd, makeRPCDefs()); } \
1279  void reconnect(const std::string& host, size_t port) { reconnect(::hobbes::net::makeConnection(host, port)); } \
1280  void reconnect(const std::string& host, const std::string& port) { reconnect(::hobbes::net::makeConnection(host, port)); } \
1281  void reconnect(const std::string& hostport) { reconnect(::hobbes::net::makeConnection(hostport)); } \
1282  void step() { while (this->asyncReaders.size() > 0 && this->asyncReaders.front()->readAndFinish()) { this->asyncReaders.pop(); } } \
1283  size_t pendingRequests() const { return this->asyncReaders.size(); } \
1284  \
1285  _HNET_MAP(_HNET_CLIENT_MAKE_ASYNC_RPCFUNC, C) \
1286  private: \
1287  enum ExprIDs { \
1288  NullExpr = 0 \
1289  _HNET_MAP(_HNET_CLIENT_MAKE_EXPRID, C) \
1290  }; \
1291  static ::hobbes::net::RPCDefs makeRPCDefs() { \
1292  ::hobbes::net::RPCDefs result; \
1293  _HNET_MAP(_HNET_CLIENT_MAKE_RPCDEF, C) \
1294  return result; \
1295  } \
1296  std::queue<::hobbes::net::AsyncReader*> asyncReaders; \
1297  void enqueue(::hobbes::net::AsyncReader* r) { this->asyncReaders.push(r); } \
1298  void closeC() { \
1299  ::close(this->s); \
1300  this->asyncReaders = std::queue<::hobbes::net::AsyncReader*>(); \
1301  } \
1302  };
1303 
1304 }}
1305 
1306 #endif
1307 
static void encode(bytes *out)
Definition: net.H:587
static bool accum(int s, async_read_state *o, std::vector< T > *x)
Definition: net.H:730
static void prepare(async_read_state *o)
Definition: net.H:725
static bytes outputType()
Definition: net.H:1109
Definition: net.H:1018
#define _HNET_TYCTOR_ARR
Definition: net.H:275
void type
Definition: net.H:335
void write(int fd, const char *s)
Definition: www.C:23
bytes willGet
Definition: net.H:126
aioTupleStateAt< i-1, V >::type type
Definition: net.H:1013
aioTupleState< Ts... >::async_read_state SndRS
Definition: net.H:996
static void write(int s, const T &x)
Definition: net.H:405
io< V >::async_read_state Vstate
Definition: net.H:628
io< size_t >::async_read_state LenS
Definition: net.H:773
static void write(int s, const std::string &x)
Definition: net.H:829
static void prepare(async_read_state *o)
Definition: net.H:673
static void encode(bytes *out)
Definition: net.H:658
static std::string describe()
Definition: net.H:1045
static void prepare(async_read_state *o)
Definition: net.H:787
io< U >::async_read_state Ustate
Definition: net.H:627
std::vector< RPCDef > RPCDefs
Definition: net.H:128
void recvData(int socket, uint8_t *d, size_t sz)
Definition: net.H:70
Definition: net.H:1193
void encode(const PrimitivePtr &, std::ostream &)
Definition: expr.C:1674
static void encode(bytes *out)
Definition: net.H:1083
static void encode(bytes *out)
Definition: net.H:931
static void encode(bytes *out)
Definition: net.H:402
static void encode(size_t, bytes *)
Definition: net.H:945
void read(gzbuffer *in, uint8_t *b, size_t n)
Definition: batchrecv.C:86
static void write(int socket, const T &x, const Ts &... xs)
Definition: net.H:965
static size_t size(const T &)
Definition: net.H:404
int * socket
Definition: net.H:1147
std::tuple async_read_state
Definition: net.H:990
static void read(int s, std::tuple< Ts... > *t)
Definition: net.H:977
static void encode(bytes *out)
Definition: net.H:827
Definition: net.H:1099
uint32_t exprid
Definition: net.H:1133
std::vector< uint8_t > bytes
Definition: net.H:41
static void encode(bytes *out)
Definition: net.H:346
static void write(int s, const std::tuple< Ts... > &)
Definition: net.H:984
Definition: pattern.H:281
std::pair< std::string, std::string > pair
Definition: str.H:220
static bool accum(int s, async_read_state *o, T *x)
Definition: net.H:938
static bytes inputType()
Definition: net.H:1103
std::string expr
Definition: net.H:124
int * socket
Definition: net.H:1260
R operator()(const Args &... args)
Definition: net.H:1120
static void write(int s, const T &x)
Definition: net.H:589
static void write(int s, const T &x)
Definition: net.H:1085
static std::string describe()
Definition: net.H:828
void operator()(const Args &... args, const K &k)
Definition: net.H:1205
static void read(int s, T *x)
Definition: net.H:406
AsyncScheduler * sched
Definition: net.H:1232
static bool accum(int s, async_read_state *o, T *x)
Definition: net.H:595
static void prepare(async_read_state *)
Definition: net.H:991
static bool accum(int s, async_read_state *o, std::pair< U, V > *x)
Definition: net.H:641
Definition: boot.H:7
uint32_t id
Definition: net.H:123
static void write(int s, const std::map< K, T > &x)
Definition: net.H:753
uint32_t exprid
Definition: net.H:34
static std::string describe()
Definition: net.H:1084
Definition: net.H:117
io< size_t >::async_read_state LenS
Definition: net.H:834
static std::string describe()
Definition: net.H:588
static void read(int s, std::vector< T > *x)
Definition: net.H:661
static void write(int s, const std::tuple< Ts... > &t)
Definition: net.H:1049
static void read(int s, std::vector< T > *x)
Definition: net.H:705
static void read(int s, std::pair< U, V > *p)
Definition: net.H:620
static void prepare(async_read_state *o)
Definition: net.H:999
RPCDef(uint32_t id=0, const std::string &expr="", const bytes &willPut=bytes(), const bytes &willGet=bytes())
Definition: net.H:118
static bool accum(int s, size_t ti, typename aioTupleState< Ts... >::async_read_state *o, std::tuple< Ts... > *x)
Definition: net.H:1019
std::function< void(const R &)> K
Definition: net.H:1197
Definition: net.H:334
bool readAndFinish()
Definition: net.H:1220
static void encode(bytes *out)
Definition: net.H:751
Definition: net.H:268
uint32_t exprid
Definition: net.H:1234
RPCFunc(int *socket, uint32_t exprid)
Definition: net.H:1118
void sendString(int socket, const std::string &s)
Definition: net.H:55
void sendData(int socket, const uint8_t *d, size_t sz)
Definition: net.H:44
#define _HNET_TYCTOR_STRUCT
Definition: net.H:277
static void encode(bytes *out)
Definition: net.H:1034
T * get(variant< Ctors... > &v)
Definition: variant.H:156
T select(const std::vector< T > &xs, I i)
Definition: array.H:92
io< K >::async_read_state KS
Definition: net.H:774
int initSession(int s, const RPCDefs &rpcds)
Definition: net.H:131
static std::string describe()
Definition: net.H:613
void setBlockingBit(int socket, bool block)
Definition: net.H:95
static void read(int s, T *x)
Definition: net.H:1086
void encode_primty(const char *tn, bytes *out)
Definition: net.H:299
static void write(int s, const char *x)
Definition: net.H:823
void ws(const char *x, bytes *out)
Definition: net.H:285
Definition: net.H:989
io< typename T::type >::async_read_state async_read_state
Definition: net.H:1088
T::async_read_state async_read_state
Definition: net.H:936
static std::string describe()
Definition: net.H:822
Definition: net.H:943
static void write(int s, const T &x)
Definition: net.H:933
static bytes outputType()
Definition: net.H:1104
void operator()(const Args &... args)
Definition: net.H:1247
void write(imagefile *f, const T &x)
Definition: file.C:188
uint32_t exprid
Definition: net.H:1261
static void encode(bytes *out)
Definition: net.H:601
static void write(int s, const std::vector< T > &x)
Definition: net.H:660
static void read(int s, std::map< K, T > *x)
Definition: net.H:761
static bool accum(int s, async_read_state *o, std::map< K, T > *x)
Definition: net.H:791
#define HNET_CMD_INVOKE
Definition: net.H:38
static void prepare(async_read_state *o)
Definition: net.H:635
static void prepare(async_read_state *o)
Definition: net.H:842
static std::string describe()
Definition: net.H:946
static void prepare(async_read_state *o)
Definition: net.H:1058
static bool accum(int s, size_t, typename aioTupleState< Ts... >::async_read_state *, std::tuple< Ts... > *)
Definition: net.H:1029
io< T >::async_read_state TS
Definition: net.H:775
static void read(int s, std::string *x)
Definition: net.H:830
void operator()(const Args &... args)
Definition: net.H:1139
static std::string describe()
Definition: net.H:752
static bool accum(int s, async_read_state *o, T *x)
Definition: net.H:1090
size_t r(const reader::MetaData &md, size_t o, T *t)
Definition: storage.H:1730
#define HNET_RESULT_FAIL
Definition: net.H:39
void w(const T &x, bytes *out)
Definition: net.H:282
static bool accum(int s, async_read_state *o, std::string *x)
Definition: net.H:847
static void write(int s, const std::vector< T > &x)
Definition: net.H:698
int * socket
Definition: net.H:1233
static bool accum(int s, async_read_state *o, std::tuple< Ts... > *x)
Definition: net.H:1059
Definition: net.H:1114
int * socket
Definition: net.H:1132
static void read(int s, std::tuple< Ts... > *)
Definition: net.H:985
int makeConnection(int s, sockaddr *saddr, size_t len)
Definition: net.H:156
size_t recvDataPartial(int socket, uint8_t *d, size_t sz)
Definition: net.H:101
static void write(int, const Ts &...)
Definition: net.H:947
#define HNET_CMD_DEFEXPR
Definition: net.H:37
ExprPtr fn(const str::seq &vns, const ExprPtr &b, const LexicalAnnotation &la)
Definition: expr.H:837
std::pair< FstRS, SndRS > async_read_state
Definition: net.H:997
#define out
Definition: netio.H:19
static bytes inputType()
Definition: net.H:1108
io< T >::async_read_state FstRS
Definition: net.H:995
uint32_t result
Definition: regex.C:376
static std::string describe()
Definition: net.H:347
static void prepare(async_read_state *o)
Definition: net.H:937
#define _HNET_TYCTOR_PRIM
Definition: net.H:272
void sendBytes(int socket, const bytes &x)
Definition: net.H:61
static void write(int s, const unit &)
Definition: net.H:348
static bool accum(int s, async_read_state *o, std::vector< T > *x)
Definition: net.H:678
static std::string describe()
Definition: net.H:403
uint8_t async_read_state
Definition: net.H:408
_HNET_DEFINE_PRIMTYS(bool, "bool")
static void read(int s, T *x)
Definition: net.H:934
static void prepare(async_read_state *o)
Definition: net.H:594
RPCFunc(int *socket, uint32_t exprid)
Definition: net.H:1137
std::map< std::string, llvm::Value * > Args
Definition: dfa.C:1276
unit()
Definition: net.H:342
AsyncRPCFunc(AsyncScheduler *, int *socket, uint32_t exprid)
Definition: net.H:1245
AsyncRPCFunc(AsyncScheduler *sched, int *socket, uint32_t exprid)
Definition: net.H:1199
static void write(int s, const std::pair< U, V > &p)
Definition: net.H:616
static void write(int s, const std::tuple< Ts... > &t)
Definition: net.H:973
io< R >::async_read_state async_read_state
Definition: net.H:1236
static std::string describe()
Definition: net.H:962
MonoTypePtr tuple(const MonoTypes &mtys=MonoTypes())
Definition: type.H:1068
Definition: net.H:972
static void read(int s, T *x)
Definition: net.H:590
LexicalAnnotation m(const YYLTYPE &p)
Definition: hexpr.parse.C:127
#define HNET_VERSION
Definition: net.H:36
static void read(int s, std::tuple< Ts... > *t)
Definition: net.H:1052
static void prepare(async_read_state *o)
Definition: net.H:1089
static void encode(size_t f, bytes *out)
Definition: net.H:953
static std::string describe()
Definition: net.H:932
Definition: net.H:1189
static void read(int s, unit *)
Definition: net.H:349
std::queue< K > KS
Definition: net.H:1237
static std::string describe()
Definition: net.H:659
static void encode(bytes *out)
Definition: net.H:821
std::pair< size_t, typename aioTupleState< Ts... >::async_read_state > async_read_state
Definition: net.H:1057
static void prepare(uint8_t *o)
Definition: net.H:409
uint32_t exprid
Definition: net.H:1148
static bool accum(int s, uint8_t *o, T *x)
Definition: net.H:410
void read(const imagefile *f, T *x)
Definition: file.C:215
Definition: net.H:1190
Definition: net.H:342
void recvString(int socket, std::string *x)
Definition: net.H:87
T::async_read_state async_read_state
Definition: net.H:593
async_read_state pr
Definition: net.H:1241
Definition: net.H:1005
bytes willPut
Definition: net.H:125