Fabcoin Core  0.16.2
P2P Digital Currency
zmqpublishnotifier.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2017 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <chain.h>
6 #include <chainparams.h>
7 #include <streams.h>
9 #include <validation.h>
10 #include <util.h>
11 #include <rpc/server.h>
12 
13 static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
14 
15 static const char *MSG_HASHBLOCK = "hashblock";
16 static const char *MSG_HASHTX = "hashtx";
17 static const char *MSG_RAWBLOCK = "rawblock";
18 static const char *MSG_RAWTX = "rawtx";
19 
20 // Internal function to send multipart message
21 static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
22 {
23  va_list args;
24  va_start(args, size);
25 
26  while (1)
27  {
28  zmq_msg_t msg;
29 
30  int rc = zmq_msg_init_size(&msg, size);
31  if (rc != 0)
32  {
33  zmqError("Unable to initialize ZMQ msg");
34  va_end(args);
35  return -1;
36  }
37 
38  void *buf = zmq_msg_data(&msg);
39  memcpy(buf, data, size);
40 
41  data = va_arg(args, const void*);
42 
43  rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
44  if (rc == -1)
45  {
46  zmqError("Unable to send ZMQ msg");
47  zmq_msg_close(&msg);
48  va_end(args);
49  return -1;
50  }
51 
52  zmq_msg_close(&msg);
53 
54  if (!data)
55  break;
56 
57  size = va_arg(args, size_t);
58  }
59  va_end(args);
60  return 0;
61 }
62 
64 {
65  assert(!psocket);
66 
67  // check if address is being used by other publish notifier
68  std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
69 
70  if (i==mapPublishNotifiers.end())
71  {
72  psocket = zmq_socket(pcontext, ZMQ_PUB);
73  if (!psocket)
74  {
75  zmqError("Failed to create socket");
76  return false;
77  }
78 
79  int rc = zmq_bind(psocket, address.c_str());
80  if (rc!=0)
81  {
82  zmqError("Failed to bind address");
83  zmq_close(psocket);
84  return false;
85  }
86 
87  // register this notifier for the address, so it can be reused for other publish notifier
88  mapPublishNotifiers.insert(std::make_pair(address, this));
89  return true;
90  }
91  else
92  {
93  LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address);
94 
95  psocket = i->second->psocket;
96  mapPublishNotifiers.insert(std::make_pair(address, this));
97 
98  return true;
99  }
100 }
101 
103 {
104  assert(psocket);
105 
106  int count = mapPublishNotifiers.count(address);
107 
108  // remove this notifier from the list of publishers using this address
109  typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
110  std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
111 
112  for (iterator it = iterpair.first; it != iterpair.second; ++it)
113  {
114  if (it->second==this)
115  {
116  mapPublishNotifiers.erase(it);
117  break;
118  }
119  }
120 
121  if (count == 1)
122  {
123  LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address);
124  int linger = 0;
125  zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
126  zmq_close(psocket);
127  }
128 
129  psocket = nullptr;
130 }
131 
132 bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
133 {
134  assert(psocket);
135 
136  /* send three parts, command & data & a LE 4byte sequence number */
137  unsigned char msgseq[sizeof(uint32_t)];
138  WriteLE32(&msgseq[0], nSequence);
139  int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr);
140  if (rc == -1)
141  return false;
142 
143  /* increment memory only sequence number after sending */
144  nSequence++;
145 
146  return true;
147 }
148 
150 {
151  uint256 hash = pindex->GetBlockHash();
152  LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex());
153  char data[32];
154  for (unsigned int i = 0; i < 32; i++)
155  data[31 - i] = hash.begin()[i];
156  return SendMessage(MSG_HASHBLOCK, data, 32);
157 }
158 
160 {
161  uint256 hash = transaction.GetHash();
162  LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", hash.GetHex());
163  char data[32];
164  for (unsigned int i = 0; i < 32; i++)
165  data[31 - i] = hash.begin()[i];
166  return SendMessage(MSG_HASHTX, data, 32);
167 }
168 
170 {
171  LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
172 
173  const Consensus::Params& consensusParams = Params().GetConsensus();
174  CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
175  {
176  LOCK(cs_main);
177  CBlock block;
178  if(!ReadBlockFromDisk(block, pindex, consensusParams))
179  {
180  zmqError("Can't read block from disk");
181  return false;
182  }
183 
184  ss << block;
185  }
186 
187  return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
188 }
189 
191 {
192  uint256 hash = transaction.GetHash();
193  LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", hash.GetHex());
194  CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
195  ss << transaction;
196  return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
197 }
bool NotifyTransaction(const CTransaction &transaction) override
uint32_t nSequence
upcounting per message sequence number
const_iterator begin() const
Definition: streams.h:233
Definition: block.h:155
std::string GetHex() const
Definition: uint256.cpp:21
size_t count
Definition: ExecStats.cpp:37
const Consensus::Params & GetConsensus() const
Definition: chainparams.h:60
CCriticalSection cs_main
Definition: validation.cpp:77
assert(len-trim+(2 *lenIndices)<=WIDTH)
Double ended buffer combining vector and stream-like interfaces.
Definition: streams.h:146
unsigned char * begin()
Definition: uint256.h:65
bool NotifyTransaction(const CTransaction &transaction) override
bool NotifyBlock(const CBlockIndex *pindex) override
bool ReadBlockFromDisk(Block &block, const CDiskBlockPos &pos, const Consensus::Params &consensusParams)
Functions for disk access for blocks.
#define LOCK(cs)
Definition: sync.h:175
bool SendMessage(const char *command, const void *data, size_t size)
size_type size() const
Definition: streams.h:237
bool NotifyBlock(const CBlockIndex *pindex) override
Parameters that influence chain consensus.
Definition: params.h:39
#define LogPrint(category,...)
Definition: util.h:164
256-bit opaque blob.
Definition: uint256.h:132
uint8_t const size_t const size
Definition: sha3.h:20
The block chain is a tree shaped structure starting with the genesis block at the root...
Definition: chain.h:177
const CChainParams & Params()
Return the currently selected parameters.
int RPCSerializationFlags()
Definition: server.cpp:591
void * memcpy(void *a, const void *b, size_t c)
const uint256 & GetHash() const
Definition: transaction.h:325
The basic transaction that is broadcasted on the network and contained in blocks. ...
Definition: transaction.h:275
uint8_t const * data
Definition: sha3.h:19
void zmqError(const char *str)
uint256 GetBlockHash() const
Definition: chain.h:324
bool Initialize(void *pcontext) override