Skip to content

Commit

Permalink
workaround for broken logs, allow fetching balance from node
Browse files Browse the repository at this point in the history
  • Loading branch information
soad003 committed Dec 18, 2023
1 parent dfdf563 commit 22a7588
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 17 deletions.
54 changes: 42 additions & 12 deletions gsrest/db/cassandra.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from gsrest.util.address import (address_to_user_format)
from gsrest.util.evm import (bytes_to_hex, strip_0x)
from gsrest.util.tron import partial_tron_to_partial_evm
from gsrest.util.node_balances import get_balances

# import hashlib

Expand Down Expand Up @@ -364,6 +365,14 @@ def get_token_configuration(self, currency):
eth_config = self.parameters.get(currency, None)
return eth_config["token_config"] if eth_config is not None else {}

def get_balance_provider(self, currency):
provider = self.config['currencies'][currency].get(
"balance_provider", None)
if provider is not None:
return partial(get_balances, provider)
else:
return None

def get_keyspace_mapping(self, currency, keyspace_type):
if currency is None:
raise BadUserInputException('Missing currency')
Expand Down Expand Up @@ -1761,26 +1770,47 @@ async def add_balance_eth(self, currency, row):
token_currencies = list(token_config.keys())
balance_currencies = [currency.upper()] + token_currencies

if 'address_id_group' not in row:
row['address_id_group'] = \
self.get_id_group(currency, row['address_id'])
query = 'SELECT balance from balance where address_id=%s '\
'and address_id_group=%s ' \
"and currency=%s"
balance_provider = self.get_balance_provider(currency)

results = {
c: one(await self.execute_async(
currency, 'transformed', query,
[row['address_id'], row['address_id_group'], c]))
for c in balance_currencies
}
results = None
if balance_provider is not None:
# load balance from alternative provider.
try:
results = await balance_provider(currency, row["address"],
token_config)
except Exception as e:
self.logger.warning(
f"Could not fetch balances over alternative provider: {e}")

if results is None:
# load results from gs database
if 'address_id_group' not in row:
row['address_id_group'] = \
self.get_id_group(currency, row['address_id'])
query = 'SELECT balance from balance where address_id=%s '\
'and address_id_group=%s ' \
"and currency=%s"

results = {
c: one(await self.execute_async(
currency, 'transformed', query,
[row['address_id'], row['address_id_group'], c]))
for c in balance_currencies
}

if results[currency.upper()] is None:
results[currency.upper()] = {
'balance':
row['total_received'].value - row['total_spent'].value
}
row['balance'] = results[currency.upper()]["balance"]

if currency == "trx":
# our self compute balances can get negative for now
# since we have not implemented all special tx types
# in our logic yet
row['balance'] = max(row['balance'], 0)

# TODO: Some accounts have negative balances, this does not make sense.
# for now we cap with 0 in case of negative
# Exp. for now is that we either lost some token txs somewhere or
Expand Down
24 changes: 19 additions & 5 deletions gsrest/util/eth_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(self, mapping, version):

def __getitem__(self, key):
v = super().__getitem__(key)

return v[self.v]


Expand All @@ -30,12 +31,12 @@ def __getitem__(self, key):
{
"name": "from",
"type": "address",
"indexed": False
"indexed": True
},
{
"name": "to",
"type": "address",
"indexed": False
"indexed": True
},
{
"name": "value",
Expand All @@ -51,12 +52,12 @@ def __getitem__(self, key):
{
"name": "from",
"type": "address",
"indexed": True
"indexed": False
},
{
"name": "to",
"type": "address",
"indexed": True
"indexed": False
},
{
"name": "value",
Expand Down Expand Up @@ -175,7 +176,20 @@ def decode_log(log):
logdef = log_signatures[log["topics"][0]]
for i in range(0, len(logdef)):
try:
result = eth_event.decode_log(log,
# Truncate topics to expected length. Importer bug
# lead to duplicate topics in some instances.
ll = log.copy()
nr_topics = sum(
[1
for x in logdef[i]["inputs"] if x["indexed"] is True]) + 1
if len(ll["topics"]) != nr_topics:
newTopics = ll['topics'][-nr_topics:]
logger.warning(
f"Truncated log topics, {ll['topics']} "
f"to {newTopics}, might an error in your raw data.")
ll["topics"] = newTopics

result = eth_event.decode_log(ll,
VersionedDict(log_signatures, i))
if "data" in result:
result["data"] = {d["name"]: d for d in result["data"]}
Expand Down
81 changes: 81 additions & 0 deletions gsrest/util/node_balances.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import asyncio
import aiohttp
from gsrest.util.evm import strip_0x, eth_address_to_hex
from async_lru import alru_cache


def create_token_request(contract_address: str,
account: str,
block: str = "latest"):
return {
"jsonrpc":
"2.0",
"method":
"eth_call",
"params": [{
"to":
contract_address,
"data":
f"0x70a08231000000000000000000000000{strip_0x(account)}"
}, "latest"],
"id":
1
}


def create_base_request(account: str, block: str = "latest"):
return {
"method": "eth_getBalance",
"params": [account, block],
"id": 1,
"jsonrpc": "2.0"
}


async def send_node_request(node_url: str, payload):
async with aiohttp.ClientSession() as session:
async with session.post(node_url, json=payload) as resp:
return await resp.json()


@alru_cache(maxsize=1000)
async def get_token_balance(node_url: str,
contract_address: str,
account: str,
block: str = "latest"):
resp = await send_node_request(node_url,
payload=create_token_request(
contract_address, account, block))

return int(resp["result"], 16)


@alru_cache(maxsize=1000)
async def get_base_balance(node_url: str, account: str, block: str = "latest"):
resp = await send_node_request(node_url,
payload=create_base_request(account, block))

return int(resp["result"], 16)


async def get_balances(node_url: str,
network: str,
account: str,
tokenConfigs,
block: str = "latest"):
bb = get_base_balance(node_url, eth_address_to_hex(account), block)

tbs = [
get_token_balance(node_url, eth_address_to_hex(tk["token_address"]),
eth_address_to_hex(account), block)
for k, tk in tokenConfigs.items()
]

balances = await asyncio.gather(bb, *tbs)

return {
k.upper(): {
"balance": v
}
for k, v in zip([network, *tokenConfigs.keys()], balances)
}

0 comments on commit 22a7588

Please sign in to comment.