From ef3281750d9698ed58e07e72232d1604652f5fff Mon Sep 17 00:00:00 2001 From: Suhas Daftuar Date: Thu, 30 Apr 2015 16:40:22 -0400 Subject: [PATCH 1/4] Fix mininode disconnections to work with select --- qa/rpc-tests/mininode.py | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/qa/rpc-tests/mininode.py b/qa/rpc-tests/mininode.py index c5c1bcfbbe..d1a57b54fa 100755 --- a/qa/rpc-tests/mininode.py +++ b/qa/rpc-tests/mininode.py @@ -37,6 +37,11 @@ MY_SUBVERSION = "/python-mininode-tester:0.0.1/" MAX_INV_SZ = 50000 +# Keep our own socket map for asyncore, so that we can track disconnects +# ourselves (to workaround an issue with closing an asyncore socket when +# using select) +mininode_socket_map = dict() + # Serialization/deserialization tools def sha256(s): return hashlib.new('sha256', s).digest() @@ -1076,7 +1081,7 @@ class NodeConn(asyncore.dispatcher): } def __init__(self, dstaddr, dstport, rpc, callback, net="regtest"): - asyncore.dispatcher.__init__(self) + asyncore.dispatcher.__init__(self, map=mininode_socket_map) self.log = logging.getLogger("NodeConn(%s:%d)" % (dstaddr, dstport)) self.dstaddr = dstaddr self.dstport = dstport @@ -1140,14 +1145,10 @@ class NodeConn(asyncore.dispatcher): return True def writable(self): - if self.disconnect: - self.handle_close() - return False - else: - self.sendbufLock.acquire() - length = len(self.sendbuf) - self.sendbufLock.release() - return (length > 0) + self.sendbufLock.acquire() + length = len(self.sendbuf) + self.sendbufLock.release() + return (length > 0) def handle_write(self): self.sendbufLock.acquire() @@ -1229,12 +1230,20 @@ class NodeConn(asyncore.dispatcher): def disconnect_node(self): self.disconnect = True - self.send_message(self.messagemap['ping']()) class NetworkThread(Thread): def run(self): - asyncore.loop(0.1, True) + while mininode_socket_map: + # We check for whether to disconnect outside of the asyncore + # loop to workaround the behavior of asyncore when using + # select + disconnected = [] + for fd, obj in mininode_socket_map.items(): + if obj.disconnect: + disconnected.append(obj) + [ obj.handle_close() for obj in disconnected ] + asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1) # An exception we can raise if we detect a potential disconnect From 5487975ca3e26e959a815679ba326fb33d6baf8d Mon Sep 17 00:00:00 2001 From: Suhas Daftuar Date: Thu, 30 Apr 2015 16:40:36 -0400 Subject: [PATCH 2/4] Don't run invalidblockrequest.py in travis until race condition is fixed --- qa/pull-tester/rpc-tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qa/pull-tester/rpc-tests.sh b/qa/pull-tester/rpc-tests.sh index ae27a94b8d..840addb251 100755 --- a/qa/pull-tester/rpc-tests.sh +++ b/qa/pull-tester/rpc-tests.sh @@ -31,7 +31,7 @@ testScripts=( 'merkle_blocks.py' # 'forknotify.py' 'maxblocksinflight.py' - 'invalidblockrequest.py' +# 'invalidblockrequest.py' ); if [ "x${ENABLE_BITCOIND}${ENABLE_UTILS}${ENABLE_WALLET}" = "x111" ]; then for (( i = 0; i < ${#testScripts[@]}; i++ )) From 574db4816fd340b57970760ec30f4c8d6234187f Mon Sep 17 00:00:00 2001 From: Suhas Daftuar Date: Fri, 1 May 2015 14:47:21 -0400 Subject: [PATCH 3/4] Fix potential race conditions in p2p testing framework Previously, each NodeConnCB had its own lock to synchronize data structures used by the testing thread and the networking thread, and NodeConn provided a separate additional lock for synchronizing access to each send buffer. This commit replaces those locks with a single global lock (mininode_lock) that we use to synchronize access to all data structures shared by the two threads. Updates comptool and maxblocksinflight to use the new synchronization semantics, eliminating previous race conditions within comptool, and re-enables invalidblockrequest.py in travis. --- qa/pull-tester/rpc-tests.sh | 2 +- qa/rpc-tests/comptool.py | 83 +++++++++++++++++-------------- qa/rpc-tests/maxblocksinflight.py | 9 ++-- qa/rpc-tests/mininode.py | 44 ++++++++-------- 4 files changed, 75 insertions(+), 63 deletions(-) diff --git a/qa/pull-tester/rpc-tests.sh b/qa/pull-tester/rpc-tests.sh index 840addb251..ae27a94b8d 100755 --- a/qa/pull-tester/rpc-tests.sh +++ b/qa/pull-tester/rpc-tests.sh @@ -31,7 +31,7 @@ testScripts=( 'merkle_blocks.py' # 'forknotify.py' 'maxblocksinflight.py' -# 'invalidblockrequest.py' + 'invalidblockrequest.py' ); if [ "x${ENABLE_BITCOIND}${ENABLE_UTILS}${ENABLE_WALLET}" = "x111" ]; then for (( i = 0; i < ${#testScripts[@]}; i++ )) diff --git a/qa/rpc-tests/comptool.py b/qa/rpc-tests/comptool.py index 6125bae51e..25cffe461f 100755 --- a/qa/rpc-tests/comptool.py +++ b/qa/rpc-tests/comptool.py @@ -25,6 +25,8 @@ generator that returns TestInstance objects. See below for definition. # on_getheaders: provide headers via BlockStore # on_getdata: provide blocks via BlockStore +global mininode_lock + class TestNode(NodeConnCB): def __init__(self, block_store, tx_store): @@ -148,10 +150,11 @@ class TestManager(object): max_tries = 10 / sleep_time # Wait at most 10 seconds while max_tries > 0: done = True - for c in self.connections: - if c.cb.verack_received is False: - done = False - break + with mininode_lock: + for c in self.connections: + if c.cb.verack_received is False: + done = False + break if done: break time.sleep(sleep_time) @@ -161,10 +164,11 @@ class TestManager(object): while received_pongs is not True: time.sleep(0.05) received_pongs = True - for c in self.connections: - if c.cb.received_ping_response(counter) is not True: - received_pongs = False - break + with mininode_lock: + for c in self.connections: + if c.cb.received_ping_response(counter) is not True: + received_pongs = False + break # sync_blocks: Wait for all connections to request the blockhash given # then send get_headers to find out the tip of each node, and synchronize @@ -173,8 +177,9 @@ class TestManager(object): # Wait for nodes to request block (50ms sleep * 20 tries * num_blocks) max_tries = 20*num_blocks while max_tries > 0: - results = [ blockhash in c.cb.block_request_map and - c.cb.block_request_map[blockhash] for c in self.connections ] + with mininode_lock: + results = [ blockhash in c.cb.block_request_map and + c.cb.block_request_map[blockhash] for c in self.connections ] if False not in results: break time.sleep(0.05) @@ -199,8 +204,9 @@ class TestManager(object): # Wait for nodes to request transaction (50ms sleep * 20 tries * num_events) max_tries = 20*num_events while max_tries > 0: - results = [ txhash in c.cb.tx_request_map and - c.cb.tx_request_map[txhash] for c in self.connections ] + with mininode_lock: + results = [ txhash in c.cb.tx_request_map and + c.cb.tx_request_map[txhash] for c in self.connections ] if False not in results: break time.sleep(0.05) @@ -221,19 +227,21 @@ class TestManager(object): self.ping_counter += 1 # Sort inv responses from each node - [ c.cb.lastInv.sort() for c in self.connections ] + with mininode_lock: + [ c.cb.lastInv.sort() for c in self.connections ] # Verify that the tip of each connection all agree with each other, and # with the expected outcome (if given) def check_results(self, blockhash, outcome): - for c in self.connections: - if outcome is None: - if c.cb.bestblockhash != self.connections[0].cb.bestblockhash: + with mininode_lock: + for c in self.connections: + if outcome is None: + if c.cb.bestblockhash != self.connections[0].cb.bestblockhash: + return False + elif ((c.cb.bestblockhash == blockhash) != outcome): + # print c.cb.bestblockhash, blockhash, outcome return False - elif ((c.cb.bestblockhash == blockhash) != outcome): - # print c.cb.bestblockhash, blockhash, outcome - return False - return True + return True # Either check that the mempools all agree with each other, or that # txhash's presence in the mempool matches the outcome specified. @@ -242,16 +250,17 @@ class TestManager(object): # perhaps it would be useful to add the ability to check explicitly that # a particular tx's existence in the mempool is the same across all nodes. def check_mempool(self, txhash, outcome): - for c in self.connections: - if outcome is None: - # Make sure the mempools agree with each other - if c.cb.lastInv != self.connections[0].cb.lastInv: - # print c.rpc.getrawmempool() + with mininode_lock: + for c in self.connections: + if outcome is None: + # Make sure the mempools agree with each other + if c.cb.lastInv != self.connections[0].cb.lastInv: + # print c.rpc.getrawmempool() + return False + elif ((txhash in c.cb.lastInv) != outcome): + # print c.rpc.getrawmempool(), c.cb.lastInv return False - elif ((txhash in c.cb.lastInv) != outcome): - # print c.rpc.getrawmempool(), c.cb.lastInv - return False - return True + return True def run(self): # Wait until verack is received @@ -272,9 +281,10 @@ class TestManager(object): block = b_or_t block_outcome = outcome # Add to shared block_store, set as current block - self.block_store.add_block(block) - for c in self.connections: - c.cb.block_request_map[block.sha256] = False + with mininode_lock: + self.block_store.add_block(block) + for c in self.connections: + c.cb.block_request_map[block.sha256] = False # Either send inv's to each node and sync, or add # to invqueue for later inv'ing. if (test_instance.sync_every_block): @@ -288,10 +298,11 @@ class TestManager(object): assert(isinstance(b_or_t, CTransaction)) tx = b_or_t tx_outcome = outcome - # Add to shared tx store - self.tx_store.add_transaction(tx) - for c in self.connections: - c.cb.tx_request_map[tx.sha256] = False + # Add to shared tx store and clear map entry + with mininode_lock: + self.tx_store.add_transaction(tx) + for c in self.connections: + c.cb.tx_request_map[tx.sha256] = False # Again, either inv to all nodes or save for later if (test_instance.sync_every_tx): [ c.cb.send_inv(tx) for c in self.connections ] diff --git a/qa/rpc-tests/maxblocksinflight.py b/qa/rpc-tests/maxblocksinflight.py index 94535822d8..87c80cd97e 100755 --- a/qa/rpc-tests/maxblocksinflight.py +++ b/qa/rpc-tests/maxblocksinflight.py @@ -61,10 +61,11 @@ class TestManager(NodeConnCB): time.sleep(2) total_requests = 0 - for key in self.blockReqCounts: - total_requests += self.blockReqCounts[key] - if self.blockReqCounts[key] > 1: - raise AssertionError("Error, test failed: block %064x requested more than once" % key) + with mininode_lock: + for key in self.blockReqCounts: + total_requests += self.blockReqCounts[key] + if self.blockReqCounts[key] > 1: + raise AssertionError("Error, test failed: block %064x requested more than once" % key) if total_requests > MAX_REQUESTS: raise AssertionError("Error, too many blocks (%d) requested" % total_requests) print "Round %d: success (total requests: %d)" % (count, total_requests) diff --git a/qa/rpc-tests/mininode.py b/qa/rpc-tests/mininode.py index d1a57b54fa..b7d78e74fa 100755 --- a/qa/rpc-tests/mininode.py +++ b/qa/rpc-tests/mininode.py @@ -26,7 +26,7 @@ import sys import random import cStringIO import hashlib -from threading import Lock +from threading import RLock from threading import Thread import logging import copy @@ -42,6 +42,14 @@ MAX_INV_SZ = 50000 # using select) mininode_socket_map = dict() +# One lock for synchronizing all data access between the networking thread (see +# NetworkThread below) and the thread running the test logic. For simplicity, +# NodeConn acquires this lock whenever delivering a message to to a NodeConnCB, +# and whenever adding anything to the send buffer (in send_message()). This +# lock should be acquired in the thread running the test logic to synchronize +# access to any data shared with the NodeConnCB or NodeConn. +mininode_lock = RLock() + # Serialization/deserialization tools def sha256(s): return hashlib.new('sha256', s).digest() @@ -980,10 +988,6 @@ class msg_reject(object): # Reimplement the on_* functions to provide handling for events class NodeConnCB(object): def __init__(self): - # Acquire on all callbacks -- overkill for now since asyncore is - # single-threaded, but may be useful for synchronizing access to - # member variables in derived classes. - self.cbLock = Lock() self.verack_received = False # Derived classes should call this function once to set the message map @@ -1009,7 +1013,7 @@ class NodeConnCB(object): } def deliver(self, conn, message): - with self.cbLock: + with mininode_lock: try: self.cbmap[message.command](conn, message) except: @@ -1094,7 +1098,6 @@ class NodeConn(asyncore.dispatcher): self.state = "connecting" self.network = net self.cb = callback - self.sendbufLock = Lock() # for protecting the sendbuffer self.disconnect = False # stuff version msg into sendbuf @@ -1145,20 +1148,18 @@ class NodeConn(asyncore.dispatcher): return True def writable(self): - self.sendbufLock.acquire() - length = len(self.sendbuf) - self.sendbufLock.release() + with mininode_lock: + length = len(self.sendbuf) return (length > 0) def handle_write(self): - self.sendbufLock.acquire() - try: - sent = self.send(self.sendbuf) - except: - self.handle_close() - return - self.sendbuf = self.sendbuf[sent:] - self.sendbufLock.release() + with mininode_lock: + try: + sent = self.send(self.sendbuf) + except: + self.handle_close() + return + self.sendbuf = self.sendbuf[sent:] def got_data(self): while True: @@ -1202,7 +1203,6 @@ class NodeConn(asyncore.dispatcher): def send_message(self, message, pushbuf=False): if self.state != "connected" and not pushbuf: return - self.sendbufLock.acquire() self.show_debug_msg("Send %s" % repr(message)) command = message.command data = message.serialize() @@ -1215,9 +1215,9 @@ class NodeConn(asyncore.dispatcher): h = sha256(th) tmsg += h[:4] tmsg += data - self.sendbuf += tmsg - self.last_sent = time.time() - self.sendbufLock.release() + with mininode_lock: + self.sendbuf += tmsg + self.last_sent = time.time() def got_message(self, message): if message.command == "version": From 2a22d4be9b27c63afa7f1a425227e6637cda8750 Mon Sep 17 00:00:00 2001 From: Suhas Daftuar Date: Fri, 1 May 2015 14:47:44 -0400 Subject: [PATCH 4/4] Fix comptool send_message call when MAX_INV_SZ reached --- qa/rpc-tests/comptool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qa/rpc-tests/comptool.py b/qa/rpc-tests/comptool.py index 25cffe461f..23a979250c 100755 --- a/qa/rpc-tests/comptool.py +++ b/qa/rpc-tests/comptool.py @@ -313,7 +313,7 @@ class TestManager(object): invqueue.append(CInv(1, tx.sha256)) # Ensure we're not overflowing the inv queue if len(invqueue) == MAX_INV_SZ: - [ c.sb.send_message(msg_inv(invqueue)) for c in self.connections ] + [ c.send_message(msg_inv(invqueue)) for c in self.connections ] invqueue = [] # Do final sync if we weren't syncing on every block or every tx.