test: add type annotations to util.get_rpc_proxy

Remove proxy.url assignment:
error: "AuthServiceProxy" has no attribute "url"
This commit is contained in:
fanquake 2021-05-28 12:03:37 +08:00
parent 1488f55fa5
commit fbeb8c43bc
No known key found for this signature in database
GPG key ID: 2EEB9F5CC09526C1
3 changed files with 19 additions and 17 deletions

View file

@ -10,6 +10,7 @@ testing.
import os import os
from .authproxy import AuthServiceProxy
REFERENCE_FILENAME = 'rpc_interface.txt' REFERENCE_FILENAME = 'rpc_interface.txt'
@ -19,16 +20,17 @@ class AuthServiceProxyWrapper():
An object that wraps AuthServiceProxy to record specific RPC calls. An object that wraps AuthServiceProxy to record specific RPC calls.
""" """
def __init__(self, auth_service_proxy_instance, coverage_logfile=None): def __init__(self, auth_service_proxy_instance: AuthServiceProxy, rpc_url: str, coverage_logfile: str=None):
""" """
Kwargs: Kwargs:
auth_service_proxy_instance (AuthServiceProxy): the instance auth_service_proxy_instance: the instance being wrapped.
being wrapped. rpc_url: url of the RPC instance being wrapped
coverage_logfile (str): if specified, write each service_name coverage_logfile: if specified, write each service_name
out to a file when called. out to a file when called.
""" """
self.auth_service_proxy_instance = auth_service_proxy_instance self.auth_service_proxy_instance = auth_service_proxy_instance
self.rpc_url = rpc_url
self.coverage_logfile = coverage_logfile self.coverage_logfile = coverage_logfile
def __getattr__(self, name): def __getattr__(self, name):
@ -36,7 +38,7 @@ class AuthServiceProxyWrapper():
if not isinstance(return_val, type(self.auth_service_proxy_instance)): if not isinstance(return_val, type(self.auth_service_proxy_instance)):
# If proxy getattr returned an unwrapped value, do the same here. # If proxy getattr returned an unwrapped value, do the same here.
return return_val return return_val
return AuthServiceProxyWrapper(return_val, self.coverage_logfile) return AuthServiceProxyWrapper(return_val, self.rpc_url, self.coverage_logfile)
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
""" """
@ -57,6 +59,7 @@ class AuthServiceProxyWrapper():
def __truediv__(self, relative_uri): def __truediv__(self, relative_uri):
return AuthServiceProxyWrapper(self.auth_service_proxy_instance / relative_uri, return AuthServiceProxyWrapper(self.auth_service_proxy_instance / relative_uri,
self.rpc_url,
self.coverage_logfile) self.coverage_logfile)
def get_request(self, *args, **kwargs): def get_request(self, *args, **kwargs):
@ -74,18 +77,18 @@ def get_filename(dirname, n_node):
dirname, "coverage.pid%s.node%s.txt" % (pid, str(n_node))) dirname, "coverage.pid%s.node%s.txt" % (pid, str(n_node)))
def write_all_rpc_commands(dirname, node): def write_all_rpc_commands(dirname: str, node: AuthServiceProxy) -> bool:
""" """
Write out a list of all RPC functions available in `bitcoin-cli` for Write out a list of all RPC functions available in `bitcoin-cli` for
coverage comparison. This will only happen once per coverage coverage comparison. This will only happen once per coverage
directory. directory.
Args: Args:
dirname (str): temporary test dir dirname: temporary test dir
node (AuthServiceProxy): client node: client
Returns: Returns:
bool. if the RPC interface file was written. if the RPC interface file was written.
""" """
filename = os.path.join(dirname, REFERENCE_FILENAME) filename = os.path.join(dirname, REFERENCE_FILENAME)

View file

@ -258,7 +258,7 @@ class TestNode():
return return
self.rpc = rpc self.rpc = rpc
self.rpc_connected = True self.rpc_connected = True
self.url = self.rpc.url self.url = self.rpc.rpc_url
return return
except JSONRPCException as e: # Initialization phase except JSONRPCException as e: # Initialization phase
# -28 RPC in warmup # -28 RPC in warmup

View file

@ -286,15 +286,15 @@ class PortSeed:
n = None n = None
def get_rpc_proxy(url, node_number, *, timeout=None, coveragedir=None): def get_rpc_proxy(url: str, node_number: int, *, timeout: int=None, coveragedir: str=None) -> coverage.AuthServiceProxyWrapper:
""" """
Args: Args:
url (str): URL of the RPC server to call url: URL of the RPC server to call
node_number (int): the node number (or id) that this calls to node_number: the node number (or id) that this calls to
Kwargs: Kwargs:
timeout (int): HTTP timeout in seconds timeout: HTTP timeout in seconds
coveragedir (str): Directory coveragedir: Directory
Returns: Returns:
AuthServiceProxy. convenience object for making RPC calls. AuthServiceProxy. convenience object for making RPC calls.
@ -305,11 +305,10 @@ def get_rpc_proxy(url, node_number, *, timeout=None, coveragedir=None):
proxy_kwargs['timeout'] = int(timeout) proxy_kwargs['timeout'] = int(timeout)
proxy = AuthServiceProxy(url, **proxy_kwargs) proxy = AuthServiceProxy(url, **proxy_kwargs)
proxy.url = url # store URL on proxy for info
coverage_logfile = coverage.get_filename(coveragedir, node_number) if coveragedir else None coverage_logfile = coverage.get_filename(coveragedir, node_number) if coveragedir else None
return coverage.AuthServiceProxyWrapper(proxy, coverage_logfile) return coverage.AuthServiceProxyWrapper(proxy, url, coverage_logfile)
def p2p_port(n): def p2p_port(n):