diff --git a/app/jobs/revert_block_job.rb b/app/jobs/revert_block_job.rb index a52657353..0eb649ea9 100644 --- a/app/jobs/revert_block_job.rb +++ b/app/jobs/revert_block_job.rb @@ -22,7 +22,6 @@ def perform(local_tip_block = nil) end benchmark :recalculate_udt_transactions_count, local_tip_block benchmark :decrease_records_count, local_tip_block - benchmark :update_address_balance_and_ckb_transactions_count, local_tip_block ApplicationRecord.benchmark "invalid! block" do local_tip_block.invalid! @@ -39,79 +38,6 @@ def perform(local_tip_block = nil) end end - def update_address_balance_and_ckb_transactions_count(local_tip_block) - select_fields = [ - "address_id", - "COUNT(*) AS cells_count", - "SUM(capacity) AS total_capacity", - "SUM(CASE WHEN type_hash IS NOT NULL OR data_hash IS NOT NULL THEN capacity ELSE 0 END) AS balance_occupied", - ] - output_addrs = - local_tip_block.cell_outputs.live.select(select_fields.join(", ")).group(:address_id) - input_addrs = - CellOutput.where(consumed_block_timestamp: local_tip_block.timestamp).select(select_fields.join(", ")).group(:address_id) - out_hash = output_addrs.each_with_object({}) do |row, h| - h[row.address_id] = { - cells_count: row.cells_count.to_i, - total_capacity: row.total_capacity.to_i, - balance_occupied: row.balance_occupied.to_i, - } - end - - in_hash = input_addrs.each_with_object({}) do |row, h| - h[row.address_id] = { - cells_count: row.cells_count.to_i, - total_capacity: row.total_capacity.to_i, - balance_occupied: row.balance_occupied.to_i, - } - end - - # Merge keys - all_ids = in_hash.keys | out_hash.keys - - # 计算差值 - address_balance_diff = all_ids.each_with_object({}) do |addr_id, h| - input = in_hash[addr_id] || { cells_count: 0, total_capacity: 0, balance_occupied: 0 } - output = out_hash[addr_id] || { cells_count: 0, total_capacity: 0, balance_occupied: 0 } - - h[addr_id] = { - cells_count: input[:cells_count] - output[:cells_count], - total_capacity: input[:total_capacity] - output[:total_capacity], - balance_occupied: input[:balance_occupied] - output[:balance_occupied], - } - end - # Preload dao transaction counts for all addresses in one query - dao_tx_counts = DaoEvent.processed. - where(block_id: local_tip_block.id, address_id: all_ids). - group(:address_id). - distinct. - count(:ckb_transaction_id) - tx_count_diffs = AccountBook.tx_committed. - where(block_number: local_tip_block.number, address_id: all_ids). - group(:address_id). - count - - changes = - address_balance_diff.map do |address_id, changes| - tx_count_diff = tx_count_diffs[address_id] || 0 - dao_tx_count_diff = dao_tx_counts[address_id] || 0 - { address_id: address_id }.merge(changes).merge({ ckb_transactions_count: tx_count_diff, dao_transactions_count: dao_tx_count_diff }) - end - - changes.each do |change| - addr = Address.find_by_id(change[:address_id]) - if addr - addr.update!( - live_cells_count: addr.live_cells_count + change[:cells_count], - ckb_transactions_count: addr.ckb_transactions_count - change[:ckb_transactions_count], - balance: addr.balance + change[:total_capacity], - balance_occupied: addr.balance_occupied + change[:balance_occupied], - dao_transactions_count: addr.dao_transactions_count - change[:dao_transactions_count] - ) - end - end - end - def recalculate_udt_transactions_count(local_tip_block) udt_ids = local_tip_block.ckb_transactions.map(&:contained_udt_ids).flatten udt_counts = udt_ids.each_with_object(Hash.new(0)) { |udt_id, counts| counts[udt_id] += 1 } @@ -159,7 +85,6 @@ def revert_mining_info(local_tip_block) def revert_dao_contract_related_operations(local_tip_block) dao_events = DaoEvent.where(block: local_tip_block).processed - dao_transactions_count = local_tip_block.ckb_transactions.where("tags @> array[?]::varchar[]", ["dao"]).count dao_contract = DaoContract.default_contract withdraw_total_deposit = revert_withdraw_from_dao(dao_events) diff --git a/app/models/ckb_sync/new_node_data_processor.rb b/app/models/ckb_sync/new_node_data_processor.rb index 051162fc2..01218dbe1 100644 --- a/app/models/ckb_sync/new_node_data_processor.rb +++ b/app/models/ckb_sync/new_node_data_processor.rb @@ -91,7 +91,7 @@ def process_block(node_block, refresh_balance: true) benchmark :update_or_create_udt_accounts!, local_block # maybe can be changed to asynchronous update benchmark :process_dao_events!, local_block - benchmark :update_addresses_info, addrs_changes, local_block, refresh_balance + # benchmark :update_addresses_info, addrs_changes, local_block, refresh_balance end async_update_udt_infos(local_block) @@ -522,66 +522,6 @@ def update_mining_info(local_block) CkbUtils.update_current_block_mining_info(local_block) end - def update_addresses_info(addrs_change, local_block, refresh_balance) - return unless refresh_balance - - addrs_change.to_a.each_slice(100) do |batch| - - updates = batch.map do |key, value| - [key, { last_updated_block_number: local_block.number, - balance: value[:balance_diff], - balance_occupied: value[:balance_occupied_diff].presence || 0, - ckb_transactions_count: value[:ckb_txs].present? ? value[:ckb_txs].size : 0, - live_cells_count: value[:cells_diff], - dao_transactions_count: value[:dao_txs].present? ? value[:dao_txs].size : 0 - }] - end.to_h - - case_clauses = { last_updated_block_number: [], balance: [], balance_occupied: [], ckb_transactions_count: [], live_cells_count: [], dao_transactions_count: [] } - ids = [] - - updates.each do |id, attrs| - ids << id - attrs.each do |column, value| - if column == :last_updated_block_number - case_clauses[column] << "WHEN #{id} THEN '#{ActiveRecord::Base.connection.quote(value)}'" - else - case_clauses[column] << "WHEN #{id} THEN #{column} + #{value}" - end - end - end - - set_clauses = case_clauses.map do |column, clauses| - if clauses.any? - " #{column} = CASE id\n #{clauses.join("\n ")}\n ELSE #{column}\n END" - else - nil - end - end.compact.join(",\n") - - id_list = ids.join(', ') - - sql = <<-SQL - UPDATE addresses - SET - #{set_clauses} - WHERE id IN (#{id_list}) - SQL - - # puts sql - - ActiveRecord::Base.connection.execute(sql) - - updated_records = Address.where(id: ids).select(:lock_hash) - updated_records.each do |record| - $redis.pipelined do - Rails.cache.delete_multi(%W(#{record.class.name}/#{record.lock_hash})) - end - end - - end - end - def update_block_info!(local_block) local_block.update!( total_transaction_fee: local_block.ckb_transactions.sum(:transaction_fee), diff --git a/app/workers/calculate_address_info_worker.rb b/app/workers/calculate_address_info_worker.rb new file mode 100644 index 000000000..3a2a93f52 --- /dev/null +++ b/app/workers/calculate_address_info_worker.rb @@ -0,0 +1,44 @@ +class CalculateAddressInfoWorker + include Sidekiq::Worker + sidekiq_options queue: "low" + + def perform() + key = :last_block_for_update_addresses_info_worker + id = Rails.cache.read(key) + local_tip_block = Block.recent.first + + blocks = Block.where("id > ? and id <= ?", id, local_tip_block.id).order(id: :desc).select(:id, :address_ids) + return if blocks.blank? + + contained_address_ids = Set.new + blocks.each do |block| + contained_address_ids.merge block.address_ids.map(&:to_i) + end + Rails.cache.write(key, local_tip_block.id) + + Address.where(id: contained_address_ids).find_in_batches do |group| + sleep(50) # Make sure it doesn't get too crowded in there! + address_attributes = [] + + group.each do |addr| + balance, balance_occupied = addr.cal_balance + address_attributes << { + id: addr.id, + balance: balance, + balance_occupied: balance_occupied, + ckb_transactions_count: AccountBook.where(address_id: addr.id).count, + live_cells_count: addr.cell_outputs.live.count, + dao_transactions_count: addr.ckb_dao_transactions.count, + created_at: addr.created_at, + updated_at: Time.current + } + + Rails.cache.delete_multi(%W(#{addr.class.name}/#{addr.lock_hash})) + end + + if address_attributes.present? + Address.upsert_all(address_attributes) + end + end + end +end diff --git a/test/jobs/revert_block_job_test.rb b/test/jobs/revert_block_job_test.rb index d6b5d7dc3..960b46acf 100644 --- a/test/jobs/revert_block_job_test.rb +++ b/test/jobs/revert_block_job_test.rb @@ -34,13 +34,4 @@ class RevertBlockJobTest < ActiveJob::TestCase @address.update(balance: 5000 * (10**8), balance_occupied: 3000 * (10**8), live_cells_count: 2, ckb_transactions_count: 2, last_updated_block_number: @parent_block.number, dao_transactions_count: 2) end - test "rollback address final_state with parent block" do - RevertBlockJob.new(@parent_block).update_address_balance_and_ckb_transactions_count(@parent_block) - - assert_equal @address.reload.live_cells_count, 4 - assert_equal @address.reload.ckb_transactions_count, 1 - assert_equal @address.reload.dao_transactions_count, 1 - assert_equal @address.reload.balance, 6000 * (10**8) - assert_equal @address.reload.balance_occupied, 3000 * (10**8) - end end diff --git a/test/models/ckb_sync/node_data_processor_test.rb b/test/models/ckb_sync/node_data_processor_test.rb index 0e6509dc3..63a9e2cf0 100644 --- a/test/models/ckb_sync/node_data_processor_test.rb +++ b/test/models/ckb_sync/node_data_processor_test.rb @@ -933,9 +933,6 @@ class NodeDataProcessorTest < ActiveSupport::TestCase VCR.use_cassette("blocks/12", record: :new_episodes) do new_local_block = node_data_processor.call - - assert_equal origin_balance - balance_diff, - new_local_block.contained_addresses.sum(:balance) end end @@ -963,10 +960,6 @@ class NodeDataProcessorTest < ActiveSupport::TestCase end node_block.transactions.first.outputs_data << "0x" end - new_local_block = node_data_processor.process_block(node_block) - - assert_equal origin_balance + new_local_block.cell_outputs.sum(:capacity), - new_local_block.contained_addresses.sum(:balance) end end @@ -1061,13 +1054,6 @@ class NodeDataProcessorTest < ActiveSupport::TestCase node_block = CKB::Types::Block.new(uncles: [], proposals: [], transactions:, header:) node_data_processor.process_block(node_block) - address1 = Address.find_by(lock_hash: lock1.compute_hash) - address2 = Address.find_by(lock_hash: lock2.compute_hash) - address3 = Address.find_by(lock_hash: lock3.compute_hash) - - assert_equal 2, address1.dao_transactions_count - assert_equal 2, address2.dao_transactions_count - assert_equal 0, address3.dao_transactions_count end test "#process_block should update abandoned block's contained address's live cells count" do @@ -1077,9 +1063,6 @@ class NodeDataProcessorTest < ActiveSupport::TestCase local_block.update(block_hash: "0x419c632366c8eb9635acbb39ea085f7552ae62e1fdd480893375334a0f37d1bx") VCR.use_cassette("blocks/12", record: :new_episodes) do new_local_block = node_data_processor.call - - assert_equal origin_live_cells_count - 1, - new_local_block.contained_addresses.sum(:live_cells_count) end end @@ -1517,11 +1500,7 @@ class NodeDataProcessorTest < ActiveSupport::TestCase local_block.update(block_hash: "0x419c632366c8eb9635acbb39ea085f7552ae62e1fdd480893375334a0f37d1bx") VCR.use_cassette("blocks/#{DEFAULT_NODE_BLOCK_NUMBER}", record: :new_episodes) do - assert_difference -> { - local_block.contained_addresses.map(&:ckb_transactions_count).flatten.sum - }, -1 do - node_data_processor.call - end + node_data_processor.call end end @@ -1529,16 +1508,10 @@ class NodeDataProcessorTest < ActiveSupport::TestCase prepare_node_data(19) local_block = Block.find_by(number: 19) local_block.update(block_hash: "0x419c632366c8eb9635acbb39ea085f7552ae62e1fdd480893375334a0f37d1bx") - ckb_transaction_ids = local_block.ckb_transactions.pluck(:id) - balance_diff = CellOutput.where(ckb_transaction_id: ckb_transaction_ids).sum(:capacity) VCR.use_cassette("blocks/#{DEFAULT_NODE_BLOCK_NUMBER}", record: :new_episodes) do - assert_difference -> { - local_block.contained_addresses.sum(:balance) - }, -balance_diff do - node_data_processor.call - end + node_data_processor.call end end @@ -2106,8 +2079,6 @@ class NodeDataProcessorTest < ActiveSupport::TestCase assert_equal 0, address.udt_accounts.find_by(type_hash: udt_type_script.compute_hash).amount - assert_equal 0, address.reload.balance_occupied - assert_equal 150 * (10**8), output_address.reload.balance_occupied end end