|
|
""" |
|
|
RPC Node Collectors |
|
|
Fetches blockchain data from RPC endpoints (Infura, Alchemy, Ankr, etc.) |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
from datetime import datetime, timezone |
|
|
from typing import Dict, List, Optional, Any |
|
|
from utils.api_client import get_client |
|
|
from utils.logger import setup_logger, log_api_request, log_error |
|
|
|
|
|
logger = setup_logger("rpc_collector") |
|
|
|
|
|
|
|
|
async def get_eth_block_number(provider: str, rpc_url: str, api_key: Optional[str] = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch latest Ethereum block number from RPC endpoint |
|
|
|
|
|
Args: |
|
|
provider: Provider name (e.g., "Infura", "Alchemy") |
|
|
rpc_url: RPC endpoint URL |
|
|
api_key: Optional API key to append to URL |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
category = "rpc_nodes" |
|
|
endpoint = "eth_blockNumber" |
|
|
|
|
|
logger.info(f"Fetching block number from {provider}") |
|
|
|
|
|
try: |
|
|
client = get_client() |
|
|
|
|
|
|
|
|
url = f"{rpc_url}/{api_key}" if api_key else rpc_url |
|
|
|
|
|
|
|
|
payload = { |
|
|
"jsonrpc": "2.0", |
|
|
"method": "eth_blockNumber", |
|
|
"params": [], |
|
|
"id": 1 |
|
|
} |
|
|
|
|
|
headers = {"Content-Type": "application/json"} |
|
|
|
|
|
|
|
|
response = await client.post(url, json=payload, headers=headers, timeout=10) |
|
|
|
|
|
|
|
|
log_api_request( |
|
|
logger, |
|
|
provider, |
|
|
endpoint, |
|
|
response.get("response_time_ms", 0), |
|
|
"success" if response["success"] else "error", |
|
|
response.get("status_code") |
|
|
) |
|
|
|
|
|
if not response["success"]: |
|
|
error_msg = response.get("error_message", "Unknown error") |
|
|
log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": response.get("error_type") |
|
|
} |
|
|
|
|
|
|
|
|
data = response["data"] |
|
|
|
|
|
|
|
|
block_data = None |
|
|
if isinstance(data, dict) and "result" in data: |
|
|
hex_block = data["result"] |
|
|
block_number = int(hex_block, 16) if hex_block else 0 |
|
|
block_data = { |
|
|
"block_number": block_number, |
|
|
"hex": hex_block, |
|
|
"chain": "ethereum" |
|
|
} |
|
|
|
|
|
logger.info(f"{provider} - {endpoint} - Block: {block_data.get('block_number', 'N/A')}") |
|
|
|
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": block_data, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": True, |
|
|
"error": None, |
|
|
"response_time_ms": response.get("response_time_ms", 0) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Unexpected error: {str(e)}" |
|
|
log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": "exception" |
|
|
} |
|
|
|
|
|
|
|
|
async def get_eth_gas_price(provider: str, rpc_url: str, api_key: Optional[str] = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch current gas price from RPC endpoint |
|
|
|
|
|
Args: |
|
|
provider: Provider name |
|
|
rpc_url: RPC endpoint URL |
|
|
api_key: Optional API key |
|
|
|
|
|
Returns: |
|
|
Dict with gas price data |
|
|
""" |
|
|
category = "rpc_nodes" |
|
|
endpoint = "eth_gasPrice" |
|
|
|
|
|
logger.info(f"Fetching gas price from {provider}") |
|
|
|
|
|
try: |
|
|
client = get_client() |
|
|
url = f"{rpc_url}/{api_key}" if api_key else rpc_url |
|
|
|
|
|
payload = { |
|
|
"jsonrpc": "2.0", |
|
|
"method": "eth_gasPrice", |
|
|
"params": [], |
|
|
"id": 1 |
|
|
} |
|
|
|
|
|
headers = {"Content-Type": "application/json"} |
|
|
response = await client.post(url, json=payload, headers=headers, timeout=10) |
|
|
|
|
|
log_api_request( |
|
|
logger, |
|
|
provider, |
|
|
endpoint, |
|
|
response.get("response_time_ms", 0), |
|
|
"success" if response["success"] else "error", |
|
|
response.get("status_code") |
|
|
) |
|
|
|
|
|
if not response["success"]: |
|
|
error_msg = response.get("error_message", "Unknown error") |
|
|
log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": response.get("error_type") |
|
|
} |
|
|
|
|
|
data = response["data"] |
|
|
gas_data = None |
|
|
|
|
|
if isinstance(data, dict) and "result" in data: |
|
|
hex_gas = data["result"] |
|
|
gas_wei = int(hex_gas, 16) if hex_gas else 0 |
|
|
gas_gwei = gas_wei / 1e9 |
|
|
|
|
|
gas_data = { |
|
|
"gas_price_wei": gas_wei, |
|
|
"gas_price_gwei": round(gas_gwei, 2), |
|
|
"hex": hex_gas, |
|
|
"chain": "ethereum" |
|
|
} |
|
|
|
|
|
logger.info(f"{provider} - {endpoint} - Gas: {gas_data.get('gas_price_gwei', 'N/A')} Gwei") |
|
|
|
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": gas_data, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": True, |
|
|
"error": None, |
|
|
"response_time_ms": response.get("response_time_ms", 0) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Unexpected error: {str(e)}" |
|
|
log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": "exception" |
|
|
} |
|
|
|
|
|
|
|
|
async def get_eth_chain_id(provider: str, rpc_url: str, api_key: Optional[str] = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch chain ID from RPC endpoint |
|
|
|
|
|
Args: |
|
|
provider: Provider name |
|
|
rpc_url: RPC endpoint URL |
|
|
api_key: Optional API key |
|
|
|
|
|
Returns: |
|
|
Dict with chain ID data |
|
|
""" |
|
|
category = "rpc_nodes" |
|
|
endpoint = "eth_chainId" |
|
|
|
|
|
try: |
|
|
client = get_client() |
|
|
url = f"{rpc_url}/{api_key}" if api_key else rpc_url |
|
|
|
|
|
payload = { |
|
|
"jsonrpc": "2.0", |
|
|
"method": "eth_chainId", |
|
|
"params": [], |
|
|
"id": 1 |
|
|
} |
|
|
|
|
|
headers = {"Content-Type": "application/json"} |
|
|
response = await client.post(url, json=payload, headers=headers, timeout=10) |
|
|
|
|
|
if not response["success"]: |
|
|
error_msg = response.get("error_message", "Unknown error") |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": error_msg |
|
|
} |
|
|
|
|
|
data = response["data"] |
|
|
chain_data = None |
|
|
|
|
|
if isinstance(data, dict) and "result" in data: |
|
|
hex_chain = data["result"] |
|
|
chain_id = int(hex_chain, 16) if hex_chain else 0 |
|
|
|
|
|
|
|
|
chain_names = { |
|
|
1: "Ethereum Mainnet", |
|
|
3: "Ropsten", |
|
|
4: "Rinkeby", |
|
|
5: "Goerli", |
|
|
11155111: "Sepolia", |
|
|
56: "BSC Mainnet", |
|
|
97: "BSC Testnet", |
|
|
137: "Polygon Mainnet", |
|
|
80001: "Mumbai Testnet" |
|
|
} |
|
|
|
|
|
chain_data = { |
|
|
"chain_id": chain_id, |
|
|
"chain_name": chain_names.get(chain_id, f"Unknown (ID: {chain_id})"), |
|
|
"hex": hex_chain |
|
|
} |
|
|
|
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": chain_data, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": True, |
|
|
"error": None, |
|
|
"response_time_ms": response.get("response_time_ms", 0) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": str(e), |
|
|
"error_type": "exception" |
|
|
} |
|
|
|
|
|
|
|
|
async def collect_infura_data(api_key: Optional[str] = None) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Collect data from Infura RPC endpoints |
|
|
|
|
|
Args: |
|
|
api_key: Infura project ID |
|
|
|
|
|
Returns: |
|
|
List of results from Infura endpoints |
|
|
""" |
|
|
provider = "Infura" |
|
|
rpc_url = "https://mainnet.infura.io/v3" |
|
|
|
|
|
if not api_key: |
|
|
logger.warning(f"{provider} - No API key provided, skipping") |
|
|
return [{ |
|
|
"provider": provider, |
|
|
"category": "rpc_nodes", |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": "API key required", |
|
|
"error_type": "missing_api_key" |
|
|
}] |
|
|
|
|
|
logger.info(f"Starting {provider} data collection") |
|
|
|
|
|
results = await asyncio.gather( |
|
|
get_eth_block_number(provider, rpc_url, api_key), |
|
|
get_eth_gas_price(provider, rpc_url, api_key), |
|
|
get_eth_chain_id(provider, rpc_url, api_key), |
|
|
return_exceptions=True |
|
|
) |
|
|
|
|
|
processed = [] |
|
|
for result in results: |
|
|
if isinstance(result, Exception): |
|
|
logger.error(f"{provider} - Collector failed: {str(result)}") |
|
|
processed.append({ |
|
|
"provider": provider, |
|
|
"category": "rpc_nodes", |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": str(result), |
|
|
"error_type": "exception" |
|
|
}) |
|
|
else: |
|
|
processed.append(result) |
|
|
|
|
|
successful = sum(1 for r in processed if r.get("success", False)) |
|
|
logger.info(f"{provider} - Collection complete: {successful}/{len(processed)} successful") |
|
|
|
|
|
return processed |
|
|
|
|
|
|
|
|
async def collect_alchemy_data(api_key: Optional[str] = None) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Collect data from Alchemy RPC endpoints |
|
|
|
|
|
Args: |
|
|
api_key: Alchemy API key |
|
|
|
|
|
Returns: |
|
|
List of results from Alchemy endpoints |
|
|
""" |
|
|
provider = "Alchemy" |
|
|
rpc_url = "https://eth-mainnet.g.alchemy.com/v2" |
|
|
|
|
|
if not api_key: |
|
|
logger.warning(f"{provider} - No API key provided, using free tier") |
|
|
|
|
|
api_key = "demo" |
|
|
|
|
|
logger.info(f"Starting {provider} data collection") |
|
|
|
|
|
results = await asyncio.gather( |
|
|
get_eth_block_number(provider, rpc_url, api_key), |
|
|
get_eth_gas_price(provider, rpc_url, api_key), |
|
|
get_eth_chain_id(provider, rpc_url, api_key), |
|
|
return_exceptions=True |
|
|
) |
|
|
|
|
|
processed = [] |
|
|
for result in results: |
|
|
if isinstance(result, Exception): |
|
|
logger.error(f"{provider} - Collector failed: {str(result)}") |
|
|
processed.append({ |
|
|
"provider": provider, |
|
|
"category": "rpc_nodes", |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": str(result), |
|
|
"error_type": "exception" |
|
|
}) |
|
|
else: |
|
|
processed.append(result) |
|
|
|
|
|
successful = sum(1 for r in processed if r.get("success", False)) |
|
|
logger.info(f"{provider} - Collection complete: {successful}/{len(processed)} successful") |
|
|
|
|
|
return processed |
|
|
|
|
|
|
|
|
async def collect_ankr_data() -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Collect data from Ankr public RPC endpoints (no key required) |
|
|
|
|
|
Returns: |
|
|
List of results from Ankr endpoints |
|
|
""" |
|
|
provider = "Ankr" |
|
|
rpc_url = "https://rpc.ankr.com/eth" |
|
|
|
|
|
logger.info(f"Starting {provider} data collection") |
|
|
|
|
|
results = await asyncio.gather( |
|
|
get_eth_block_number(provider, rpc_url), |
|
|
get_eth_gas_price(provider, rpc_url), |
|
|
get_eth_chain_id(provider, rpc_url), |
|
|
return_exceptions=True |
|
|
) |
|
|
|
|
|
processed = [] |
|
|
for result in results: |
|
|
if isinstance(result, Exception): |
|
|
logger.error(f"{provider} - Collector failed: {str(result)}") |
|
|
processed.append({ |
|
|
"provider": provider, |
|
|
"category": "rpc_nodes", |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": str(result), |
|
|
"error_type": "exception" |
|
|
}) |
|
|
else: |
|
|
processed.append(result) |
|
|
|
|
|
successful = sum(1 for r in processed if r.get("success", False)) |
|
|
logger.info(f"{provider} - Collection complete: {successful}/{len(processed)} successful") |
|
|
|
|
|
return processed |
|
|
|
|
|
|
|
|
async def collect_public_rpc_data() -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Collect data from free public RPC endpoints |
|
|
|
|
|
Returns: |
|
|
List of results from public endpoints |
|
|
""" |
|
|
logger.info("Starting public RPC data collection") |
|
|
|
|
|
public_rpcs = [ |
|
|
("Cloudflare", "https://cloudflare-eth.com"), |
|
|
("PublicNode", "https://ethereum.publicnode.com"), |
|
|
("LlamaNodes", "https://eth.llamarpc.com"), |
|
|
] |
|
|
|
|
|
all_results = [] |
|
|
|
|
|
for provider, rpc_url in public_rpcs: |
|
|
results = await asyncio.gather( |
|
|
get_eth_block_number(provider, rpc_url), |
|
|
get_eth_gas_price(provider, rpc_url), |
|
|
return_exceptions=True |
|
|
) |
|
|
|
|
|
for result in results: |
|
|
if isinstance(result, Exception): |
|
|
logger.error(f"{provider} - Collector failed: {str(result)}") |
|
|
all_results.append({ |
|
|
"provider": provider, |
|
|
"category": "rpc_nodes", |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": str(result), |
|
|
"error_type": "exception" |
|
|
}) |
|
|
else: |
|
|
all_results.append(result) |
|
|
|
|
|
successful = sum(1 for r in all_results if r.get("success", False)) |
|
|
logger.info(f"Public RPC collection complete: {successful}/{len(all_results)} successful") |
|
|
|
|
|
return all_results |
|
|
|
|
|
|
|
|
async def collect_rpc_data( |
|
|
infura_key: Optional[str] = None, |
|
|
alchemy_key: Optional[str] = None |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Main function to collect RPC data from all sources |
|
|
|
|
|
Args: |
|
|
infura_key: Infura project ID |
|
|
alchemy_key: Alchemy API key |
|
|
|
|
|
Returns: |
|
|
List of results from all RPC collectors |
|
|
""" |
|
|
logger.info("Starting RPC data collection from all sources") |
|
|
|
|
|
|
|
|
all_results = [] |
|
|
|
|
|
|
|
|
if infura_key: |
|
|
infura_results = await collect_infura_data(infura_key) |
|
|
all_results.extend(infura_results) |
|
|
|
|
|
|
|
|
alchemy_results = await collect_alchemy_data(alchemy_key) |
|
|
all_results.extend(alchemy_results) |
|
|
|
|
|
|
|
|
ankr_results = await collect_ankr_data() |
|
|
all_results.extend(ankr_results) |
|
|
|
|
|
|
|
|
public_results = await collect_public_rpc_data() |
|
|
all_results.extend(public_results) |
|
|
|
|
|
|
|
|
successful = sum(1 for r in all_results if r.get("success", False)) |
|
|
logger.info(f"RPC data collection complete: {successful}/{len(all_results)} successful") |
|
|
|
|
|
return all_results |
|
|
|
|
|
|
|
|
class RPCNodeCollector: |
|
|
""" |
|
|
RPC Node Collector class for WebSocket streaming interface |
|
|
Wraps the standalone RPC node collection functions |
|
|
""" |
|
|
|
|
|
def __init__(self, config: Any = None): |
|
|
""" |
|
|
Initialize the RPC node collector |
|
|
|
|
|
Args: |
|
|
config: Configuration object (optional, for compatibility) |
|
|
""" |
|
|
self.config = config |
|
|
self.logger = logger |
|
|
|
|
|
async def collect(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Collect RPC node data from all sources |
|
|
|
|
|
Returns: |
|
|
Dict with aggregated RPC node data |
|
|
""" |
|
|
import os |
|
|
infura_key = os.getenv("INFURA_API_KEY") |
|
|
alchemy_key = os.getenv("ALCHEMY_API_KEY") |
|
|
results = await collect_rpc_data(infura_key, alchemy_key) |
|
|
|
|
|
|
|
|
aggregated = { |
|
|
"nodes": [], |
|
|
"active_nodes": 0, |
|
|
"total_nodes": 0, |
|
|
"average_latency": 0, |
|
|
"events": [], |
|
|
"block_number": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat() |
|
|
} |
|
|
|
|
|
total_latency = 0 |
|
|
latency_count = 0 |
|
|
|
|
|
for result in results: |
|
|
aggregated["total_nodes"] += 1 |
|
|
|
|
|
if result.get("success"): |
|
|
aggregated["active_nodes"] += 1 |
|
|
provider = result.get("provider", "unknown") |
|
|
response_time = result.get("response_time_ms", 0) |
|
|
data = result.get("data", {}) |
|
|
|
|
|
|
|
|
if response_time: |
|
|
total_latency += response_time |
|
|
latency_count += 1 |
|
|
|
|
|
|
|
|
node_info = { |
|
|
"provider": provider, |
|
|
"response_time_ms": response_time, |
|
|
"status": "active", |
|
|
"data": data |
|
|
} |
|
|
|
|
|
|
|
|
if "result" in data and isinstance(data["result"], str): |
|
|
try: |
|
|
block_number = int(data["result"], 16) |
|
|
node_info["block_number"] = block_number |
|
|
if aggregated["block_number"] is None or block_number > aggregated["block_number"]: |
|
|
aggregated["block_number"] = block_number |
|
|
except: |
|
|
pass |
|
|
|
|
|
aggregated["nodes"].append(node_info) |
|
|
|
|
|
|
|
|
if latency_count > 0: |
|
|
aggregated["average_latency"] = total_latency / latency_count |
|
|
|
|
|
return aggregated |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
async def main(): |
|
|
import os |
|
|
|
|
|
infura_key = os.getenv("INFURA_API_KEY") |
|
|
alchemy_key = os.getenv("ALCHEMY_API_KEY") |
|
|
|
|
|
results = await collect_rpc_data(infura_key, alchemy_key) |
|
|
|
|
|
print("\n=== RPC Data Collection Results ===") |
|
|
for result in results: |
|
|
print(f"\nProvider: {result['provider']}") |
|
|
print(f"Success: {result['success']}") |
|
|
if result['success']: |
|
|
print(f"Response Time: {result.get('response_time_ms', 0):.2f}ms") |
|
|
data = result.get('data', {}) |
|
|
if data: |
|
|
print(f"Data: {data}") |
|
|
else: |
|
|
print(f"Error: {result.get('error', 'Unknown')}") |
|
|
|
|
|
asyncio.run(main()) |
|
|
|