diff --git a/qa/rpc-tests/comptool.py b/qa/rpc-tests/comptool.py index 6125bae51e..23a979250c 100755 --- a/qa/rpc-tests/comptool.py +++ b/qa/rpc-tests/comptool.py @@ -25,6 +25,8 @@ generator that returns TestInstance objects. See below for definition. # on_getheaders: provide headers via BlockStore # on_getdata: provide blocks via BlockStore +global mininode_lock + class TestNode(NodeConnCB): def __init__(self, block_store, tx_store): @@ -148,10 +150,11 @@ class TestManager(object): max_tries = 10 / sleep_time # Wait at most 10 seconds while max_tries > 0: done = True - for c in self.connections: - if c.cb.verack_received is False: - done = False - break + with mininode_lock: + for c in self.connections: + if c.cb.verack_received is False: + done = False + break if done: break time.sleep(sleep_time) @@ -161,10 +164,11 @@ class TestManager(object): while received_pongs is not True: time.sleep(0.05) received_pongs = True - for c in self.connections: - if c.cb.received_ping_response(counter) is not True: - received_pongs = False - break + with mininode_lock: + for c in self.connections: + if c.cb.received_ping_response(counter) is not True: + received_pongs = False + break # sync_blocks: Wait for all connections to request the blockhash given # then send get_headers to find out the tip of each node, and synchronize @@ -173,8 +177,9 @@ class TestManager(object): # Wait for nodes to request block (50ms sleep * 20 tries * num_blocks) max_tries = 20*num_blocks while max_tries > 0: - results = [ blockhash in c.cb.block_request_map and - c.cb.block_request_map[blockhash] for c in self.connections ] + with mininode_lock: + results = [ blockhash in c.cb.block_request_map and + c.cb.block_request_map[blockhash] for c in self.connections ] if False not in results: break time.sleep(0.05) @@ -199,8 +204,9 @@ class TestManager(object): # Wait for nodes to request transaction (50ms sleep * 20 tries * num_events) max_tries = 20*num_events while max_tries > 0: - results = [ txhash in c.cb.tx_request_map and - c.cb.tx_request_map[txhash] for c in self.connections ] + with mininode_lock: + results = [ txhash in c.cb.tx_request_map and + c.cb.tx_request_map[txhash] for c in self.connections ] if False not in results: break time.sleep(0.05) @@ -221,19 +227,21 @@ class TestManager(object): self.ping_counter += 1 # Sort inv responses from each node - [ c.cb.lastInv.sort() for c in self.connections ] + with mininode_lock: + [ c.cb.lastInv.sort() for c in self.connections ] # Verify that the tip of each connection all agree with each other, and # with the expected outcome (if given) def check_results(self, blockhash, outcome): - for c in self.connections: - if outcome is None: - if c.cb.bestblockhash != self.connections[0].cb.bestblockhash: + with mininode_lock: + for c in self.connections: + if outcome is None: + if c.cb.bestblockhash != self.connections[0].cb.bestblockhash: + return False + elif ((c.cb.bestblockhash == blockhash) != outcome): + # print c.cb.bestblockhash, blockhash, outcome return False - elif ((c.cb.bestblockhash == blockhash) != outcome): - # print c.cb.bestblockhash, blockhash, outcome - return False - return True + return True # Either check that the mempools all agree with each other, or that # txhash's presence in the mempool matches the outcome specified. @@ -242,16 +250,17 @@ class TestManager(object): # perhaps it would be useful to add the ability to check explicitly that # a particular tx's existence in the mempool is the same across all nodes. def check_mempool(self, txhash, outcome): - for c in self.connections: - if outcome is None: - # Make sure the mempools agree with each other - if c.cb.lastInv != self.connections[0].cb.lastInv: - # print c.rpc.getrawmempool() + with mininode_lock: + for c in self.connections: + if outcome is None: + # Make sure the mempools agree with each other + if c.cb.lastInv != self.connections[0].cb.lastInv: + # print c.rpc.getrawmempool() + return False + elif ((txhash in c.cb.lastInv) != outcome): + # print c.rpc.getrawmempool(), c.cb.lastInv return False - elif ((txhash in c.cb.lastInv) != outcome): - # print c.rpc.getrawmempool(), c.cb.lastInv - return False - return True + return True def run(self): # Wait until verack is received @@ -272,9 +281,10 @@ class TestManager(object): block = b_or_t block_outcome = outcome # Add to shared block_store, set as current block - self.block_store.add_block(block) - for c in self.connections: - c.cb.block_request_map[block.sha256] = False + with mininode_lock: + self.block_store.add_block(block) + for c in self.connections: + c.cb.block_request_map[block.sha256] = False # Either send inv's to each node and sync, or add # to invqueue for later inv'ing. if (test_instance.sync_every_block): @@ -288,10 +298,11 @@ class TestManager(object): assert(isinstance(b_or_t, CTransaction)) tx = b_or_t tx_outcome = outcome - # Add to shared tx store - self.tx_store.add_transaction(tx) - for c in self.connections: - c.cb.tx_request_map[tx.sha256] = False + # Add to shared tx store and clear map entry + with mininode_lock: + self.tx_store.add_transaction(tx) + for c in self.connections: + c.cb.tx_request_map[tx.sha256] = False # Again, either inv to all nodes or save for later if (test_instance.sync_every_tx): [ c.cb.send_inv(tx) for c in self.connections ] @@ -302,7 +313,7 @@ class TestManager(object): invqueue.append(CInv(1, tx.sha256)) # Ensure we're not overflowing the inv queue if len(invqueue) == MAX_INV_SZ: - [ c.sb.send_message(msg_inv(invqueue)) for c in self.connections ] + [ c.send_message(msg_inv(invqueue)) for c in self.connections ] invqueue = [] # Do final sync if we weren't syncing on every block or every tx. diff --git a/qa/rpc-tests/maxblocksinflight.py b/qa/rpc-tests/maxblocksinflight.py index 94535822d8..87c80cd97e 100755 --- a/qa/rpc-tests/maxblocksinflight.py +++ b/qa/rpc-tests/maxblocksinflight.py @@ -61,10 +61,11 @@ class TestManager(NodeConnCB): time.sleep(2) total_requests = 0 - for key in self.blockReqCounts: - total_requests += self.blockReqCounts[key] - if self.blockReqCounts[key] > 1: - raise AssertionError("Error, test failed: block %064x requested more than once" % key) + with mininode_lock: + for key in self.blockReqCounts: + total_requests += self.blockReqCounts[key] + if self.blockReqCounts[key] > 1: + raise AssertionError("Error, test failed: block %064x requested more than once" % key) if total_requests > MAX_REQUESTS: raise AssertionError("Error, too many blocks (%d) requested" % total_requests) print "Round %d: success (total requests: %d)" % (count, total_requests) diff --git a/qa/rpc-tests/mininode.py b/qa/rpc-tests/mininode.py index c5c1bcfbbe..b7d78e74fa 100755 --- a/qa/rpc-tests/mininode.py +++ b/qa/rpc-tests/mininode.py @@ -26,7 +26,7 @@ import sys import random import cStringIO import hashlib -from threading import Lock +from threading import RLock from threading import Thread import logging import copy @@ -37,6 +37,19 @@ MY_SUBVERSION = "/python-mininode-tester:0.0.1/" MAX_INV_SZ = 50000 +# Keep our own socket map for asyncore, so that we can track disconnects +# ourselves (to workaround an issue with closing an asyncore socket when +# using select) +mininode_socket_map = dict() + +# One lock for synchronizing all data access between the networking thread (see +# NetworkThread below) and the thread running the test logic. For simplicity, +# NodeConn acquires this lock whenever delivering a message to to a NodeConnCB, +# and whenever adding anything to the send buffer (in send_message()). This +# lock should be acquired in the thread running the test logic to synchronize +# access to any data shared with the NodeConnCB or NodeConn. +mininode_lock = RLock() + # Serialization/deserialization tools def sha256(s): return hashlib.new('sha256', s).digest() @@ -975,10 +988,6 @@ class msg_reject(object): # Reimplement the on_* functions to provide handling for events class NodeConnCB(object): def __init__(self): - # Acquire on all callbacks -- overkill for now since asyncore is - # single-threaded, but may be useful for synchronizing access to - # member variables in derived classes. - self.cbLock = Lock() self.verack_received = False # Derived classes should call this function once to set the message map @@ -1004,7 +1013,7 @@ class NodeConnCB(object): } def deliver(self, conn, message): - with self.cbLock: + with mininode_lock: try: self.cbmap[message.command](conn, message) except: @@ -1076,7 +1085,7 @@ class NodeConn(asyncore.dispatcher): } def __init__(self, dstaddr, dstport, rpc, callback, net="regtest"): - asyncore.dispatcher.__init__(self) + asyncore.dispatcher.__init__(self, map=mininode_socket_map) self.log = logging.getLogger("NodeConn(%s:%d)" % (dstaddr, dstport)) self.dstaddr = dstaddr self.dstport = dstport @@ -1089,7 +1098,6 @@ class NodeConn(asyncore.dispatcher): self.state = "connecting" self.network = net self.cb = callback - self.sendbufLock = Lock() # for protecting the sendbuffer self.disconnect = False # stuff version msg into sendbuf @@ -1140,24 +1148,18 @@ class NodeConn(asyncore.dispatcher): return True def writable(self): - if self.disconnect: - self.handle_close() - return False - else: - self.sendbufLock.acquire() + with mininode_lock: length = len(self.sendbuf) - self.sendbufLock.release() - return (length > 0) + return (length > 0) def handle_write(self): - self.sendbufLock.acquire() - try: - sent = self.send(self.sendbuf) - except: - self.handle_close() - return - self.sendbuf = self.sendbuf[sent:] - self.sendbufLock.release() + with mininode_lock: + try: + sent = self.send(self.sendbuf) + except: + self.handle_close() + return + self.sendbuf = self.sendbuf[sent:] def got_data(self): while True: @@ -1201,7 +1203,6 @@ class NodeConn(asyncore.dispatcher): def send_message(self, message, pushbuf=False): if self.state != "connected" and not pushbuf: return - self.sendbufLock.acquire() self.show_debug_msg("Send %s" % repr(message)) command = message.command data = message.serialize() @@ -1214,9 +1215,9 @@ class NodeConn(asyncore.dispatcher): h = sha256(th) tmsg += h[:4] tmsg += data - self.sendbuf += tmsg - self.last_sent = time.time() - self.sendbufLock.release() + with mininode_lock: + self.sendbuf += tmsg + self.last_sent = time.time() def got_message(self, message): if message.command == "version": @@ -1229,12 +1230,20 @@ class NodeConn(asyncore.dispatcher): def disconnect_node(self): self.disconnect = True - self.send_message(self.messagemap['ping']()) class NetworkThread(Thread): def run(self): - asyncore.loop(0.1, True) + while mininode_socket_map: + # We check for whether to disconnect outside of the asyncore + # loop to workaround the behavior of asyncore when using + # select + disconnected = [] + for fd, obj in mininode_socket_map.items(): + if obj.disconnect: + disconnected.append(obj) + [ obj.handle_close() for obj in disconnected ] + asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1) # An exception we can raise if we detect a potential disconnect