PRCYCoin  2.0.0.7rc1
P2P Digital Currency
zmqpublishnotifier.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include "chainparams.h"
6 #include "zmqpublishnotifier.h"
7 #include "main.h"
8 #include "util.h"
9 #include "crypto/common.h"
10 
11 static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
12 
13 static const char *MSG_HASHBLOCK = "hashblock";
14 static const char *MSG_HASHTX = "hashtx";
15 static const char *MSG_HASHTXLOCK = "hashtxlock";
16 static const char *MSG_RAWBLOCK = "rawblock";
17 static const char *MSG_RAWTX = "rawtx";
18 static const char *MSG_RAWTXLOCK = "rawtxlock";
19 
20 // Internal function to send multipart message
21 static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
22 {
23  va_list args;
24  va_start(args, size);
25 
26  while (1)
27  {
28  zmq_msg_t msg;
29 
30  int rc = zmq_msg_init_size(&msg, size);
31  if (rc != 0)
32  {
33  zmqError("Unable to initialize ZMQ msg");
34  return -1;
35  }
36 
37  void *buf = zmq_msg_data(&msg);
38  memcpy(buf, data, size);
39 
40  data = va_arg(args, const void*);
41 
42  rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
43  if (rc == -1)
44  {
45  zmqError("Unable to send ZMQ msg");
46  zmq_msg_close(&msg);
47  return -1;
48  }
49 
50  zmq_msg_close(&msg);
51 
52  if (!data)
53  break;
54 
55  size = va_arg(args, size_t);
56  }
57  return 0;
58 }
59 
61 {
62  assert(!psocket);
63 
64  // check if address is being used by other publish notifier
65  std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
66 
67  if (i==mapPublishNotifiers.end())
68  {
69  psocket = zmq_socket(pcontext, ZMQ_PUB);
70  if (!psocket)
71  {
72  zmqError("Failed to create socket");
73  return false;
74  }
75 
76  int rc = zmq_bind(psocket, address.c_str());
77  if (rc!=0)
78  {
79  zmqError("Failed to bind address");
80  zmq_close(psocket);
81  return false;
82  }
83 
84  // register this notifier for the address, so it can be reused for other publish notifier
85  mapPublishNotifiers.insert(std::make_pair(address, this));
86  return true;
87  }
88  else
89  {
90  LogPrint(BCLog::ZMQ, "Reusing socket for address %s\n", address);
91 
92  psocket = i->second->psocket;
93  mapPublishNotifiers.insert(std::make_pair(address, this));
94 
95  return true;
96  }
97 }
98 
100 {
101  assert(psocket);
102 
103  int count = mapPublishNotifiers.count(address);
104 
105  // remove this notifier from the list of publishers using this address
106  typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
107  std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
108 
109  for (iterator it = iterpair.first; it != iterpair.second; ++it)
110  {
111  if (it->second==this)
112  {
113  mapPublishNotifiers.erase(it);
114  break;
115  }
116  }
117 
118  if (count == 1)
119  {
120  LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address);
121  int linger = 0;
122  zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
123  zmq_close(psocket);
124  }
125 
126  psocket = 0;
127 }
128 
129 bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
130 {
131  assert(psocket);
132 
133  /* send three parts, command & data & a LE 4byte sequence number */
134  unsigned char msgseq[sizeof(uint32_t)];
135  WriteLE32(&msgseq[0], nSequence);
136  int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), (void*)0);
137  if (rc == -1)
138  return false;
139 
140  /* increment memory only sequence number after sending */
141  nSequence++;
142 
143  return true;
144 }
145 
147 {
148  uint256 hash = pindex->GetBlockHash();
149  LogPrint(BCLog::ZMQ, "Publish hashblock %s\n", hash.GetHex());
150  char data[32];
151  for (unsigned int i = 0; i < 32; i++)
152  data[31 - i] = hash.begin()[i];
153  return SendMessage(MSG_HASHBLOCK, data, 32);
154 }
155 
157 {
158  uint256 hash = transaction.GetHash();
159  LogPrint(BCLog::ZMQ, "Publish hashtx %s\n", hash.GetHex());
160  char data[32];
161  for (unsigned int i = 0; i < 32; i++)
162  data[31 - i] = hash.begin()[i];
163  return SendMessage(MSG_HASHTX, data, 32);
164 }
165 
167 {
168  uint256 hash = transaction.GetHash();
169  LogPrint(BCLog::ZMQ, "Publish hashtxlock %s\n", hash.GetHex());
170  char data[32];
171  for (unsigned int i = 0; i < 32; i++)
172  data[31 - i] = hash.begin()[i];
173  return SendMessage(MSG_HASHTXLOCK, data, 32);
174 }
175 
177 {
178  LogPrint(BCLog::ZMQ, "Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
179 
180 // XX42 const Consensus::Params& consensusParams = Params().GetConsensus();
181  CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
182  {
183  LOCK(cs_main);
184  CBlock block;
185 // XX42 if(!ReadBlockFromDisk(block, pindex, consensusParams))
186  if(!ReadBlockFromDisk(block, pindex))
187  {
188  zmqError("Can't read block from disk");
189  return false;
190  }
191 
192  ss << block;
193  }
194 
195  return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
196 }
197 
199 {
200  uint256 hash = transaction.GetHash();
201  LogPrint(BCLog::ZMQ, "Publish rawtx %s\n", hash.GetHex());
202  CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
203  ss << transaction;
204  return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
205 }
206 
208 {
209  uint256 hash = transaction.GetHash();
210  LogPrint(BCLog::ZMQ, "Publish rawtxlock %s\n", hash.GetHex());
211  CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
212  ss << transaction;
213  return SendMessage(MSG_RAWTXLOCK, &(*ss.begin()), ss.size());
214 }
CZMQPublishHashTransactionNotifier::NotifyTransaction
bool NotifyTransaction(const CTransaction &transaction)
Definition: zmqpublishnotifier.cpp:156
BCLog::ZMQ
@ ZMQ
Definition: logging.h:45
CDataStream::begin
const_iterator begin() const
Definition: streams.h:116
base_uint::GetHex
std::string GetHex() const
Definition: arith_uint256.cpp:155
CZMQAbstractPublishNotifier::SendMessage
bool SendMessage(const char *command, const void *data, size_t size)
Definition: zmqpublishnotifier.cpp:129
base_uint::begin
unsigned char * begin()
Definition: arith_uint256.h:240
CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock
bool NotifyTransactionLock(const CTransaction &transaction)
Definition: zmqpublishnotifier.cpp:207
CZMQPublishHashBlockNotifier::NotifyBlock
bool NotifyBlock(const CBlockIndex *pindex)
Definition: zmqpublishnotifier.cpp:146
memcpy
void * memcpy(void *a, const void *b, size_t c)
Definition: glibc_compat.cpp:15
chainparams.h
SER_NETWORK
@ SER_NETWORK
Definition: serialize.h:159
CTransaction
The basic transaction that is broadcasted on the network and contained in blocks.
Definition: transaction.h:269
CZMQAbstractPublishNotifier::nSequence
uint32_t nSequence
Definition: zmqpublishnotifier.h:15
cs_main
RecursiveMutex cs_main
Global state.
Definition: main.cpp:65
CZMQPublishHashTransactionLockNotifier::NotifyTransactionLock
bool NotifyTransactionLock(const CTransaction &transaction)
Definition: zmqpublishnotifier.cpp:166
CZMQAbstractPublishNotifier::Shutdown
void Shutdown()
Definition: zmqpublishnotifier.cpp:99
CZMQAbstractNotifier::psocket
void * psocket
Definition: zmqabstractnotifier.h:40
zmqError
void zmqError(const char *str)
Definition: zmqnotificationinterface.cpp:13
uint256
256-bit unsigned big integer.
Definition: uint256.h:38
LogPrint
#define LogPrint(category,...)
Definition: logging.h:162
CBlockIndex::GetBlockHash
uint256 GetBlockHash() const
Definition: chain.h:359
CZMQAbstractNotifier::address
std::string address
Definition: zmqabstractnotifier.h:42
zmqpublishnotifier.h
CDataStream::size
size_type size() const
Definition: streams.h:120
CBlock
Definition: block.h:142
common.h
CZMQPublishRawBlockNotifier::NotifyBlock
bool NotifyBlock(const CBlockIndex *pindex)
Definition: zmqpublishnotifier.cpp:176
main.h
LOCK
#define LOCK(cs)
Definition: sync.h:182
ReadBlockFromDisk
bool ReadBlockFromDisk(CBlock &block, const CDiskBlockPos &pos)
Definition: main.cpp:2101
CZMQAbstractPublishNotifier::Initialize
bool Initialize(void *pcontext)
Definition: zmqpublishnotifier.cpp:60
CZMQPublishRawTransactionNotifier::NotifyTransaction
bool NotifyTransaction(const CTransaction &transaction)
Definition: zmqpublishnotifier.cpp:198
CDataStream
Double ended buffer combining vector and stream-like interfaces.
Definition: streams.h:34
CTransaction::GetHash
const uint256 & GetHash() const
Definition: transaction.h:342
CBlockIndex
The block chain is a tree shaped structure starting with the genesis block at the root,...
Definition: chain.h:162