Skip to content

Commit b1d3cf5

Browse files
authored
Merge pull request #60 from skalenetwork/update-credit-distributor
Update credit distributor - multiple source chains and custom value
2 parents 383802c + 90a038a commit b1d3cf5

6 files changed

Lines changed: 120 additions & 95 deletions

File tree

credit-distributor/README.md

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# credit-distributor
22

3-
`credit-distributor` is a service that listens for `PaymentReceived` events on the mainnet `CreditStation` contract and fulfills the corresponding payments on the schain `Ledger` contract.
3+
`credit-distributor` watches `PaymentReceived` events on one or more `CreditStation` deployments (the **sources**) and fulfills each payment on a single **destination** schain `Ledger`.
4+
5+
Each source is an independent `CreditStation` deployment on its own chain, initialized with a unique on-chain `sourceId` (set via `setPaymentIdOffset`) so payment IDs never collide. The distributor reads from every source sequentially each cycle and forwards `event.value` (wei) as the native amount to the purchaser on the destination schain.
46

57
### Usage
68

@@ -10,17 +12,21 @@ Configuration is loaded from a local TOML file.
1012

1113
1. Create your config:
1214

13-
```bash
14-
cp config.toml.example config.toml
15-
```
15+
```bash
16+
cp config.toml.example config.toml
17+
```
1618

17-
2. Edit `config.toml` for your environment.
19+
2. Edit `config.toml`:
1820

19-
See `config.toml.example` for the full list of required/optional settings and their meaning.
21+
* `[general]` — destination `schain_name` and the distributor's `eth_private_key` (used to call `Ledger.fulfill` on the destination schain).
22+
* `[destination]` — destination schain `endpoint` and `Ledger` `contract` address.
23+
* `[[sources]]` — one entry per `CreditStation` deployment. Repeat the block for each source chain. Each needs a unique `name` (used as a state-file key), the source-chain `endpoint`, the `CreditStation` `contract` address, and an initial `from_block` to scan from.
2024

21-
#### Running the service
25+
See `config.toml.example` for a complete annotated sample with two sources.
26+
27+
The distributor maintains per-source progress in `state.json` (path configurable via `[general].state_file`). Adding a new source later just appends a `[[sources]]` block and starts at its configured `from_block`; existing sources keep their stored progress.
2228

23-
Run the service:
29+
#### Running the service
2430

2531
```bash
2632
docker compose up -d --build
Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,28 @@
11
[general]
22
schain_name = 'test' # required
33
eth_private_key = '0x0' # required
4-
from_block = 1234 # required
4+
state_file = 'state.json' # optional
55

6-
[contracts]
7-
mainnet = '0x0' # required
8-
schain = '0x0' # required
6+
[destination]
7+
endpoint = 'https://example.com' # required - destination schain RPC
8+
contract = '0x0' # required - Ledger contract address on destination schain
99

10-
[endpoints]
11-
mainnet = 'https://example.com' # required
12-
schain = 'https://example.com' # required
10+
# One [[sources]] entry per CreditStation deployment to monitor.
11+
# Each deployment must have been initialized with a unique on-chain `sourceId`
12+
# (see CreditStation.setPaymentIdOffset). The `name` is an off-chain label
13+
# used as a key in the state file and in logs.
14+
[[sources]]
15+
name = 'ethereum' # required - unique label
16+
endpoint = 'https://example.com' # required - source chain RPC
17+
contract = '0x0' # required - CreditStation contract address
18+
from_block = 1234 # required - initial block to start scanning from
19+
20+
[[sources]]
21+
name = 'base'
22+
endpoint = 'https://example.com'
23+
contract = '0x0'
24+
from_block = 5678
1325

1426
[agent] # optional
1527
loop_sleep = 120 # optional
16-
exception_sleep = 10 # optional
17-
18-
[payment] # optional
19-
value_eth = 1 # optional
20-
value_wei = 10000 # optional
28+
exception_sleep = 10 # optional

credit-distributor/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
name = "credit-distributor"
33
version = "0.0.1"
44
requires-python = ">=3.13"
5-
dependencies = ["skale.py==7.9dev0", "pydantic-settings==2.12.0"]
5+
dependencies = ["skale.py==7.14dev0", "pydantic-settings==2.12.0"]
66

77
[project.optional-dependencies]
88
dev = ["ruff==0.14.5", "mypy==1.7.1"]

credit-distributor/src/configs.py

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,33 +29,31 @@
2929
SettingsConfigDict,
3030
TomlConfigSettingsSource,
3131
)
32-
from skale.types.schain import SchainName
32+
from skale_core.types import SchainName
3333

3434
CONFIG_FILEPATH = os.path.join(os.path.dirname(__file__), os.pardir, 'config.toml')
3535

3636

37-
class Endpoints(BaseModel):
38-
mainnet: str
39-
schain: str
37+
class Source(BaseModel):
38+
name: str
39+
endpoint: str
40+
contract: str
41+
from_block: int
42+
source_id: int | None = None
4043

4144

42-
class Contracts(BaseModel):
43-
mainnet: str
44-
schain: str
45+
class Destination(BaseModel):
46+
endpoint: str
47+
contract: str
4548

4649

4750
class Agent(BaseModel):
4851
loop_sleep: int = 120
4952
exception_sleep: int = 10
5053

5154

52-
class Payment(BaseModel):
53-
value_eth: int = 1
54-
55-
5655
class General(BaseModel):
5756
schain_name: SchainName
58-
from_block: int
5957
eth_private_key: HexStr
6058
state_file: str = 'state.json'
6159

@@ -64,10 +62,9 @@ class Config(BaseSettings):
6462
model_config = SettingsConfigDict(toml_file=CONFIG_FILEPATH)
6563

6664
general: General
67-
endpoints: Endpoints
68-
contracts: Contracts
65+
destination: Destination
66+
sources: list[Source]
6967
agent: Agent = Agent()
70-
payment: Payment = Payment()
7168

7269
@classmethod
7370
def settings_customise_sources(

credit-distributor/src/main.py

Lines changed: 60 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -26,73 +26,94 @@
2626
from skale.utils.web3_utils import init_web3
2727
from skale.wallets import Web3Wallet
2828

29-
from src.configs import Config, get_config
29+
from src.configs import Config, Source, get_config
3030
from src.state import State, StateManager
3131

3232
logger = logging.getLogger(__name__)
3333

34+
SOURCE_ID_SHIFT = 248
35+
36+
37+
def extract_source_id(payment_id: int) -> int:
38+
return payment_id >> SOURCE_ID_SHIFT
39+
40+
41+
def resolve_source_id(mainnet_cs: MainnetCreditStation) -> int:
42+
last_payment_id = mainnet_cs.credit_station.contract.functions.getLastPaymentId().call()
43+
return extract_source_id(last_payment_id)
44+
3445

3546
def run_distributor() -> None:
3647
config = get_config()
3748
state_manager = StateManager(state_file=config.general.state_file)
38-
state = state_manager.load(config.general.from_block)
49+
state = state_manager.load({s.name: s.from_block for s in config.sources})
3950

40-
schain_web3 = init_web3(config.endpoints.schain)
51+
schain_web3 = init_web3(config.destination.endpoint)
4152
schain_wallet = Web3Wallet(config.general.eth_private_key, schain_web3)
42-
mainnet_cs = MainnetCreditStation(config.endpoints.mainnet, config.contracts.mainnet)
43-
schain_cs = SchainCreditStation(config.endpoints.schain, config.contracts.schain, schain_wallet)
53+
schain_cs = SchainCreditStation(
54+
config.destination.endpoint, config.destination.contract, schain_wallet
55+
)
56+
source_clients = {
57+
source.name: MainnetCreditStation(source.endpoint, source.contract)
58+
for source in config.sources
59+
}
60+
61+
for source in config.sources:
62+
source.source_id = resolve_source_id(source_clients[source.name])
63+
logger.info(f'[{source.name}] Resolved on-chain source_id={source.source_id}')
64+
4465
while True:
45-
try:
46-
logging.info('Starting credit distribution cycle')
47-
state = distribute_credits(mainnet_cs, schain_cs, config, state)
48-
state_manager.save(state)
49-
logger.info(f'Sleeping for {config.agent.loop_sleep} seconds before next cycle')
50-
sleep(config.agent.loop_sleep)
51-
except Exception as e:
52-
logging.exception(f'Error during credit distribution cycle: {e}')
53-
logger.info(f'Sleeping for {config.agent.exception_sleep} seconds before retrying')
54-
sleep(config.agent.exception_sleep)
55-
56-
57-
def distribute_credits(
66+
logger.info('Starting credit distribution cycle')
67+
for source in config.sources:
68+
try:
69+
state = distribute_credits_for_source(
70+
source, source_clients[source.name], schain_cs, config, state
71+
)
72+
state_manager.save(state)
73+
except Exception as e:
74+
logger.exception(
75+
f'Error processing source {source.name!r}: {e}; continuing with next'
76+
)
77+
logger.info(f'Sleeping for {config.agent.loop_sleep} seconds before next cycle')
78+
sleep(config.agent.loop_sleep)
79+
80+
81+
def distribute_credits_for_source(
82+
source: Source,
5883
mainnet_cs: MainnetCreditStation,
5984
schain_cs: SchainCreditStation,
6085
config: Config,
6186
state: State,
6287
) -> State:
88+
from_block = state.from_blocks[source.name]
89+
logger.info(f'[{source.name}] Fetching events from block {from_block}')
6390
all_events = mainnet_cs.credit_station.get_payment_received_events(
64-
from_block=state.from_block, schain_name=config.general.schain_name
91+
from_block=from_block, schain_name=config.general.schain_name
6592
)
66-
last_block = state.from_block
6793
for event in all_events:
68-
fulfill_payment(event, schain_cs, config)
94+
fulfill_payment(source, event, schain_cs)
6995

70-
last_block_with_event = 0
71-
if len(all_events) != 0:
72-
last_block_with_event = all_events[-1]['block_number']
96+
if all_events:
97+
state.from_blocks[source.name] = all_events[-1]['block_number'] + 1
7398
else:
74-
logger.info('No new PaymentReceived events found.')
75-
state.from_block = max(last_block, last_block_with_event + 1)
99+
logger.info(f'[{source.name}] No new PaymentReceived events found.')
76100
return state
77101

78102

79-
def get_payment_wei_value(config: Config) -> int:
80-
return config.payment.value_eth * 10**18
81-
82-
83103
def fulfill_payment(
84-
event: PaymentReceivedEvent, schain_cs: SchainCreditStation, config: Config
104+
source: Source,
105+
event: PaymentReceivedEvent,
106+
schain_cs: SchainCreditStation,
85107
) -> None:
86-
logger.info(f'Checking payment: {event["payment_id"]}')
87-
is_fulfilled = schain_cs.ledger.is_fulfilled(event['payment_id'])
108+
payment_id = event['payment_id']
109+
logger.info(f'[{source.name}] Checking payment: {payment_id}')
110+
is_fulfilled = schain_cs.ledger.is_fulfilled(payment_id)
88111
if not is_fulfilled:
89-
logger.info(f'Fulfilling payment: {event["payment_id"]}')
90-
schain_cs.ledger.fulfill(
91-
event['payment_id'], event['to_address'], value=get_payment_wei_value(config)
92-
)
93-
logger.info(f'Payment {event["payment_id"]} fulfilled successfully.')
112+
logger.info(f'[{source.name}] Fulfilling payment: {payment_id}')
113+
schain_cs.ledger.fulfill(payment_id, event['to_address'], value=event['value'])
114+
logger.info(f'[{source.name}] Payment {payment_id} fulfilled successfully.')
94115
else:
95-
logger.debug(f'Payment {event["payment_id"]} is already fulfilled.')
116+
logger.debug(f'[{source.name}] Payment {payment_id} is already fulfilled.')
96117

97118

98119
if __name__ == '__main__':

credit-distributor/src/state.py

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
# You should have received a copy of the GNU Affero General Public License
1818
# along with this program. If not, see <https://www.gnu.org/licenses/>.
1919

20-
import json
2120
import logging
2221
from pathlib import Path
2322

@@ -27,30 +26,24 @@
2726

2827

2928
class State(BaseModel):
30-
from_block: int
29+
from_blocks: dict[str, int]
3130

3231

3332
class StateManager:
3433
def __init__(self, state_file: str | Path):
35-
self.state_file = Path(state_file) if isinstance(state_file, str) else state_file
34+
self.state_file = Path(state_file)
3635

37-
def load(self, initial_from_block: int) -> State:
36+
def load(self, initial_from_blocks: dict[str, int]) -> State:
37+
from_blocks = dict(initial_from_blocks)
3838
if self.state_file.exists():
39-
try:
40-
data = json.loads(self.state_file.read_text())
41-
logger.info(f'Loaded state from {self.state_file}')
42-
return State(**data)
43-
except Exception as e:
44-
logger.warning(
45-
f'Failed to load state from {self.state_file}: {e}. Using initial value.'
46-
)
47-
logger.info(f'Creating new state with from_block={initial_from_block}')
48-
return State(from_block=initial_from_block)
39+
stored = State.model_validate_json(self.state_file.read_text()).from_blocks
40+
logger.info(f'Loaded state from {self.state_file}')
41+
for name in from_blocks:
42+
if name in stored:
43+
from_blocks[name] = stored[name]
44+
else:
45+
logger.info(f'Creating new state with from_blocks={from_blocks}')
46+
return State(from_blocks=from_blocks)
4947

5048
def save(self, state: State) -> None:
51-
try:
52-
self.state_file.write_text(state.model_dump_json(indent=2))
53-
logger.debug(f'Saved state to {self.state_file}')
54-
except Exception as e:
55-
logger.error(f'Failed to save state to {self.state_file}: {e}')
56-
raise
49+
self.state_file.write_text(state.model_dump_json(indent=2))

0 commit comments

Comments
 (0)