#!/usr/bin/env python3 # Copyright (c) 2018-2022 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Tests some generic aspects of the RPC interface.""" import json import os from dataclasses import dataclass from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal, assert_greater_than_or_equal from threading import Thread from typing import Optional import subprocess RPC_INVALID_ADDRESS_OR_KEY = -5 RPC_INVALID_PARAMETER = -8 RPC_METHOD_NOT_FOUND = -32601 RPC_INVALID_REQUEST = -32600 RPC_PARSE_ERROR = -32700 @dataclass class BatchOptions: version: Optional[int] = None notification: bool = False request_fields: Optional[dict] = None response_fields: Optional[dict] = None def format_request(options, idx, fields): request = {} if options.version == 1: request.update(version="1.1") elif options.version == 2: request.update(jsonrpc="2.0") elif options.version is not None: raise NotImplementedError(f"Unknown JSONRPC version {options.version}") if not options.notification: request.update(id=idx) request.update(fields) if options.request_fields: request.update(options.request_fields) return request def format_response(options, idx, fields): response = {} response.update(id=None if options.notification else idx) response.update(result=None, error=None) response.update(fields) if options.response_fields: response.update(options.response_fields) return response def send_raw_rpc(node, raw_body: bytes) -> tuple[object, int]: return node._request("POST", "/", raw_body) def send_json_rpc(node, body: object) -> tuple[object, int]: raw = json.dumps(body).encode("utf-8") return send_raw_rpc(node, raw) def expect_http_rpc_status(expected_http_status, expected_rpc_error_code, node, method, params, version=1, notification=False): req = format_request(BatchOptions(version, notification), 0, {"method": method, "params": params}) response, status = send_json_rpc(node, req) if expected_rpc_error_code is not None: assert_equal(response["error"]["code"], expected_rpc_error_code) assert_equal(status, expected_http_status) def test_work_queue_getblock(node, got_exceeded_error): while not got_exceeded_error: try: node.cli("waitfornewblock", "500").send_cli() except subprocess.CalledProcessError as e: assert_equal(e.output, 'error: Server response: Work queue depth exceeded\n') got_exceeded_error.append(True) class RPCInterfaceTest(BitcoinTestFramework): def set_test_params(self): self.num_nodes = 1 self.setup_clean_chain = True self.supports_cli = False def test_getrpcinfo(self): self.log.info("Testing getrpcinfo...") info = self.nodes[0].getrpcinfo() assert_equal(len(info['active_commands']), 1) command = info['active_commands'][0] assert_equal(command['method'], 'getrpcinfo') assert_greater_than_or_equal(command['duration'], 0) assert_equal(info['logpath'], os.path.join(self.nodes[0].chain_path, 'debug.log')) def test_batch_request(self, call_options): calls = [ # A basic request that will work fine. {"method": "getblockcount"}, # Request that will fail. The whole batch request should still # work fine. {"method": "invalidmethod"}, # Another call that should succeed. {"method": "getblockhash", "params": [0]}, # Invalid request format {"pizza": "sausage"} ] results = [ {"result": 0}, {"error": {"code": RPC_METHOD_NOT_FOUND, "message": "Method not found"}}, {"result": "0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206"}, {"error": {"code": RPC_INVALID_REQUEST, "message": "Missing method"}}, ] request = [] response = [] for idx, (call, result) in enumerate(zip(calls, results), 1): options = call_options(idx) if options is None: continue request.append(format_request(options, idx, call)) response.append(format_response(options, idx, result)) rpc_response, http_status = send_json_rpc(self.nodes[0], request) assert_equal(http_status, 200) assert_equal(rpc_response, response) def test_batch_requests(self): self.log.info("Testing empty batch request...") self.test_batch_request(lambda idx: None) self.log.info("Testing basic JSON-RPC 2.0 batch request...") self.test_batch_request(lambda idx: BatchOptions(version=2)) self.log.info("Testing JSON-RPC 2.0 batch with notifications...") self.test_batch_request(lambda idx: BatchOptions(version=2, notification=idx < 2)) self.log.info("Testing JSON-RPC 2.0 batch of ALL notifications...") self.test_batch_request(lambda idx: BatchOptions(version=2, notification=True)) # JSONRPC 1.1 does not support batch requests, but test them for backwards compatibility. self.log.info("Testing nonstandard JSON-RPC 1.1 batch request...") self.test_batch_request(lambda idx: BatchOptions(version=1)) self.log.info("Testing nonstandard mixed JSON-RPC 1.1/2.0 batch request...") self.test_batch_request(lambda idx: BatchOptions(version=2 if idx % 2 else 1)) self.log.info("Testing nonstandard batch request without version numbers...") self.test_batch_request(lambda idx: BatchOptions()) self.log.info("Testing nonstandard batch request without version numbers or ids...") self.test_batch_request(lambda idx: BatchOptions(notification=True)) self.log.info("Testing nonstandard jsonrpc 1.0 version number is accepted...") self.test_batch_request(lambda idx: BatchOptions(request_fields={"jsonrpc": "1.0"})) self.log.info("Testing unrecognized jsonrpc version number is accepted...") self.test_batch_request(lambda idx: BatchOptions(request_fields={"jsonrpc": "2.1"})) def test_http_status_codes(self): self.log.info("Testing HTTP status codes for JSON-RPC 1.1 requests...") # OK expect_http_rpc_status(200, None, self.nodes[0], "getblockhash", [0]) # Errors expect_http_rpc_status(404, RPC_METHOD_NOT_FOUND, self.nodes[0], "invalidmethod", []) expect_http_rpc_status(500, RPC_INVALID_PARAMETER, self.nodes[0], "getblockhash", [42]) # force-send empty request response, status = send_raw_rpc(self.nodes[0], b"") assert_equal(response, {"id": None, "result": None, "error": {"code": RPC_PARSE_ERROR, "message": "Parse error"}}) assert_equal(status, 500) # force-send invalidly formatted request response, status = send_raw_rpc(self.nodes[0], b"this is bad") assert_equal(response, {"id": None, "result": None, "error": {"code": RPC_PARSE_ERROR, "message": "Parse error"}}) assert_equal(status, 500) self.log.info("Testing HTTP status codes for JSON-RPC 2.0 requests...") # OK expect_http_rpc_status(200, None, self.nodes[0], "getblockhash", [0], 2, False) # RPC errors and HTTP errors expect_http_rpc_status(404, RPC_METHOD_NOT_FOUND, self.nodes[0], "invalidmethod", [], 2, False) expect_http_rpc_status(500, RPC_INVALID_PARAMETER, self.nodes[0], "getblockhash", [42], 2, False) # force-send invalidly formatted requests response, status = send_json_rpc(self.nodes[0], {"jsonrpc": 2, "method": "getblockcount"}) assert_equal(response, {"error": None, "id": None, "result": 0}) assert_equal(status, 200) response, status = send_json_rpc(self.nodes[0], {"jsonrpc": "3.0", "method": "getblockcount"}) assert_equal(response, {"error": None, "id": None, "result": 0}) assert_equal(status, 200) self.log.info("Testing HTTP status codes for JSON-RPC 2.0 notifications...") # Not notification: id exists response, status = send_json_rpc(self.nodes[0], {"jsonrpc": "2.0", "id": None, "method": "getblockcount"}) assert_equal(response["result"], 0) assert_equal(status, 200) # Not notification: JSON 1.1 expect_http_rpc_status(200, None, self.nodes[0], "getblockcount", [], 1) # Not notification: has "id" field expect_http_rpc_status(200, None, self.nodes[0], "getblockcount", [], 2, False) block_count = self.nodes[0].getblockcount() expect_http_rpc_status(200, None, self.nodes[0], "generatetoaddress", [1, "bcrt1qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqdku202"], 2, True) # The command worked even though there was no response assert_equal(block_count + 1, self.nodes[0].getblockcount()) expect_http_rpc_status(500, RPC_INVALID_ADDRESS_OR_KEY, self.nodes[0], "generatetoaddress", [1, "invalid_address"], 2, True) # Sanity check: command was not executed assert_equal(block_count + 1, self.nodes[0].getblockcount()) def test_work_queue_exceeded(self): self.log.info("Testing work queue exceeded...") self.restart_node(0, ['-rpcworkqueue=1', '-rpcthreads=1']) got_exceeded_error = [] threads = [] for _ in range(3): t = Thread(target=test_work_queue_getblock, args=(self.nodes[0], got_exceeded_error)) t.start() threads.append(t) for t in threads: t.join() def run_test(self): self.test_getrpcinfo() self.test_batch_requests() self.test_http_status_codes() self.test_work_queue_exceeded() if __name__ == '__main__': RPCInterfaceTest().main()