diff --git a/README.md b/README.md index 7a569b4..2b6317b 100644 --- a/README.md +++ b/README.md @@ -97,13 +97,13 @@ Agent runners are recommended to create a [backup](https://github.com/valory-xyz 1. Use the `trades` command to display information about placed trades by a given address: ```bash - cd trader; poetry run python ../trades.py YOUR_SAFE_ADDRESS; cd .. + cd trader; poetry run python ../trades.py --creator YOUR_SAFE_ADDRESS; cd .. ``` Or restrict the search to specific dates by defining the "from" and "to" dates: ```bash - cd trader; poetry run python ../trades.py YOUR_SAFE_ADDRESS --from-date 2023-08-15:03:50:00 --to-date 2023-08-20:13:45:00; cd .. + cd trader; poetry run python ../trades.py --creator YOUR_SAFE_ADDRESS --from-date 2023-08-15:03:50:00 --to-date 2023-08-20:13:45:00; cd .. ``` 2. Use the `report` command to display a summary of the service status: diff --git a/report.py b/report.py index b819e35..0ef2ac7 100644 --- a/report.py +++ b/report.py @@ -141,7 +141,7 @@ def _get_mech_requests_count( return sum( 1 for mech_request in mech_requests.values() - if mech_request.get("utc_timestamp", 0) > timestamp + if mech_request.get("block_timestamp", 0) > timestamp ) @@ -218,7 +218,7 @@ def _parse_args() -> Any: rpc = file.read().strip() # Prediction market trading - mech_requests = trades.get_mech_requests(rpc, safe_address) + mech_requests = trades.get_mech_requests(safe_address) mech_statistics = trades.get_mech_statistics(mech_requests) trades_json = trades._query_omen_xdai_subgraph(safe_address) _, statistics_table = trades.parse_user( diff --git a/scripts/mech_events.py b/scripts/mech_events.py index 21e5405..3c4de40 100644 --- a/scripts/mech_events.py +++ b/scripts/mech_events.py @@ -25,82 +25,92 @@ import time from dataclasses import dataclass from pathlib import Path -from typing import Any, Dict +from string import Template +from typing import Any, ClassVar, Dict import requests -from eth_utils import to_checksum_address from tqdm import tqdm -from web3 import HTTPProvider, Web3 from web3.datastructures import AttributeDict -from web3.types import BlockParams SCRIPT_PATH = Path(__file__).resolve().parent STORE_PATH = Path(SCRIPT_PATH, "..", ".trader_runner") MECH_EVENTS_JSON_PATH = Path(STORE_PATH, "mech_events.json") -AGENT_MECH_JSON_PATH = Path(SCRIPT_PATH, "..", "contracts", "AgentMech.json") HTTP = "http://" HTTPS = HTTP[:4] + "s" + HTTP[4:] CID_PREFIX = "f01701220" IPFS_ADDRESS = f"{HTTPS}gateway.autonolas.tech/ipfs/" -LATEST_BLOCK_NAME: BlockParams = "latest" -BLOCK_DATA_NUMBER = "number" -BLOCKS_CHUNK_SIZE = 5000 -EXCLUDED_BLOCKS_THRESHOLD = 2 * BLOCKS_CHUNK_SIZE -NUM_EXCLUDED_BLOCKS = 10 -MECH_EVENTS_DB_VERSION = 2 +MECH_EVENTS_DB_VERSION = 3 DEFAULT_MECH_FEE = 10000000000000000 - -# Pair of (Mech contract address, Mech contract deployed on block number). -MECH_CONTRACT_ADDRESSES = [ - # Old Mech contract - ( - to_checksum_address("0xff82123dfb52ab75c417195c5fdb87630145ae81"), - 27939217, - ), - # New Mech contract - ( - to_checksum_address("0x77af31de935740567cf4ff1986d04b2c964a786a"), - 30663133, - ), -] +MECH_SUBGRAPH_URL = "https://api.studio.thegraph.com/query/57238/mech/0.0.2" +SUBGRAPH_HEADERS = { + "Accept": "application/json, multipart/mixed", + "Content-Type": "application/json", +} +QUERY_BATCH_SIZE = 1000 +MECH_EVENTS_SUBGRAPH_QUERY = Template( + """ + query { + ${subgraph_event_set_name}( + where: { + sender: "${sender}" + id_gt: "${id_gt}" + } + first: ${first} + orderBy: id + orderDirection: asc + ) { + id + ipfsHash + requestId + sender + transactionHash + blockNumber + blockTimestamp + } + } + """ +) @dataclass -class MechBaseEvent: +class MechBaseEvent: # pylint: disable=too-many-instance-attributes """Base class for mech's on-chain event representation.""" event_id: str - data: str sender: str transaction_hash: str + ipfs_hash: str block_number: int - utc_timestamp: int + block_timestamp: int ipfs_link: str ipfs_contents: Dict[str, Any] + event_name: ClassVar[str] + subgraph_event_name: ClassVar[str] + def __init__( self, event_id: str, - data: str, sender: str, + ipfs_hash: str, transaction_hash: str, block_number: int, - utc_timestamp: int, + block_timestamp: int, ): # pylint: disable=too-many-arguments """Initializes the MechBaseEvent""" self.event_id = event_id - self.data = data self.sender = sender + self.ipfs_hash = ipfs_hash self.transaction_hash = transaction_hash self.block_number = block_number - self.utc_timestamp = utc_timestamp + self.block_timestamp = block_timestamp self.ipfs_link = "" self.ipfs_contents = {} - self._populate_ipfs_contents(data) + self._populate_ipfs_contents(ipfs_hash) def _populate_ipfs_contents(self, data: str) -> None: - url = f"{IPFS_ADDRESS}{CID_PREFIX}{data}" + url = f"{IPFS_ADDRESS}{data}" for _url in [f"{url}/metadata.json", url]: try: response = requests.get(_url) @@ -117,22 +127,20 @@ class MechRequest(MechBaseEvent): request_id: str fee: int - event_name: str = "Request" - def __init__(self, event: AttributeDict, utc_timestamp: int): - """Initializes the MechRequest""" + event_name: ClassVar[str] = "Request" + subgraph_event_name: ClassVar[str] = "request" - if self.event_name != event["event"]: - raise ValueError("Invalid event to initialize MechRequest") + def __init__(self, event: AttributeDict): + """Initializes the MechRequest""" - args = event["args"] super().__init__( - event_id=args["requestId"], - data=args["data"].hex(), - sender=args["sender"], - transaction_hash=event["transactionHash"].hex(), - block_number=event["blockNumber"], - utc_timestamp=utc_timestamp, + event_id=event["requestId"], + sender=event["sender"], + ipfs_hash=event["ipfsHash"], + transaction_hash=event["transactionHash"], + block_number=int(event["blockNumber"]), + block_timestamp=int(event["blockTimestamp"]), ) self.request_id = self.event_id @@ -163,10 +171,10 @@ def _read_mech_events_data_from_file() -> Dict[str, Any]: last_write_time = 0.0 -def _write_mech_events_data( - mech_events_data: Dict[str, Any], force_write=False +def _write_mech_events_data_to_file( + mech_events_data: Dict[str, Any], force_write: bool = False ) -> None: - global last_write_time + global last_write_time # pylint: disable=global-statement now = time.time() if force_write or (now - last_write_time) >= MINIMUM_WRITE_FILE_DELAY: @@ -175,130 +183,114 @@ def _write_mech_events_data( last_write_time = now +def _query_mech_events_subgraph( + sender: str, event_cls: type[MechBaseEvent] +) -> dict[str, Any]: + """Query the subgraph.""" + + subgraph_event_set_name = f"{event_cls.subgraph_event_name}s" + all_results: dict[str, Any] = {"data": {subgraph_event_set_name: []}} + id_gt = "" + while True: + query = MECH_EVENTS_SUBGRAPH_QUERY.substitute( + subgraph_event_set_name=subgraph_event_set_name, + sender=sender, + id_gt=id_gt, + first=QUERY_BATCH_SIZE, + ) + response = requests.post( + MECH_SUBGRAPH_URL, + headers=SUBGRAPH_HEADERS, + json={"query": query}, + timeout=300, + ) + result_json = response.json() + events = result_json.get("data", {}).get(subgraph_event_set_name, []) + + if not events: + break + + all_results["data"][subgraph_event_set_name].extend(events) + id_gt = events[len(events) - 1]["id"] + + return all_results + + # pylint: disable=too-many-locals def _update_mech_events_db( - rpc: str, - mech_contract_address: str, - event_name: str, - earliest_block: int, sender: str, + event_cls: type[MechBaseEvent], ) -> None: - """Get the mech Request events.""" + """Get the mech Events database.""" print( f"Updating the local Mech events database. This may take a while.\n" - f" Event: {event_name}\n" - f" Mech contract: {mech_contract_address}\n" - f" Sender address: {sender}" + f" Event: {event_cls.event_name}\n" + f" Sender address: {sender}" ) - # Read the current Mech events database - mech_events_data = _read_mech_events_data_from_file() - - # Search for Mech events in the blockchain try: - w3 = Web3(HTTPProvider(rpc)) - with open(AGENT_MECH_JSON_PATH, "r", encoding="utf-8") as file: - contract_data = json.load(file) - - abi = contract_data.get("abi", []) - contract_instance = w3.eth.contract(address=mech_contract_address, abi=abi) - - last_processed_block = ( - mech_events_data.get(sender, {}) - .get(mech_contract_address, {}) - .get(event_name, {}) - .get("last_processed_block", 0) + # Query the subgraph + query = _query_mech_events_subgraph(sender, event_cls) + subgraph_data = query["data"] + + # Read the current Mech events database + mech_events_data = _read_mech_events_data_from_file() + stored_events = mech_events_data.setdefault(sender, {}).setdefault( + event_cls.event_name, {} ) - starting_block = max(earliest_block, last_processed_block + 1) - ending_block = w3.eth.get_block(LATEST_BLOCK_NAME)[BLOCK_DATA_NUMBER] - - print(f" Starting block: {starting_block}") - print(f" Ending block: {ending_block}") - - # If the script has to process relatively recent blocks, - # this will allow the RPC synchronize them and prevent - # throwing an exception. - if ending_block - starting_block < EXCLUDED_BLOCKS_THRESHOLD: - ending_block -= NUM_EXCLUDED_BLOCKS - time.sleep(10) - - for from_block in tqdm( - range(starting_block, ending_block, BLOCKS_CHUNK_SIZE), - desc=" Progress: ", + + subgraph_event_set_name = f"{event_cls.subgraph_event_name}s" + for subgraph_event in tqdm( + subgraph_data[subgraph_event_set_name], + miniters=1, + desc=" Processing", ): - to_block = min(from_block + BLOCKS_CHUNK_SIZE, ending_block) - event_filter = contract_instance.events[event_name].create_filter( - fromBlock=from_block, toBlock=to_block - ) - chunk = event_filter.get_all_entries() - w3.eth.uninstall_filter(event_filter.filter_id) - filtered_events = [ - event for event in chunk if event["args"]["sender"] == sender - ] - - # Update the Mech events data with the latest examined chunk - sender_data = mech_events_data.setdefault(sender, {}) - contract_data = sender_data.setdefault(mech_contract_address, {}) - event_data = contract_data.setdefault(event_name, {}) - event_data["last_processed_block"] = to_block - - mech_events = event_data.setdefault("mech_events", {}) - for event in filtered_events: - block_number = event["blockNumber"] - utc_timestamp = w3.eth.get_block(block_number).timestamp - - if event_name == MechRequest.event_name: - mech_event = MechRequest(event, utc_timestamp) - mech_events[mech_event.event_id] = mech_event.__dict__ - - # Store the (updated) Mech events database - _write_mech_events_data(mech_events_data) + if subgraph_event[ + "requestId" + ] not in stored_events or not stored_events.get( + subgraph_event["requestId"], {} + ).get( + "ipfs_contents" + ): + mech_event = event_cls(subgraph_event) # type: ignore + stored_events[mech_event.event_id] = mech_event.__dict__ + + _write_mech_events_data_to_file(mech_events_data=mech_events_data) + + _write_mech_events_data_to_file( + mech_events_data=mech_events_data, force_write=True + ) except KeyboardInterrupt: print( "\n" - f"WARNING: The update of the local Mech events database was cancelled (contract {mech_contract_address}). " + "WARNING: The update of the local Mech events database was cancelled. " "Therefore, the Mech calls and costs might not be reflected accurately. " "You may attempt to rerun this script to retry synchronizing the database." ) input("Press Enter to continue...") except Exception: # pylint: disable=broad-except print( - f"WARNING: An error occurred while updating the local Mech events database (contract {mech_contract_address}). " + "WARNING: An error occurred while updating the local Mech events database. " "Therefore, the Mech calls and costs might not be reflected accurately. " "You may attempt to rerun this script to retry synchronizing the database." ) input("Press Enter to continue...") - _write_mech_events_data(mech_events_data, True) print("") -def _get_mech_events(rpc: str, sender: str, event_name: str) -> Dict[str, Any]: +def _get_mech_events(sender: str, event_cls: type[MechBaseEvent]) -> Dict[str, Any]: """Updates the local database of Mech events and returns the Mech events.""" - for ( - mech_contract_address, - mech_contract_deployed_block, - ) in MECH_CONTRACT_ADDRESSES: - _update_mech_events_db( - rpc, mech_contract_address, event_name, mech_contract_deployed_block, sender - ) - + _update_mech_events_db(sender, event_cls) mech_events_data = _read_mech_events_data_from_file() sender_data = mech_events_data.get(sender, {}) - - all_mech_events = {} - for mech_contract_data in sender_data.values(): - event_data = mech_contract_data.get(event_name, {}) - mech_events = event_data.get("mech_events", {}) - all_mech_events.update(mech_events) - - return all_mech_events + return sender_data.get(event_cls.event_name, {}) -def get_mech_requests(rpc: str, sender: str) -> Dict[str, Any]: +def get_mech_requests(sender: str) -> Dict[str, Any]: """Returns the Mech requests.""" - return _get_mech_events(rpc, sender, MechRequest.event_name) + return _get_mech_events(sender, MechRequest) diff --git a/trades.py b/trades.py index c36baf1..011f540 100644 --- a/trades.py +++ b/trades.py @@ -59,6 +59,9 @@ STORE_PATH = Path(SCRIPT_PATH, ".trader_runner") RPC_PATH = Path(STORE_PATH, "rpc.txt") WXDAI_CONTRACT_ADDRESS = "0xe91D153E0b41518A2Ce8Dd3D7944Fa863463a97d" +SCRIPT_PATH = Path(__file__).resolve().parent +STORE_PATH = Path(SCRIPT_PATH, ".trader_runner") +SAFE_ADDRESS_PATH = Path(STORE_PATH, "service_safe_address.txt") headers = { @@ -255,7 +258,7 @@ def _parse_args() -> Any: """Parse the script arguments.""" parser = ArgumentParser(description="Get trades on Omen for a Safe address.") parser.add_argument( - "creator", + "--creator", action=EthereumAddressAction, help="Ethereum address of the service Safe", ) @@ -285,6 +288,10 @@ def _parse_args() -> Any: ) args = parser.parse_args() + if args.creator is None: + with open(SAFE_ADDRESS_PATH, "r", encoding="utf-8") as file: + args.creator = file.read().strip() + args.from_date = args.from_date.replace(tzinfo=datetime.timezone.utc) args.to_date = args.to_date.replace(tzinfo=datetime.timezone.utc) args.fpmm_created_from_date = args.fpmm_created_from_date.replace( @@ -786,7 +793,7 @@ def get_mech_statistics(mech_requests: Dict[str, Any]) -> Dict[str, Dict[str, in with open(RPC_PATH, "r", encoding="utf-8") as rpc_file: rpc = rpc_file.read() - mech_requests = get_mech_requests(rpc, user_args.creator) + mech_requests = get_mech_requests(user_args.creator) mech_statistics = get_mech_statistics(mech_requests) trades_json = _query_omen_xdai_subgraph(