#!/usr/bin/env python3 # Copyright (c) 2020 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. import argparse import json import logging import math import os import re import struct import sys import time import subprocess PATH_BASE_CONTRIB_SIGNET = os.path.abspath(os.path.dirname(os.path.realpath(__file__))) PATH_BASE_TEST_FUNCTIONAL = os.path.abspath(os.path.join(PATH_BASE_CONTRIB_SIGNET, "..", "..", "test", "functional")) sys.path.insert(0, PATH_BASE_TEST_FUNCTIONAL) from test_framework.blocktools import get_witness_script, script_BIP34_coinbase_height # noqa: E402 from test_framework.messages import CBlock, CBlockHeader, COutPoint, CTransaction, CTxIn, CTxInWitness, CTxOut, from_binary, from_hex, ser_string, ser_uint256, tx_from_hex # noqa: E402 from test_framework.psbt import PSBT, PSBTMap, PSBT_GLOBAL_UNSIGNED_TX, PSBT_IN_FINAL_SCRIPTSIG, PSBT_IN_FINAL_SCRIPTWITNESS, PSBT_IN_NON_WITNESS_UTXO, PSBT_IN_SIGHASH_TYPE # noqa: E402 from test_framework.script import CScript, CScriptOp # noqa: E402 logging.basicConfig( format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S') SIGNET_HEADER = b"\xec\xc7\xda\xa2" PSBT_SIGNET_BLOCK = b"\xfc\x06signetb" # proprietary PSBT global field holding the block being signed RE_MULTIMINER = re.compile(r"^(\d+)(-(\d+))?/(\d+)$") def signet_txs(block, challenge): # assumes signet solution has not been added yet so does not need # to be removed txs = block.vtx[:] txs[0] = CTransaction(txs[0]) txs[0].vout[-1].scriptPubKey += CScriptOp.encode_op_pushdata(SIGNET_HEADER) hashes = [] for tx in txs: tx.rehash() hashes.append(ser_uint256(tx.sha256)) mroot = block.get_merkle_root(hashes) sd = b"" sd += block.nVersion.to_bytes(4, "little", signed=True) sd += ser_uint256(block.hashPrevBlock) sd += ser_uint256(mroot) sd += block.nTime.to_bytes(4, "little") to_spend = CTransaction() to_spend.version = 0 to_spend.nLockTime = 0 to_spend.vin = [CTxIn(COutPoint(0, 0xFFFFFFFF), b"\x00" + CScriptOp.encode_op_pushdata(sd), 0)] to_spend.vout = [CTxOut(0, challenge)] to_spend.rehash() spend = CTransaction() spend.version = 0 spend.nLockTime = 0 spend.vin = [CTxIn(COutPoint(to_spend.sha256, 0), b"", 0)] spend.vout = [CTxOut(0, b"\x6a")] return spend, to_spend def decode_psbt(b64psbt): psbt = PSBT.from_base64(b64psbt) assert len(psbt.tx.vin) == 1 assert len(psbt.tx.vout) == 1 assert PSBT_SIGNET_BLOCK in psbt.g.map scriptSig = psbt.i[0].map.get(PSBT_IN_FINAL_SCRIPTSIG, b"") scriptWitness = psbt.i[0].map.get(PSBT_IN_FINAL_SCRIPTWITNESS, b"\x00") return from_binary(CBlock, psbt.g.map[PSBT_SIGNET_BLOCK]), ser_string(scriptSig) + scriptWitness def finish_block(block, signet_solution, grind_cmd): block.vtx[0].vout[-1].scriptPubKey += CScriptOp.encode_op_pushdata(SIGNET_HEADER + signet_solution) block.vtx[0].rehash() block.hashMerkleRoot = block.calc_merkle_root() if grind_cmd is None: block.solve() else: headhex = CBlockHeader.serialize(block).hex() cmd = grind_cmd.split(" ") + [headhex] newheadhex = subprocess.run(cmd, stdout=subprocess.PIPE, input=b"", check=True).stdout.strip() newhead = from_hex(CBlockHeader(), newheadhex.decode('utf8')) block.nNonce = newhead.nNonce block.rehash() return block def generate_psbt(tmpl, reward_spk, *, blocktime=None, poolid=None): signet_spk = tmpl["signet_challenge"] signet_spk_bin = bytes.fromhex(signet_spk) scriptSig = script_BIP34_coinbase_height(tmpl["height"]) if poolid is not None: scriptSig = CScript(b"" + scriptSig + CScriptOp.encode_op_pushdata(poolid)) cbtx = CTransaction() cbtx.vin = [CTxIn(COutPoint(0, 0xffffffff), scriptSig, 0xffffffff)] cbtx.vout = [CTxOut(tmpl["coinbasevalue"], reward_spk)] cbtx.vin[0].nSequence = 2**32-2 cbtx.rehash() block = CBlock() block.nVersion = tmpl["version"] block.hashPrevBlock = int(tmpl["previousblockhash"], 16) block.nTime = tmpl["curtime"] if blocktime is None else blocktime if block.nTime < tmpl["mintime"]: block.nTime = tmpl["mintime"] block.nBits = int(tmpl["bits"], 16) block.nNonce = 0 block.vtx = [cbtx] + [tx_from_hex(t["data"]) for t in tmpl["transactions"]] witnonce = 0 witroot = block.calc_witness_merkle_root() cbwit = CTxInWitness() cbwit.scriptWitness.stack = [ser_uint256(witnonce)] block.vtx[0].wit.vtxinwit = [cbwit] block.vtx[0].vout.append(CTxOut(0, bytes(get_witness_script(witroot, witnonce)))) signme, spendme = signet_txs(block, signet_spk_bin) psbt = PSBT() psbt.g = PSBTMap( {PSBT_GLOBAL_UNSIGNED_TX: signme.serialize(), PSBT_SIGNET_BLOCK: block.serialize() } ) psbt.i = [ PSBTMap( {PSBT_IN_NON_WITNESS_UTXO: spendme.serialize(), PSBT_IN_SIGHASH_TYPE: bytes([1,0,0,0])}) ] psbt.o = [ PSBTMap() ] return psbt.to_base64() def get_poolid(args): if args.poolid is not None: return args.poolid.encode('utf8') elif args.poolnum is not None: return b"/signet:%d/" % (args.poolnum) else: return None def get_reward_addr_spk(args, height): assert args.address is not None or args.descriptor is not None if hasattr(args, "reward_spk"): return args.address, args.reward_spk if args.address is not None: reward_addr = args.address elif '*' not in args.descriptor: reward_addr = args.address = json.loads(args.bcli("deriveaddresses", args.descriptor))[0] else: remove = [k for k in args.derived_addresses.keys() if k+20 <= height] for k in remove: del args.derived_addresses[k] if height not in args.derived_addresses: addrs = json.loads(args.bcli("deriveaddresses", args.descriptor, "[%d,%d]" % (height, height+20))) for k, a in enumerate(addrs): args.derived_addresses[height+k] = a reward_addr = args.derived_addresses[height] reward_spk = bytes.fromhex(json.loads(args.bcli("getaddressinfo", reward_addr))["scriptPubKey"]) if args.address is not None: # will always be the same, so cache args.reward_spk = reward_spk return reward_addr, reward_spk def do_genpsbt(args): poolid = get_poolid(args) tmpl = json.load(sys.stdin) _, reward_spk = get_reward_addr_spk(args, tmpl["height"]) psbt = generate_psbt(tmpl, reward_spk, poolid=poolid) print(psbt) def do_solvepsbt(args): block, signet_solution = decode_psbt(sys.stdin.read()) block = finish_block(block, signet_solution, args.grind_cmd) print(block.serialize().hex()) def nbits_to_target(nbits): shift = (nbits >> 24) & 0xff return (nbits & 0x00ffffff) * 2**(8*(shift - 3)) def target_to_nbits(target): tstr = "{0:x}".format(target) if len(tstr) < 6: tstr = ("000000"+tstr)[-6:] if len(tstr) % 2 != 0: tstr = "0" + tstr if int(tstr[0],16) >= 0x8: # avoid "negative" tstr = "00" + tstr fix = int(tstr[:6], 16) sz = len(tstr)//2 if tstr[6:] != "0"*(sz*2-6): fix += 1 return int("%02x%06x" % (sz,fix), 16) def seconds_to_hms(s): if s == 0: return "0s" neg = (s < 0) if neg: s = -s out = "" if s % 60 > 0: out = "%ds" % (s % 60) s //= 60 if s % 60 > 0: out = "%dm%s" % (s % 60, out) s //= 60 if s > 0: out = "%dh%s" % (s, out) if neg: out = "-" + out return out class Generate: INTERVAL = 600.0*2016/2015 # 10 minutes, adjusted for the off-by-one bug def __init__(self, multiminer=None, ultimate_target=None, poisson=False, max_interval=1800, standby_delay=0, backup_delay=0, set_block_time=None, poolid=None): if multiminer is None: multiminer = (0, 1, 1) (self.multi_low, self.multi_high, self.multi_period) = multiminer self.ultimate_target = ultimate_target self.poisson = poisson self.max_interval = max_interval self.standby_delay = standby_delay self.backup_delay = backup_delay self.set_block_time = set_block_time self.poolid = poolid def next_block_delta(self, last_nbits, last_hash): # strategy: # 1) work out how far off our desired target we are # 2) cap it to a factor of 4 since that's the best we can do in a single retarget period # 3) use that to work out the desired average interval in this retarget period # 4) if doing poisson, use the last hash to pick a uniformly random number in [0,1), and work out a random multiplier to vary the average by # 5) cap the resulting interval between 1 second and 1 hour to avoid extremes current_target = nbits_to_target(last_nbits) retarget_factor = self.ultimate_target / current_target retarget_factor = max(0.25, min(retarget_factor, 4.0)) avg_interval = self.INTERVAL * retarget_factor if self.poisson: det_rand = int(last_hash[-8:], 16) * 2**-32 this_interval_variance = -math.log1p(-det_rand) else: this_interval_variance = 1 this_interval = avg_interval * this_interval_variance this_interval = max(1, min(this_interval, self.max_interval)) return this_interval def next_block_is_mine(self, last_hash): det_rand = int(last_hash[-16:-8], 16) return self.multi_low <= (det_rand % self.multi_period) < self.multi_high def next_block_time(self, now, bestheader, is_first_block): if self.set_block_time is not None: logging.debug("Setting start time to %d", self.set_block_time) self.mine_time = self.set_block_time self.action_time = now self.is_mine = True elif bestheader["height"] == 0: time_delta = self.INTERVAL * 100 # plenty of time to mine 100 blocks logging.info("Backdating time for first block to %d minutes ago" % (time_delta/60)) self.mine_time = now - time_delta self.action_time = now self.is_mine = True else: time_delta = self.next_block_delta(int(bestheader["bits"], 16), bestheader["hash"]) self.mine_time = bestheader["time"] + time_delta self.is_mine = self.next_block_is_mine(bestheader["hash"]) self.action_time = self.mine_time if not self.is_mine: self.action_time += self.backup_delay if self.standby_delay > 0: self.action_time += self.standby_delay elif is_first_block: # for non-standby, always mine immediately on startup, # even if the next block shouldn't be ours self.action_time = now # don't want fractional times so round down self.mine_time = int(self.mine_time) self.action_time = int(self.action_time) # can't mine a block 2h in the future; 1h55m for some safety self.action_time = max(self.action_time, self.mine_time - 6900) def gbt(self, bcli, bestblockhash, now): tmpl = json.loads(bcli("getblocktemplate", '{"rules":["signet","segwit"]}')) if tmpl["previousblockhash"] != bestblockhash: logging.warning("GBT based off unexpected block (%s not %s), retrying", tmpl["previousblockhash"], bci["bestblockhash"]) time.sleep(1) return None if tmpl["mintime"] > self.mine_time: logging.info("Updating block time from %d to %d", self.mine_time, tmpl["mintime"]) self.mine_time = tmpl["mintime"] if self.mine_time > now: logging.error("GBT mintime is in the future: %d is %d seconds later than %d", self.mine_time, (self.mine_time-now), now) return None return tmpl def mine(self, bcli, grind_cmd, tmpl, reward_spk): psbt = generate_psbt(tmpl, reward_spk, blocktime=self.mine_time, poolid=self.poolid) input_stream = os.linesep.join([psbt, "true", "ALL"]).encode('utf8') psbt_signed = json.loads(bcli("-stdin", "walletprocesspsbt", input=input_stream)) if not psbt_signed.get("complete",False): logging.debug("Generated PSBT: %s" % (psbt,)) sys.stderr.write("PSBT signing failed\n") return None block, signet_solution = decode_psbt(psbt_signed["psbt"]) return finish_block(block, signet_solution, grind_cmd) def do_generate(args): if args.set_block_time is not None: max_blocks = 1 elif args.max_blocks is not None: if args.max_blocks < 1: logging.error("--max_blocks must specify a positive integer") return 1 max_blocks = args.max_blocks elif args.ongoing: max_blocks = None else: max_blocks = 1 if args.set_block_time is not None and args.set_block_time < 0: args.set_block_time = time.time() logging.info("Treating negative block time as current time (%d)" % (args.set_block_time)) if args.min_nbits: args.nbits = "1e0377ae" logging.info("Using nbits=%s" % (args.nbits)) if args.set_block_time is None: if args.nbits is None or len(args.nbits) != 8: logging.error("Must specify --nbits (use calibrate command to determine value)") return 1 if args.multiminer is None: my_blocks = (0,1,1) else: if not args.ongoing: logging.error("Cannot specify --multiminer without --ongoing") return 1 m = RE_MULTIMINER.match(args.multiminer) if m is None: logging.error("--multiminer argument must be k/m or j-k/m") return 1 start,_,stop,total = m.groups() if stop is None: stop = start start, stop, total = map(int, (start, stop, total)) if stop < start or start <= 0 or total < stop or total == 0: logging.error("Inconsistent values for --multiminer") return 1 my_blocks = (start-1, stop, total) if args.max_interval < 960: logging.error("--max-interval must be at least 960 (16 minutes)") return 1 poolid = get_poolid(args) ultimate_target = nbits_to_target(int(args.nbits,16)) gen = Generate(multiminer=my_blocks, ultimate_target=ultimate_target, poisson=args.poisson, max_interval=args.max_interval, standby_delay=args.standby_delay, backup_delay=args.backup_delay, set_block_time=args.set_block_time, poolid=poolid) mined_blocks = 0 bestheader = {"hash": None} lastheader = None while max_blocks is None or mined_blocks < max_blocks: # current status? bci = json.loads(args.bcli("getblockchaininfo")) if bestheader["hash"] != bci["bestblockhash"]: bestheader = json.loads(args.bcli("getblockheader", bci["bestblockhash"])) if lastheader is None: lastheader = bestheader["hash"] elif bestheader["hash"] != lastheader: next_delta = gen.next_block_delta(int(bestheader["bits"], 16), bestheader["hash"]) next_delta += bestheader["time"] - time.time() next_is_mine = gen.next_block_is_mine(bestheader["hash"]) logging.info("Received new block at height %d; next in %s (%s)", bestheader["height"], seconds_to_hms(next_delta), ("mine" if next_is_mine else "backup")) lastheader = bestheader["hash"] # when is the next block due to be mined? now = time.time() gen.next_block_time(now, bestheader, (mined_blocks == 0)) # ready to go? otherwise sleep and check for new block if now < gen.action_time: sleep_for = min(gen.action_time - now, 60) if gen.mine_time < now: # someone else might have mined the block, # so check frequently, so we don't end up late # mining the next block if it's ours sleep_for = min(20, sleep_for) minestr = "mine" if gen.is_mine else "backup" logging.debug("Sleeping for %s, next block due in %s (%s)" % (seconds_to_hms(sleep_for), seconds_to_hms(gen.mine_time - now), minestr)) time.sleep(sleep_for) continue # gbt tmpl = gen.gbt(args.bcli, bci["bestblockhash"], now) if tmpl is None: continue logging.debug("GBT template: %s", tmpl) # address for reward reward_addr, reward_spk = get_reward_addr_spk(args, tmpl["height"]) # mine block logging.debug("Mining block delta=%s start=%s mine=%s", seconds_to_hms(gen.mine_time-bestheader["time"]), gen.mine_time, gen.is_mine) mined_blocks += 1 block = gen.mine(args.bcli, args.grind_cmd, tmpl, reward_spk) if block is None: return 1 # submit block r = args.bcli("-stdin", "submitblock", input=block.serialize().hex().encode('utf8')) # report bstr = "block" if gen.is_mine else "backup block" next_delta = gen.next_block_delta(block.nBits, block.hash) next_delta += block.nTime - time.time() next_is_mine = gen.next_block_is_mine(block.hash) logging.debug("Block hash %s payout to %s", block.hash, reward_addr) logging.info("Mined %s at height %d; next in %s (%s)", bstr, tmpl["height"], seconds_to_hms(next_delta), ("mine" if next_is_mine else "backup")) if r != "": logging.warning("submitblock returned %s for height %d hash %s", r, tmpl["height"], block.hash) lastheader = block.hash def do_calibrate(args): if args.nbits is not None and args.seconds is not None: sys.stderr.write("Can only specify one of --nbits or --seconds\n") return 1 if args.nbits is not None and len(args.nbits) != 8: sys.stderr.write("Must specify 8 hex digits for --nbits\n") return 1 TRIALS = 600 # gets variance down pretty low TRIAL_BITS = 0x1e3ea75f # takes about 5m to do 600 trials header = CBlockHeader() header.nBits = TRIAL_BITS targ = nbits_to_target(header.nBits) start = time.time() count = 0 for i in range(TRIALS): header.nTime = i header.nNonce = 0 headhex = header.serialize().hex() cmd = args.grind_cmd.split(" ") + [headhex] newheadhex = subprocess.run(cmd, stdout=subprocess.PIPE, input=b"", check=True).stdout.strip() avg = (time.time() - start) * 1.0 / TRIALS if args.nbits is not None: want_targ = nbits_to_target(int(args.nbits,16)) want_time = avg*targ/want_targ else: want_time = args.seconds if args.seconds is not None else 25 want_targ = int(targ*(avg/want_time)) print("nbits=%08x for %ds average mining time" % (target_to_nbits(want_targ), want_time)) return 0 def bitcoin_cli(basecmd, args, **kwargs): cmd = basecmd + ["-signet"] + args logging.debug("Calling bitcoin-cli: %r", cmd) out = subprocess.run(cmd, stdout=subprocess.PIPE, **kwargs, check=True).stdout if isinstance(out, bytes): out = out.decode('utf8') return out.strip() def main(): parser = argparse.ArgumentParser() parser.add_argument("--cli", default="bitcoin-cli", type=str, help="bitcoin-cli command") parser.add_argument("--debug", action="store_true", help="Print debugging info") parser.add_argument("--quiet", action="store_true", help="Only print warnings/errors") cmds = parser.add_subparsers(help="sub-commands") genpsbt = cmds.add_parser("genpsbt", help="Generate a block PSBT for signing") genpsbt.set_defaults(fn=do_genpsbt) solvepsbt = cmds.add_parser("solvepsbt", help="Solve a signed block PSBT") solvepsbt.set_defaults(fn=do_solvepsbt) generate = cmds.add_parser("generate", help="Mine blocks") generate.set_defaults(fn=do_generate) howmany = generate.add_mutually_exclusive_group() howmany.add_argument("--ongoing", action="store_true", help="Keep mining blocks") howmany.add_argument("--max-blocks", default=None, type=int, help="Max blocks to mine (default=1)") howmany.add_argument("--set-block-time", default=None, type=int, help="Set block time (unix timestamp); implies --max-blocks=1") nbit_target = generate.add_mutually_exclusive_group() nbit_target.add_argument("--nbits", default=None, type=str, help="Target nBits (specify difficulty)") nbit_target.add_argument("--min-nbits", action="store_true", help="Target minimum nBits (use min difficulty)") generate.add_argument("--poisson", action="store_true", help="Simulate randomised block times") generate.add_argument("--multiminer", default=None, type=str, help="Specify which set of blocks to mine (eg: 1-40/100 for the first 40%%, 2/3 for the second 3rd)") generate.add_argument("--backup-delay", default=300, type=int, help="Seconds to delay before mining blocks reserved for other miners (default=300)") generate.add_argument("--standby-delay", default=0, type=int, help="Seconds to delay before mining blocks (default=0)") generate.add_argument("--max-interval", default=1800, type=int, help="Maximum interblock interval (seconds)") calibrate = cmds.add_parser("calibrate", help="Calibrate difficulty") calibrate.set_defaults(fn=do_calibrate) calibrate_by = calibrate.add_mutually_exclusive_group() calibrate_by.add_argument("--nbits", type=str, default=None) calibrate_by.add_argument("--seconds", type=int, default=None) for sp in [genpsbt, generate]: payto = sp.add_mutually_exclusive_group(required=True) payto.add_argument("--address", default=None, type=str, help="Address for block reward payment") payto.add_argument("--descriptor", default=None, type=str, help="Descriptor for block reward payment") pool = sp.add_mutually_exclusive_group() pool.add_argument("--poolnum", default=None, type=int, help="Identify blocks that you mine") pool.add_argument("--poolid", default=None, type=str, help="Identify blocks that you mine (eg: /signet:1/)") for sp in [solvepsbt, generate, calibrate]: sp.add_argument("--grind-cmd", default=None, type=str, required=(sp==calibrate), help="Command to grind a block header for proof-of-work") args = parser.parse_args(sys.argv[1:]) args.bcli = lambda *a, input=b"", **kwargs: bitcoin_cli(args.cli.split(" "), list(a), input=input, **kwargs) if hasattr(args, "address") and hasattr(args, "descriptor"): args.derived_addresses = {} if args.debug: logging.getLogger().setLevel(logging.DEBUG) elif args.quiet: logging.getLogger().setLevel(logging.WARNING) else: logging.getLogger().setLevel(logging.INFO) if hasattr(args, "fn"): return args.fn(args) else: logging.error("Must specify command") return 1 if __name__ == "__main__": main()