Skip to content

Commit 31c0362

Browse files
authored
move address info calculate to scheduler (#2888)
1 parent 0c75a2b commit 31c0362

File tree

5 files changed

+47
-176
lines changed

5 files changed

+47
-176
lines changed

app/jobs/revert_block_job.rb

Lines changed: 0 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ def perform(local_tip_block = nil)
2222
end
2323
benchmark :recalculate_udt_transactions_count, local_tip_block
2424
benchmark :decrease_records_count, local_tip_block
25-
benchmark :update_address_balance_and_ckb_transactions_count, local_tip_block
2625

2726
ApplicationRecord.benchmark "invalid! block" do
2827
local_tip_block.invalid!
@@ -39,79 +38,6 @@ def perform(local_tip_block = nil)
3938
end
4039
end
4140

42-
def update_address_balance_and_ckb_transactions_count(local_tip_block)
43-
select_fields = [
44-
"address_id",
45-
"COUNT(*) AS cells_count",
46-
"SUM(capacity) AS total_capacity",
47-
"SUM(CASE WHEN type_hash IS NOT NULL OR data_hash IS NOT NULL THEN capacity ELSE 0 END) AS balance_occupied",
48-
]
49-
output_addrs =
50-
local_tip_block.cell_outputs.live.select(select_fields.join(", ")).group(:address_id)
51-
input_addrs =
52-
CellOutput.where(consumed_block_timestamp: local_tip_block.timestamp).select(select_fields.join(", ")).group(:address_id)
53-
out_hash = output_addrs.each_with_object({}) do |row, h|
54-
h[row.address_id] = {
55-
cells_count: row.cells_count.to_i,
56-
total_capacity: row.total_capacity.to_i,
57-
balance_occupied: row.balance_occupied.to_i,
58-
}
59-
end
60-
61-
in_hash = input_addrs.each_with_object({}) do |row, h|
62-
h[row.address_id] = {
63-
cells_count: row.cells_count.to_i,
64-
total_capacity: row.total_capacity.to_i,
65-
balance_occupied: row.balance_occupied.to_i,
66-
}
67-
end
68-
69-
# Merge keys
70-
all_ids = in_hash.keys | out_hash.keys
71-
72-
# 计算差值
73-
address_balance_diff = all_ids.each_with_object({}) do |addr_id, h|
74-
input = in_hash[addr_id] || { cells_count: 0, total_capacity: 0, balance_occupied: 0 }
75-
output = out_hash[addr_id] || { cells_count: 0, total_capacity: 0, balance_occupied: 0 }
76-
77-
h[addr_id] = {
78-
cells_count: input[:cells_count] - output[:cells_count],
79-
total_capacity: input[:total_capacity] - output[:total_capacity],
80-
balance_occupied: input[:balance_occupied] - output[:balance_occupied],
81-
}
82-
end
83-
# Preload dao transaction counts for all addresses in one query
84-
dao_tx_counts = DaoEvent.processed.
85-
where(block_id: local_tip_block.id, address_id: all_ids).
86-
group(:address_id).
87-
distinct.
88-
count(:ckb_transaction_id)
89-
tx_count_diffs = AccountBook.tx_committed.
90-
where(block_number: local_tip_block.number, address_id: all_ids).
91-
group(:address_id).
92-
count
93-
94-
changes =
95-
address_balance_diff.map do |address_id, changes|
96-
tx_count_diff = tx_count_diffs[address_id] || 0
97-
dao_tx_count_diff = dao_tx_counts[address_id] || 0
98-
{ address_id: address_id }.merge(changes).merge({ ckb_transactions_count: tx_count_diff, dao_transactions_count: dao_tx_count_diff })
99-
end
100-
101-
changes.each do |change|
102-
addr = Address.find_by_id(change[:address_id])
103-
if addr
104-
addr.update!(
105-
live_cells_count: addr.live_cells_count + change[:cells_count],
106-
ckb_transactions_count: addr.ckb_transactions_count - change[:ckb_transactions_count],
107-
balance: addr.balance + change[:total_capacity],
108-
balance_occupied: addr.balance_occupied + change[:balance_occupied],
109-
dao_transactions_count: addr.dao_transactions_count - change[:dao_transactions_count]
110-
)
111-
end
112-
end
113-
end
114-
11541
def recalculate_udt_transactions_count(local_tip_block)
11642
udt_ids = local_tip_block.ckb_transactions.map(&:contained_udt_ids).flatten
11743
udt_counts = udt_ids.each_with_object(Hash.new(0)) { |udt_id, counts| counts[udt_id] += 1 }
@@ -159,7 +85,6 @@ def revert_mining_info(local_tip_block)
15985

16086
def revert_dao_contract_related_operations(local_tip_block)
16187
dao_events = DaoEvent.where(block: local_tip_block).processed
162-
dao_transactions_count = local_tip_block.ckb_transactions.where("tags @> array[?]::varchar[]", ["dao"]).count
16388
dao_contract = DaoContract.default_contract
16489

16590
withdraw_total_deposit = revert_withdraw_from_dao(dao_events)

app/models/ckb_sync/new_node_data_processor.rb

Lines changed: 1 addition & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def process_block(node_block, refresh_balance: true)
9191
benchmark :update_or_create_udt_accounts!, local_block
9292
# maybe can be changed to asynchronous update
9393
benchmark :process_dao_events!, local_block
94-
benchmark :update_addresses_info, addrs_changes, local_block, refresh_balance
94+
# benchmark :update_addresses_info, addrs_changes, local_block, refresh_balance
9595
end
9696

9797
async_update_udt_infos(local_block)
@@ -522,66 +522,6 @@ def update_mining_info(local_block)
522522
CkbUtils.update_current_block_mining_info(local_block)
523523
end
524524

525-
def update_addresses_info(addrs_change, local_block, refresh_balance)
526-
return unless refresh_balance
527-
528-
addrs_change.to_a.each_slice(100) do |batch|
529-
530-
updates = batch.map do |key, value|
531-
[key, { last_updated_block_number: local_block.number,
532-
balance: value[:balance_diff],
533-
balance_occupied: value[:balance_occupied_diff].presence || 0,
534-
ckb_transactions_count: value[:ckb_txs].present? ? value[:ckb_txs].size : 0,
535-
live_cells_count: value[:cells_diff],
536-
dao_transactions_count: value[:dao_txs].present? ? value[:dao_txs].size : 0
537-
}]
538-
end.to_h
539-
540-
case_clauses = { last_updated_block_number: [], balance: [], balance_occupied: [], ckb_transactions_count: [], live_cells_count: [], dao_transactions_count: [] }
541-
ids = []
542-
543-
updates.each do |id, attrs|
544-
ids << id
545-
attrs.each do |column, value|
546-
if column == :last_updated_block_number
547-
case_clauses[column] << "WHEN #{id} THEN '#{ActiveRecord::Base.connection.quote(value)}'"
548-
else
549-
case_clauses[column] << "WHEN #{id} THEN #{column} + #{value}"
550-
end
551-
end
552-
end
553-
554-
set_clauses = case_clauses.map do |column, clauses|
555-
if clauses.any?
556-
" #{column} = CASE id\n #{clauses.join("\n ")}\n ELSE #{column}\n END"
557-
else
558-
nil
559-
end
560-
end.compact.join(",\n")
561-
562-
id_list = ids.join(', ')
563-
564-
sql = <<-SQL
565-
UPDATE addresses
566-
SET
567-
#{set_clauses}
568-
WHERE id IN (#{id_list})
569-
SQL
570-
571-
# puts sql
572-
573-
ActiveRecord::Base.connection.execute(sql)
574-
575-
updated_records = Address.where(id: ids).select(:lock_hash)
576-
updated_records.each do |record|
577-
$redis.pipelined do
578-
Rails.cache.delete_multi(%W(#{record.class.name}/#{record.lock_hash}))
579-
end
580-
end
581-
582-
end
583-
end
584-
585525
def update_block_info!(local_block)
586526
local_block.update!(
587527
total_transaction_fee: local_block.ckb_transactions.sum(:transaction_fee),
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
class CalculateAddressInfoWorker
2+
include Sidekiq::Worker
3+
sidekiq_options queue: "low"
4+
5+
def perform()
6+
key = :last_block_for_update_addresses_info_worker
7+
id = Rails.cache.read(key)
8+
local_tip_block = Block.recent.first
9+
10+
blocks = Block.where("id > ? and id <= ?", id, local_tip_block.id).order(id: :desc).select(:id, :address_ids)
11+
return if blocks.blank?
12+
13+
contained_address_ids = Set.new
14+
blocks.each do |block|
15+
contained_address_ids.merge block.address_ids.map(&:to_i)
16+
end
17+
Rails.cache.write(key, local_tip_block.id)
18+
19+
Address.where(id: contained_address_ids).find_in_batches do |group|
20+
sleep(50) # Make sure it doesn't get too crowded in there!
21+
address_attributes = []
22+
23+
group.each do |addr|
24+
balance, balance_occupied = addr.cal_balance
25+
address_attributes << {
26+
id: addr.id,
27+
balance: balance,
28+
balance_occupied: balance_occupied,
29+
ckb_transactions_count: AccountBook.where(address_id: addr.id).count,
30+
live_cells_count: addr.cell_outputs.live.count,
31+
dao_transactions_count: addr.ckb_dao_transactions.count,
32+
created_at: addr.created_at,
33+
updated_at: Time.current
34+
}
35+
36+
Rails.cache.delete_multi(%W(#{addr.class.name}/#{addr.lock_hash}))
37+
end
38+
39+
if address_attributes.present?
40+
Address.upsert_all(address_attributes)
41+
end
42+
end
43+
end
44+
end

test/jobs/revert_block_job_test.rb

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,4 @@ class RevertBlockJobTest < ActiveJob::TestCase
3434
@address.update(balance: 5000 * (10**8), balance_occupied: 3000 * (10**8), live_cells_count: 2, ckb_transactions_count: 2, last_updated_block_number: @parent_block.number,
3535
dao_transactions_count: 2)
3636
end
37-
test "rollback address final_state with parent block" do
38-
RevertBlockJob.new(@parent_block).update_address_balance_and_ckb_transactions_count(@parent_block)
39-
40-
assert_equal @address.reload.live_cells_count, 4
41-
assert_equal @address.reload.ckb_transactions_count, 1
42-
assert_equal @address.reload.dao_transactions_count, 1
43-
assert_equal @address.reload.balance, 6000 * (10**8)
44-
assert_equal @address.reload.balance_occupied, 3000 * (10**8)
45-
end
4637
end

test/models/ckb_sync/node_data_processor_test.rb

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -933,9 +933,6 @@ class NodeDataProcessorTest < ActiveSupport::TestCase
933933

934934
VCR.use_cassette("blocks/12", record: :new_episodes) do
935935
new_local_block = node_data_processor.call
936-
937-
assert_equal origin_balance - balance_diff,
938-
new_local_block.contained_addresses.sum(:balance)
939936
end
940937
end
941938

@@ -963,10 +960,6 @@ class NodeDataProcessorTest < ActiveSupport::TestCase
963960
end
964961
node_block.transactions.first.outputs_data << "0x"
965962
end
966-
new_local_block = node_data_processor.process_block(node_block)
967-
968-
assert_equal origin_balance + new_local_block.cell_outputs.sum(:capacity),
969-
new_local_block.contained_addresses.sum(:balance)
970963
end
971964
end
972965

@@ -1061,13 +1054,6 @@ class NodeDataProcessorTest < ActiveSupport::TestCase
10611054
node_block = CKB::Types::Block.new(uncles: [], proposals: [],
10621055
transactions:, header:)
10631056
node_data_processor.process_block(node_block)
1064-
address1 = Address.find_by(lock_hash: lock1.compute_hash)
1065-
address2 = Address.find_by(lock_hash: lock2.compute_hash)
1066-
address3 = Address.find_by(lock_hash: lock3.compute_hash)
1067-
1068-
assert_equal 2, address1.dao_transactions_count
1069-
assert_equal 2, address2.dao_transactions_count
1070-
assert_equal 0, address3.dao_transactions_count
10711057
end
10721058

10731059
test "#process_block should update abandoned block's contained address's live cells count" do
@@ -1077,9 +1063,6 @@ class NodeDataProcessorTest < ActiveSupport::TestCase
10771063
local_block.update(block_hash: "0x419c632366c8eb9635acbb39ea085f7552ae62e1fdd480893375334a0f37d1bx")
10781064
VCR.use_cassette("blocks/12", record: :new_episodes) do
10791065
new_local_block = node_data_processor.call
1080-
1081-
assert_equal origin_live_cells_count - 1,
1082-
new_local_block.contained_addresses.sum(:live_cells_count)
10831066
end
10841067
end
10851068

@@ -1517,28 +1500,18 @@ class NodeDataProcessorTest < ActiveSupport::TestCase
15171500
local_block.update(block_hash: "0x419c632366c8eb9635acbb39ea085f7552ae62e1fdd480893375334a0f37d1bx")
15181501
VCR.use_cassette("blocks/#{DEFAULT_NODE_BLOCK_NUMBER}",
15191502
record: :new_episodes) do
1520-
assert_difference -> {
1521-
local_block.contained_addresses.map(&:ckb_transactions_count).flatten.sum
1522-
}, -1 do
1523-
node_data_processor.call
1524-
end
1503+
node_data_processor.call
15251504
end
15261505
end
15271506

15281507
test "should update abandoned block's contained address's balance" do
15291508
prepare_node_data(19)
15301509
local_block = Block.find_by(number: 19)
15311510
local_block.update(block_hash: "0x419c632366c8eb9635acbb39ea085f7552ae62e1fdd480893375334a0f37d1bx")
1532-
ckb_transaction_ids = local_block.ckb_transactions.pluck(:id)
1533-
balance_diff = CellOutput.where(ckb_transaction_id: ckb_transaction_ids).sum(:capacity)
15341511

15351512
VCR.use_cassette("blocks/#{DEFAULT_NODE_BLOCK_NUMBER}",
15361513
record: :new_episodes) do
1537-
assert_difference -> {
1538-
local_block.contained_addresses.sum(:balance)
1539-
}, -balance_diff do
1540-
node_data_processor.call
1541-
end
1514+
node_data_processor.call
15421515
end
15431516
end
15441517

@@ -2106,8 +2079,6 @@ class NodeDataProcessorTest < ActiveSupport::TestCase
21062079

21072080
assert_equal 0,
21082081
address.udt_accounts.find_by(type_hash: udt_type_script.compute_hash).amount
2109-
assert_equal 0, address.reload.balance_occupied
2110-
assert_equal 150 * (10**8), output_address.reload.balance_occupied
21112082
end
21122083
end
21132084

0 commit comments

Comments
 (0)