Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 0 additions & 75 deletions app/jobs/revert_block_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ def perform(local_tip_block = nil)
end
benchmark :recalculate_udt_transactions_count, local_tip_block
benchmark :decrease_records_count, local_tip_block
benchmark :update_address_balance_and_ckb_transactions_count, local_tip_block

ApplicationRecord.benchmark "invalid! block" do
local_tip_block.invalid!
Expand All @@ -39,79 +38,6 @@ def perform(local_tip_block = nil)
end
end

def update_address_balance_and_ckb_transactions_count(local_tip_block)
select_fields = [
"address_id",
"COUNT(*) AS cells_count",
"SUM(capacity) AS total_capacity",
"SUM(CASE WHEN type_hash IS NOT NULL OR data_hash IS NOT NULL THEN capacity ELSE 0 END) AS balance_occupied",
]
output_addrs =
local_tip_block.cell_outputs.live.select(select_fields.join(", ")).group(:address_id)
input_addrs =
CellOutput.where(consumed_block_timestamp: local_tip_block.timestamp).select(select_fields.join(", ")).group(:address_id)
out_hash = output_addrs.each_with_object({}) do |row, h|
h[row.address_id] = {
cells_count: row.cells_count.to_i,
total_capacity: row.total_capacity.to_i,
balance_occupied: row.balance_occupied.to_i,
}
end

in_hash = input_addrs.each_with_object({}) do |row, h|
h[row.address_id] = {
cells_count: row.cells_count.to_i,
total_capacity: row.total_capacity.to_i,
balance_occupied: row.balance_occupied.to_i,
}
end

# Merge keys
all_ids = in_hash.keys | out_hash.keys

# 计算差值
address_balance_diff = all_ids.each_with_object({}) do |addr_id, h|
input = in_hash[addr_id] || { cells_count: 0, total_capacity: 0, balance_occupied: 0 }
output = out_hash[addr_id] || { cells_count: 0, total_capacity: 0, balance_occupied: 0 }

h[addr_id] = {
cells_count: input[:cells_count] - output[:cells_count],
total_capacity: input[:total_capacity] - output[:total_capacity],
balance_occupied: input[:balance_occupied] - output[:balance_occupied],
}
end
# Preload dao transaction counts for all addresses in one query
dao_tx_counts = DaoEvent.processed.
where(block_id: local_tip_block.id, address_id: all_ids).
group(:address_id).
distinct.
count(:ckb_transaction_id)
tx_count_diffs = AccountBook.tx_committed.
where(block_number: local_tip_block.number, address_id: all_ids).
group(:address_id).
count

changes =
address_balance_diff.map do |address_id, changes|
tx_count_diff = tx_count_diffs[address_id] || 0
dao_tx_count_diff = dao_tx_counts[address_id] || 0
{ address_id: address_id }.merge(changes).merge({ ckb_transactions_count: tx_count_diff, dao_transactions_count: dao_tx_count_diff })
end

changes.each do |change|
addr = Address.find_by_id(change[:address_id])
if addr
addr.update!(
live_cells_count: addr.live_cells_count + change[:cells_count],
ckb_transactions_count: addr.ckb_transactions_count - change[:ckb_transactions_count],
balance: addr.balance + change[:total_capacity],
balance_occupied: addr.balance_occupied + change[:balance_occupied],
dao_transactions_count: addr.dao_transactions_count - change[:dao_transactions_count]
)
end
end
end

def recalculate_udt_transactions_count(local_tip_block)
udt_ids = local_tip_block.ckb_transactions.map(&:contained_udt_ids).flatten
udt_counts = udt_ids.each_with_object(Hash.new(0)) { |udt_id, counts| counts[udt_id] += 1 }
Expand Down Expand Up @@ -159,7 +85,6 @@ def revert_mining_info(local_tip_block)

def revert_dao_contract_related_operations(local_tip_block)
dao_events = DaoEvent.where(block: local_tip_block).processed
dao_transactions_count = local_tip_block.ckb_transactions.where("tags @> array[?]::varchar[]", ["dao"]).count
dao_contract = DaoContract.default_contract

withdraw_total_deposit = revert_withdraw_from_dao(dao_events)
Expand Down
62 changes: 1 addition & 61 deletions app/models/ckb_sync/new_node_data_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def process_block(node_block, refresh_balance: true)
benchmark :update_or_create_udt_accounts!, local_block
# maybe can be changed to asynchronous update
benchmark :process_dao_events!, local_block
benchmark :update_addresses_info, addrs_changes, local_block, refresh_balance
# benchmark :update_addresses_info, addrs_changes, local_block, refresh_balance
end

async_update_udt_infos(local_block)
Expand Down Expand Up @@ -522,66 +522,6 @@ def update_mining_info(local_block)
CkbUtils.update_current_block_mining_info(local_block)
end

def update_addresses_info(addrs_change, local_block, refresh_balance)
return unless refresh_balance

addrs_change.to_a.each_slice(100) do |batch|

updates = batch.map do |key, value|
[key, { last_updated_block_number: local_block.number,
balance: value[:balance_diff],
balance_occupied: value[:balance_occupied_diff].presence || 0,
ckb_transactions_count: value[:ckb_txs].present? ? value[:ckb_txs].size : 0,
live_cells_count: value[:cells_diff],
dao_transactions_count: value[:dao_txs].present? ? value[:dao_txs].size : 0
}]
end.to_h

case_clauses = { last_updated_block_number: [], balance: [], balance_occupied: [], ckb_transactions_count: [], live_cells_count: [], dao_transactions_count: [] }
ids = []

updates.each do |id, attrs|
ids << id
attrs.each do |column, value|
if column == :last_updated_block_number
case_clauses[column] << "WHEN #{id} THEN '#{ActiveRecord::Base.connection.quote(value)}'"
else
case_clauses[column] << "WHEN #{id} THEN #{column} + #{value}"
end
end
end

set_clauses = case_clauses.map do |column, clauses|
if clauses.any?
" #{column} = CASE id\n #{clauses.join("\n ")}\n ELSE #{column}\n END"
else
nil
end
end.compact.join(",\n")

id_list = ids.join(', ')

sql = <<-SQL
UPDATE addresses
SET
#{set_clauses}
WHERE id IN (#{id_list})
SQL

# puts sql

ActiveRecord::Base.connection.execute(sql)

updated_records = Address.where(id: ids).select(:lock_hash)
updated_records.each do |record|
$redis.pipelined do
Rails.cache.delete_multi(%W(#{record.class.name}/#{record.lock_hash}))
end
end

end
end

def update_block_info!(local_block)
local_block.update!(
total_transaction_fee: local_block.ckb_transactions.sum(:transaction_fee),
Expand Down
44 changes: 44 additions & 0 deletions app/workers/calculate_address_info_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
class CalculateAddressInfoWorker
include Sidekiq::Worker
sidekiq_options queue: "low"

def perform()
key = :last_block_for_update_addresses_info_worker
id = Rails.cache.read(key)
local_tip_block = Block.recent.first

blocks = Block.where("id > ? and id <= ?", id, local_tip_block.id).order(id: :desc).select(:id, :address_ids)
return if blocks.blank?

contained_address_ids = Set.new
blocks.each do |block|
contained_address_ids.merge block.address_ids.map(&:to_i)
end
Rails.cache.write(key, local_tip_block.id)

Address.where(id: contained_address_ids).find_in_batches do |group|
sleep(50) # Make sure it doesn't get too crowded in there!
address_attributes = []

group.each do |addr|
balance, balance_occupied = addr.cal_balance
address_attributes << {
id: addr.id,
balance: balance,
balance_occupied: balance_occupied,
ckb_transactions_count: AccountBook.where(address_id: addr.id).count,
live_cells_count: addr.cell_outputs.live.count,
dao_transactions_count: addr.ckb_dao_transactions.count,
created_at: addr.created_at,
updated_at: Time.current
}

Rails.cache.delete_multi(%W(#{addr.class.name}/#{addr.lock_hash}))
end

if address_attributes.present?
Address.upsert_all(address_attributes)
end
end
end
end
9 changes: 0 additions & 9 deletions test/jobs/revert_block_job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,4 @@ class RevertBlockJobTest < ActiveJob::TestCase
@address.update(balance: 5000 * (10**8), balance_occupied: 3000 * (10**8), live_cells_count: 2, ckb_transactions_count: 2, last_updated_block_number: @parent_block.number,
dao_transactions_count: 2)
end
test "rollback address final_state with parent block" do
RevertBlockJob.new(@parent_block).update_address_balance_and_ckb_transactions_count(@parent_block)

assert_equal @address.reload.live_cells_count, 4
assert_equal @address.reload.ckb_transactions_count, 1
assert_equal @address.reload.dao_transactions_count, 1
assert_equal @address.reload.balance, 6000 * (10**8)
assert_equal @address.reload.balance_occupied, 3000 * (10**8)
end
end
33 changes: 2 additions & 31 deletions test/models/ckb_sync/node_data_processor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -933,9 +933,6 @@ class NodeDataProcessorTest < ActiveSupport::TestCase

VCR.use_cassette("blocks/12", record: :new_episodes) do
new_local_block = node_data_processor.call

assert_equal origin_balance - balance_diff,
new_local_block.contained_addresses.sum(:balance)
end
end

Expand Down Expand Up @@ -963,10 +960,6 @@ class NodeDataProcessorTest < ActiveSupport::TestCase
end
node_block.transactions.first.outputs_data << "0x"
end
new_local_block = node_data_processor.process_block(node_block)

assert_equal origin_balance + new_local_block.cell_outputs.sum(:capacity),
new_local_block.contained_addresses.sum(:balance)
end
end

Expand Down Expand Up @@ -1061,13 +1054,6 @@ class NodeDataProcessorTest < ActiveSupport::TestCase
node_block = CKB::Types::Block.new(uncles: [], proposals: [],
transactions:, header:)
node_data_processor.process_block(node_block)
address1 = Address.find_by(lock_hash: lock1.compute_hash)
address2 = Address.find_by(lock_hash: lock2.compute_hash)
address3 = Address.find_by(lock_hash: lock3.compute_hash)

assert_equal 2, address1.dao_transactions_count
assert_equal 2, address2.dao_transactions_count
assert_equal 0, address3.dao_transactions_count
end

test "#process_block should update abandoned block's contained address's live cells count" do
Expand All @@ -1077,9 +1063,6 @@ class NodeDataProcessorTest < ActiveSupport::TestCase
local_block.update(block_hash: "0x419c632366c8eb9635acbb39ea085f7552ae62e1fdd480893375334a0f37d1bx")
VCR.use_cassette("blocks/12", record: :new_episodes) do
new_local_block = node_data_processor.call

assert_equal origin_live_cells_count - 1,
new_local_block.contained_addresses.sum(:live_cells_count)
end
end

Expand Down Expand Up @@ -1517,28 +1500,18 @@ class NodeDataProcessorTest < ActiveSupport::TestCase
local_block.update(block_hash: "0x419c632366c8eb9635acbb39ea085f7552ae62e1fdd480893375334a0f37d1bx")
VCR.use_cassette("blocks/#{DEFAULT_NODE_BLOCK_NUMBER}",
record: :new_episodes) do
assert_difference -> {
local_block.contained_addresses.map(&:ckb_transactions_count).flatten.sum
}, -1 do
node_data_processor.call
end
node_data_processor.call
end
end

test "should update abandoned block's contained address's balance" do
prepare_node_data(19)
local_block = Block.find_by(number: 19)
local_block.update(block_hash: "0x419c632366c8eb9635acbb39ea085f7552ae62e1fdd480893375334a0f37d1bx")
ckb_transaction_ids = local_block.ckb_transactions.pluck(:id)
balance_diff = CellOutput.where(ckb_transaction_id: ckb_transaction_ids).sum(:capacity)

VCR.use_cassette("blocks/#{DEFAULT_NODE_BLOCK_NUMBER}",
record: :new_episodes) do
assert_difference -> {
local_block.contained_addresses.sum(:balance)
}, -balance_diff do
node_data_processor.call
end
node_data_processor.call
end
end

Expand Down Expand Up @@ -2106,8 +2079,6 @@ class NodeDataProcessorTest < ActiveSupport::TestCase

assert_equal 0,
address.udt_accounts.find_by(type_hash: udt_type_script.compute_hash).amount
assert_equal 0, address.reload.balance_occupied
assert_equal 150 * (10**8), output_address.reload.balance_occupied
end
end

Expand Down