Compare ASMaps with respect to specific addresses

Introduce diff_addrs subcommand as means for a Bitcoin-centric
comparison of two ASMaps.

In addition to two ASMaps, the subcommand reads addresses from
a (getnodeaddresses-compatible) file, and provides information on
addresses that have mismatching ASN according to the two ASMaps.
This commit is contained in:
virtu 2024-06-05 14:23:08 +02:00
parent 9efc2af3be
commit 5215c925d1

View file

@ -6,7 +6,9 @@
import argparse
import sys
import ipaddress
import json
import math
from collections import defaultdict
import asmap
@ -113,6 +115,18 @@ def main():
parser_diff.add_argument('infile2', type=argparse.FileType('rb'),
help="second file to compare (text or binary)")
parser_diff_addrs = subparsers.add_parser("diff_addrs",
help="compute difference between two asmap files for a set of addresses")
parser_diff_addrs.add_argument('-s', '--show-addresses', dest="show_addresses", default=False, action="store_true",
help="include reassigned addresses in the output")
parser_diff_addrs.add_argument("infile1", type=argparse.FileType("rb"),
help="first file to compare (text or binary)")
parser_diff_addrs.add_argument("infile2", type=argparse.FileType("rb"),
help="second file to compare (text or binary)")
parser_diff_addrs.add_argument("addrs_file", type=argparse.FileType("r"),
help="address file containing getnodeaddresses output to use in the comparison "
"(make sure to set the count parameter to zero to get all node addresses, "
"e.g. 'bitcoin-cli getnodeaddresses 0 > addrs.json')")
args = parser.parse_args()
if args.subcommand is None:
parser.print_help()
@ -148,6 +162,33 @@ def main():
f"# {ipv4_changed}{ipv4_change_str} IPv4 addresses changed; "
f"{ipv6_changed}{ipv6_change_str} IPv6 addresses changed"
)
elif args.subcommand == "diff_addrs":
state1 = load_file(args.infile1)
state2 = load_file(args.infile2)
address_info = json.load(args.addrs_file)
addrs = {a["address"] for a in address_info if a["network"] in ["ipv4", "ipv6"]}
reassignments = defaultdict(list)
for addr in addrs:
net = ipaddress.ip_network(addr)
prefix = asmap.net_to_prefix(net)
old_asn = state1.lookup(prefix)
new_asn = state2.lookup(prefix)
if new_asn != old_asn:
reassignments[(old_asn, new_asn)].append(addr)
reassignments = sorted(reassignments.items(), key=lambda item: len(item[1]), reverse=True)
num_reassignment_type = defaultdict(int)
for (old_asn, new_asn), reassigned_addrs in reassignments:
num_reassigned = len(reassigned_addrs)
num_reassignment_type[(bool(old_asn), bool(new_asn))] += num_reassigned
old_asn_str = f"AS{old_asn}" if old_asn else "unassigned"
new_asn_str = f"AS{new_asn}" if new_asn else "unassigned"
opt = ": " + ", ".join(reassigned_addrs) if args.show_addresses else ""
print(f"{num_reassigned} address(es) reassigned from {old_asn_str} to {new_asn_str}{opt}")
num_reassignments = sum(len(addrs) for _, addrs in reassignments)
share = num_reassignments / len(addrs) if len(addrs) > 0 else 0
print(f"Summary: {num_reassignments:,} ({share:.2%}) of {len(addrs):,} addresses were reassigned "
f"(migrations={num_reassignment_type[True, True]}, assignments={num_reassignment_type[False, True]}, "
f"unassignments={num_reassignment_type[True, False]})")
else:
parser.print_help()
sys.exit("No command provided.")