hobbes
a language, embedded compiler, and runtime for efficient dynamic expression evaluation, data storage and analysis
storage.H
Go to the documentation of this file.
1 /*
2  * storage : structured storage of application data
3  *
4  * use DEFINE_STORAGE_GROUP(G, C, QoS, T) to create storage group / transaction context
5  * use DECLARE_STORAGE_GROUP(G) to forward-declare the storage group G (suitable for declaration in program headers
6  * use HSTORE(G,N,V0,V1,...) to record the data V0,V1,... with the name N in the group G
7  * use HLOG (G,N,"text display",V0,V1,...) to record the data V0,V1,... with the name N in the group G (with the display hint "text display" to reconstruct text)
8  *
9  */
10 
11 #ifndef HSTORE_H_INCLUDED
12 #define HSTORE_H_INCLUDED
13 
14 #include <map>
15 #include <vector>
16 #include <string>
17 #include <string.h>
18 #include <tuple>
19 #include <functional>
20 #include <iostream>
21 #include <sstream>
22 #include <stdexcept>
23 #include <stdlib.h>
24 #include <stdint.h>
25 #include <assert.h>
26 
27 #include <unistd.h>
28 #include <sys/types.h>
29 #include <sys/stat.h>
30 #include <sys/mman.h>
31 #include <sys/un.h>
32 #include <sys/socket.h>
33 #include <sys/ioctl.h>
34 #include <netinet/in.h>
35 #include <netdb.h>
36 #include <fcntl.h>
37 
38 // a few things have to be OS-specific here
39 // * waiting in shared memory
40 #if defined(__APPLE__) && defined(__MACH__)
41 namespace hobbes { namespace storage {
42 static inline void waitForUpdate(volatile uint32_t* p, int eqV) {
43  while (true) {
44  for (size_t c = 0; c < 4096; ++c) {
45  if (*p != eqV) {
46  return;
47  }
48  }
49  usleep(500);
50  }
51 }
52 static inline void wakeN(volatile uint32_t* p, int c) {
53  // assume the waiter is polling, nothing to do
54 }
55 }}
56 #else
57 #include <linux/futex.h>
58 #include <sys/syscall.h>
59 
60 namespace hobbes { namespace storage {
61 static inline long sys_futex(volatile uint32_t* p, int op, int v, struct timespec* timeout, void* p2, int v2) {
62  return syscall(SYS_futex, p, op, v, timeout, p2, v2);
63 }
64 static inline void waitForUpdate(volatile uint32_t* p, int eqV) {
65  sys_futex(p, FUTEX_WAIT, eqV, 0, 0, 0);
66 }
67 static inline void wakeN(volatile uint32_t* p, int c) {
68  sys_futex(p, FUTEX_WAKE, c, 0, 0, 0);
69 }
70 }}
71 #endif
72 
73 namespace hobbes { namespace storage {
74 
75 #define HSTORE_VERSION ((uint32_t)0x00010000)
76 
77 typedef std::vector<uint8_t> bytes;
78 
79 // write transactions into shared memory
80 #define _HSTORE_LIKELY(x) __builtin_expect((x),1)
81 #define _HSTORE_UNLIKELY(x) __builtin_expect((x),0)
82 
83 template <typename T>
84  T align(T x, T m) {
85  if (m == 0 || (x % m) == 0) {
86  return x;
87  } else {
88  return (1 + (x / m)) * m;
89  }
90  }
91 
92 static inline void uxchg(volatile uint32_t* px, uint32_t nx) {
93  __asm__ __volatile__(
94  "xchgl %0,%1"
95  :"=r" (nx)
96  :"m" (*px), "0" (nx)
97  :"memory"
98  );
99 }
100 #define xchg __sync_lock_test_and_set
101 
102 // define a local socket for registering new storage queues
103 inline void mqwrite(int fd, const uint8_t* x, size_t len) {
104  size_t i = 0;
105  while (i < len) {
106  ssize_t c = write(fd, x + i, len - i);
107  if (c < 0) {
108  if (errno != EINTR) {
109  throw std::runtime_error("Couldn't write to socket: " + std::string(strerror(errno)));
110  }
111  } else {
112  i += c;
113  }
114  }
115 }
116 
117 inline int mqconnect(const std::string& fileName) {
118  int r = socket(AF_UNIX, SOCK_STREAM, 0);
119  if (r == -1) {
120  throw std::runtime_error("Unable to allocate socket: " + std::string(strerror(errno)));
121  }
122 
123  sockaddr_un addr;
124  memset(&addr, 0, sizeof(addr));
125  addr.sun_family = AF_UNIX;
126  snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", fileName.c_str());
127 
128  sockaddr* saddr = (sockaddr*)&addr;
129  size_t len = sizeof(addr);
130  if (connect(r, saddr, len) == -1) {
131  std::string emsg = "Unable to connect socket: " + std::string(strerror(errno));
132  close(r);
133  throw std::runtime_error(emsg);
134  }
135 
136  fd_set wd;
137  FD_ZERO(&wd);
138  FD_SET(r, &wd);
139  if (select(r + 1, 0, &wd, 0, 0) == -1) {
140  std::string emsg = "Failed to connect socket while waiting for writeability: " + std::string(strerror(errno));
141  close(r);
142  throw std::runtime_error(emsg);
143  }
144 
145  uint32_t version = HSTORE_VERSION;
146  mqwrite(r, (const uint8_t*)&version, sizeof(version));
147  return r;
148 }
149 
150 inline int mqlisten(const std::string& fileName) {
151  int s = socket(AF_UNIX, SOCK_STREAM, 0);
152  if (s == -1) {
153  throw std::runtime_error("Unable to allocate socket: " + std::string(strerror(errno)));
154  }
155 
156  sockaddr_un addr;
157  memset(&addr, 0, sizeof(addr));
158  addr.sun_family = AF_UNIX;
159  unlink(fileName.c_str());
160  snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", fileName.c_str());
161 
162  if (bind(s, (sockaddr*)&addr, sizeof(addr)) == -1) {
163  std::string emsg = "Unable to bind socket to file: " + fileName + " (" + std::string(strerror(errno)) + ")";
164  close(s);
165  throw std::runtime_error(emsg);
166  }
167 
168  // and then start to listen
169  if (listen(s, SOMAXCONN) == -1) {
170  std::string emsg = "Unable to listen socket on file: " + fileName + " (" + std::string(strerror(errno)) + ")";
171  close(s);
172  throw std::runtime_error(emsg);
173  }
174 
175  return s;
176 }
177 
178 inline std::string tempDir() {
179  const char* td = 0;
180  if ((td = ::getenv("TMPDIR"))) {
181  if (::strlen(td) > 0 && ::access(td, W_OK) == 0) {
182  return std::string(td);
183  }
184  }
185  if ((td = ::getenv("TMP"))) {
186  if (::strlen(td) > 0 && ::access(td, W_OK) == 0) {
187  return std::string(td);
188  }
189  }
190  if (::access("/var/tmp", W_OK) == 0) {
191  return "/var/tmp";
192  }
193  return "/tmp";
194 }
195 
196 inline int connectGroupHost(const std::string& groupName) {
197  return mqconnect(tempDir() + "/hstore." + groupName + ".sk");
198 }
199 
200 inline int makeGroupHost(const std::string& groupName) {
201  return mqlisten(tempDir() + "/hstore." + groupName + ".sk");
202 }
203 
204 // identify this process/thread
205 typedef std::pair<uint64_t, uint64_t> ProcThread;
206 
207 inline ProcThread thisProcThread() {
208  ProcThread r;
209  r.first = (uint64_t)getpid();
210 #if defined(__APPLE__) && defined(__MACH__)
211  pthread_threadid_np(0, &r.second);
212 #else
213  r.second = (uint64_t)syscall(SYS_gettid);
214 #endif
215  return r;
216 }
217 
218 // derive a name for a shared memory region with this group name in this thread/process
219 inline std::string sharedMemName(const std::string& groupName, const ProcThread& pt) {
220  std::ostringstream ss;
221  ss << "/" << groupName << "." << pt.first << "." << pt.second;
222  return ss.str();
223 }
224 inline std::string sharedMemName(const std::string& groupName) {
225  return sharedMemName(groupName, thisProcThread());
226 }
227 
228 // register the allocation of a shared memory region with this group name in this thread/process
229 inline void registerSHMAlloc(int* mqserver, const std::string& groupName) {
230  if (*mqserver < 0) {
231  *mqserver = connectGroupHost(groupName);
232  }
233 
234  ProcThread pt = thisProcThread();
235  uint8_t msg[1+sizeof(ProcThread)];
236  msg[0] = 0;
237  memcpy(msg+1, &pt, sizeof(ProcThread));
238  mqwrite(*mqserver, msg, sizeof(msg));
239 }
240 
241 // between queue readers and writers, there are only three states of concern:
242 // 0: both reader and writer are making progress
243 // 1: the reader is blocked waiting for new values (empty queue)
244 // 2: the writer is blocked waiting for the reader to catch up (full queue)
245 #define _HSTORE_STATE_UNBLOCKED 0
246 #define _HSTORE_STATE_READER_WAITING 1
247 #define _HSTORE_STATE_WRITER_WAITING 2
248 
251  valuesz(0), count(0), wstate(0), readerIndex(0), writerIndex(0), data(0) {
252  }
253 
254  pqueue_config(size_t valuesz, size_t count, uint32_t* wstate, uint32_t* ri, uint32_t* wi, uint8_t* data) :
255  valuesz(valuesz), count(count), wstate(wstate), readerIndex(ri), writerIndex(wi), data(data)
256  {
257  }
258 
259  size_t valuesz; // how large is one "value" or queue element?
260  size_t count; // how many "values" (of size 'valuesz') are there indexable from 'data'?
261  volatile uint32_t* wstate; // inter-process wait state : 0=no waiting, 1=reader waiting, 2=writer waiting
262  volatile uint32_t* readerIndex; // where is the reader in the data sequence?
263  volatile uint32_t* writerIndex; // where is the writer in the data sequence?
264  uint8_t* data; // the actual queue data
265 };
266 
267 // shared memory queue data
269  uint32_t ready; // set to 1 when the queue has been fully constructed and is ready to read
270  size_t valsz; // the size of a single "queue value"
271  size_t count; // the number of queue values defined in the queue
272  size_t metasz; // the size of the following meta-data section
273 };
274 
275 struct ShQueueData {
276  uint32_t wstate;
277  uint32_t ri;
278  uint32_t wi;
279  uint32_t unused;
280 };
281 
282 // write data into shared memory
283 class writer {
284 private:
285  std::string shmname;
286  int shmfd;
288 
289  inline volatile uint32_t* waitState() const { return this->cfg.wstate; }
290  inline volatile uint32_t* readIndex() const { return this->cfg.readerIndex; }
291  inline volatile uint32_t* writeIndex() const { return this->cfg.writerIndex; }
292  inline uint8_t* value(size_t i) const { return this->cfg.data + (i*this->cfg.valuesz); }
293  inline uint32_t nextIndex(volatile uint32_t* i) const { return (*i + 1) % this->cfg.count; }
294 public:
295  writer(const bytes& meta, const std::string& shmname, size_t qvalsz, size_t count) {
296  shm_unlink(shmname.c_str());
297 
298  // sections of shared memory should be aligned to page boundaries
299  long pagesz = sysconf(_SC_PAGESIZE);
300  if (pagesz == -1) {
301  throw std::runtime_error("Failed to query system page size for '" + shmname + "': " + strerror(errno));
302  }
303 
304  // create the shared memory region
305  int shfd = shm_open(shmname.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
306  if (shfd == -1) {
307  throw std::runtime_error("Failed to allocate shared memory for '" + shmname + "': " + strerror(errno));
308  }
309 
310  // our meta-data section comes first up to the first page boundary
311  // then our data section comes next
312  size_t metaLen = align<size_t>(sizeof(ShQueueHeader) + meta.size(), pagesz);
313  size_t dataLen = align<size_t>(sizeof(ShQueueData) + qvalsz*count, pagesz);
314  size_t memLen = metaLen + dataLen;
315 
316  // allocate this much data
317  if (ftruncate(shfd, memLen) == -1) {
318  throw std::runtime_error("Failed to truncate shared memory for '" + shmname + "': " + strerror(errno));
319  }
320 
321  uint8_t* mem = (uint8_t*)mmap(0, memLen, PROT_READ | PROT_WRITE, MAP_SHARED, shfd, 0);
322  if (mem == MAP_FAILED) {
323  throw std::runtime_error("Failed to map bytes out of shared memory for '" + shmname + "': " + strerror(errno));
324  }
325 
326  // write meta data
327  ShQueueHeader* hdr = (ShQueueHeader*)mem;
328  hdr->valsz = qvalsz;
329  hdr->count = count;
330  hdr->metasz = meta.size();
331  memcpy(mem + sizeof(ShQueueHeader), &meta[0], meta.size());
332 
333  // OK, this queue is fully initialized
334  uxchg(&hdr->ready, 1);
335 
336  // now make this pqueue config
337  ShQueueData* sqd = (ShQueueData*)(mem + metaLen);
338 
339  this->shmname = shmname;
340  this->shmfd = shfd;
341  this->cfg.valuesz = qvalsz;
342  this->cfg.count = count;
343  this->cfg.wstate = &sqd->wstate;
344  this->cfg.readerIndex = &sqd->ri;
345  this->cfg.writerIndex = &sqd->wi;
346  this->cfg.data = mem + metaLen + sizeof(ShQueueData);
347  }
348 
350  shm_unlink(this->shmname.c_str());
351  }
352 
353  inline const pqueue_config& config() const { return this->cfg; }
354 
355  uint8_t* next() {
356  uint32_t nwi = nextIndex(writeIndex());
357 
358  while (_HSTORE_UNLIKELY(*readIndex() == nwi)) {
359  // the reader is behind and we've caught up with it, switch into writer-wait mode
360  switch (xchg(waitState(), _HSTORE_STATE_WRITER_WAITING)) {
362  // we previously were unblocked
363  // make sure that we still need to block the writer (in case the read index moved while we were getting here)
364  // then block while we're in writer-wait state
365  if (*readIndex() == nwi) {
367  }
368  break;
370  // we previously were in reader-wait state (this should practically never happen)
371  // since we wait to write anyway, unblock the reader and try again
372  uxchg(waitState(), _HSTORE_STATE_UNBLOCKED);
373  wakeN(waitState(), 1);
374  break;
375  }
376  }
377  return value(*writeIndex());
378  }
379 
380  uint8_t* pollNext() {
381  if (_HSTORE_UNLIKELY(*readIndex() == nextIndex(writeIndex()))) {
382  return 0;
383  } else {
384  return value(*writeIndex());
385  }
386  }
387 
388  void push() {
389  uxchg(writeIndex(), nextIndex(writeIndex()));
390 
391  // when the writer advances, the reader can be unblocked
393  wakeN(waitState(), 1);
394  }
395  }
396 };
397 
398 // shared memory pages can be marked as representing four possible conditions:
399 // 1) page is published but state is undetermined (contingent on being able to acquire another page without blocking)
400 // 2) page is a continuation of its transaction (intermediate in the total transaction)
401 // 3) page successfully terminates the transaction (commit)
402 // 4) page prematurely terminates the transaction (rollback)
403 #define _HSTORE_PAGE_STATE_TENTATIVE ((uint8_t)0)
404 #define _HSTORE_PAGE_STATE_CONT ((uint8_t)1)
405 #define _HSTORE_PAGE_STATE_COMMIT ((uint8_t)2)
406 #define _HSTORE_PAGE_STATE_ROLLBACK ((uint8_t)3)
407 
408 // writers may record data 'unreliably' (non-blocking) or 'reliably' (block iff waiting for consumer to catch up)
409 enum PipeQOS {
410  Reliable = 0,
412 };
413 
414 // wpipe : a "write pipe" (to write an 'arbitrary-length' sequence of data) on top of writers
415 class wpipe {
416 private:
418  uint8_t* page;
419  size_t pagesz;
420  uint32_t offset;
422 
423  void markPage(uint8_t c) {
424  *((uint32_t*)(this->page + this->pagesz)) = (((uint32_t)c) << 24) | this->offset;
425  }
426 
427  inline bool reliable() const {
428  return this->qos == Reliable;
429  }
430 
431  bool stepPage() {
432  if (reliable()) {
433  markPage(_HSTORE_PAGE_STATE_CONT);
434  this->wq->push();
435 
436  this->page = this->wq->next();
437  this->offset = 0;
438  return true;
439  } else {
441  this->wq->push();
442  uint8_t* npage = this->wq->pollNext();
444 
445  this->page = npage;
446  this->offset = 0;
447  return npage;
448  }
449  }
450 public:
451  wpipe(writer* wq, PipeQOS qos = Reliable) : wq(wq), pagesz(wq->config().valuesz - sizeof(uint32_t)), offset(0), qos(qos) {
452  if (wq->config().valuesz <= sizeof(uint32_t)) {
453  throw std::runtime_error("queue page size too small for use as shared memory pipe");
454  }
455  this->page = wq->pollNext();
456  }
457 
458  void commit() {
459  if (_HSTORE_LIKELY(this->page != 0)) {
460  markPage(_HSTORE_PAGE_STATE_COMMIT);
461  this->wq->push();
462  }
463  this->page = reliable() ? this->wq->next() : this->wq->pollNext();
464  this->offset = 0;
465  }
466 
467  bool hasSpaceFor(size_t sz) const {
468  return this->page && sz < (this->pagesz - this->offset);
469  }
470 
471  // write a block of bytes within a frame
472  bool write(const uint8_t* src, size_t sz) {
473  // just for unreliable pipes, we might enter here without a page
474  if (_HSTORE_UNLIKELY(!this->page)) {
475  return false;
476  }
477 
478  size_t remsz = this->pagesz - this->offset;
479 
480  // most of the time we'll be writing small chunks of data
481  if (_HSTORE_LIKELY(sz < remsz)) {
482  memcpy(this->page + this->offset, src, sz);
483  this->offset += sz;
484  return true;
485  }
486 
487  // now we might cross any number of pages
488  // write the src prefix to the remaining page space
489  // write all intermediate complete pages
490  // write the src suffix to the last page
491  memcpy(this->page + this->offset, src, remsz);
492  this->offset += remsz;
493 
494  size_t so = remsz;
495  while (stepPage() && sz - so >= this->pagesz) {
496  memcpy(this->page, src + so, this->pagesz);
497  so += this->pagesz;
498  }
499  if (!this->page) {
500  return false;
501  } else {
502  if (so < sz) {
503  this->offset = sz - so;
504  memcpy(this->page, src + so, this->offset);
505  }
506  return true;
507  }
508  }
509 };
510 
512  int shfd;
513  uint8_t* data;
514  size_t datasz;
515  size_t pagesz;
516 };
517 
518 inline QueueConnection consumeQueue(const std::string& shmname) {
519  // sections of shared memory are aligned to page boundaries
520  long pagesz = sysconf(_SC_PAGESIZE);
521  if (pagesz == -1) {
522  throw std::runtime_error("Failed to query system page size for '" + shmname + "': " + strerror(errno));
523  }
524 
525  // see if we can open this shared memory region
526  int shfd = shm_open(shmname.c_str(), O_RDWR, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
527  if (shfd == -1) {
528  throw std::runtime_error("Failed to open shared memory for '" + shmname + "': " + strerror(errno));
529  }
530 
531  // make sure that the region has been sized
532  struct stat msb;
533  if (fstat(shfd, &msb) < 0) {
534  close(shfd);
535  throw std::runtime_error("Failed to stat shared memory for '" + shmname + "': " + strerror(errno));
536  }
537  if (msb.st_size <= 0) {
538  close(shfd);
539  throw std::runtime_error("Shared memory for '" + shmname + "' is not ready");
540  }
541 
542  // map memory for this data, ensure it is in a good state
543  uint8_t* mem = (uint8_t*)mmap(0, msb.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, shfd, 0);
544  if (mem == MAP_FAILED) {
545  close(shfd);
546  throw std::runtime_error("Failed to map bytes out of shared memory for '" + shmname + "': " + strerror(errno));
547  }
548 
549  if (!((ShQueueHeader*)mem)->ready) {
550  munmap(mem, msb.st_size);
551  close(shfd);
552  throw std::runtime_error("Not ready to consume shared memory for '" + shmname + "'");
553  }
554 
555  // ok, if we get here then we can take ownership of the queue (hence unlink it)
556  shm_unlink(shmname.c_str());
557 
558  QueueConnection c;
559  c.shfd = shfd;
560  c.data = mem;
561  c.datasz = msb.st_size;
562  c.pagesz = pagesz;
563  return c;
564 }
565 
566 inline QueueConnection consumeGroup(const std::string& gname, const ProcThread& pt) {
567  return consumeQueue(sharedMemName(gname, pt));
568 }
569 
570 // read data out of shared memory
571 class reader {
572 private:
573  int shfd;
574  const uint8_t* metad;
575  size_t metasz;
577 
578  inline volatile uint32_t* waitState() const { return this->cfg.wstate; }
579  inline volatile uint32_t* readIndex() const { return this->cfg.readerIndex; }
580  inline volatile uint32_t* writeIndex() const { return this->cfg.writerIndex; }
581  inline uint8_t* value(size_t i) const { return this->cfg.data + (i*this->cfg.valuesz); }
582  inline uint32_t nextIndex(volatile uint32_t* i) const { return (*i + 1) % this->cfg.count; }
583 public:
584  reader(const QueueConnection& qc) : shfd(qc.shfd) {
585  // prepare to read the queue description
586  ShQueueHeader* hdr = (ShQueueHeader*)qc.data;
587  size_t metaLen = align<size_t>(sizeof(ShQueueHeader) + hdr->metasz, qc.pagesz);
588  ShQueueData* sqd = (ShQueueData*)(qc.data + metaLen);
589 
590  // now we should have enough to read out of this queue
591  this->metad = qc.data + sizeof(ShQueueHeader);
592  this->metasz = hdr->metasz;
593  this->cfg.valuesz = hdr->valsz;
594  this->cfg.count = hdr->count;
595  this->cfg.wstate = &sqd->wstate;
596  this->cfg.readerIndex = &sqd->ri;
597  this->cfg.writerIndex = &sqd->wi;
598  this->cfg.data = qc.data + metaLen + sizeof(ShQueueData);
599  }
600 
602  close(this->shfd);
603  }
604 
605  inline const pqueue_config& config() const { return this->cfg; }
606 
607  // access queue init data; (null,0) if no data was specified
608  typedef std::pair<const uint8_t*, size_t> MetaData;
609  MetaData meta() const {
610  return MetaData(this->metad, this->metasz);
611  }
612 
613  // get the next value in the queue, blocking if necessary
614  uint8_t* next() {
615  uint32_t ri = *readIndex();
616 
617  while (_HSTORE_UNLIKELY(*writeIndex() == ri)) {
618  // there's nothing to read, switch into reader-wait mode
619  switch (xchg(waitState(), _HSTORE_STATE_READER_WAITING)) {
621  // we previously were unblocked
622  // make sure that we still need to block the reader (in case the write index moved while we were getting here)
623  // then block while we're in reader-wait state
624  if (*writeIndex() == ri) {
626  }
627  break;
629  // we previously were in writer-wait state (this should practically never happen)
630  // since we wait to read anyway, unblock the writer and try again
631  uxchg(waitState(), _HSTORE_STATE_UNBLOCKED);
632  wakeN(waitState(), 1);
633  break;
634  }
635  }
636  return value(*readIndex());
637  }
638 
639  // get the next value in the queue if one is present, else null
640  uint8_t* pollNext() {
641  if (_HSTORE_UNLIKELY(*writeIndex() == *readIndex())) {
642  return 0;
643  } else {
644  return value(*readIndex());
645  }
646  }
647 
648  // remove the next value from the queue (increment the read index)
649  void pop() {
650  uxchg(readIndex(), nextIndex(readIndex()));
651 
652  // when the reader advances, the writer can be unblocked
654  wakeN(waitState(), 1);
655  }
656  }
657 };
658 
659 // rpipe : a "read pipe" on top of readers
660 class rpipe {
661 private:
663  const uint8_t* page;
664  size_t pagesz;
665  uint32_t offset;
666 public:
667  rpipe(reader* rq) : rq(rq), page(0), pagesz(rq->config().valuesz - sizeof(uint32_t)), offset(0) {
668  if (rq->config().valuesz <= sizeof(uint32_t)) {
669  throw std::runtime_error("queue page size too small for use as shared memory pipe");
670  }
671  }
672 
673  // read a range of bytes out of the 'pipe' into a user-supplied buffer
674  size_t read(uint8_t* dst, size_t sz, uint8_t* state) {
675  if (!this->page) {
676  this->page = this->rq->next();
677  }
678 
679  size_t doff = 0;
680 
681  while (sz != 0 && this->page) {
682  // how much can we copy out of the current page into the dest buffer?
683  volatile uint32_t* pend = (uint32_t*)(this->page + this->pagesz);
684  size_t rpagesz = *pend & ~(0xFF << 24);
685  size_t availPage = rpagesz - this->offset;
686  size_t csz = (sz < availPage) ? sz : availPage;
687 
688  // copy as much as we can
689  memcpy(dst + doff, this->page + this->offset, csz);
690 
691  // step through this segment we've copied
692  this->offset += csz;
693  doff += csz;
694  sz -= csz;
695 
696  // if we've hit the end of the page, mark it done and then try for the next one
697  if (this->offset == rpagesz) {
698  // wait for the producer to decide what state the page is in
699  while ((*pend >> 24) == _HSTORE_PAGE_STATE_TENTATIVE);
700  uint8_t ps = *pend >> 24;
701  if (state) *state = ps;
702 
703  // if this page is just a continuation, continue reading
704  // else we've terminated a transaction
705  this->rq->pop();
706  this->page = (ps == _HSTORE_PAGE_STATE_CONT) ? this->rq->pollNext() : 0;
707  this->offset = 0;
708  }
709  }
710 
711  return doff;
712  }
713 };
714 
715 // type/value serialization (guarded by compile-time predicates)
716 #define _HSTORE_TYCTOR_PRIM ((int)0)
717 #define _HSTORE_TYCTOR_TVAR ((int)2)
718 #define _HSTORE_TYCTOR_FIXEDARR ((int)4)
719 #define _HSTORE_TYCTOR_ARR ((int)5)
720 #define _HSTORE_TYCTOR_VARIANT ((int)6)
721 #define _HSTORE_TYCTOR_STRUCT ((int)7)
722 #define _HSTORE_TYCTOR_SIZE ((int)11)
723 #define _HSTORE_TYCTOR_RECURSIVE ((int)13)
724 
725 template <typename T, typename P = void>
726  struct store {
727  };
728 template <typename T>
729  void w(const T& x, bytes* out) {
730  out->insert(out->end(), (uint8_t*)&x, ((uint8_t*)&x) + sizeof(x));
731  }
732 inline void ws(const char* x, bytes* out) {
733  size_t n = strlen(x);
734  w(n, out);
735  out->insert(out->end(), x, x + n);
736 }
737 inline void ws(const std::string& x, bytes* out) {
738  w((size_t)x.size(), out);
739  out->insert(out->end(), x.begin(), x.end());
740 }
741 inline void ws(const bytes& x, bytes* out) {
742  w((size_t)x.size(), out);
743  out->insert(out->end(), x.begin(), x.end());
744 }
745 
746 inline void encode_primty(const char* tn, bytes* out) {
747  w(_HSTORE_TYCTOR_PRIM, out);
748  ws(tn, out);
749  w((bool)false, out);
750 }
751 #define _HSTORE_DEFINE_PRIMTYS(T, n) \
752  template <> \
753  struct store<T> { \
754  typedef void can_memcpy; \
755  static void encode(bytes* out) { encode_primty(n, out); } \
756  static std::string describe() { return n; } \
757  static size_t size(const T&) { return sizeof(T); } \
758  static bool write(wpipe& p, const T& x) { return p.write((const uint8_t*)&x, sizeof(x)); } \
759  }
760 
761 _HSTORE_DEFINE_PRIMTYS(bool, "bool");
762 _HSTORE_DEFINE_PRIMTYS(uint8_t, "byte");
763 _HSTORE_DEFINE_PRIMTYS(char, "char");
764 _HSTORE_DEFINE_PRIMTYS(int16_t, "short");
765 _HSTORE_DEFINE_PRIMTYS(uint16_t, "short");
766 _HSTORE_DEFINE_PRIMTYS(int32_t, "int");
767 _HSTORE_DEFINE_PRIMTYS(uint32_t, "int");
768 _HSTORE_DEFINE_PRIMTYS(int64_t, "long");
769 _HSTORE_DEFINE_PRIMTYS(uint64_t, "long");
770 #ifdef __clang__
771 _HSTORE_DEFINE_PRIMTYS(size_t, "long");
772 #endif
773 _HSTORE_DEFINE_PRIMTYS(float, "float");
774 _HSTORE_DEFINE_PRIMTYS(double, "double");
775 
776 template <typename T, typename P = void>
777  struct cannot_memcpy {
778  typedef void type;
779  };
780 template <typename T>
781  struct cannot_memcpy<T, typename store<T>::can_memcpy> {
782  };
783 
784 template <typename ... Ts>
786  static const size_t count = 0;
787  static void encode(size_t,bytes*) { }
788  static std::string describe() { return ""; }
789  static size_t size(const Ts&...) { return 0; }
790  static bool write(wpipe&,const Ts&...) { return true; }
791  };
792 template <typename T, typename ... Ts>
793  struct store_tuple_types<T, Ts...> {
794  static const size_t count = 1 + store_tuple_types<Ts...>::count;
795 
796  static void encode(size_t f, bytes* out) {
797  char fn[32];
798  snprintf(fn, sizeof(fn), ".f%lld", (unsigned long long)f);
799  ws(fn, out);
800  w((int)-1, out);
801  store<T>::encode(out);
802 
804  }
805  static std::string describe() {
807  }
808  static size_t size(const T& x, const Ts&... xs) {
810  }
811  static bool write(wpipe& p, const T& x, const Ts&... xs) {
812  return store<T>::write(p, x) && store_tuple_types<Ts...>::write(p, xs...);
813  }
814  };
815 template <size_t i, size_t e, typename ... Ts>
816  struct storeTuple {
817  static size_t size(const std::tuple<Ts...>& t) {
818  return store<typename std::tuple_element<i, std::tuple<Ts...>>::type>::size(std::get<i>(t)) +
820  }
821  static bool write(wpipe& p, const std::tuple<Ts...>& t) {
822  return store<typename std::tuple_element<i, std::tuple<Ts...>>::type>::write(p, std::get<i>(t)) &&
824  }
825  };
826 template <size_t e, typename ... Ts>
827  struct storeTuple<e, e, Ts...> {
828  static size_t size(const std::tuple<Ts...>&) {
829  return 0;
830  }
831  static bool write(wpipe&, const std::tuple<Ts...>&) {
832  return true;
833  }
834  };
835 template <typename ... Ts>
836  struct store< std::tuple<Ts...> > {
837  static const size_t arity = store_tuple_types<Ts...>::count;
838 
839  static void encode(bytes* out) {
840  size_t localArity = arity; // required because 'arity' doesn't have an address
841 
842  if (localArity > 0) {
843  w(_HSTORE_TYCTOR_STRUCT, out);
844  w(localArity, out);
846  } else {
847  encode_primty("unit", out);
848  }
849  }
850  static std::string describe() {
851  std::string x = store_tuple_types<Ts...>::describe();
852  return (x.size() > 1) ? x.substr(0,x.size()-1) : x;
853  }
854  static size_t size(const std::tuple<Ts...>& t) {
855  return storeTuple<0, std::tuple_size<std::tuple<Ts...>>::value, Ts...>::size(t);
856  }
857  static bool write(wpipe& p, const std::tuple<Ts...>& t) {
858  return storeTuple<0, std::tuple_size<std::tuple<Ts...>>::value, Ts...>::write(p, t);
859  }
860  };
861 
862 // very basic macro metaprogramming
863 #define _HSTORE_FIRST(a, ...) a
864 #define _HSTORE_SECOND(a, b, ...) b
865 #define _HSTORE_JOIN(a,b) a ## b
866 #define _HSTORE_IS_NEGATE(...) _HSTORE_SECOND(__VA_ARGS__, 0)
867 #define _HSTORE_NOT(x) _HSTORE_IS_NEGATE(_HSTORE_JOIN(_HSTORE__NOT_, x))
868 #define _HSTORE__NOT_0 NEGATE, 1
869 #define _HSTORE_BOOL(x) _HSTORE_NOT(_HSTORE_NOT(x))
870 #define _HSTORE_IF_ELSE(condition) _HSTORE__IF_ELSE(_HSTORE_BOOL(condition))
871 #define _HSTORE__IF_ELSE(condition) _HSTORE_JOIN(_HSTORE__IF_, condition)
872 #define _HSTORE__IF_1(...) __VA_ARGS__ _HSTORE__IF_1_ELSE
873 #define _HSTORE__IF_0(...) _HSTORE__IF_0_ELSE
874 #define _HSTORE__IF_1_ELSE(...)
875 #define _HSTORE__IF_0_ELSE(...) __VA_ARGS__
876 #define _HSTORE_EMPTY()
877 #define _HSTORE_EVAL(...) _HSTORE_EVAL256(__VA_ARGS__)
878 #define _HSTORE_EVAL256(...) _HSTORE_EVAL128(_HSTORE_EVAL128(__VA_ARGS__))
879 #define _HSTORE_EVAL128(...) _HSTORE_EVAL64(_HSTORE_EVAL64(__VA_ARGS__))
880 #define _HSTORE_EVAL64(...) _HSTORE_EVAL32(_HSTORE_EVAL32(__VA_ARGS__))
881 #define _HSTORE_EVAL32(...) _HSTORE_EVAL16(_HSTORE_EVAL16(__VA_ARGS__))
882 #define _HSTORE_EVAL16(...) _HSTORE_EVAL8(_HSTORE_EVAL8(__VA_ARGS__))
883 #define _HSTORE_EVAL8(...) _HSTORE_EVAL4(_HSTORE_EVAL4(__VA_ARGS__))
884 #define _HSTORE_EVAL4(...) _HSTORE_EVAL2(_HSTORE_EVAL2(__VA_ARGS__))
885 #define _HSTORE_EVAL2(...) _HSTORE_EVAL1(_HSTORE_EVAL1(__VA_ARGS__))
886 #define _HSTORE_EVAL1(...) __VA_ARGS__
887 #define _HSTORE_DEFER2(m) m _HSTORE_EMPTY _HSTORE_EMPTY()()
888 #define _HSTORE_HAS_PARGS(...) _HSTORE_BOOL(_HSTORE_FIRST(_HSTORE__EOAP_ __VA_ARGS__)())
889 #define _HSTORE__EOAP_(...) _HSTORE_BOOL(_HSTORE_FIRST(_HSTORE__EOA_ __VA_ARGS__)())
890 #define _HSTORE__EOA_() 0
891 #define _HSTORE_MAP(f, VS...) _HSTORE_EVAL(_HSTORE_MAPP(f, VS))
892 #define _HSTORE_MAPP(f, H, T...) \
893  f H \
894  _HSTORE_IF_ELSE(_HSTORE_HAS_PARGS(T))( \
895  _HSTORE_DEFER2(_HSTORE__MAPP)()(f, T) \
896  )( \
897  )
898 #define _HSTORE__MAPP() _HSTORE_MAPP
899 
900 // support storing packed, reflective structs
901 #define HSTORE_FIELDC_SUCC(t,n) +1
902 #define HSTORE_FIELD_COUNT(FIELDS...) (0 _HSTORE_MAP(HSTORE_FIELDC_SUCC, FIELDS))
903 
904 #define HSTORE_FIELDS_SUCC(t,n) +::hobbes::storage::store< t >::size(this-> n)
905 #define HSTORE_FIELD_SIZE(FIELDS...) (0 _HSTORE_MAP(HSTORE_FIELDS_SUCC, FIELDS))
906 
907 #define HSTORE_FIELDW_SUCC(t,n) &&::hobbes::storage::store< t >::write(p, this-> n)
908 #define HSTORE_FIELD_WRITES(FIELDS...) (true _HSTORE_MAP(HSTORE_FIELDW_SUCC, FIELDS))
909 
910 #define HSTORE_STRUCT_FIELD(t, n) t n;
911 #define HSTORE_STRUCT_FIELD_ENC(t, n) ::hobbes::storage::ws(#n, out); ::hobbes::storage::w((int)-1, out); ::hobbes::storage::store< t >::encode(out);
912 #define HSTORE_STRUCT_FIELD_DESC(t, n) + (", " #n " : " + ::hobbes::storage::store< t >::describe())
913 
914 #define DEFINE_PACKED_HSTORE_STRUCT(T, FIELDS...) \
915  struct T { \
916  _HSTORE_MAP(HSTORE_STRUCT_FIELD, FIELDS) /* struct fields */ \
917  typedef void is_packed_hstore_struct; /* identify this type as a packed struct */ \
918  static void encode(::hobbes::storage::bytes* out) { \
919  size_t localArity = HSTORE_FIELD_COUNT(FIELDS); \
920  if (localArity > 0) { \
921  ::hobbes::storage::w(_HSTORE_TYCTOR_STRUCT, out); \
922  ::hobbes::storage::w(localArity, out); \
923  _HSTORE_MAP(HSTORE_STRUCT_FIELD_ENC, FIELDS) \
924  } else { \
925  ::hobbes::storage::encode_primty("unit", out); \
926  } \
927  } \
928  static std::string describe() { \
929  return "{" + ( std::string("") _HSTORE_MAP(HSTORE_STRUCT_FIELD_DESC, FIELDS) ).substr(2) + "}"; \
930  } \
931  } __attribute__((packed))
932 
933 template <typename T>
934  struct store<T, typename T::is_packed_hstore_struct> {
935  typedef void can_memcpy;
936  static void encode(bytes* out) { T::encode(out); }
937  static std::string describe() { return T::describe(); }
938  static size_t size(const T&) { return sizeof(T); }
939  static bool write(wpipe& p, const T& x) { return p.write((const uint8_t*)&x, sizeof(x)); }
940  };
941 
942 // support storing standard, reflective structs
943 #define DEFINE_HSTORE_STRUCT(T, FIELDS...) \
944  struct T { \
945  _HSTORE_MAP(HSTORE_STRUCT_FIELD, FIELDS) /* struct fields */ \
946  typedef void is_hstore_struct; /* identify this type as a struct */ \
947  static void encode(::hobbes::storage::bytes* out) { \
948  size_t localArity = HSTORE_FIELD_COUNT(FIELDS); \
949  if (localArity > 0) { \
950  ::hobbes::storage::w(_HSTORE_TYCTOR_STRUCT, out); \
951  ::hobbes::storage::w(localArity, out); \
952  _HSTORE_MAP(HSTORE_STRUCT_FIELD_ENC, FIELDS) \
953  } else { \
954  ::hobbes::storage::encode_primty("unit", out); \
955  } \
956  } \
957  static std::string describe() { \
958  return "{" + ( std::string("") _HSTORE_MAP(HSTORE_STRUCT_FIELD_DESC, FIELDS) ).substr(2) + "}"; \
959  } \
960  size_t size() const { \
961  return HSTORE_FIELD_SIZE(FIELDS); \
962  } \
963  bool write(::hobbes::storage::wpipe& p) const { \
964  return HSTORE_FIELD_WRITES(FIELDS); \
965  } \
966  }
967 
968 template <typename T>
969  struct store<T, typename T::is_hstore_struct> {
970  static void encode(bytes* out) { T::encode(out); }
971  static std::string describe() { return T::describe(); }
972  static size_t size(const T& x) { return x.size(); }
973  static bool write(wpipe& p, const T& x) { return x.write(p); }
974  };
975 
976 // support storing standard pairs of types
977 template <typename U, typename V>
978  struct store<std::pair<U,V>> {
979  static void encode(bytes* out) {
980  w(_HSTORE_TYCTOR_STRUCT, out);
981  w((size_t)2, out);
982 
983  ws(".f0", out);
984  w((int)-1, out);
985  store<U>::encode(out);
986 
987  ws(".f1", out);
988  w((int)-1, out);
989  store<V>::encode(out);
990  }
991  static std::string describe() {
992  return "(" + store<U>::describe() + "*" + store<V>::describe() + ")";
993  }
994  static size_t size(const std::pair<U, V>& x) {
995  return store<U>::size(x.first) + store<V>::size(x.second);
996  }
997  static bool write(wpipe& p, const std::pair<U, V>& x) {
998  return store<U>::write(p, x.first) && store<V>::write(p, x.second);
999  }
1000  };
1001 
1002 // support storing reflective enumerations
1003 #define HSTORE_ENUM_CTOR_DEF(n) n ,
1004 #define HSTORE_ENUM_CTOR_CTOR(n) static const SelfT n() { return SelfT(SelfT::Enum::n); }
1005 #define HSTORE_ENUM_CTORC_SUCC(n) +1
1006 #define HSTORE_ENUM_CTOR_STR(n) + "|" #n
1007 #define HSTORE_ENUM_CTOR_ENCODE(n) \
1008  ::hobbes::storage::ws(#n, out); \
1009  ::hobbes::storage::w((uint32_t)(Enum :: n), out); \
1010  ::hobbes::storage::encode_primty("unit", out);
1011 
1012 #define DEFINE_HSTORE_ENUM(T, CTORS...) \
1013  struct T { \
1014  typedef void is_hstore_enum; \
1015  enum class Enum : uint32_t { \
1016  _HSTORE_MAP(HSTORE_ENUM_CTOR_DEF, CTORS) \
1017  COUNT \
1018  }; \
1019  Enum value; \
1020  T() : value() { } \
1021  T(Enum v) : value(v) { } \
1022  T& operator=(Enum v) { this->value = v; return *this; } \
1023  operator Enum() { return this->value; } \
1024  typedef T SelfT; \
1025  _HSTORE_MAP(HSTORE_ENUM_CTOR_CTOR, CTORS) \
1026  static void encode(::hobbes::storage::bytes* out) { \
1027  ::hobbes::storage::w(_HSTORE_TYCTOR_VARIANT, out); \
1028  ::hobbes::storage::w((size_t)(0 _HSTORE_MAP(HSTORE_ENUM_CTORC_SUCC, CTORS)), out); \
1029  _HSTORE_MAP(HSTORE_ENUM_CTOR_ENCODE, CTORS); \
1030  } \
1031  static std::string describe() { \
1032  return (std::string("") _HSTORE_MAP(HSTORE_ENUM_CTOR_STR, CTORS)).substr(1); \
1033  } \
1034  }
1035 
1036 template <typename T>
1037  struct store<T, typename T::is_hstore_enum> {
1038  typedef void can_memcpy;
1039  static void encode(bytes* out) { T::encode(out); }
1040  static std::string describe() { return T::describe(); }
1041  static size_t size(const T&) { return sizeof(T); }
1042  static bool write(wpipe& p, const T& x) { return p.write((const uint8_t*)&x, sizeof(x)); }
1043  };
1044 
1045 // support storing variants (with and without explicit constructor names)
1046 #define DEFINE_HSTORE_VARIANT_GEN(T, VDECL, VCTORS, VCOPY, VDESTROY, CTORCOUNT, VENCODE, VDESC, VSIZE, VWRITE, VVISITCASE, VEQCASE, CTAGS, CDATA) \
1047  template <typename R> \
1048  struct T##Visitor { \
1049  VDECL \
1050  }; \
1051  struct T { \
1052  typedef void is_hstore_variant; \
1053  typedef T SelfT; \
1054  T() : tag(Enum::COUNT) { } \
1055  VCTORS \
1056  T(const T& rhs) : tag(rhs.tag) { \
1057  switch (this->tag) { \
1058  VCOPY \
1059  default: break; \
1060  } \
1061  } \
1062  ~T() { \
1063  switch (this->tag) { \
1064  VDESTROY \
1065  default: break; \
1066  } \
1067  } \
1068  T& operator=(const T& rhs) { \
1069  if (this == &rhs) return *this; \
1070  switch (this->tag) { \
1071  VDESTROY \
1072  default: break; \
1073  } \
1074  this->tag = rhs.tag; \
1075  switch (this->tag) { \
1076  VCOPY \
1077  default: break; \
1078  } \
1079  return *this; \
1080  } \
1081  static void encode(::hobbes::storage::bytes* out) { \
1082  ::hobbes::storage::w(_HSTORE_TYCTOR_VARIANT, out); \
1083  ::hobbes::storage::w((size_t)(0 CTORCOUNT), out); \
1084  VENCODE; \
1085  } \
1086  static std::string describe() { \
1087  return "|" + (std::string("") VDESC).substr(1) + "|"; \
1088  } \
1089  size_t wireSize() const { \
1090  switch (this->tag) { \
1091  VSIZE \
1092  default: return 0; \
1093  } \
1094  } \
1095  bool write(::hobbes::storage::wpipe& p) const { \
1096  switch (this->tag) { \
1097  VWRITE \
1098  default: return false; \
1099  } \
1100  } \
1101  template <typename R> \
1102  R visit(const T##Visitor<R>& v) const { \
1103  switch (this->tag) { \
1104  VVISITCASE \
1105  default: throw std::runtime_error("while deconstructing the " #T " variant, cannot decide payload type because tag is invalid"); \
1106  } \
1107  } \
1108  bool operator==(const T& rhs) const { \
1109  if (this->tag != rhs.tag) { \
1110  return false; \
1111  } else { \
1112  switch (this->tag) { \
1113  VEQCASE \
1114  default: return false; \
1115  } \
1116  } \
1117  } \
1118  private: \
1119  enum class Enum : uint32_t { \
1120  CTAGS \
1121  COUNT \
1122  }; \
1123  Enum tag; \
1124  union { \
1125  char data[1]; \
1126  CDATA \
1127  }; \
1128  }
1129 
1130 // (with implicit ctor names)
1131 #define HSTORE_VARIANT_CTOR(n, t) static SelfT n(const t & x) { SelfT r; r.tag = Enum::tag_##n; new (r.data) t(x); return r; }
1132 #define HSTORE_VARIANT_CTOR_STR(n, t) + "," #n ":" + ::hobbes::storage::store< t >::describe()
1133 #define HSTORE_VARIANT_CTOR_TAG(n, t) tag_##n,
1134 #define HSTORE_VARIANT_SIZE_CASE(n, t) case Enum::tag_##n: return sizeof(int) + ::hobbes::storage::store< t >::size(this->n##_data);
1135 #define HSTORE_VARIANT_WRITE_CASE(n, t) case Enum::tag_##n: return ::hobbes::storage::store<int>::write(p, (int)this->tag) && ::hobbes::storage::store< t >::write(p, this->n##_data);
1136 #define HSTORE_VARIANT_PCOPY(n, t) case Enum::tag_##n: new (this->data) t(rhs.n##_data); break;
1137 #define HSTORE_VARIANT_PDESTROY(n, t) case Enum::tag_##n: { typedef t __DT; ((__DT*)&this->n##_data)->~__DT(); } break;
1138 #define HSTORE_VARIANT_SUCC(n, t) +1
1139 #define HSTORE_VARIANT_CTOR_OPAQUEDATA(n, t) t n##_data;
1140 #define HSTORE_VARIANT_CTOR_ENCODE(n, t) \
1141  ::hobbes::storage::ws(#n, out); \
1142  ::hobbes::storage::w((uint32_t)Enum::tag_##n, out); \
1143  ::hobbes::storage::store< t >::encode(out);
1144 
1145 #define HSTORE_VARIANT_VDECL(n, t) virtual R n(const t & x) const = 0;
1146 #define HSTORE_VARIANT_VCASE(n, t) case Enum::tag_##n: return v. n (this->n##_data);
1147 #define HSTORE_VARIANT_EQCASE(n, t) case Enum::tag_##n: return (this->n##_data == rhs.n##_data);
1148 
1149 #define DEFINE_HSTORE_VARIANT(T, CTORS...) \
1150  DEFINE_HSTORE_VARIANT_GEN(T, _HSTORE_MAP(HSTORE_VARIANT_VDECL, CTORS), _HSTORE_MAP(HSTORE_VARIANT_CTOR, CTORS), _HSTORE_MAP(HSTORE_VARIANT_PCOPY, CTORS), _HSTORE_MAP(HSTORE_VARIANT_PDESTROY, CTORS), _HSTORE_MAP(HSTORE_VARIANT_SUCC, CTORS), _HSTORE_MAP(HSTORE_VARIANT_CTOR_ENCODE, CTORS), _HSTORE_MAP(HSTORE_VARIANT_CTOR_STR, CTORS), _HSTORE_MAP(HSTORE_VARIANT_SIZE_CASE, CTORS), _HSTORE_MAP(HSTORE_VARIANT_WRITE_CASE, CTORS), _HSTORE_MAP(HSTORE_VARIANT_VCASE, CTORS), _HSTORE_MAP(HSTORE_VARIANT_EQCASE, CTORS), _HSTORE_MAP(HSTORE_VARIANT_CTOR_TAG, CTORS), _HSTORE_MAP(HSTORE_VARIANT_CTOR_OPAQUEDATA, CTORS))
1151 
1152 // (with explicit ctor names)
1153 #define HSTORE_VARIANT_LBL_CTOR(n, lbl, t) static SelfT n(const t & x) { SelfT r; r.tag = Enum::tag_##n; new (r.data) t(x); return r; }
1154 #define HSTORE_VARIANT_LBL_CTOR_STR(n, lbl, t) + "," #n ":" + ::hobbes::storage::store< t >::describe()
1155 #define HSTORE_VARIANT_LBL_CTOR_TAG(n, lbl, t) tag_##n,
1156 #define HSTORE_VARIANT_LBL_SIZE_CASE(n, lbl, t) case Enum::tag_##n: return sizeof(int) + ::hobbes::storage::store< t >::size(this->n##_data);
1157 #define HSTORE_VARIANT_LBL_WRITE_CASE(n, lbl, t) case Enum::tag_##n: return ::hobbes::storage::store<int>::write(p, (int)this->tag) && ::hobbes::storage::store< t >::write(p, this->n##_data);
1158 #define HSTORE_VARIANT_LBL_PCOPY(n, lbl, t) case Enum::tag_##n: new (this->data) t(rhs.n##_data); break;
1159 #define HSTORE_VARIANT_LBL_PDESTROY(n, lbl, t) case Enum::tag_##n: { typedef t __DT; ((__DT*)&this->n##_data)->~__DT(); } break;
1160 #define HSTORE_VARIANT_LBL_SUCC(n, lbl, t) +1
1161 #define HSTORE_VARIANT_LBL_CTOR_OPAQUEDATA(n, lbl, t) t n##_data;
1162 #define HSTORE_VARIANT_LBL_CTOR_ENCODE(n, lbl, t) \
1163  ::hobbes::storage::ws(#lbl, out); \
1164  ::hobbes::storage::w((uint32_t)Enum::tag_##n, out); \
1165  ::hobbes::storage::store< t >::encode(out);
1166 
1167 #define HSTORE_VARIANT_LBL_VDECL(n, lbl, t) virtual R n(const t & x) const = 0;
1168 #define HSTORE_VARIANT_LBL_VCASE(n, lbl, t) case Enum::tag_##n: return v. n (this->n##_data);
1169 #define HSTORE_VARIANT_LBL_EQCASE(n, lbl, t) case Enum::tag_##n: return (this->n##_data == rhs.n##_data);
1170 
1171 #define DEFINE_HSTORE_VARIANT_WITH_LABELS(T, CTORS...) \
1172  DEFINE_HSTORE_VARIANT_GEN(T, _HSTORE_MAP(HSTORE_VARIANT_LBL_VDECL, CTORS), _HSTORE_MAP(HSTORE_VARIANT_LBL_CTOR, CTORS), _HSTORE_MAP(HSTORE_VARIANT_LBL_PCOPY, CTORS), _HSTORE_MAP(HSTORE_VARIANT_LBL_PDESTROY, CTORS), _HSTORE_MAP(HSTORE_VARIANT_LBL_SUCC, CTORS), _HSTORE_MAP(HSTORE_VARIANT_LBL_CTOR_ENCODE, CTORS), _HSTORE_MAP(HSTORE_VARIANT_LBL_CTOR_STR, CTORS), _HSTORE_MAP(HSTORE_VARIANT_LBL_SIZE_CASE, CTORS), _HSTORE_MAP(HSTORE_VARIANT_LBL_WRITE_CASE, CTORS), _HSTORE_MAP(HSTORE_VARIANT_LBL_VCASE, CTORS), _HSTORE_MAP(HSTORE_VARIANT_LBL_EQCASE, CTORS), _HSTORE_MAP(HSTORE_VARIANT_LBL_CTOR_TAG, CTORS), _HSTORE_MAP(HSTORE_VARIANT_LBL_CTOR_OPAQUEDATA, CTORS))
1173 
1174 template <typename T>
1175  struct store<T, typename T::is_hstore_variant> {
1176  static void encode(bytes* out) { T::encode(out); }
1177  static std::string describe() { return T::describe(); }
1178  static size_t size(const T& x) { return x.wireSize(); }
1179  static bool write(wpipe& p, const T& x) { return x.write(p); }
1180  };
1181 
1182 // store recursive types
1183 struct recursion {
1184  void* value;
1185  recursion(void* x) : value(x) { }
1186 };
1187 
1188 template <typename T>
1189  struct recursive {
1190  typedef T value_type;
1191  value_type value;
1192  recursive(const value_type& x) : value(x) { }
1193  };
1194 
1195 template <>
1196  struct store<recursion> {
1197  static void encode(bytes* out) {
1198  w(_HSTORE_TYCTOR_TVAR, out);
1199  ws("x", out);
1200  }
1201  static std::string describe() {
1202  return "x";
1203  }
1204 
1205  typedef size_t (*RecSizeF)(const recursion&);
1206  static RecSizeF& sizeF() {
1207  thread_local RecSizeF fn = 0;
1208  return fn;
1209  }
1210  static size_t size(const recursion& x) {
1211  return sizeF()(x);
1212  }
1213 
1214  typedef bool (*RecWriteF)(wpipe&, const recursion&);
1215  static RecWriteF& writeF() {
1216  thread_local RecWriteF fn = 0;
1217  return fn;
1218  }
1219  static bool write(wpipe& p, const recursion& x) {
1220  return writeF()(p, x);
1221  }
1222  };
1223 
1224 template <typename T>
1225  struct store<recursive<T>> {
1226  static void encode(bytes* out) {
1228  ws("x", out);
1229  store<T>::encode(out);
1230  }
1231  static std::string describe() {
1232  return "^x." + store<T>::describe();
1233  }
1234  static size_t recSize(const recursion& x) {
1235  return size(*((recursive<T>*)x.value));
1236  }
1237  static size_t size(const recursive<T>& x) {
1238  typedef typename store<recursion>::RecSizeF RSF;
1239  RSF sf = store<recursion>::sizeF();
1241  size_t r = store<T>::size(x.value);
1242  store<recursion>::sizeF() = sf;
1243  return r;
1244  }
1245 
1246  static bool recWrite(wpipe& p, const recursion& x) {
1247  return write(p, *((recursive<T>*)x.value));
1248  }
1249  static bool write(wpipe& p, const recursive<T>& x) {
1250  typedef typename store<recursion>::RecWriteF RWF;
1251  RWF sf = store<recursion>::writeF();
1253  bool r = store<T>::write(p, x.value);
1254  store<recursion>::writeF() = sf;
1255  return r;
1256  }
1257  };
1258 
1259 // store unit
1260 struct unit {
1261  unit() { }
1262  bool operator==(const unit&) const { return true; }
1263  bool operator< (const unit&) const { return false; }
1264 };
1265 
1266 template <>
1267  struct store<unit> {
1268  static void encode(bytes* out) { encode_primty("unit", out); }
1269  static std::string describe() { return "()"; }
1270  static size_t size(const unit&) { return 0; }
1271  static bool write(wpipe&, const unit&) { return true; }
1272  };
1273 
1274 // store opaque type aliases
1275 #define DEFINE_HSTORE_TYPE_ALIAS(_ATY, _REPTY) \
1276  struct _ATY { \
1277  typedef void is_hstore_alias; \
1278  typedef _REPTY type; \
1279  static const char* name() { return #_ATY; } \
1280  inline operator _REPTY() { return this->value; } \
1281  _REPTY value; \
1282  _ATY() : value() { } \
1283  _ATY(const _REPTY& x) : value(x) { } \
1284  _ATY(const _ATY& x) : value(x.value) { } \
1285  _ATY& operator=(const _ATY& x) { this->value = x.value; return *this; } \
1286  }
1287 
1288 template <typename T>
1289  struct store<T, typename T::is_hstore_alias> {
1290  static void encode(bytes* out) { w(_HSTORE_TYCTOR_PRIM, out); ws(T::name(), out); w((bool)true, out); store<typename T::type>::encode(out); }
1291  static std::string describe() { return T::name(); }
1292  static size_t size(const T& x) { return store<typename T::type>::size(x.value); }
1293  static bool write(wpipe& p, const T& x) { return store<typename T::type>::write(p, x.value); }
1294  };
1295 
1296 // store fixed-length arrays
1297 template <typename T, size_t N>
1299  static void encode(bytes* out) {
1300  w(_HSTORE_TYCTOR_FIXEDARR, out);
1301  store<T>::encode(out);
1302  w(_HSTORE_TYCTOR_SIZE, out);
1303  w((long)N, out);
1304  }
1305 
1306  static std::string describe() {
1307  char nf[32];
1308  snprintf(nf, sizeof(nf), "%lld", (unsigned long long)N);
1309  return "[:" + store<T>::describe() + "|" + std::string(nf) + ":]";
1310  }
1311  };
1312 
1313 template <typename T, size_t N>
1315  static size_t size(const T (&v)[N]) {
1316  return sizeof(T)*N;
1317  }
1318  static bool write(wpipe& p, const T (&v)[N]) {
1319  return p.write((const uint8_t*)v, sizeof(T)*N);
1320  }
1321  };
1322 
1323 template <typename T, size_t N>
1325  static size_t size(const T (&v)[N]) {
1326  size_t s = 0;
1327  for (size_t i = 0; i < N; ++i) {
1328  s += store<T>::size(v[i]);
1329  }
1330  return s;
1331  }
1332  static bool write(wpipe& p, const T (&v)[N]) {
1333  for (size_t i = 0; i < N; ++i) {
1334  if (!store<T>::write(p, v[i])) {
1335  return false;
1336  }
1337  }
1338  return true;
1339  }
1340  };
1341 
1342 template <typename T, size_t N>
1343  struct store<T[N], typename store<T>::can_memcpy> : public fixedArrTyDesc<T,N>, fixedArrMemcpyWrite<T,N> { };
1344 
1345 template <typename T, size_t N>
1346  struct store<T[N], typename cannot_memcpy<T>::type> : public fixedArrTyDesc<T,N>, fixedArrIterWrite<T,N> { };
1347 
1348 // support storage of vectors
1349 template <typename T>
1350  struct store<std::vector<T>, typename store<T>::can_memcpy> {
1351  static void encode(bytes* out) { w(_HSTORE_TYCTOR_ARR, out); store<T>::encode(out); }
1352  static std::string describe() { return "[" + store<T>::describe() + "]"; }
1353  static size_t size(const std::vector<T>& xs) { return sizeof(size_t) + (xs.size() * sizeof(T)); }
1354  static bool write(wpipe& p, const std::vector<T>& xs) { size_t n = xs.size(); return store<size_t>::write(p, n) && p.write((const uint8_t*)&xs[0], n * sizeof(T)); }
1355  };
1356 
1357 template <typename T>
1358  struct store<std::vector<T>, typename cannot_memcpy<T>::type> {
1359  static void encode(bytes* out) { w(_HSTORE_TYCTOR_ARR, out); store<T>::encode(out); }
1360  static std::string describe() { return "[" + store<T>::describe() + "]"; }
1361  static size_t size(const std::vector<T>& xs) {
1362  size_t t = sizeof(size_t);
1363  for (const auto& x : xs) {
1364  t += store<T>::size(x);
1365  }
1366  return t;
1367  }
1368  static bool write(wpipe& p, const std::vector<T>& xs) {
1369  size_t n = xs.size();
1370  if (!store<size_t>::write(p, n)) {
1371  return false;
1372  }
1373  for (const auto& x : xs) {
1374  if (!store<T>::write(p, x)) {
1375  return false;
1376  }
1377  }
1378  return true;
1379  }
1380  };
1381 
1382 // support storage of strings
1383 template <>
1384  struct store<const char*> {
1385  static void encode(bytes* out) { w(_HSTORE_TYCTOR_ARR, out); store<char>::encode(out); }
1386  static std::string describe() { return "[char]"; }
1387  static size_t size(const char* s) { return sizeof(size_t) + strlen(s); }
1388  static bool write(wpipe& p, const char* s) { size_t n = strlen(s); return store<size_t>::write(p, n) && p.write((const uint8_t*)s, n); }
1389  };
1390 template <>
1391  struct store<std::string> {
1392  static void encode(bytes* out) { w(_HSTORE_TYCTOR_ARR, out); store<char>::encode(out); }
1393  static std::string describe() { return "[char]"; }
1394  static size_t size(const std::string& s) { return sizeof(size_t) + s.size(); }
1395  static bool write(wpipe& p, const std::string& s) { size_t n = s.size(); return store<size_t>::write(p, n) && p.write((const uint8_t*)s.data(), n); }
1396  };
1397 
1398 // make a function for describing store payload types
1399 template <typename ... Ts>
1401  typedef void head_type;
1402  static const size_t count = 0;
1403  static void encode(size_t,bytes*) {}
1404  static std::string describe() { return ""; }
1405  };
1406 template <typename T, typename ... Ts>
1407  struct hstore_payload_types<T,Ts...> {
1408  typedef T head_type;
1409  static const size_t count = 1 + hstore_payload_types<Ts...>::count;
1410  static void encode(size_t f, bytes* out) {
1411  char fn[32];
1412  snprintf(fn, sizeof(fn), ".f%lld", (unsigned long long)f);
1413  ws(fn, out);
1414  w((int)-1, out);
1415  store<T>::encode(out);
1417  }
1418  static std::string describe() {
1420  }
1421  };
1422 template <typename ... Ts>
1424  return hstore_payload_types<Ts...>();
1425  }
1426 
1427 typedef void (*tydescfn)(bytes*,std::string*);
1428 template <typename TyList>
1429  void makeTyDescF(bytes* e, std::string* d) {
1430  if (e) {
1431  size_t localArity = TyList::count;
1432 
1433  // don't bother to entuple log argument lists if there's just one argument
1434  switch (localArity) {
1435  case 0:
1436  encode_primty("unit", e);
1437  break;
1438  case 1:
1440  break;
1441  default:
1443  w(localArity, e);
1444  TyList::encode(0, e);
1445  break;
1446  }
1447  }
1448 
1449  if (d) {
1450  *d = TyList::describe().substr(1);
1451  }
1452  }
1453 
1454 // compile-time strings -- used to uniquely identify log statements
1455 template <size_t N>
1456  constexpr char at(size_t i, const char (&s)[N]) {
1457  return (i < N) ? s[i] : '\0';
1458  }
1459 template <size_t N>
1460  constexpr size_t at8S(size_t i, size_t k, const char (&s)[N]) {
1461  return (k==8) ? 0 : ((((size_t)at(i+k,s))<<(8*k))|at8S(i,k+1,s));
1462  }
1463 template <size_t N>
1464  constexpr size_t at8(size_t i, const char (&s)[N]) {
1465  return at8S(i, 0, s);
1466  }
1467 template <size_t... pcs>
1468  struct strpack {
1469  static std::string str() {
1470  constexpr size_t msg[] = {pcs...};
1471  static_assert(msg[(sizeof(msg)/sizeof(msg[0]))-1] == 0, "compile-time string larger than internal max limit (this limit can be bumped in storage.H)");
1472  return std::string((const char*)msg);
1473  }
1474  };
1475 #define _HSTORE_TSTR32(i,s) ::hobbes::storage::at8(i+(8*0),s),::hobbes::storage::at8(i+(8*1),s),::hobbes::storage::at8(i+(8*2),s),::hobbes::storage::at8(i+(8*3),s)
1476 #define _HSTORE_TSTR128(i,s) _HSTORE_TSTR32(i+(32*0),s),_HSTORE_TSTR32(i+(32*1),s),_HSTORE_TSTR32(i+(32*2),s),_HSTORE_TSTR32(i+(32*3),s)
1477 #define _HSTORE_TSTR512(i,s) _HSTORE_TSTR128(i+(128*0),s),_HSTORE_TSTR128(i+(128*1),s),_HSTORE_TSTR128(i+(128*2),s),_HSTORE_TSTR128(i+(128*3),s)
1478 #define _HSTORE_TSTR1024(i,s) _HSTORE_TSTR512(i+(512*0),s),_HSTORE_TSTR512(i+(512*1),s)
1479 #define _HSTORE_TSTR(s) ::hobbes::storage::strpack<_HSTORE_TSTR1024(0,s)>
1480 
1481 template <uint32_t* X>
1482  uint32_t forceRegistration() { return *X; }
1483 template <typename Group, Group* G, typename File, uint32_t Line, typename StmtName, size_t Flags, typename FormatStr, typename ArgTyList>
1485  static uint32_t id;
1486  StorageStatement() { forceRegistration<&id>(); }
1487  };
1488 template <typename Group, Group* G, typename File, uint32_t Line, typename StmtName, size_t Flags, typename FormatStr, typename ArgTyList>
1489  uint32_t StorageStatement<Group, G, File, Line, StmtName, Flags, FormatStr, ArgTyList>::id = G->allocateStorageStatement(File::str(), Line, StmtName::str(), Flags, FormatStr::str(), &makeTyDescF<ArgTyList>);
1490 
1491 // allow static registration of storage points
1495 };
1496 
1497 template <typename Name, CommitMethod cm>
1499  struct StmtData {
1501  uint64_t flags;
1502  std::string fmtstr;
1503  std::string file;
1504  uint32_t line;
1505  uint32_t id;
1506  };
1507  typedef std::map<std::string, StmtData> StorageStatements;
1508 
1509  StorageStatements* statements;
1511  size_t mempages;
1513 
1514  static thread_local wpipe* pipe;
1515 
1517  delete this->statements;
1518  delete this->pipe;
1519  }
1520 
1521  void prepareMeta(bytes* meta) {
1522  if (!this->statements) return; // if nothing to record, nothing to prepare
1523 
1524  w(HSTORE_VERSION, meta);
1525  w((int)this->qos, meta);
1526  w((int)cm, meta);
1527 
1528  // write the count of storage statements, then each statement's static data
1529  w((uint32_t)this->statements->size(), meta);
1530  for (auto s : *this->statements) {
1531  ws(s.first, meta);
1532  w (s.second.flags, meta);
1533  ws(s.second.fmtstr, meta);
1534  ws(s.second.file, meta);
1535  w (s.second.line, meta);
1536  w (s.second.id, meta);
1537 
1538  bytes td;
1539  s.second.tdesc(&td,0);
1540  ws(td, meta);
1541  }
1542  }
1543 
1544  wpipe& out() {
1545  if (_HSTORE_LIKELY(this->pipe != 0)) {
1546  return *this->pipe;
1547  }
1548 
1549  // allocate a shared memory queue for this group, register it with the group server
1550  bytes meta;
1551  prepareMeta(&meta);
1552 
1553  size_t pagesz = sysconf(_SC_PAGESIZE);
1554  size_t pagec = 1 + (meta.size() / pagesz) + std::max<size_t>(this->mempages, 10);
1555 
1556  this->pipe = new wpipe(new writer(meta, sharedMemName(Name::str()), pagesz, pagec), this->qos);
1557  registerSHMAlloc(&this->mqserver, Name::str());
1558  return *this->pipe;
1559  }
1560 
1561  inline void init() {
1562  out();
1563  }
1564 
1565  inline void commit() {
1566  out().commit();
1567  }
1568 
1569  uint32_t allocateStorageStatement(const std::string& file, uint32_t line, const std::string& name, size_t flags, const std::string& fmtstr, tydescfn tdesc) {
1570  if (!this->statements) {
1571  this->statements = new StorageStatements();
1572  }
1573 
1574  auto s = this->statements->find(name);
1575  if (s != this->statements->end()) {
1576  // this statement has already been added
1577  // make sure that all identifying parameters are consistent
1578  bytes ety, xty;
1579  s->second.tdesc(&ety,0);
1580  tdesc(&xty,0);
1581  if (ety != xty) {
1582  std::string etd, xtd;
1583  s->second.tdesc(0,&etd);
1584  tdesc(0,&xtd);
1585 
1586  std::cerr
1587  << "fatal error: incompatible types for store statement '" << name << "' between:\n"
1588  << " " << s->second.file << ":" << s->second.line << " (" << xtd << ")\n"
1589  << "and\n"
1590  << " " << file << ":" << line << " (" << etd << ")\n"
1591  << std::endl;
1592 
1593  exit(-1);
1594  }
1595 
1596  if (s->second.fmtstr != fmtstr) {
1597  std::cerr
1598  << "fatal error: incompatible format strings for store statement '" << name << "' between:\n"
1599  << " " << s->second.file << ":" << s->second.line << " (" << s->second.fmtstr << ")\n"
1600  << "and\n"
1601  << " " << file << ":" << line << " (" << fmtstr << ")\n"
1602  << std::endl;
1603 
1604  exit(-1);
1605  }
1606 
1607  if (s->second.flags != flags) {
1608  std::cerr
1609  << "fatal error: incompatible storage flags for store statement '" << name << "' between:\n"
1610  << " " << s->second.file << ":" << s->second.line << " (" << s->second.flags << ")\n"
1611  << "and\n"
1612  << " " << file << ":" << line << " (" << flags << ")\n"
1613  << std::endl;
1614 
1615  exit(-1);
1616  }
1617 
1618  return s->second.id;
1619  }
1620 
1621  // this must be a new statement, give it a new ID and put it in the group list
1622  StmtData d;
1623  d.tdesc = tdesc;
1624  d.flags = flags;
1625  d.fmtstr = fmtstr;
1626  d.file = file;
1627  d.line = line;
1628  d.id = (uint32_t)this->statements->size();
1629 
1630  (*this->statements)[name] = d;
1631  return d.id;
1632  }
1633 };
1634 template <typename Name, CommitMethod cm>
1635  thread_local wpipe* StorageGroup<Name, cm>::pipe = 0;
1636 
1637 // write a storage statement with payload into a pipe
1638 template <typename ... Ts>
1640  static size_t size(const Ts&...) { return 0; }
1641  static bool write(wpipe&, const Ts&...) { return true; }
1642  };
1643 template <typename T, typename ... Ts>
1644  struct serialize_values<T, Ts...> {
1645  static size_t size(const T& x, const Ts&... xs) {
1646  return store<T>::size(x) + serialize_values<Ts...>::size(xs...);
1647  }
1648  static bool write(wpipe& p, const T& x, const Ts&... xs) {
1649  return store<T>::write(p, x) && serialize_values<Ts...>::write(p, xs...);
1650  }
1651  };
1652 
1653 template <typename GName, typename ... Ts>
1654  inline bool write(StorageGroup<GName, AutoCommit>* g, uint32_t id, const Ts&... xs) {
1655  wpipe& p = g->out();
1656  if (!p.hasSpaceFor(sizeof(uint32_t) + serialize_values<Ts...>::size(xs...))) {
1657  g->commit();
1658  }
1659  return store<uint32_t>::write(p, id) &&
1661  }
1662 template <typename GName, typename ... Ts>
1663  inline bool write(StorageGroup<GName, ManualCommit>* g, uint32_t id, const Ts&... xs) {
1664  wpipe& p = g->out();
1665  return store<uint32_t>::write(p, id) &&
1667  }
1668 
1669 // validate arity in log format strings (prevent format strings referring to payload variables that don't exist)
1670 template <size_t N>
1671  static constexpr size_t readInt(const char (&fmt)[N], size_t i, size_t e, size_t n) {
1672  return (i == e) ? n : readInt(fmt, i+1, e, (n*10)+(fmt[i]-'0'));
1673  }
1674 
1675 static constexpr size_t maxV(size_t x, size_t y) {
1676  return (x < y) ? y : x;
1677 }
1678 
1679 template <size_t N>
1680  static constexpr size_t maxVarRefS(const char (&fmt)[N], size_t i, size_t s, size_t vri, size_t maxvr) {
1681  return (i >= N) ? maxvr
1682  :(s == 0) ? ((fmt[i] == '\\') ? maxVarRefS(fmt, i+2, 0, 0, maxvr)
1683  :(fmt[i] == '$') ? maxVarRefS(fmt, i+1, 1, i+1, maxvr)
1684  : maxVarRefS(fmt, i+1, 0, 0, maxvr))
1685  : ((fmt[i] >= '0' && fmt[i] <= '9') ?
1686  maxVarRefS(fmt, i+1, 1, vri, maxvr)
1687  : maxVarRefS(fmt, i+1, 0, 0, maxV(maxvr, 1+readInt(fmt, vri, i, 0))));
1688  }
1689 
1690 template <size_t N>
1691  static constexpr size_t maxVarRef(const char (&fmt)[N]) {
1692  return maxVarRefS(fmt, 0, 0, 0, 0);
1693  }
1694 
1695 // create statement groups
1696 #define DECLARE_STORAGE_GROUP(NAME, cm) extern ::hobbes::storage::StorageGroup<_HSTORE_TSTR(#NAME),cm> NAME
1697 #define DEFINE_STORAGE_GROUP(NAME, pagec, qos, cm) ::hobbes::storage::StorageGroup<_HSTORE_TSTR(#NAME),cm> NAME = { 0, qos, pagec, -1 }
1698 
1699 // record some data
1700 #define APPLY_HSTORE_STMT(GROUP, NAME, FLAGS, FMTSTR, ARGS...) \
1701  ::hobbes::storage::write(&GROUP, ({static_assert(::hobbes::storage::maxVarRef(FMTSTR) <= decltype(::hobbes::storage::makePayloadTypes(ARGS))::count, "Log format string and payload arity mismatch"); ::hobbes::storage::StorageStatement<decltype(GROUP),&GROUP,_HSTORE_TSTR(__FILE__),__LINE__,_HSTORE_TSTR(#NAME),FLAGS,_HSTORE_TSTR(FMTSTR),decltype(::hobbes::storage::makePayloadTypes(ARGS))>::id;}), ## ARGS)
1702 
1703 #define HSTORE(GROUP, NAME, ARGS...) APPLY_HSTORE_STMT(GROUP, NAME, 0, "", ## ARGS)
1704 #define HLOG(GROUP, NAME, FMTSTR, ARGS...) APPLY_HSTORE_STMT(GROUP, NAME, 1, FMTSTR, ## ARGS)
1705 
1706 // run a process to read transaction data
1707 struct statement {
1708  std::string name;
1709  uint64_t flags;
1710  std::string fmtstr;
1711  std::string file;
1712  uint32_t line;
1713  uint32_t id;
1714  bytes type;
1715 
1716  inline bool isLog() const { return (this->flags & 1) == 1; }
1717 };
1718 typedef std::vector<statement> statements;
1719 
1720 inline size_t rs(const reader::MetaData& md, size_t o, size_t n, uint8_t* b) {
1721  if (o + n <= md.second) {
1722  memcpy(b, md.first + o, n);
1723  return o + n;
1724  } else {
1725  return md.second;
1726  }
1727 }
1728 
1729 template <typename T>
1730  size_t r(const reader::MetaData& md, size_t o, T* t) {
1731  return rs(md, o, sizeof(T), (uint8_t*)t);
1732  }
1733 
1734 inline size_t rs(const reader::MetaData& md, size_t o, std::string* s) {
1735  size_t n = 0;
1736  o = r(md, o, &n);
1737  s->resize(n);
1738  return rs(md, o, n, (uint8_t*)s->data());
1739 }
1740 
1741 inline size_t rs(const reader::MetaData& md, size_t o, bytes* s) {
1742  size_t n = 0;
1743  o = r(md, o, &n);
1744  s->resize(n);
1745  return rs(md, o, n, &(*s)[0]);
1746 }
1747 
1749 public:
1750  Transaction(const uint8_t* data, size_t datasz) : data(data), datasz(datasz), i(0) {
1751  }
1752 
1753  bool canRead(size_t x) const {
1754  return (this->i+x) <= this->datasz;
1755  }
1756 
1757  const uint8_t* ptr() const {
1758  return this->data + this->i;
1759  }
1760 
1761  void skip(size_t d) {
1762  this->i += d;
1763  }
1764 
1765  template <typename T>
1766  const T* read() {
1767  auto p = (const T*)ptr();
1768  skip(sizeof(T));
1769  return p;
1770  }
1771 
1772  size_t size() const {
1773  return this->datasz;
1774  }
1775 private:
1776  const uint8_t* data;
1777  size_t datasz;
1778  size_t i;
1779 };
1780 
1781 inline void runReadProcess(const QueueConnection& qc, const std::function<std::function<void(Transaction&)>(PipeQOS, CommitMethod, const statements&)>& initF) {
1782  reader rd(qc);
1783  rpipe p(&rd);
1784 
1785  // initialize
1786  reader::MetaData md = rd.meta();
1787 
1788  uint32_t hstoreVersion = 0;
1789  size_t o = r(md, 0, &hstoreVersion);
1790  if (hstoreVersion != HSTORE_VERSION) {
1791  throw std::runtime_error("Can't read storage data from incompatible process");
1792  }
1793 
1794  // read group flags/settings
1795  int qos, cm;
1796  o = r(md, o, &qos);
1797  o = r(md, o, &cm);
1798 
1799  // read all storage statements
1800  uint32_t n = 0;
1801  o = r(md, o, &n);
1802  statements ss;
1803  ss.reserve(n);
1804  for (size_t i = 0; i < n; ++i) {
1805  statement s;
1806  o = rs(md, o, &s.name);
1807  o = r (md, o, &s.flags);
1808  o = rs(md, o, &s.fmtstr);
1809  o = rs(md, o, &s.file);
1810  o = r (md, o, &s.line);
1811  o = r (md, o, &s.id);
1812  o = rs(md, o, &s.type);
1813  ss.push_back(s);
1814  }
1815 
1816  auto txnF = initF((PipeQOS)qos, (CommitMethod)cm, ss);
1817 
1818  // read transactions and call back into user code
1819  bytes txn;
1820  while (true) {
1821  static const size_t blockSize = 1024;
1822 
1823  size_t i = txn.size();
1824  txn.resize(txn.size() + blockSize);
1825 
1826  uint8_t txnFlag = 0;
1827  txn.resize(txn.size() - (blockSize - p.read(&txn[i], blockSize, &txnFlag)));
1828 
1829  switch (txnFlag) {
1831  txn.clear();
1832  break;
1834  Transaction txnr(&txn[0], txn.size());
1835  txnF(txnr);
1836  txn.clear();
1837  break;
1838  }
1839  default:
1840  break;
1841  }
1842  }
1843 }
1844 
1845 }}
1846 
1847 #endif
static bool write(wpipe &, const std::tuple< Ts... > &)
Definition: storage.H:831
size_t datasz
Definition: storage.H:514
size_t pagesz
Definition: storage.H:515
recursive(const value_type &x)
Definition: storage.H:1192
uint32_t nextIndex(volatile uint32_t *i) const
Definition: storage.H:293
static void encode(size_t f, bytes *out)
Definition: storage.H:796
static bool write(wpipe &p, const std::pair< U, V > &x)
Definition: storage.H:997
static thread_local wpipe * pipe
Definition: storage.H:1514
#define _HSTORE_TYCTOR_RECURSIVE
Definition: storage.H:723
void type
Definition: storage.H:778
int mqconnect(const std::string &fileName)
Definition: storage.H:117
Definition: storage.H:1298
#define _HSTORE_PAGE_STATE_ROLLBACK
Definition: storage.H:406
volatile uint32_t * waitState() const
Definition: storage.H:289
size_t valsz
Definition: storage.H:270
static size_t size(const recursive< T > &x)
Definition: storage.H:1237
std::string sharedMemName(const std::string &groupName, const ProcThread &pt)
Definition: storage.H:219
std::string file
Definition: storage.H:1711
const uint8_t * page
Definition: storage.H:663
const T * read()
Definition: storage.H:1766
Definition: storage.H:415
uint32_t ri
Definition: storage.H:277
uint32_t wstate
Definition: storage.H:276
void mqwrite(int fd, const uint8_t *x, size_t len)
Definition: storage.H:103
void w(const T &x, bytes *out)
Definition: storage.H:729
static bool write(wpipe &p, const T &x)
Definition: storage.H:1042
uint8_t * next()
Definition: storage.H:355
int shmfd
Definition: storage.H:286
void commit()
Definition: storage.H:1565
static bool write(wpipe &p, const T &x, const Ts &... xs)
Definition: storage.H:1648
QueueConnection consumeGroup(const std::string &gname, const ProcThread &pt)
Definition: storage.H:566
static size_t size(const std::pair< U, V > &x)
Definition: storage.H:994
static long sys_futex(volatile uint32_t *p, int op, int v, struct timespec *timeout, void *p2, int v2)
Definition: storage.H:61
static bool write(wpipe &p, const T &x, const Ts &... xs)
Definition: storage.H:811
uint8_t * value(size_t i) const
Definition: storage.H:581
static size_t size(const T &x)
Definition: storage.H:972
static bool write(wpipe &p, const T(&v)[N])
Definition: storage.H:1318
#define _HSTORE_STATE_UNBLOCKED
Definition: storage.H:245
void ws(const char *x, bytes *out)
Definition: storage.H:732
static bool write(wpipe &p, const std::vector< T > &xs)
Definition: storage.H:1368
uint32_t offset
Definition: storage.H:665
void encode(const PrimitivePtr &, std::ostream &)
Definition: expr.C:1674
static std::string describe()
Definition: storage.H:1404
static bool write(wpipe &p, const recursion &x)
Definition: storage.H:1219
static void waitForUpdate(volatile uint32_t *p, int eqV)
Definition: storage.H:64
_HSTORE_DEFINE_PRIMTYS(bool, "bool")
size_t count
Definition: storage.H:260
Definition: storage.H:726
static size_t size(const std::tuple< Ts... > &t)
Definition: storage.H:817
static constexpr size_t maxVarRef(const char(&fmt)[N])
Definition: storage.H:1691
static std::string describe()
Definition: storage.H:1269
int mqlisten(const std::string &fileName)
Definition: storage.H:150
Definition: storage.H:571
bool operator==(const unit &) const
Definition: storage.H:1262
static size_t size(const std::tuple< Ts... > &t)
Definition: storage.H:854
T align(T x, T m)
Definition: storage.H:84
void push()
Definition: storage.H:388
hstore_payload_types< Ts... > makePayloadTypes(const Ts &...)
Definition: storage.H:1423
#define _HSTORE_TYCTOR_SIZE
Definition: storage.H:722
size_t metasz
Definition: storage.H:272
static std::string describe()
Definition: storage.H:1306
uint8_t * value(size_t i) const
Definition: storage.H:292
const pqueue_config & config() const
Definition: storage.H:605
static std::string describe()
Definition: storage.H:850
std::string fmtstr
Definition: storage.H:1502
static void encode(bytes *out)
Definition: storage.H:1268
static void encode(bytes *out)
Definition: storage.H:936
~reader()
Definition: storage.H:601
const uint8_t * data
Definition: storage.H:1776
static std::string describe()
Definition: storage.H:788
static size_t size(const std::tuple< Ts... > &)
Definition: storage.H:828
static void encode(bytes *out)
Definition: storage.H:1351
static size_t size(const std::vector< T > &xs)
Definition: storage.H:1353
ProcThread thisProcThread()
Definition: storage.H:207
uint32_t line
Definition: storage.H:1504
static size_t recSize(const recursion &x)
Definition: storage.H:1234
Definition: pattern.H:281
std::pair< std::string, std::string > pair
Definition: str.H:220
Definition: storage.H:1468
pqueue_config(size_t valuesz, size_t count, uint32_t *wstate, uint32_t *ri, uint32_t *wi, uint8_t *data)
Definition: storage.H:254
size_t size() const
Definition: storage.H:1772
#define _HSTORE_STATE_READER_WAITING
Definition: storage.H:246
value_type value
Definition: storage.H:1191
static void encode(bytes *out)
Definition: storage.H:1226
static size_t size(const T &)
Definition: storage.H:1041
recursion(void *x)
Definition: storage.H:1185
static std::string describe()
Definition: storage.H:1177
std::string tempDir()
Definition: storage.H:178
bool write(StorageGroup< GName, AutoCommit > *g, uint32_t id, const Ts &... xs)
Definition: storage.H:1654
static std::string describe()
Definition: storage.H:937
static void encode(bytes *out)
Definition: storage.H:839
writer * wq
Definition: storage.H:417
volatile uint32_t * wstate
Definition: storage.H:261
static size_t size(const std::string &s)
Definition: storage.H:1394
Definition: boot.H:7
pqueue_config()
Definition: storage.H:250
static size_t size(const T &x)
Definition: storage.H:1292
StorageStatement()
Definition: storage.H:1486
int mqserver
Definition: storage.H:1512
static RecSizeF & sizeF()
Definition: storage.H:1206
Definition: storage.H:1484
volatile uint32_t * readerIndex
Definition: storage.H:262
Definition: storage.H:410
Definition: storage.H:785
static std::string describe()
Definition: storage.H:1418
Definition: storage.H:1639
static uint32_t id
Definition: storage.H:1485
static bool write(wpipe &p, const recursive< T > &x)
Definition: storage.H:1249
volatile uint32_t * writerIndex
Definition: storage.H:263
static void encode(bytes *out)
Definition: storage.H:1039
static size_t size(const std::vector< T > &xs)
Definition: storage.H:1361
Definition: storage.H:275
llvm::Value * offset(llvm::IRBuilder<> *b, llvm::Value *p, llvm::Value *o0)
Definition: llvm.H:419
#define _HSTORE_PAGE_STATE_TENTATIVE
Definition: storage.H:403
bool isLog() const
Definition: storage.H:1716
bool reliable() const
Definition: storage.H:427
std::string shmname
Definition: storage.H:285
void prepareMeta(bytes *meta)
Definition: storage.H:1521
bool write(StorageGroup< GName, ManualCommit > *g, uint32_t id, const Ts &... xs)
Definition: storage.H:1663
bool write(const uint8_t *src, size_t sz)
Definition: storage.H:472
std::vector< statement > statements
Definition: storage.H:1718
uint32_t wi
Definition: storage.H:278
static bool write(wpipe &p, const std::tuple< Ts... > &t)
Definition: storage.H:821
static void encode(bytes *out)
Definition: storage.H:970
std::map< std::string, StmtData > StorageStatements
Definition: storage.H:1507
Definition: storage.H:1498
uint32_t forceRegistration()
Definition: storage.H:1482
void markPage(uint8_t c)
Definition: storage.H:423
bytes type
Definition: storage.H:1714
CommitMethod
Definition: storage.H:1492
static size_t size(const T(&v)[N])
Definition: storage.H:1315
uint32_t id
Definition: storage.H:1505
static size_t size(const unit &)
Definition: storage.H:1270
static bool write(wpipe &, const unit &)
Definition: storage.H:1271
static std::string describe()
Definition: storage.H:1291
std::vector< uint8_t > bytes
Definition: storage.H:77
rpipe(reader *rq)
Definition: storage.H:667
size_t datasz
Definition: storage.H:1777
std::string file
Definition: storage.H:1503
tydescfn tdesc
Definition: storage.H:1500
Definition: storage.H:1260
static bool write(wpipe &p, const T &x)
Definition: storage.H:1179
uint32_t allocateStorageStatement(const std::string &file, uint32_t line, const std::string &name, size_t flags, const std::string &fmtstr, tydescfn tdesc)
Definition: storage.H:1569
static constexpr size_t maxV(size_t x, size_t y)
Definition: storage.H:1675
StorageStatements * statements
Definition: storage.H:1509
T select(const std::vector< T > &xs, I i)
Definition: array.H:92
#define _HSTORE_PAGE_STATE_COMMIT
Definition: storage.H:405
static size_t size(const Ts &...)
Definition: storage.H:1640
#define _HSTORE_TYCTOR_STRUCT
Definition: storage.H:721
static constexpr size_t readInt(const char(&fmt)[N], size_t i, size_t e, size_t n)
Definition: storage.H:1671
uint32_t unused
Definition: storage.H:279
writer(const bytes &meta, const std::string &shmname, size_t qvalsz, size_t count)
Definition: storage.H:295
bool stepPage()
Definition: storage.H:431
static bool write(wpipe &p, const T &x)
Definition: storage.H:1293
Definition: storage.H:268
static size_t size(const T &x, const Ts &... xs)
Definition: storage.H:1645
static bool write(wpipe &p, const T(&v)[N])
Definition: storage.H:1332
Definition: storage.H:1189
Definition: storage.H:283
static void encode(bytes *out)
Definition: storage.H:1290
static size_t size(const recursion &x)
Definition: storage.H:1210
void runReadProcess(const QueueConnection &qc, const std::function< std::function< void(Transaction &)>(PipeQOS, CommitMethod, const statements &)> &initF)
Definition: storage.H:1781
Definition: storage.H:411
#define _HSTORE_TYCTOR_TVAR
Definition: storage.H:717
static bool write(wpipe &, const Ts &...)
Definition: storage.H:790
Definition: storage.H:777
uint32_t line
Definition: storage.H:1712
void commit()
Definition: storage.H:458
static std::string str()
Definition: storage.H:1469
static RecWriteF & writeF()
Definition: storage.H:1215
int connectGroupHost(const std::string &groupName)
Definition: storage.H:196
static bool write(wpipe &p, const std::tuple< Ts... > &t)
Definition: storage.H:857
static size_t size(const T &)
Definition: storage.H:938
static size_t size(const T &x, const Ts &... xs)
Definition: storage.H:808
std::string name
Definition: storage.H:1708
static void encode(bytes *out)
Definition: storage.H:1392
size_t read(uint8_t *dst, size_t sz, uint8_t *state)
Definition: storage.H:674
Transaction(const uint8_t *data, size_t datasz)
Definition: storage.H:1750
static constexpr size_t maxVarRefS(const char(&fmt)[N], size_t i, size_t s, size_t vri, size_t maxvr)
Definition: storage.H:1680
uint8_t * data
Definition: storage.H:513
volatile uint32_t * readIndex() const
Definition: storage.H:290
void init()
Definition: storage.H:1561
volatile uint32_t * readIndex() const
Definition: storage.H:579
pqueue_config cfg
Definition: storage.H:576
#define HSTORE_VERSION
Definition: storage.H:75
constexpr size_t at8(size_t i, const char(&s)[N])
Definition: storage.H:1464
static bool recWrite(wpipe &p, const recursion &x)
Definition: storage.H:1246
uint8_t * next()
Definition: storage.H:614
#define _HSTORE_TYCTOR_ARR
Definition: storage.H:719
Definition: storage.H:1707
uint32_t nextIndex(volatile uint32_t *i) const
Definition: storage.H:582
QueueConnection consumeQueue(const std::string &shmname)
Definition: storage.H:518
volatile uint32_t * waitState() const
Definition: storage.H:578
int shfd
Definition: storage.H:512
void makeTyDescF(bytes *e, std::string *d)
Definition: storage.H:1429
uint8_t * page
Definition: storage.H:418
size_t mempages
Definition: storage.H:1511
Definition: storage.H:1499
const uint8_t * ptr() const
Definition: storage.H:1757
size_t r(const reader::MetaData &md, size_t o, T *t)
Definition: storage.H:1730
~writer()
Definition: storage.H:349
#define _HSTORE_UNLIKELY(x)
Definition: storage.H:81
wpipe & out()
Definition: storage.H:1544
Definition: storage.H:511
void registerSHMAlloc(int *mqserver, const std::string &groupName)
Definition: storage.H:229
size_t pagesz
Definition: storage.H:419
uint32_t ready
Definition: storage.H:269
static void encode(size_t, bytes *)
Definition: storage.H:1403
Definition: storage.H:660
RunMode config(int argc, const char **argv)
Definition: main.C:91
PipeQOS qos
Definition: storage.H:1510
uint32_t state
Definition: regex.C:372
static bool write(wpipe &p, const T &x)
Definition: storage.H:939
static void encode(bytes *out)
Definition: storage.H:1197
Definition: storage.H:1494
static void encode(size_t, bytes *)
Definition: storage.H:787
Definition: storage.H:1748
static std::string describe()
Definition: storage.H:1386
constexpr size_t at8S(size_t i, size_t k, const char(&s)[N])
Definition: storage.H:1460
size_t rs(const reader::MetaData &md, size_t o, size_t n, uint8_t *b)
Definition: storage.H:1720
ExprPtr fn(const str::seq &vns, const ExprPtr &b, const LexicalAnnotation &la)
Definition: expr.H:837
#define out
Definition: netio.H:19
static bool write(wpipe &p, const std::vector< T > &xs)
Definition: storage.H:1354
static std::string describe()
Definition: storage.H:1040
static void encode(bytes *out)
Definition: storage.H:979
static std::string describe()
Definition: storage.H:991
void skip(size_t d)
Definition: storage.H:1761
MetaData meta() const
Definition: storage.H:609
size_t metasz
Definition: storage.H:575
int makeGroupHost(const std::string &groupName)
Definition: storage.H:200
reader(const QueueConnection &qc)
Definition: storage.H:584
static size_t size(const T(&v)[N])
Definition: storage.H:1325
static size_t size(const Ts &...)
Definition: storage.H:789
static size_t size(const char *s)
Definition: storage.H:1387
Definition: storage.H:1493
uint64_t flags
Definition: storage.H:1501
static void encode(size_t f, bytes *out)
Definition: storage.H:1410
PipeQOS qos
Definition: storage.H:421
static std::string describe()
Definition: storage.H:805
std::pair< uint64_t, uint64_t > ProcThread
Definition: storage.H:205
static void encode(bytes *out)
Definition: storage.H:1385
static std::string describe()
Definition: storage.H:971
static bool write(wpipe &, const Ts &...)
Definition: storage.H:1641
void(* tydescfn)(bytes *, std::string *)
Definition: storage.H:1427
uint64_t flags
Definition: storage.H:1709
Definition: storage.H:816
static bool write(wpipe &p, const T &x)
Definition: storage.H:973
uint32_t offset
Definition: storage.H:420
uint8_t * pollNext()
Definition: storage.H:380
size_t count
Definition: storage.H:271
const uint8_t * metad
Definition: storage.H:574
reader * rq
Definition: storage.H:662
static void uxchg(volatile uint32_t *px, uint32_t nx)
Definition: storage.H:92
bool canRead(size_t x) const
Definition: storage.H:1753
T value_type
Definition: storage.H:1190
volatile uint32_t * writeIndex() const
Definition: storage.H:580
constexpr char at(size_t i, const char(&s)[N])
Definition: storage.H:1456
unit()
Definition: storage.H:1261
#define _HSTORE_PAGE_STATE_CONT
Definition: storage.H:404
~StorageGroup()
Definition: storage.H:1516
Definition: storage.H:1324
void head_type
Definition: storage.H:1401
size_t i
Definition: storage.H:1778
size_t valuesz
Definition: storage.H:259
static std::string describe()
Definition: storage.H:1201
int shfd
Definition: storage.H:573
#define xchg
Definition: storage.H:100
static void encode(bytes *out)
Definition: storage.H:1299
static bool write(wpipe &p, const char *s)
Definition: storage.H:1388
static size_t size(const T &x)
Definition: storage.H:1178
void pop()
Definition: storage.H:649
MonoTypePtr tuple(const MonoTypes &mtys=MonoTypes())
Definition: type.H:1068
uint32_t id
Definition: storage.H:1713
#define _HSTORE_TYCTOR_PRIM
Definition: storage.H:716
#define _HSTORE_TYCTOR_FIXEDARR
Definition: storage.H:718
#define _HSTORE_LIKELY(x)
Definition: storage.H:80
std::pair< const uint8_t *, size_t > MetaData
Definition: storage.H:608
LexicalAnnotation m(const YYLTYPE &p)
Definition: hexpr.parse.C:127
size_t pagesz
Definition: storage.H:664
static std::string describe()
Definition: storage.H:1393
void encode_primty(const char *tn, bytes *out)
Definition: storage.H:746
pqueue_config cfg
Definition: storage.H:287
wpipe(writer *wq, PipeQOS qos=Reliable)
Definition: storage.H:451
static void wakeN(volatile uint32_t *p, int c)
Definition: storage.H:67
std::string fmtstr
Definition: storage.H:1710
const pqueue_config & config() const
Definition: storage.H:353
static void encode(bytes *out)
Definition: storage.H:1176
volatile uint32_t * writeIndex() const
Definition: storage.H:291
bool hasSpaceFor(size_t sz) const
Definition: storage.H:467
#define _HSTORE_STATE_WRITER_WAITING
Definition: storage.H:247
Definition: jitcc.H:21
uint8_t * data
Definition: storage.H:264
uint8_t * pollNext()
Definition: storage.H:640
Definition: storage.H:1400
Definition: storage.H:249
Definition: storage.H:1183
PipeQOS
Definition: storage.H:409
static std::string describe()
Definition: storage.H:1231
static bool write(wpipe &p, const std::string &s)
Definition: storage.H:1395
Definition: storage.H:1314
void * value
Definition: storage.H:1184