script, test: fix python linter E275 errors with flake8 5.0.4

This commit is contained in:
Jon Atack 2022-10-04 15:18:42 +02:00 committed by jonatack
parent cb552c5f21
commit 459cb637ac
28 changed files with 93 additions and 93 deletions

View file

@ -78,7 +78,7 @@ def get_block_hashes(settings, max_blocks_per_call=10000):
if rpc.response_is_error(resp_obj):
print('JSON-RPC: error at height', height+x, ': ', resp_obj['error'], file=sys.stderr)
sys.exit(1)
assert(resp_obj['id'] == x) # assume replies are in-sequence
assert resp_obj['id'] == x # assume replies are in-sequence
if settings['rev_hash_bytes'] == 'true':
resp_obj['result'] = bytes.fromhex(resp_obj['result'])[::-1].hex()
print(resp_obj['result'])

View file

@ -70,13 +70,13 @@ def name_to_bip155(addr):
if i == 0 or i == (len(addr)-1): # skip empty component at beginning or end
continue
x += 1 # :: skips to suffix
assert(x < 2)
assert x < 2
else: # two bytes per component
val = int(comp, 16)
sub[x].append(val >> 8)
sub[x].append(val & 0xff)
nullbytes = 16 - len(sub[0]) - len(sub[1])
assert((x == 0 and nullbytes == 0) or (x == 1 and nullbytes > 0))
assert (x == 0 and nullbytes == 0) or (x == 1 and nullbytes > 0)
addr_bytes = bytes(sub[0] + ([0] * nullbytes) + sub[1])
if addr_bytes[0] == 0xfc:
# Assume that seeds with fc00::/8 addresses belong to CJDNS,

View file

@ -221,7 +221,7 @@ class CoinStatsIndexTest(BitcoinTestFramework):
self.generate(index_node, 1, sync_fun=self.no_op)
res10 = index_node.gettxoutsetinfo('muhash')
assert(res8['txouts'] < res10['txouts'])
assert res8['txouts'] < res10['txouts']
self.log.info("Test that the index works with -reindex")
@ -268,12 +268,12 @@ class CoinStatsIndexTest(BitcoinTestFramework):
res2 = index_node.gettxoutsetinfo(hash_type='muhash', hash_or_height=112)
assert_equal(res["bestblock"], block)
assert_equal(res["muhash"], res2["muhash"])
assert(res["muhash"] != res_invalid["muhash"])
assert res["muhash"] != res_invalid["muhash"]
# Test that requesting reorged out block by hash is still returning correct results
res_invalid2 = index_node.gettxoutsetinfo(hash_type='muhash', hash_or_height=reorg_block)
assert_equal(res_invalid2["muhash"], res_invalid["muhash"])
assert(res["muhash"] != res_invalid2["muhash"])
assert res["muhash"] != res_invalid2["muhash"]
# Add another block, so we don't depend on reconsiderblock remembering which
# blocks were touched by invalidateblock

View file

@ -62,7 +62,7 @@ class FeatureIndexPruneTest(BitcoinTestFramework):
for node in filter_nodes:
assert_greater_than(len(node.getblockfilter(tip)['filter']), 0)
for node in stats_nodes:
assert(node.gettxoutsetinfo(hash_type="muhash", hash_or_height=tip)['muhash'])
assert node.gettxoutsetinfo(hash_type="muhash", hash_or_height=tip)['muhash']
self.mine_batches(500)
self.sync_index(height=700)
@ -80,14 +80,14 @@ class FeatureIndexPruneTest(BitcoinTestFramework):
for node in filter_nodes:
assert_greater_than(len(node.getblockfilter(tip)['filter']), 0)
for node in stats_nodes:
assert(node.gettxoutsetinfo(hash_type="muhash", hash_or_height=tip)['muhash'])
assert node.gettxoutsetinfo(hash_type="muhash", hash_or_height=tip)['muhash']
self.log.info("check if we can access the blockfilter and coinstats of a pruned block")
height_hash = self.nodes[0].getblockhash(2)
for node in filter_nodes:
assert_greater_than(len(node.getblockfilter(height_hash)['filter']), 0)
for node in stats_nodes:
assert(node.gettxoutsetinfo(hash_type="muhash", hash_or_height=height_hash)['muhash'])
assert node.gettxoutsetinfo(hash_type="muhash", hash_or_height=height_hash)['muhash']
# mine and sync index up to a height that will later be the pruneheight
self.generate(self.nodes[0], 51)

View file

@ -357,8 +357,8 @@ class UTXOCacheTracepointTest(BitcoinTestFramework):
"size": event.size
})
# sanity checks only
assert(event.memory > 0)
assert(event.duration > 0)
assert event.memory > 0
assert event.duration > 0
handle_flush_succeeds += 1
bpf["utxocache_flush"].open_perf_buffer(handle_utxocache_flush)

View file

@ -112,7 +112,7 @@ class ValidationTracepointTest(BitcoinTestFramework):
assert_equal(len([tx["vin"] for tx in block["tx"]]), event.inputs)
assert_equal(0, event.sigops) # no sigops in coinbase tx
# only plausibility checks
assert(event.duration > 0)
assert event.duration > 0
del expected_blocks[block_hash]
blocks_checked += 1

View file

@ -10,7 +10,7 @@ import json
def perform_pre_checks():
mock_result_path = os.path.join(os.getcwd(), "mock_result")
if(os.path.isfile(mock_result_path)):
if os.path.isfile(mock_result_path):
with open(mock_result_path, "r", encoding="utf8") as f:
mock_result = f.read()
if mock_result[0]:

View file

@ -10,7 +10,7 @@ import json
def perform_pre_checks():
mock_result_path = os.path.join(os.getcwd(), "mock_result")
if(os.path.isfile(mock_result_path)):
if os.path.isfile(mock_result_path):
with open(mock_result_path, "r", encoding="utf8") as f:
mock_result = f.read()
if mock_result[0]:

View file

@ -49,7 +49,7 @@ class AddrReceiver(P2PInterface):
def on_addr(self, message):
for addr in message.addrs:
self.num_ipv4_received += 1
if(self.test_addr_contents):
if self.test_addr_contents:
# relay_tests checks the content of the addr messages match
# expectations based on the message creation in setup_addr_msg
assert_equal(addr.nServices, 9)

View file

@ -104,7 +104,7 @@ class P2PBlocksOnly(BitcoinTestFramework):
self.nodes[0].setmocktime(int(time.time()) + 60)
conn.sync_send_with_ping()
assert(int(txid, 16) not in conn.get_invs())
assert int(txid, 16) not in conn.get_invs()
def check_p2p_inv_violation(self, peer):
self.log.info("Check that tx-invs from P2P are rejected and result in disconnect")

View file

@ -60,7 +60,7 @@ class AddrTest(BitcoinTestFramework):
# Need to make sure we hit MAX_ADDR_TO_SEND records in the addr response later because
# only a fraction of all known addresses can be cached and returned.
assert(len(self.nodes[0].getnodeaddresses(0)) > int(MAX_ADDR_TO_SEND / (MAX_PCT_ADDR_TO_SEND / 100)))
assert len(self.nodes[0].getnodeaddresses(0)) > int(MAX_ADDR_TO_SEND / (MAX_PCT_ADDR_TO_SEND / 100))
last_response_on_local_bind = None
last_response_on_onion_bind1 = None
@ -85,9 +85,9 @@ class AddrTest(BitcoinTestFramework):
if i > 0:
# Responses from different binds should be unique
assert(last_response_on_local_bind != addr_receiver_onion1.get_received_addrs())
assert(last_response_on_local_bind != addr_receiver_onion2.get_received_addrs())
assert(last_response_on_onion_bind1 != addr_receiver_onion2.get_received_addrs())
assert last_response_on_local_bind != addr_receiver_onion1.get_received_addrs()
assert last_response_on_local_bind != addr_receiver_onion2.get_received_addrs()
assert last_response_on_onion_bind1 != addr_receiver_onion2.get_received_addrs()
# Responses on from the same bind should be the same
assert_equal(last_response_on_local_bind, addr_receiver_local.get_received_addrs())
assert_equal(last_response_on_onion_bind1, addr_receiver_onion1.get_received_addrs())
@ -119,9 +119,9 @@ class AddrTest(BitcoinTestFramework):
addr_receiver_onion2.wait_until(addr_receiver_onion2.addr_received)
# new response is different
assert(set(last_response_on_local_bind) != set(addr_receiver_local.get_received_addrs()))
assert(set(last_response_on_onion_bind1) != set(addr_receiver_onion1.get_received_addrs()))
assert(set(last_response_on_onion_bind2) != set(addr_receiver_onion2.get_received_addrs()))
assert set(last_response_on_local_bind) != set(addr_receiver_local.get_received_addrs())
assert set(last_response_on_onion_bind1) != set(addr_receiver_onion1.get_received_addrs())
assert set(last_response_on_onion_bind2) != set(addr_receiver_onion2.get_received_addrs())
if __name__ == '__main__':
AddrTest().main()

View file

@ -57,7 +57,7 @@ class RejectLowDifficultyHeadersTest(BitcoinTestFramework):
def check_node3_chaintips(num_tips, tip_hash, height):
node3_chaintips = self.nodes[3].getchaintips()
assert(len(node3_chaintips) == num_tips)
assert len(node3_chaintips) == num_tips
assert {
'height': height,
'hash': tip_hash,
@ -69,7 +69,7 @@ class RejectLowDifficultyHeadersTest(BitcoinTestFramework):
for node in self.nodes[1:3]:
chaintips = node.getchaintips()
assert(len(chaintips) == 1)
assert len(chaintips) == 1
assert {
'height': 0,
'hash': '0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206',
@ -89,7 +89,7 @@ class RejectLowDifficultyHeadersTest(BitcoinTestFramework):
'status': 'active',
} in self.nodes[2].getchaintips()
assert(len(self.nodes[2].getchaintips()) == 1)
assert len(self.nodes[2].getchaintips()) == 1
self.log.info("Check that node3 accepted these headers as well")
check_node3_chaintips(2, self.nodes[0].getbestblockhash(), NODE1_BLOCKS_REQUIRED)

View file

@ -36,7 +36,7 @@ def mini_parser(dat_file):
"""
with open(dat_file, 'rb') as f_in:
# This should have at least one message in it
assert(os.fstat(f_in.fileno()).st_size >= TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE)
assert os.fstat(f_in.fileno()).st_size >= TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE
while True:
tmp_header_raw = f_in.read(TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE)
if not tmp_header_raw:
@ -44,7 +44,7 @@ def mini_parser(dat_file):
tmp_header = BytesIO(tmp_header_raw)
tmp_header.read(TIME_SIZE) # skip the timestamp field
msgtype = tmp_header.read(MSGTYPE_SIZE).rstrip(b'\x00')
assert(msgtype in MESSAGEMAP)
assert msgtype in MESSAGEMAP
length: int = int.from_bytes(tmp_header.read(LENGTH_SIZE), "little")
data = f_in.read(length)
assert_equal(len(data), length)

View file

@ -370,7 +370,7 @@ class BlockchainTest(BitcoinTestFramework):
# hash_type muhash should return a different UTXO set hash.
res6 = node.gettxoutsetinfo(hash_type='muhash')
assert 'muhash' in res6
assert(res['hash_serialized_2'] != res6['muhash'])
assert res['hash_serialized_2'] != res6['muhash']
# muhash should not be returned unless requested.
for r in [res, res2, res3, res4, res5]:

View file

@ -216,13 +216,13 @@ class RawTransactionsTest(BitcoinTestFramework):
if missing_fields:
raise AssertionError(f"fields {', '.join(missing_fields)} are not in transaction")
assert(len(gottx['vin']) > 0)
assert len(gottx['vin']) > 0
if v == 1:
assert('fee' not in gottx)
assert('prevout' not in gottx['vin'][0])
assert 'fee' not in gottx
assert 'prevout' not in gottx['vin'][0]
if v == 2:
assert(isinstance(gottx['fee'], Decimal))
assert('prevout' in gottx['vin'][0])
assert isinstance(gottx['fee'], Decimal)
assert 'prevout' in gottx['vin'][0]
prevout = gottx['vin'][0]['prevout']
script_pub_key = prevout['scriptPubKey']
@ -235,11 +235,11 @@ class RawTransactionsTest(BitcoinTestFramework):
raise AssertionError(f"fields {', '.join(missing_fields)} are not in transaction")
# check verbosity 2 without blockhash but with txindex
assert('fee' in self.nodes[0].getrawtransaction(txid=tx, verbosity=2))
assert 'fee' in self.nodes[0].getrawtransaction(txid=tx, verbosity=2)
# check that coinbase has no fee or does not throw any errors for verbosity 2
coin_base = self.nodes[1].getblock(block1)['tx'][0]
gottx = self.nodes[1].getrawtransaction(txid=coin_base, verbosity=2, blockhash=block1)
assert('fee' not in gottx)
assert 'fee' not in gottx
def createrawtransaction_tests(self):
self.log.info("Test createrawtransaction")

View file

@ -46,7 +46,7 @@ class ScanblocksTest(BitcoinTestFramework):
self.wait_until(lambda: all(i["synced"] for i in node.getindexinfo().values()))
out = node.scanblocks("start", [f"addr({addr_1})"])
assert(blockhash in out['relevant_blocks'])
assert blockhash in out['relevant_blocks']
assert_equal(height, out['to_height'])
assert_equal(0, out['from_height'])
@ -56,24 +56,24 @@ class ScanblocksTest(BitcoinTestFramework):
# make sure the blockhash is not in the filter result if we set the start_height
# to the just mined block (unlikely to hit a false positive)
assert(blockhash not in node.scanblocks(
"start", [f"addr({addr_1})"], height_new)['relevant_blocks'])
assert blockhash not in node.scanblocks(
"start", [f"addr({addr_1})"], height_new)['relevant_blocks']
# make sure the blockhash is present when using the first mined block as start_height
assert(blockhash in node.scanblocks(
"start", [f"addr({addr_1})"], height)['relevant_blocks'])
assert blockhash in node.scanblocks(
"start", [f"addr({addr_1})"], height)['relevant_blocks']
# also test the stop height
assert(blockhash in node.scanblocks(
"start", [f"addr({addr_1})"], height, height)['relevant_blocks'])
assert blockhash in node.scanblocks(
"start", [f"addr({addr_1})"], height, height)['relevant_blocks']
# use the stop_height to exclude the relevant block
assert(blockhash not in node.scanblocks(
"start", [f"addr({addr_1})"], 0, height - 1)['relevant_blocks'])
assert blockhash not in node.scanblocks(
"start", [f"addr({addr_1})"], 0, height - 1)['relevant_blocks']
# make sure the blockhash is present when using the first mined block as start_height
assert(blockhash in node.scanblocks(
"start", [{"desc": f"pkh({parent_key}/*)", "range": [0, 100]}], height)['relevant_blocks'])
assert blockhash in node.scanblocks(
"start", [{"desc": f"pkh({parent_key}/*)", "range": [0, 100]}], height)['relevant_blocks']
# check that false-positives are included in the result now; note that
# finding a false-positive at runtime would take too long, hence we simply
@ -89,10 +89,10 @@ class ScanblocksTest(BitcoinTestFramework):
false_positive_hash = bip158_basic_element_hash(false_positive_spk, 1, genesis_blockhash)
assert_equal(genesis_coinbase_hash, false_positive_hash)
assert(genesis_blockhash in node.scanblocks(
"start", [{"desc": f"raw({genesis_coinbase_spk.hex()})"}], 0, 0)['relevant_blocks'])
assert(genesis_blockhash in node.scanblocks(
"start", [{"desc": f"raw({false_positive_spk.hex()})"}], 0, 0)['relevant_blocks'])
assert genesis_blockhash in node.scanblocks(
"start", [{"desc": f"raw({genesis_coinbase_spk.hex()})"}], 0, 0)['relevant_blocks']
assert genesis_blockhash in node.scanblocks(
"start", [{"desc": f"raw({false_positive_spk.hex()})"}], 0, 0)['relevant_blocks']
# TODO: after an "accurate" mode for scanblocks is implemented (e.g. PR #26325)
# check here that it filters out the false-positive

View file

@ -139,7 +139,7 @@ class EllipticCurve:
See https://en.wikibooks.org/wiki/Cryptography/Prime_Curve/Jacobian_Coordinates - Point Addition (with affine point)"""
x1, y1, z1 = p1
x2, y2, z2 = p2
assert(z2 == 1)
assert z2 == 1
# Adding to the point at infinity is a no-op
if z1 == 0:
return p2
@ -262,7 +262,7 @@ class ECPubKey():
return self.valid
def get_bytes(self):
assert(self.valid)
assert self.valid
p = SECP256K1.affine(self.p)
if p is None:
return None
@ -276,7 +276,7 @@ class ECPubKey():
See https://en.wikipedia.org/wiki/Elliptic_Curve_Digital_Signature_Algorithm for the
ECDSA verifier algorithm"""
assert(self.valid)
assert self.valid
# Extract r and s from the DER formatted signature. Return false for
# any DER encoding errors.
@ -349,7 +349,7 @@ class ECKey():
def set(self, secret, compressed):
"""Construct a private key object with given 32-byte secret and compressed flag."""
assert(len(secret) == 32)
assert len(secret) == 32
secret = int.from_bytes(secret, 'big')
self.valid = (secret > 0 and secret < SECP256K1_ORDER)
if self.valid:
@ -362,7 +362,7 @@ class ECKey():
def get_bytes(self):
"""Retrieve the 32-byte representation of this key."""
assert(self.valid)
assert self.valid
return self.secret.to_bytes(32, 'big')
@property
@ -375,7 +375,7 @@ class ECKey():
def get_pubkey(self):
"""Compute an ECPubKey object for this secret key."""
assert(self.valid)
assert self.valid
ret = ECPubKey()
p = SECP256K1.mul([(SECP256K1_G, self.secret)])
ret.p = p
@ -388,7 +388,7 @@ class ECKey():
See https://en.wikipedia.org/wiki/Elliptic_Curve_Digital_Signature_Algorithm for the
ECDSA signer algorithm."""
assert(self.valid)
assert self.valid
z = int.from_bytes(msg, 'big')
# Note: no RFC6979 by default, but a simple random nonce (some tests rely on distinct transactions for the same operation)
if rfc6979:

View file

@ -824,10 +824,10 @@ def taproot_tree_helper(scripts):
if len(scripts) == 1:
# One entry: treat as a leaf
script = scripts[0]
assert(not callable(script))
assert not callable(script)
if isinstance(script, list):
return taproot_tree_helper(script)
assert(isinstance(script, tuple))
assert isinstance(script, tuple)
version = LEAF_VERSION_TAPSCRIPT
name = script[0]
code = script[1]

View file

@ -31,7 +31,7 @@ def siphash_round(v0, v1, v2, v3):
def siphash(k0, k1, data):
assert(type(data) == bytes)
assert type(data) == bytes
v0 = 0x736f6d6570736575 ^ k0
v1 = 0x646f72616e646f6d ^ k1
v2 = 0x6c7967656e657261 ^ k0
@ -61,5 +61,5 @@ def siphash(k0, k1, data):
def siphash256(k0, k1, num):
assert(type(num) == int)
assert type(num) == int
return siphash(k0, k1, num.to_bytes(32, 'little'))

View file

@ -195,7 +195,7 @@ class AvoidReuseTest(BitcoinTestFramework):
# getbalances should show no used, 10 btc trusted
assert_balances(self.nodes[1], mine={"used": 0, "trusted": 10})
# node 0 should not show a used entry, as it does not enable avoid_reuse
assert("used" not in self.nodes[0].getbalances()["mine"])
assert "used" not in self.nodes[0].getbalances()["mine"]
self.nodes[1].sendtoaddress(retaddr, 5)
self.generate(self.nodes[0], 1)

View file

@ -193,18 +193,18 @@ class BackwardsCompatibilityTest(BitcoinTestFramework):
assert_equal(txs[1]["txid"], tx1_id)
assert_equal(txs[2]["walletconflicts"], [tx1_id])
assert_equal(txs[1]["replaced_by_txid"], tx2_id)
assert not(txs[1]["abandoned"])
assert not txs[1]["abandoned"]
assert_equal(txs[1]["confirmations"], -1)
assert_equal(txs[2]["blockindex"], 1)
assert txs[3]["abandoned"]
assert_equal(txs[4]["walletconflicts"], [tx3_id])
assert_equal(txs[3]["replaced_by_txid"], tx4_id)
assert not(hasattr(txs[3], "blockindex"))
assert not hasattr(txs[3], "blockindex")
elif wallet_name == "w2":
assert(info['private_keys_enabled'] == False)
assert info['private_keys_enabled'] == False
assert info['keypoolsize'] == 0
else:
assert(info['private_keys_enabled'] == True)
assert info['private_keys_enabled'] == True
assert info['keypoolsize'] == 0
else:
for node in legacy_nodes:

View file

@ -557,7 +557,7 @@ def test_unconfirmed_not_spendable(self, rbf_node, rbf_node_address):
def test_bumpfee_metadata(self, rbf_node, dest_address):
self.log.info('Test that bumped txn metadata persists to new txn record')
assert(rbf_node.getbalance() < 49)
assert rbf_node.getbalance() < 49
self.generatetoaddress(rbf_node, 101, rbf_node.getnewaddress())
rbfid = rbf_node.sendtoaddress(dest_address, 49, "comment value", "to value")
bumped_tx = rbf_node.bumpfee(rbfid)

View file

@ -36,7 +36,7 @@ def check_implicit_transactions(implicit_keys, implicit_node):
pubkey = implicit_keys[a]
for b in address_types:
b_address = key_to_address(pubkey, b)
assert(('receive', b_address) in tuple((tx['category'], tx['address']) for tx in txs))
assert ('receive', b_address) in tuple((tx['category'], tx['address']) for tx in txs)
class ImplicitSegwitTest(BitcoinTestFramework):
def add_options(self, parser):

View file

@ -487,8 +487,8 @@ class ImportDescriptorsTest(BitcoinTestFramework):
assert_equal(addr, 'bcrt1qp8s25ckjl7gr6x2q3dx3tn2pytwp05upkjztk6ey857tt50r5aeqn6mvr9') # Derived at m/84'/0'/0'/1
change_addr = wmulti_pub.getrawchangeaddress('bech32')
assert_equal(change_addr, 'bcrt1qzxl0qz2t88kljdnkzg4n4gapr6kte26390gttrg79x66nt4p04fssj53nl')
assert(send_txid in self.nodes[0].getrawmempool(True))
assert(send_txid in (x['txid'] for x in wmulti_pub.listunspent(0)))
assert send_txid in self.nodes[0].getrawmempool(True)
assert send_txid in (x['txid'] for x in wmulti_pub.listunspent(0))
assert_equal(wmulti_pub.getwalletinfo()['keypoolsize'], 999)
# generate some utxos for next tests

View file

@ -94,11 +94,11 @@ class ReorgsRestoreTest(BitcoinTestFramework):
tx_after_reorg = self.nodes[1].gettransaction(txid)
# Check that normal confirmed tx is confirmed again but with different blockhash
assert_equal(tx_after_reorg["confirmations"], 2)
assert(tx_before_reorg["blockhash"] != tx_after_reorg["blockhash"])
assert tx_before_reorg["blockhash"] != tx_after_reorg["blockhash"]
conflicted_after_reorg = self.nodes[1].gettransaction(conflicted_txid)
# Check that conflicted tx is confirmed again with blockhash different than previously conflicting tx
assert_equal(conflicted_after_reorg["confirmations"], 1)
assert(conflicting["blockhash"] != conflicted_after_reorg["blockhash"])
assert conflicting["blockhash"] != conflicted_after_reorg["blockhash"]
if __name__ == '__main__':
ReorgsRestoreTest().main()

View file

@ -279,11 +279,11 @@ class WalletSendTest(BitcoinTestFramework):
self.log.info("Don't broadcast...")
res = self.test_send(from_wallet=w0, to_wallet=w1, amount=1, add_to_wallet=False)
assert(res["hex"])
assert res["hex"]
self.log.info("Return PSBT...")
res = self.test_send(from_wallet=w0, to_wallet=w1, amount=1, psbt=True)
assert(res["psbt"])
assert res["psbt"]
self.log.info("Create transaction that spends to address, but don't broadcast...")
self.test_send(from_wallet=w0, to_wallet=w1, amount=1, add_to_wallet=False)

View file

@ -174,7 +174,7 @@ class WalletSignerTest(BitcoinTestFramework):
mock_psbt_signed = mock_wallet.walletprocesspsbt(psbt=mock_psbt, sign=True, sighashtype="ALL", bip32derivs=True)
mock_psbt_final = mock_wallet.finalizepsbt(mock_psbt_signed["psbt"])
mock_tx = mock_psbt_final["hex"]
assert(mock_wallet.testmempoolaccept([mock_tx])[0]["allowed"])
assert mock_wallet.testmempoolaccept([mock_tx])[0]["allowed"]
# # Create a new wallet and populate with specific public keys, in order
# # to work with the mock signed PSBT.
@ -203,7 +203,7 @@ class WalletSignerTest(BitcoinTestFramework):
# assert_equal(result[1], {'success': True})
assert_equal(hww.getwalletinfo()["txcount"], 1)
assert(hww.testmempoolaccept([mock_tx])[0]["allowed"])
assert hww.testmempoolaccept([mock_tx])[0]["allowed"]
with open(os.path.join(self.nodes[1].cwd, "mock_psbt"), "w", encoding="utf8") as f:
f.write(mock_psbt_signed["psbt"])
@ -212,13 +212,13 @@ class WalletSignerTest(BitcoinTestFramework):
# Don't broadcast transaction yet so the RPC returns the raw hex
res = hww.send(outputs={dest:0.5},options={"add_to_wallet": False})
assert(res["complete"])
assert res["complete"]
assert_equal(res["hex"], mock_tx)
self.log.info('Test sendall using hww1')
res = hww.sendall(recipients=[{dest:0.5}, hww.getrawchangeaddress()],options={"add_to_wallet": False})
assert(res["complete"])
assert res["complete"]
assert_equal(res["hex"], mock_tx)
# Broadcast transaction so we can bump the fee
hww.sendrawtransaction(res["hex"])

View file

@ -244,7 +244,7 @@ class WalletTaprootTest(BitcoinTestFramework):
desc_pub = self.make_desc(pattern, privmap, keys, True)
assert_equal(self.nodes[0].getdescriptorinfo(desc)['descriptor'], desc_pub)
result = addr_gen.importdescriptors([{"desc": desc_pub, "active": True, "timestamp": "now"}])
assert(result[0]['success'])
assert result[0]['success']
address_type = "bech32m" if "tr" in pattern else "bech32"
for i in range(4):
addr_g = addr_gen.getnewaddress(address_type=address_type)
@ -260,9 +260,9 @@ class WalletTaprootTest(BitcoinTestFramework):
# tr descriptors can be imported
result = privs_tr_enabled.importdescriptors([{"desc": desc, "timestamp": "now"}])
assert(result[0]["success"])
assert result[0]['success']
result = pubs_tr_enabled.importdescriptors([{"desc": desc_pub, "timestamp": "now"}])
assert(result[0]["success"])
assert result[0]["success"]
# Cleanup
privs_tr_enabled.unloadwallet()
@ -284,9 +284,9 @@ class WalletTaprootTest(BitcoinTestFramework):
assert_equal(self.nodes[0].getdescriptorinfo(desc_pay)['descriptor'], desc_pay_pub)
assert_equal(self.nodes[0].getdescriptorinfo(desc_change)['descriptor'], desc_change_pub)
result = rpc_online.importdescriptors([{"desc": desc_pay, "active": True, "timestamp": "now"}])
assert(result[0]['success'])
assert result[0]['success']
result = rpc_online.importdescriptors([{"desc": desc_change, "active": True, "timestamp": "now", "internal": True}])
assert(result[0]['success'])
assert result[0]['success']
address_type = "bech32m" if "tr" in pattern else "bech32"
for i in range(4):
addr_g = rpc_online.getnewaddress(address_type=address_type)
@ -302,12 +302,12 @@ class WalletTaprootTest(BitcoinTestFramework):
# Increase fee_rate to compensate for the wallet's inability to estimate fees for script path spends.
res = rpc_online.sendtoaddress(address=self.boring.getnewaddress(), amount=Decimal(ret_amnt) / 100000000, subtractfeefromamount=True, fee_rate=200)
self.generatetoaddress(self.nodes[0], 1, self.boring.getnewaddress(), sync_fun=self.no_op)
assert(rpc_online.gettransaction(res)["confirmations"] > 0)
assert rpc_online.gettransaction(res)["confirmations"] > 0
# Cleanup
txid = rpc_online.sendall(recipients=[self.boring.getnewaddress()])["txid"]
self.generatetoaddress(self.nodes[0], 1, self.boring.getnewaddress(), sync_fun=self.no_op)
assert(rpc_online.gettransaction(txid)["confirmations"] > 0)
assert rpc_online.gettransaction(txid)["confirmations"] > 0
rpc_online.unloadwallet()
def do_test_psbt(self, comment, pattern, privmap, treefn, keys_pay, keys_change):
@ -329,16 +329,16 @@ class WalletTaprootTest(BitcoinTestFramework):
assert_equal(self.nodes[0].getdescriptorinfo(desc_pay)['descriptor'], desc_pay_pub)
assert_equal(self.nodes[0].getdescriptorinfo(desc_change)['descriptor'], desc_change_pub)
result = psbt_online.importdescriptors([{"desc": desc_pay_pub, "active": True, "timestamp": "now"}])
assert(result[0]['success'])
assert result[0]['success']
result = psbt_online.importdescriptors([{"desc": desc_change_pub, "active": True, "timestamp": "now", "internal": True}])
assert(result[0]['success'])
assert result[0]['success']
result = psbt_offline.importdescriptors([{"desc": desc_pay, "active": True, "timestamp": "now"}])
assert(result[0]['success'])
assert result[0]['success']
result = psbt_offline.importdescriptors([{"desc": desc_change, "active": True, "timestamp": "now", "internal": True}])
assert(result[0]['success'])
assert result[0]['success']
for key in keys_pay + keys_change:
result = key_only_wallet.importdescriptors([{"desc": descsum_create(f"wpkh({key['xprv']}/*)"), "timestamp":"now"}])
assert(result[0]["success"])
assert result[0]["success"]
address_type = "bech32m" if "tr" in pattern else "bech32"
for i in range(4):
addr_g = psbt_online.getnewaddress(address_type=address_type)
@ -375,7 +375,7 @@ class WalletTaprootTest(BitcoinTestFramework):
txid = self.nodes[0].sendrawtransaction(rawtx)
self.generatetoaddress(self.nodes[0], 1, self.boring.getnewaddress(), sync_fun=self.no_op)
assert(psbt_online.gettransaction(txid)['confirmations'] > 0)
assert psbt_online.gettransaction(txid)['confirmations'] > 0
# Cleanup
psbt = psbt_online.sendall(recipients=[self.boring.getnewaddress()], options={"psbt": True})["psbt"]
@ -383,7 +383,7 @@ class WalletTaprootTest(BitcoinTestFramework):
rawtx = self.nodes[0].finalizepsbt(res['psbt'])['hex']
txid = self.nodes[0].sendrawtransaction(rawtx)
self.generatetoaddress(self.nodes[0], 1, self.boring.getnewaddress(), sync_fun=self.no_op)
assert(psbt_online.gettransaction(txid)['confirmations'] > 0)
assert psbt_online.gettransaction(txid)['confirmations'] > 0
psbt_online.unloadwallet()
psbt_offline.unloadwallet()