diff --git a/Makefile b/Makefile index 72966123..9c8787f5 100644 --- a/Makefile +++ b/Makefile @@ -21,10 +21,41 @@ rm_logs: fi # deploy live table udtfs -deploy_live_table_udtfs: rm_logs +deploy_livetable_udtfs: rm_logs dbt run \ - -s near_models.deploy.near.live__table \ - --vars '{UPDATE_UDFS_AND_SPS: true, ENABLE_LIVE_TABLE: true, LIVE_TABLE_MATERIALIZATION: ephemeral}' \ + -s near_models.deploy.livetable \ + --vars '{UPDATE_UDFS_AND_SPS: true, ENABLE_LIVE_TABLE: true, LIVE_TABLE_MATERIALIZATION: ephemeral, UDTF_TARGET: gold_core}' \ --profiles-dir ~/.dbt \ --profile near \ --target dev + +deploy_tx_sproc: rm_logs + dbt run-operation create_sp_refresh_fact_transactions_live \ + --profiles-dir ~/.dbt \ + --profile near \ + --target dev + +deploy_tx_task: rm_logs + dbt run-operation create_fact_tx_sproc_task \ + --profiles-dir ~/.dbt \ + --profile near \ + --target dev + +compile_sp_macro: rm_logs + dbt compile --select _compile_sp_macro \ + --profiles-dir ~/.dbt \ + --profile near \ + --target dev + +compile_task: rm_logs + dbt compile --select _compile_task \ + --profiles-dir ~/.dbt \ + --profile near \ + --target dev + +test_deploy_sf_tasks: rm_logs + dbt run --select _dummy_model \ + --profiles-dir ~/.dbt \ + --profile near \ + --target dev + diff --git a/dbt_project.yml b/dbt_project.yml index b027d1f9..8342f8b5 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -28,7 +28,8 @@ clean-targets: # directories to be removed by `dbt clean` on-run-start: - "{{ create_udfs() }}" - "{{ create_sps() }}" - + - "{{ deploy_sf_tasks() }}" + on-run-end: - "{{ apply_meta_as_tags(results) }}" diff --git a/macros/create_sps.sql b/macros/create_sps.sql index 645917ef..ef66bb8c 100644 --- a/macros/create_sps.sql +++ b/macros/create_sps.sql @@ -1,6 +1,7 @@ {% macro create_sps() %} {% if target.database == 'NEAR' %} CREATE schema IF NOT EXISTS _internal; -{{ sp_create_prod_clone('_internal') }}; + {{ sp_create_prod_clone('_internal') }}; {% endif %} + {{ create_sp_refresh_fact_transactions_live() }} {% endmacro %} diff --git a/macros/livetable/assets/overview.md b/macros/livetable/assets/overview.md new file mode 100644 index 00000000..89cc5cd4 --- /dev/null +++ b/macros/livetable/assets/overview.md @@ -0,0 +1,53 @@ +## Near Livetable SPROC + +### Overview +``` ++-----------------+ +---------------------------------------------+ +---------------------------+ +| Snowflake Task | | Stored Procedure (SP) | | _live.udf_api | +| (Sched: 5 min) |----->| SP_REFRESH_FACT_TRANSACTIONS_LIVE() | |---------------------------| +| QUERY_TAG: {...}| |---------------------------------------------| | | ++-----------------+ | 1. Check Schema/Table Exists (Idempotent) | | [RPC call to Near Node] | + | - CREATE SCHEMA IF NOT EXISTS ... | | Quicknode Endpoint | + | - CREATE HYBRID TABLE IF NOT EXISTS ... | | | + | (Incl. PRIMARY KEY) | +---------------------------+ + | | ^ + | 2. Get Chain Head | | + | CALL _live.udf_api()---------------------+------------------>+ + | | | | + | | | | + | v | | + | [Near RPC API] <-------------------------|-------------------+ + | ^ | + | | Returns Height | + | height <-- _live.udf_api() | + | | + | 3. Calculate Start Block | .-------------------------. + | start_block = height - buffer | | Hybrid Table | + | | | CORE_LIVE. | + | 4. Call UDTF to Fetch Transactions | | FACT_TRANSACTIONS | + | rows <-- | |-------------------------| + | TABLE(livetable.tf_fact_tx(...)) ----+| | - tx_hash (PK) | + | | || | - block_id | + | v UDTF Execution || | - block_timestamp (*) | + | (Requires Owner Permissions) || | - tx_signer | + | (Calls other UDFs for. || | - tx_receiver | + | block/chunk/tx details || | - ... | + | internally) || | - _hybrid_updated_at | + | | mimic's gold fact_tx || `-------------------------' + | +------------------------------|| ^ | + | | | | Step 6: Prune + | 5. MERGE Data into Hybrid Table |--------------->| | DELETE Rows + | rows_merged = SQLROWCOUNT | | | Older than 60m + | | | | (based on block_ts) + | 7. Return Status String | | v + | RETURN 'Fetched...Merged...Deleted...' | | ............ + +--------------------|------------------------+ | : Old Data : + | | :..........: + | Status String | + v | ++------------------------+ +------------------------+ | +| Snowflake Task History |<----| Task Record |<------------------------------' +| (Logs Status, Return, | | (Captures SP Return) | +| Duration) | +------------------------+ ++------------------------+ +``` \ No newline at end of file diff --git a/macros/livetable/near.yaml.sql b/macros/livetable/near.yaml.sql index 30b15a7e..5a1ef363 100644 --- a/macros/livetable/near.yaml.sql +++ b/macros/livetable/near.yaml.sql @@ -3,13 +3,13 @@ This macro is used to generate the high level abstractions for the Near blockchain. #} -{% set schema = blockchain ~ "_" ~ network %} +{% set schema = blockchain %} - name: {{ schema }}.udf_get_latest_block_height signature: [] return_type: INTEGER sql: | - {{ near_live_table_latest_block_height() | indent(4) -}} + {{ near_livetable_latest_block_height() | indent(4) -}} - name: {{ schema }}.lt_tx_udf_api signature: @@ -24,7 +24,7 @@ api_integration: '{{ var("API_INTEGRATION") }}' options: | NOT NULL - MAX_BATCH_ROWS = 25 + MAX_BATCH_ROWS = 150 sql: udf_api - name: {{ schema }}.lt_blocks_udf_api @@ -40,7 +40,7 @@ api_integration: '{{ var("API_INTEGRATION") }}' options: | NOT NULL - MAX_BATCH_ROWS = 25 + MAX_BATCH_ROWS = 120 sql: udf_api - name: {{ schema }}.lt_chunks_udf_api @@ -71,21 +71,35 @@ VOLATILE COMMENT = $$Returns the block data for a given block height.Fetches blocks for the specified number of blocks $$ sql: | - {{ near_live_table_fact_blocks(schema, blockchain, network) | indent(4) -}} + {{ near_livetable_fact_blocks(schema, blockchain, network) | indent(4) -}} - name: {{ schema -}}.tf_fact_transactions signature: - [_block_height, INTEGER, The start block height to get the transactions from] - [row_count, INTEGER, The number of rows to fetch] return_type: - - "TABLE(tx_hash STRING, block_id NUMBER, block_timestamp TIMESTAMP_NTZ, nonce INT, signature STRING, tx_receiver STRING, tx_signer STRING, tx VARIANT, gas_used NUMBER, transaction_fee NUMBER, attached_gas NUMBER, tx_succeeded BOOLEAN, fact_transactions_id STRING, inserted_timestamp TIMESTAMP_NTZ, modified_timestamp TIMESTAMP_NTZ)" + - "TABLE(tx_hash STRING, block_id NUMBER, block_timestamp TIMESTAMP_NTZ, nonce INT, signature STRING, tx_receiver STRING, tx_signer STRING, tx VARIANT, gas_used NUMBER, transaction_fee NUMBER, attached_gas NUMBER, tx_succeeded BOOLEAN, fact_transactions_id STRING, inserted_timestamp TIMESTAMP_NTZ, modified_timestamp TIMESTAMP_NTZ, data VARIANT, value OBJECT, partition_key NUMBER)" options: | NOT NULL RETURNS NULL ON NULL INPUT VOLATILE COMMENT = $$Returns transaction details for blocks starting from a given height.Fetches txs for the specified number of blocks.$$ sql: | - {{ near_live_table_fact_transactions(schema, blockchain, network) | indent(4) -}} + {{ near_livetable_fact_transactions(schema, blockchain, network) | indent(4) -}} + + - name: {{ schema -}}.tf_bronze_transactions + signature: + - [_block_height, INTEGER, The start block height to get the transactions from] + - [row_count, INTEGER, The number of rows to fetch] + return_type: + - "TABLE(data VARIANT, value OBJECT, partition_key NUMBER(38,0), _inserted_timestamp TIMESTAMP_LTZ(9))" + options: | + NOT NULL + RETURNS NULL ON NULL INPUT + VOLATILE + COMMENT = $$Returns transaction details for blocks starting from a given height.Fetches txs for the specified number of blocks.$$ + sql: | + {{ near_livetable_bronze_transactions(schema, blockchain, network) | indent(4) -}} - name: {{ schema -}}.tf_fact_receipts signature: @@ -99,7 +113,7 @@ VOLATILE COMMENT = $$Returns receipt details for blocks starting from a given height. Fetches receipts for the specified number of blocks.$$ sql: | - {{ near_live_table_fact_receipts(schema, blockchain, network) | indent(4) -}} + {{ near_livetable_fact_receipts(schema, blockchain, network) | indent(4) -}} - name: {{ schema -}}.tf_ez_actions signature: @@ -113,7 +127,27 @@ VOLATILE COMMENT = $$Returns decoded action details for blocks starting from a given height. Fetches actions for the specified number of blocks.$$ sql: | - {{ near_live_table_ez_actions(schema, blockchain, network) | indent(4) -}} + {{ near_livetable_ez_actions(schema, blockchain, network) | indent(4) -}} {%- endmacro -%} +{% macro lt_ephemeral_deploy(configs) %} +{# + This macro is used to deploy livetable udf/udtfs using ephemeral models. + #} + + {% if execute and (var("UPDATE_UDFS_AND_SPS") or var("DROP_UDFS_AND_SPS")) and model.unique_id in selected_resources %} + {% set sql %} + {% for config in configs %} + {{- livequery_models.crud_udfs_by_chain(config, this.schema, network, var("DROP_UDFS_AND_SPS")) -}} + {%- endfor -%} + {%- endset -%} + {%- if var("DROP_UDFS_AND_SPS") -%} + {%- do log("Drop partner udfs: " ~ this.database ~ "." ~ schema, true) -%} + {%- else -%} + {%- do log("Deploy partner udfs: " ~ this.database ~ "." ~ schema, true) -%} + {%- endif -%} + {%- do run_query(sql ~ livequery_models.apply_grants_by_schema(schema)) -%} + {%- endif -%} + SELECT '{{ model.schema }}' as schema_ +{%- endmacro -%} \ No newline at end of file diff --git a/macros/livetable/near_live_table_abstractions.sql b/macros/livetable/near_live_table_abstractions.sql index f5f57e38..0e266703 100644 --- a/macros/livetable/near_live_table_abstractions.sql +++ b/macros/livetable/near_live_table_abstractions.sql @@ -1,6 +1,6 @@ -- Get Near Chain Head -{% macro near_live_table_latest_block_height() %} +{% macro near_livetable_latest_block_height() %} WITH rpc_call AS ( SELECT DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS request_timestamp, @@ -15,7 +15,7 @@ WITH rpc_call AS ( 'params' : {'finality' : 'final'} }, _utils.UDF_WHOAMI(), - 'Vault/prod/near/quicknode/mainnet' + 'Vault/prod/near/quicknode/livetable/mainnet' ):data::object AS rpc_result ) SELECT @@ -24,7 +24,7 @@ FROM rpc_call {% endmacro %} -{% macro near_live_table_min_max_block_height(start_block, block_count) %} +{% macro near_livetable_min_max_block_height(start_block, block_count) %} SELECT {{ start_block }} AS min_height, min_height + {{ block_count }} AS max_height, @@ -33,14 +33,14 @@ FROM {% endmacro %} -- Get Near Block Data -{% macro near_live_table_target_blocks(start_block, block_count) %} +{% macro near_livetable_target_blocks(start_block, block_count) %} WITH heights AS ( SELECT min_height, max_height, FROM ( - {{- near_live_table_min_max_block_height(start_block=start_block, block_count=block_count) | indent(13) -}} + {{- near_livetable_min_max_block_height(start_block=start_block, block_count=block_count) | indent(13) -}} ) ), block_spine AS ( @@ -59,7 +59,7 @@ FROM FROM block_spine {% endmacro %} -{% macro near_live_table_get_spine(table_name) %} +{% macro near_livetable_get_spine(table_name) %} SELECT block_height, ROW_NUMBER() OVER (ORDER BY block_height) - 1 as partition_num @@ -81,11 +81,11 @@ FROM ) {% endmacro %} -{% macro near_live_table_get_raw_block_data(spine) %} +{% macro near_livetable_get_raw_block_data(spine) %} SELECT block_height, DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS request_timestamp, - live_table.lt_blocks_udf_api( + livetable.lt_blocks_udf_api( 'POST', '{Service}', {'Content-Type' : 'application/json'}, @@ -96,14 +96,14 @@ SELECT 'params':{'block_id': block_height} }, _utils.UDF_WHOAMI(), - 'Vault/prod/near/quicknode/mainnet' + 'Vault/prod/near/quicknode/livetable/mainnet' ):data.result AS rpc_data_result from {{spine}} {% endmacro %} -{% macro near_live_table_extract_raw_block_data(raw_blocks) %} +{% macro near_livetable_extract_raw_block_data(raw_blocks) %} SELECT block_data:header:height::string as block_id, TO_TIMESTAMP_NTZ( @@ -146,22 +146,27 @@ FROM {{raw_blocks}} {% endmacro %} -{% macro near_live_table_fact_blocks(schema, blockchain, network) %} - {%- set near_live_table_fact_blocks = livequery_models.get_rendered_model('near_models', 'livetable_fact_blocks', schema, blockchain, network) -%} - {{ near_live_table_fact_blocks }} +{% macro near_livetable_fact_blocks(schema, blockchain, network) %} + {%- set near_livetable_fact_blocks = livequery_models.get_rendered_model('near_models', 'livetable_fact_blocks', schema, blockchain, network) -%} + {{ near_livetable_fact_blocks }} {% endmacro %} -{% macro near_live_table_fact_transactions(schema, blockchain, network) %} - {%- set near_live_table_fact_transactions = livequery_models.get_rendered_model('near_models', 'livetable_fact_transactions', schema, blockchain, network) -%} - {{ near_live_table_fact_transactions }} +{% macro near_livetable_fact_transactions(schema, blockchain, network) %} + {%- set near_livetable_fact_transactions = livequery_models.get_rendered_model('near_models', 'livetable_fact_transactions', schema, blockchain, network) -%} + {{ near_livetable_fact_transactions }} {% endmacro %} -{% macro near_live_table_fact_receipts(schema, blockchain, network) %} - {%- set near_live_table_fact_receipts = livequery_models.get_rendered_model('near_models', 'livetable_fact_receipts', schema, blockchain, network) -%} - {{ near_live_table_fact_receipts }} +{% macro near_livetable_bronze_transactions(schema, blockchain, network) %} + {%- set near_livetable_bronze_transactions = livequery_models.get_rendered_model('near_models', 'livetable_bronze_transactions', schema, blockchain, network) -%} + {{ near_livetable_bronze_transactions }} {% endmacro %} -{% macro near_live_table_ez_actions(schema, blockchain, network) %} - {%- set near_live_table_ez_actions = livequery_models.get_rendered_model('near_models', 'livetable_ez_actions', schema, blockchain, network) -%} - {{ near_live_table_ez_actions }} +{% macro near_livetable_fact_receipts(schema, blockchain, network) %} + {%- set near_livetable_fact_receipts = livequery_models.get_rendered_model('near_models', 'livetable_fact_receipts', schema, blockchain, network) -%} + {{ near_livetable_fact_receipts }} +{% endmacro %} + +{% macro near_livetable_ez_actions(schema, blockchain, network) %} + {%- set near_livetable_ez_actions = livequery_models.get_rendered_model('near_models', 'livetable_ez_actions', schema, blockchain, network) -%} + {{ near_livetable_ez_actions }} {% endmacro %} diff --git a/macros/livetable/permissions/udtf_permissions.sql b/macros/livetable/permissions/udtf_permissions.sql new file mode 100644 index 00000000..104369fa --- /dev/null +++ b/macros/livetable/permissions/udtf_permissions.sql @@ -0,0 +1,14 @@ +{% macro lt_udtf_permissions() %} + + + {% set udtfs = [ + '{{ target.database }}.livetable.lt_blocks_udf_api', + '{{ target.database }}.livetable.lt_tx_udf_api', + '{{ target.database }}.livetable.lt_chunks_udf_api', + ] %} + + {% for udtf in udtfs %} + GRANT USAGE ON FUNCTION {{ udtf }} TO ROLE {{ target.role }}; + {% endfor %} + +{% endmacro %} \ No newline at end of file diff --git a/macros/livetable/sproc/sp_bronze_transactions.sql b/macros/livetable/sproc/sp_bronze_transactions.sql new file mode 100644 index 00000000..44f07d3e --- /dev/null +++ b/macros/livetable/sproc/sp_bronze_transactions.sql @@ -0,0 +1,158 @@ +{% macro create_sp_refresh_bronze_transactions_live() %} + +{% set procedure_sql %} + + CREATE SCHEMA IF NOT EXISTS {{ target.database }}.livetable; + CREATE SCHEMA IF NOT EXISTS {{ target.database }}.bronze_live; + + CREATE OR REPLACE PROCEDURE {{ target.database }}.livetable.sp_refresh_bronze_transactions_live() + RETURNS STRING + LANGUAGE SQL + AS + $$ + DECLARE + -- Configuration + bronze_transactions_table_name STRING DEFAULT '{{ target.database }}.BRONZE_LIVE.TRANSACTIONS'; + chain_head_udf STRING DEFAULT '_live.udf_api'; + secret_path STRING DEFAULT 'Vault/prod/near/quicknode/livetable/mainnet'; + pk_column STRING DEFAULT 'tx_hash'; + blocks_to_fetch_buffer INTEGER DEFAULT 1; + block_timestamp_column STRING DEFAULT 'block_timestamp'; + pruning_threshold_minutes INTEGER DEFAULT 60; + + -- State Variables + chain_head_block INTEGER; + start_block_for_udtf INTEGER; + rows_merged_bronze_stage INTEGER := 0; + final_return_message STRING; + error_message STRING; + + BEGIN + + CREATE HYBRID TABLE IF NOT EXISTS IDENTIFIER(:bronze_transactions_table_name) ( + TX_HASH STRING PRIMARY KEY, + PARTITION_KEY NUMBER, + DATA VARIANT, + VALUE VARIANT, + _INSERTED_TIMESTAMP TIMESTAMP_LTZ(9) DEFAULT CURRENT_TIMESTAMP(9), + METADATA VARIANT DEFAULT NULL + ); + + SELECT IDENTIFIER(:chain_head_udf)( + 'POST', + '{Service}', + {'Content-Type': 'application/json', 'fsc-compression-mode': 'auto'}, + {'jsonrpc': '2.0', 'method': 'block', 'id': 'Flipside/block/' || DATE_PART('EPOCH', SYSDATE()) :: STRING, 'params': {'finality': 'final'}}, + _utils.UDF_WHOAMI(), + :secret_path + ):data:result:header:height::INTEGER + INTO :chain_head_block; + + IF (:chain_head_block IS NULL) THEN + RETURN 'ERROR: Failed to fetch chain head block height.'; + END IF; + + start_block_for_udtf := :chain_head_block - :blocks_to_fetch_buffer + 1; + + MERGE INTO IDENTIFIER(:bronze_transactions_table_name) AS target + USING ( + SELECT + DATA :transaction :hash :: STRING AS TX_HASH, + PARTITION_KEY, + DATA, + VALUE, + _INSERTED_TIMESTAMP + FROM TABLE({{ target.database }}.LIVETABLE.TF_BRONZE_TRANSACTIONS(:start_block_for_udtf, :blocks_to_fetch_buffer)) + ) AS source + ON target.tx_hash = source.tx_hash + WHEN MATCHED THEN UPDATE SET + target.PARTITION_KEY = source.PARTITION_KEY, + target.DATA = source.DATA, + target.VALUE = source.value, + target._INSERTED_TIMESTAMP = CURRENT_TIMESTAMP(9), + target.METADATA = { + 'app_name': 'livetable_sproc', + 'batch_id': NULL, + 'request_id': NULL, + 'request': { + 'data': { + 'id': 'Flipside/EXPERIMENTAL_tx_status/' || target._INSERTED_TIMESTAMP::STRING || '/' || source.TX_HASH, + 'jsonrpc': '2.0', + 'method': 'EXPERIMENTAL_tx_status', + 'params': { + 'tx_hash': source.TX_HASH, + 'wait_until': 'FINAL' + } + }, + 'headers': { + 'Content-Type': 'application/json', + 'fsc-compression-mode': 'auto' + }, + 'method': 'POST', + 'secret_name': :secret_path, + 'url': '{Service}' + } + } + WHEN NOT MATCHED THEN INSERT ( + TX_HASH, + PARTITION_KEY, + DATA, + VALUE, + _INSERTED_TIMESTAMP, + METADATA + ) VALUES ( + source.TX_HASH, + source.PARTITION_KEY, + source.DATA, + source.value, + source._INSERTED_TIMESTAMP, + { + 'app_name': 'livetable_sproc', + 'batch_id': NULL, + 'request_id': NULL, + 'request': { + 'data': { + 'id': 'Flipside/EXPERIMENTAL_tx_status/' || (DATE_PART('EPOCH_SECOND', SYSDATE())::NUMBER)::STRING || '/' || source.TX_HASH, + 'jsonrpc': '2.0', + 'method': 'EXPERIMENTAL_tx_status', + 'params': { + 'tx_hash': source.TX_HASH, + 'wait_until': 'FINAL' + } + }, + 'headers': { + 'Content-Type': 'application/json', + 'fsc-compression-mode': 'auto' + }, + 'method': 'POST', + 'secret_name': :secret_path, + 'url': '{Service}' + } + } + ); + + rows_merged_bronze_stage := SQLROWCOUNT; + + + final_return_message := 'Bronze Stage: Upserted ' || :rows_merged_bronze_stage || ' RPC responses.'; + + RETURN final_return_message; + + EXCEPTION + WHEN OTHER THEN + error_message := 'ERROR in sp_refresh_bronze_transactions_live: ' || SQLERRM; + RETURN error_message; + END + $$; +{% endset %} + + +{% if execute %} + {% do run_query(procedure_sql) %} + {% do log(procedure_sql, info=True) %} + {% do log("Deployed stored procedure: livetable.sp_refresh_bronze_transactions_live", info=True) %} +{% endif %} + +{% endmacro %} + + diff --git a/macros/livetable/sproc/sp_fact_blocks.sql b/macros/livetable/sproc/sp_fact_blocks.sql new file mode 100644 index 00000000..fb78729d --- /dev/null +++ b/macros/livetable/sproc/sp_fact_blocks.sql @@ -0,0 +1,159 @@ +{% macro create_sp_refresh_fact_blocks_live() %} + +{% set procedure_sql %} +CREATE OR REPLACE PROCEDURE {{ target.database }}.livetable.sp_refresh_fact_blocks_live() +RETURNS STRING +LANGUAGE SQL +AS +$$ +DECLARE + -- Configuration + hybrid_table_name STRING DEFAULT '{{ target.database }}.CORE_LIVE.HYBRID_FACT_BLOCKS'; + udtf_name STRING DEFAULT '{{ target.database }}.LIVETABLE.TF_FACT_BLOCKS'; + chain_head_udf STRING DEFAULT '_live.udf_api'; + secret_path STRING DEFAULT 'Vault/prod/near/quicknode/livetable/mainnet'; + pk_column STRING DEFAULT 'block_id'; + blocks_to_fetch_buffer INTEGER DEFAULT 385; + block_timestamp_column STRING DEFAULT 'block_timestamp'; + pruning_threshold_minutes INTEGER DEFAULT 60; + + -- State Variables + chain_head_block INTEGER; + start_block_for_udtf INTEGER; + rows_merged INTEGER := 0; + rows_deleted INTEGER := 0; + +BEGIN + + CREATE SCHEMA IF NOT EXISTS {{ target.database }}.core_live; + + CREATE HYBRID TABLE IF NOT EXISTS IDENTIFIER(:hybrid_table_name) ( + block_id NUMBER PRIMARY KEY, + block_timestamp TIMESTAMP_NTZ, + block_hash STRING, + block_author STRING, + header OBJECT, + block_challenges_result ARRAY, + block_challenges_root STRING, + chunk_headers_root STRING, + chunk_tx_root STRING, + chunk_mask ARRAY, + chunk_receipts_root STRING, + chunks ARRAY, + chunks_included NUMBER, + epoch_id STRING, + epoch_sync_data_hash STRING, + gas_price FLOAT, + last_ds_final_block STRING, + last_final_block STRING, + latest_protocol_version INT, + next_bp_hash STRING, + next_epoch_id STRING, + outcome_root STRING, + prev_hash STRING, + prev_height NUMBER, + prev_state_root STRING, + random_value STRING, + rent_paid FLOAT, + signature STRING, + total_supply FLOAT, + validator_proposals ARRAY, + validator_reward FLOAT, + fact_blocks_id STRING, + inserted_timestamp TIMESTAMP_NTZ, + modified_timestamp TIMESTAMP_NTZ, + _hybrid_updated_at TIMESTAMP_LTZ DEFAULT SYSDATE() + ); + + SELECT IDENTIFIER(:chain_head_udf)( + 'POST', + '{Service}', + { + 'Content-Type': 'application/json', + 'fsc-compression-mode': 'auto' + }, + { + 'jsonrpc': '2.0', + 'id': 'Flipside/block/' || DATE_PART('EPOCH', SYSDATE()) :: STRING, + 'method': 'block', + 'params': {'finality': 'final'} + }, + _utils.UDF_WHOAMI(), :secret_path + ):data:result:header:height::INTEGER + INTO :chain_head_block; + + IF (:chain_head_block IS NULL) THEN + RETURN 'ERROR: Failed to fetch chain head block height.'; + END IF; + + start_block_for_udtf := :chain_head_block - :blocks_to_fetch_buffer + 1; + + MERGE INTO IDENTIFIER(:hybrid_table_name) AS target + USING ( + SELECT * FROM TABLE(IDENTIFIER(:udtf_name)(:start_block_for_udtf, :blocks_to_fetch_buffer)) + ) AS source + ON target.block_id = source.block_id + WHEN MATCHED THEN UPDATE SET + target.block_timestamp = source.block_timestamp, target.block_hash = source.block_hash, + target.block_author = source.block_author, target.header = source.header, + target.block_challenges_result = source.block_challenges_result, target.block_challenges_root = source.block_challenges_root, + target.chunk_headers_root = source.chunk_headers_root, target.chunk_tx_root = source.chunk_tx_root, + target.chunk_mask = source.chunk_mask, target.chunk_receipts_root = source.chunk_receipts_root, + target.chunks = source.chunks, target.chunks_included = source.chunks_included, + target.epoch_id = source.epoch_id, target.epoch_sync_data_hash = source.epoch_sync_data_hash, + target.gas_price = source.gas_price, target.last_ds_final_block = source.last_ds_final_block, + target.last_final_block = source.last_final_block, target.latest_protocol_version = source.latest_protocol_version, + target.next_bp_hash = source.next_bp_hash, target.next_epoch_id = source.next_epoch_id, + target.outcome_root = source.outcome_root, target.prev_hash = source.prev_hash, + target.prev_height = source.prev_height, target.prev_state_root = source.prev_state_root, + target.random_value = source.random_value, target.rent_paid = source.rent_paid, + target.signature = source.signature, target.total_supply = source.total_supply, + target.validator_proposals = source.validator_proposals, target.validator_reward = source.validator_reward, + target.fact_blocks_id = source.fact_blocks_id, target.inserted_timestamp = source.inserted_timestamp, + target.modified_timestamp = source.modified_timestamp, + target._hybrid_updated_at = SYSDATE() + WHEN NOT MATCHED THEN INSERT ( + + block_id, block_timestamp, block_hash, block_author, header, block_challenges_result, + block_challenges_root, chunk_headers_root, chunk_tx_root, chunk_mask, + chunk_receipts_root, chunks, chunks_included, epoch_id, epoch_sync_data_hash, + gas_price, last_ds_final_block, last_final_block, latest_protocol_version, + next_bp_hash, next_epoch_id, outcome_root, prev_hash, prev_height, + prev_state_root, random_value, rent_paid, signature, total_supply, + validator_proposals, validator_reward, fact_blocks_id, inserted_timestamp, + modified_timestamp, _hybrid_updated_at + ) VALUES ( + source.block_id, source.block_timestamp, source.block_hash, source.block_author, source.header, source.block_challenges_result, + source.block_challenges_root, source.chunk_headers_root, source.chunk_tx_root, source.chunk_mask, + source.chunk_receipts_root, source.chunks, source.chunks_included, source.epoch_id, source.epoch_sync_data_hash, + source.gas_price, source.last_ds_final_block, source.last_final_block, source.latest_protocol_version, + source.next_bp_hash, source.next_epoch_id, source.outcome_root, source.prev_hash, source.prev_height, + source.prev_state_root, source.random_value, source.rent_paid, source.signature, source.total_supply, + source.validator_proposals, source.validator_reward, source.fact_blocks_id, source.inserted_timestamp, + source.modified_timestamp, SYSDATE() + ); + rows_merged := SQLROWCOUNT; + + -- 4. Prune Hybrid Table (Same logic, different table) + DELETE FROM IDENTIFIER(:hybrid_table_name) + WHERE IDENTIFIER(:block_timestamp_column) < (DATEADD('minute', - :pruning_threshold_minutes, CURRENT_TIMESTAMP()))::TIMESTAMP_NTZ(9); + rows_deleted := SQLROWCOUNT; + + RETURN 'Refreshed Blocks: Fetched starting ' || :start_block_for_udtf || '. Merged ' || :rows_merged || ' rows. Pruned ' || :rows_deleted || ' rows.'; + +EXCEPTION + WHEN OTHER THEN + RETURN 'ERROR in sp_refresh_fact_blocks_live: ' || SQLERRM; +END; +$$; +{% endset %} + + +{% if execute %} + {% do run_query(procedure_sql) %} + {% do log("Created sp_refresh_fact_blocks_live", info=True) %} +{% else %} + {% do log("Skipping sp_refresh_fact_blocks_live creation during compile phase.", info=True) %} +{% endif %} + +{% endmacro %} \ No newline at end of file diff --git a/macros/livetable/sproc/sp_fact_transactions.sql b/macros/livetable/sproc/sp_fact_transactions.sql new file mode 100644 index 00000000..688b6317 --- /dev/null +++ b/macros/livetable/sproc/sp_fact_transactions.sql @@ -0,0 +1,148 @@ +{% macro create_sp_refresh_fact_transactions_live() %} + +{% set procedure_sql %} + + CREATE SCHEMA IF NOT EXISTS {{ target.database }}.livetable; + CREATE SCHEMA IF NOT EXISTS {{ target.database }}.core_live; + CREATE SCHEMA IF NOT EXISTS {{ target.database }}.bronze_live; + + CREATE OR REPLACE PROCEDURE {{ target.database }}.livetable.sp_refresh_fact_transactions_live() + RETURNS STRING + LANGUAGE SQL + AS + $$ + DECLARE + -- Configuration + gold_hybrid_table_name STRING DEFAULT '{{ target.database }}.CORE_LIVE.FACT_TRANSACTIONS'; + chain_head_udf STRING DEFAULT '_live.udf_api'; + secret_path STRING DEFAULT 'Vault/prod/near/quicknode/livetable/mainnet'; + pk_column STRING DEFAULT 'tx_hash'; + blocks_to_fetch_buffer INTEGER DEFAULT 295; + block_timestamp_column STRING DEFAULT 'block_timestamp'; + pruning_threshold_minutes INTEGER DEFAULT 60; + + -- State Variables + chain_head_block INTEGER; + start_block_for_udtf INTEGER; + rows_merged_gold INTEGER := 0; + rows_deleted_gold INTEGER := 0; + final_return_message STRING; + error_message STRING; + + BEGIN + + CREATE HYBRID TABLE IF NOT EXISTS IDENTIFIER(:gold_hybrid_table_name) ( + tx_hash STRING PRIMARY KEY, + block_id NUMBER, + block_timestamp TIMESTAMP_NTZ, + nonce INT, + signature STRING, + tx_receiver STRING, + tx_signer STRING, + tx VARIANT, + gas_used NUMBER, + transaction_fee NUMBER, + attached_gas NUMBER, + tx_succeeded BOOLEAN, + fact_transactions_id STRING, + inserted_timestamp TIMESTAMP_NTZ, + modified_timestamp TIMESTAMP_NTZ, + + INDEX idx_tx_signer (tx_signer), + INDEX idx_tx_receiver (tx_receiver) + ); + + SELECT IDENTIFIER(:chain_head_udf)( + 'POST', + '{Service}', + {'Content-Type': 'application/json', 'fsc-compression-mode': 'auto'}, + {'jsonrpc': '2.0', 'method': 'block', 'id': 'Flipside/block/' || DATE_PART('EPOCH', SYSDATE()) :: STRING, 'params': {'finality': 'final'}}, + _utils.UDF_WHOAMI(), + :secret_path + ):data:result:header:height::INTEGER + INTO :chain_head_block; + + IF (:chain_head_block IS NULL) THEN + RETURN 'ERROR: Failed to fetch chain head block height.'; + END IF; + + start_block_for_udtf := :chain_head_block - :blocks_to_fetch_buffer + 1; + + + MERGE INTO IDENTIFIER(:gold_hybrid_table_name) AS target + USING ( + SELECT + TX_HASH, BLOCK_ID, BLOCK_TIMESTAMP, NONCE, SIGNATURE, TX_RECEIVER, TX_SIGNER, TX, + GAS_USED, TRANSACTION_FEE, ATTACHED_GAS, TX_SUCCEEDED, FACT_TRANSACTIONS_ID, + INSERTED_TIMESTAMP, MODIFIED_TIMESTAMP + + FROM TABLE({{ target.database }}.LIVETABLE.TF_FACT_TRANSACTIONS(:start_block_for_udtf, :blocks_to_fetch_buffer)) + ) AS source + ON target.tx_hash = source.tx_hash + WHEN MATCHED THEN UPDATE SET + target.block_id = source.block_id, + target.block_timestamp = source.block_timestamp, + target.nonce = source.nonce, + target.signature = source.signature, + target.tx_receiver = source.tx_receiver, + target.tx_signer = source.tx_signer, + target.tx = source.tx, + target.gas_used = source.gas_used, + target.transaction_fee = source.transaction_fee, + target.attached_gas = source.attached_gas, + target.tx_succeeded = source.tx_succeeded, + target.fact_transactions_id = source.fact_transactions_id, + target.inserted_timestamp = source.inserted_timestamp, + target.modified_timestamp = source.modified_timestamp + WHEN NOT MATCHED THEN INSERT ( + tx_hash, block_id, block_timestamp, nonce, signature, tx_receiver, tx_signer, tx, + gas_used, transaction_fee, attached_gas, tx_succeeded, fact_transactions_id, + inserted_timestamp, modified_timestamp + ) VALUES ( + source.tx_hash, + source.block_id, + source.block_timestamp, + source.nonce, + source.signature, + source.tx_receiver, + source.tx_signer, + source.tx, + source.gas_used, + source.transaction_fee, + source.attached_gas, + source.tx_succeeded, + source.fact_transactions_id, + source.inserted_timestamp, + source.modified_timestamp + ); + + rows_merged_gold := SQLROWCOUNT; + + DELETE FROM IDENTIFIER(:gold_hybrid_table_name) + WHERE IDENTIFIER(:block_timestamp_column) < (DATEADD('minute', - :pruning_threshold_minutes, CURRENT_TIMESTAMP()))::TIMESTAMP_NTZ(9); + + rows_deleted_gold := SQLROWCOUNT; + + + final_return_message := 'Gold: Merged ' || :rows_merged_gold || ', Pruned ' || :rows_deleted_gold || '. '; + + RETURN final_return_message; + + EXCEPTION + WHEN OTHER THEN + error_message := 'ERROR in sp_refresh_fact_transactions_live: ' || SQLERRM; + RETURN error_message; + END + $$; +{% endset %} + + +{% if execute %} + {% do run_query(procedure_sql) %} + {% do log(procedure_sql, info=True) %} + {% do log("Deployed stored procedure: livetable.sp_refresh_fact_transactions_live", info=True) %} +{% endif %} + +{% endmacro %} + + diff --git a/macros/livetable/tasks/bronze/schedule_sp_bronze_transactions.sql b/macros/livetable/tasks/bronze/schedule_sp_bronze_transactions.sql new file mode 100644 index 00000000..2cda3aa1 --- /dev/null +++ b/macros/livetable/tasks/bronze/schedule_sp_bronze_transactions.sql @@ -0,0 +1,18 @@ +{% macro create_bronze_tx_sproc_task()%} + +{% set task_query %} + CREATE TASK IF NOT EXISTS {{ target.database }}.LIVETABLE.TASK_REFRESH_BRONZE_TRANSACTION_LIVE_TEST + WAREHOUSE=DATA_PLATFORM_DEV + SCHEDULE='15 MINUTE' + QUERY_TAG='{"project": "near_models", "model": "sp_refresh_bronze_transactions_live_test", "model_type": "core"}' + AS CALL {{ target.database }}.LIVETABLE.SP_REFRESH_BRONZE_TRANSACTIONS_LIVE(); +{% endset %} + +{% if execute %} + {% do run_query(task_query) %} + {% do log("Deployed task TASK_UPDATE_BRONZE_HYBRID_TX", info=True) %} + ALTER TASK {{ target.database }}.LIVETABLE.TASK_REFRESH_BRONZE_TRANSACTION_LIVE_TEST RESUME; + {% do log("Updated task state to resumed for TASK_UPDATE_RECENT_HYBRID_TX", info=True) %} +{% endif %} +{% endmacro %} + diff --git a/macros/livetable/tasks/deploy_sf_tasks.sql b/macros/livetable/tasks/deploy_sf_tasks.sql new file mode 100644 index 00000000..01ac2637 --- /dev/null +++ b/macros/livetable/tasks/deploy_sf_tasks.sql @@ -0,0 +1,4 @@ +{% macro deploy_sf_tasks() %} + {{ create_fact_tx_sproc_task() }} + {{ create_bronze_tx_sproc_task() }} +{% endmacro %} diff --git a/macros/livetable/tasks/schedule_sp_fact_transactions.sql b/macros/livetable/tasks/schedule_sp_fact_transactions.sql new file mode 100644 index 00000000..8522a144 --- /dev/null +++ b/macros/livetable/tasks/schedule_sp_fact_transactions.sql @@ -0,0 +1,18 @@ +{% macro create_fact_tx_sproc_task()%} + +{% set task_query %} + CREATE TASK IF NOT EXISTS {{ target.database }}.LIVETABLE.TASK_REFRESH_FACT_TRANSACTION_LIVE_TEST + WAREHOUSE=DATA_PLATFORM_DEV + SCHEDULE='4 MINUTE' + QUERY_TAG='{"project": "near_models", "model": "sp_refresh_fact_transactions_live_test", "model_type": "core"}' + AS CALL {{ target.database }}.LIVETABLE.SP_REFRESH_FACT_TRANSACTIONS_LIVE(); +{% endset %} + +{% if execute %} + {% do run_query(task_query) %} + {% do log("Deployed task TASK_UPDATE_RECENT_HYBRID_TX", info=True) %} + ALTER TASK {{ target.database }}.LIVETABLE.TASK_REFRESH_FACT_TRANSACTION_LIVE_TEST RESUME; + {% do log("Updated task state to resumed for TASK_UPDATE_RECENT_HYBRID_TX", info=True) %} +{% endif %} +{% endmacro %} + diff --git a/models/bronze/bronze__FR_blocks.sql b/models/bronze/bronze__FR_blocks.sql index 755ea77c..a099b8d2 100644 --- a/models/bronze/bronze__FR_blocks.sql +++ b/models/bronze/bronze__FR_blocks.sql @@ -10,10 +10,10 @@ {%- set schema = blockchain ~ "_" ~ network -%} WITH spine AS ( - {{ near_live_table_target_blocks(start_block='_block_height', block_count='row_count') | indent(4) -}} + {{ near_livetable_target_blocks(start_block='_block_height', block_count='row_count') | indent(4) -}} ), raw_blocks AS ( - {{ near_live_table_get_raw_block_data('spine') | indent(4) -}} + {{ near_livetable_get_raw_block_data('spine') | indent(4) -}} ) SELECT diff --git a/models/bronze/bronze__FR_transactions.sql b/models/bronze/bronze__FR_transactions.sql index b6fcda35..ddf4f8aa 100644 --- a/models/bronze/bronze__FR_transactions.sql +++ b/models/bronze/bronze__FR_transactions.sql @@ -3,120 +3,130 @@ tags = ['streamline_helper'] ) }} +{%- set blockchain = this.schema -%} +{%- set network = this.identifier -%} +{%- set schema = blockchain ~ "_" ~ network -%} + {% if var('ENABLE_LIVE_TABLE', false) %} - - {%- set blockchain = this.schema -%} - {%- set network = this.identifier -%} - {%- set schema = blockchain ~ "_" ~ network -%} + {% if var('UDTF_TARGET') == 'gold_core' %} + WITH spine AS ( + {{ near_livetable_target_blocks(start_block='_block_height', block_count='row_count') | indent(4) -}} + ) + SELECT * + FROM {{ source('livetable_bronze', 'transactions') }} btx + JOIN spine sp on sp.block_number = btx.value:block_id + WHERE sp.block_number = 147484199; + + {% else %} + {# bronze_transactions RPC Response flush #} + WITH spine AS ( + {{ near_livetable_target_blocks(start_block='_block_height', block_count='row_count') | indent(4) -}} + ), + raw_blocks AS ( + {{ near_livetable_get_raw_block_data('spine') | indent(4) -}} + ), + block_chunk_hashes AS ( + -- Extract block info and the chunk_hash from each chunk header + SELECT + rb.block_height, + rb.rpc_data_result:header:timestamp::INTEGER AS block_timestamp_int, + ch.value:chunk_hash::STRING AS chunk_hash, + ch.value:shard_id::INTEGER AS shard_id, + ch.value:height_created::INTEGER AS chunk_height_created, + ch.value:height_included::INTEGER AS chunk_height_included + FROM raw_blocks rb, + LATERAL FLATTEN(input => rb.rpc_data_result:chunks) ch + WHERE ch.value:tx_root::STRING <> '11111111111111111111111111111111' + ), + raw_chunk_details AS ( + -- Fetch full chunk details using the chunk_hash + SELECT + bch.block_height, + bch.block_timestamp_int, + bch.shard_id, + bch.chunk_hash, + bch.chunk_height_created, + bch.chunk_height_included, + livetable.lt_chunks_udf_api( + 'POST', + '{Service}', + {'Content-Type' : 'application/json'}, + { + 'jsonrpc' : '2.0', + 'method' : 'chunk', + 'id' : 'Flipside/chunk/' || bch.block_height || '/' || bch.chunk_hash, + 'params': {'chunk_id': bch.chunk_hash} + }, + _utils.UDF_WHOAMI(), + 'Vault/prod/near/quicknode/livetable/mainnet' + ):data:result AS chunk_data + FROM block_chunk_hashes bch + ), + chunk_txs AS ( + -- Flatten the transactions array from the actual chunk_data result + SELECT + rcd.block_height, + rcd.block_timestamp_int, + rcd.shard_id, + rcd.chunk_hash, + rcd.chunk_height_created, + rcd.chunk_height_included, + tx.value:hash::STRING AS tx_hash, + tx.value:signer_id::STRING AS tx_signer + FROM raw_chunk_details rcd, + LATERAL FLATTEN(input => rcd.chunk_data:transactions) tx + ), + transactions AS ( + SELECT + DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS request_timestamp, + tx.block_height, + tx.block_timestamp_int, + tx.tx_hash, + tx.tx_signer, + tx.shard_id, + tx.chunk_hash, + tx.chunk_height_created, + tx.chunk_height_included, + livetable.lt_tx_udf_api( + 'POST', + '{Service}', + {'Content-Type' : 'application/json', 'fsc-compression-mode' : 'auto'}, + { + 'jsonrpc' : '2.0', + 'method' : 'EXPERIMENTAL_tx_status', + 'id' : 'Flipside/EXPERIMENTAL_tx_status/' || request_timestamp || '/' || tx.block_height :: STRING, + 'params' : { + 'tx_hash': tx.tx_hash, + 'sender_account_id': tx.tx_signer, + 'wait_until': 'FINAL' + } + }, + _utils.UDF_WHOAMI(), + 'Vault/prod/near/quicknode/livetable/mainnet' + ):data:result AS tx_result + FROM chunk_txs tx + ) - WITH spine AS ( - {{ near_live_table_target_blocks(start_block='_block_height', block_count='row_count') | indent(4) -}} - ), - raw_blocks AS ( - {{ near_live_table_get_raw_block_data('spine') | indent(4) -}} - ), - block_chunk_hashes AS ( - -- Extract block info and the chunk_hash from each chunk header - SELECT - rb.block_height, - rb.rpc_data_result:header:timestamp::STRING AS block_timestamp_str, - ch.value:chunk_hash::STRING AS chunk_hash, - ch.value:shard_id::INTEGER AS shard_id, - ch.value:height_created::INTEGER AS chunk_height_created, - ch.value:height_included::INTEGER AS chunk_height_included - FROM raw_blocks rb, - LATERAL FLATTEN(input => rb.rpc_data_result:chunks) ch - WHERE ch.value:tx_root::STRING <> '11111111111111111111111111111111' - ), - raw_chunk_details AS ( - -- Fetch full chunk details using the chunk_hash - SELECT - bch.block_height, - bch.block_timestamp_str, - bch.shard_id, - bch.chunk_hash, - bch.chunk_height_created, - bch.chunk_height_included, - live_table.lt_chunks_udf_api( - 'POST', - '{Service}', - {'Content-Type' : 'application/json'}, - { - 'jsonrpc' : '2.0', - 'method' : 'chunk', - 'id' : 'Flipside/chunk/' || bch.block_height || '/' || bch.chunk_hash, - 'params': {'chunk_id': bch.chunk_hash} - }, - _utils.UDF_WHOAMI(), - 'Vault/prod/near/quicknode/mainnet' - ):data:result AS chunk_data - FROM block_chunk_hashes bch - ), - chunk_txs AS ( - -- Flatten the transactions array from the actual chunk_data result SELECT - rcd.block_height, - rcd.block_timestamp_str, - rcd.shard_id, - rcd.chunk_hash, - rcd.chunk_height_created, - rcd.chunk_height_included, - tx.value:hash::STRING AS tx_hash, - tx.value:signer_id::STRING AS tx_signer - FROM raw_chunk_details rcd, - LATERAL FLATTEN(input => rcd.chunk_data:transactions) tx - ), - transactions AS ( - SELECT - DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS request_timestamp, - tx.block_height, - tx.block_timestamp_str, - tx.tx_hash, - tx.tx_signer, - tx.shard_id, - tx.chunk_hash, - tx.chunk_height_created, - tx.chunk_height_included, - live_table.lt_tx_udf_api( - 'POST', - '{Service}', - {'Content-Type' : 'application/json', 'fsc-compression-mode' : 'auto'}, - { - 'jsonrpc' : '2.0', - 'method' : 'EXPERIMENTAL_tx_status', - 'id' : 'Flipside/EXPERIMENTAL_tx_status/' || request_timestamp || '/' || tx.block_height :: STRING, - 'params' : { - 'tx_hash': tx.tx_hash, - 'sender_account_id': tx.tx_signer, - 'wait_until': 'FINAL' - } - }, - _utils.UDF_WHOAMI(), - 'Vault/prod/near/quicknode/mainnet' - ):data:result AS tx_result - FROM chunk_txs tx - ) - - SELECT - tx.tx_result as data, - { - 'FINAL_EXECUTION_STATUS': tx.tx_result:final_execution_status, - 'RECEIPTS': tx.tx_result:receipts, - 'RECEIPTS_OUTCOME': tx.tx_result:receipts_outcome, - 'STATUS': tx.tx_result:status, - 'TRANSACTION': tx.tx_result:transaction, - 'TRANSACTION_OUTCOME': tx.tx_result:transaction_outcome, - 'BLOCK_ID': tx.block_height, - 'BLOCK_TIMESTAMP_EPOCH': DATE_PART('EPOCH_SECOND', TO_TIMESTAMP_NTZ(tx.block_timestamp_str))::INTEGER, - 'SHARD_ID': tx.shard_id, - 'CHUNK_HASH': tx.chunk_hash, - 'HEIGHT_CREATED': tx.chunk_height_created, - 'HEIGHT_INCLUDED': tx.chunk_height_included - } as value, - round(tx.block_height, -3) AS partition_key, - CURRENT_TIMESTAMP() AS _inserted_timestamp - FROM transactions tx - + tx.tx_result as data, + { + 'FINAL_EXECUTION_STATUS': tx.tx_result:final_execution_status, + 'RECEIPTS': tx.tx_result:receipts, + 'RECEIPTS_OUTCOME': tx.tx_result:receipts_outcome, + 'STATUS': tx.tx_result:status, + 'TRANSACTION': tx.tx_result:transaction, + 'TRANSACTION_OUTCOME': tx.tx_result:transaction_outcome, + 'BLOCK_ID': tx.block_height, + 'BLOCK_TIMESTAMP_EPOCH': DATE_PART('EPOCH_SECOND', TO_TIMESTAMP_NTZ(tx.block_timestamp_int))::INTEGER, + 'SHARD_ID': tx.shard_id, + 'CHUNK_HASH': tx.chunk_hash, + 'HEIGHT_CREATED': tx.chunk_height_created, + 'HEIGHT_INCLUDED': tx.chunk_height_included + } as value, + round(tx.block_height, -3) AS partition_key, + CURRENT_TIMESTAMP() AS _inserted_timestamp + FROM transactions tx + {% endif %} {% else %} -- BATCH LOGIC: Default {{ streamline_external_table_FR_query_v2( diff --git a/models/deploy/_compile_sp_macro.sql b/models/deploy/_compile_sp_macro.sql new file mode 100644 index 00000000..c745c23d --- /dev/null +++ b/models/deploy/_compile_sp_macro.sql @@ -0,0 +1 @@ + {{ create_sp_refresh_bronze_transactions_live() }} \ No newline at end of file diff --git a/models/deploy/_compile_task.sql b/models/deploy/_compile_task.sql new file mode 100644 index 00000000..a4ec2273 --- /dev/null +++ b/models/deploy/_compile_task.sql @@ -0,0 +1 @@ +{{ create_fact_tx_sproc_task() }} \ No newline at end of file diff --git a/models/deploy/_dummy_model.sql b/models/deploy/_dummy_model.sql new file mode 100644 index 00000000..ce899d1c --- /dev/null +++ b/models/deploy/_dummy_model.sql @@ -0,0 +1 @@ +SELECT 1 AS id \ No newline at end of file diff --git a/models/deploy/near/live__table.sql b/models/deploy/livetable.sql similarity index 88% rename from models/deploy/near/live__table.sql rename to models/deploy/livetable.sql index 07acce81..04bf935e 100644 --- a/models/deploy/near/live__table.sql +++ b/models/deploy/livetable.sql @@ -11,8 +11,9 @@ -- depends_on: {{ ref('near_models','core__fact_receipts') }} -- depends_on: {{ ref('near_models','silver__receipts_final') }} -- depends_on: {{ ref('near_models', 'core__ez_actions') }} +-- depends_on: {{ ref('near_models', 'core__fact_transactions_live') }} {%- set configs = [ config_near_high_level_abstractions ] -%} -{{- livequery_models.ephemeral_deploy(configs) -}} +{{- near_models.lt_ephemeral_deploy(configs) -}} \ No newline at end of file diff --git a/models/gold/core/live/core__fact_transactions_live.sql b/models/gold/core/live/core__fact_transactions_live.sql new file mode 100644 index 00000000..37c5341a --- /dev/null +++ b/models/gold/core/live/core__fact_transactions_live.sql @@ -0,0 +1,57 @@ +{% docs core_fact_transactions_live %} +Combines final Gold layer transaction data (`core__fact_transactions`) with specific Bronze layer components (`bronze__FR_transactions`) required for downstream processes. + +This model serves as the direct source logic that gets rendered **inside** the `TF_FACT_TRANSACTIONS` User-Defined Table Function (UDTF) via the `livequery_models.get_rendered_model` macro. + +**Output Columns:** +- Includes all standard columns from `core__fact_transactions`. +- Adds the following columns from `bronze__FR_transactions` needed for bronze data reconstruction: + - `data`: The raw VARIANT response from the `EXPERIMENTAL_tx_status` API call. + - `value`: The structured VARIANT containing transaction metadata used in earlier bronze/silver steps. + - `partition_key`: The calculated partition key from the bronze layer. + +**Purpose & Downstream Use:** +The combined output allows the `SP_REFRESH_FACT_TRANSACTIONS_LIVE` stored procedure (which calls the UDTF generated from this model) to: +1. Populate the `CORE_LIVE.FACT_TRANSACTIONS` hybrid table using the Gold columns. +2. Reconstruct and unload the raw bronze transaction RPC data (using `data`, `value`, `partition_key`, etc.) via a `COPY INTO` to the designated S3 stage, matching the structure of the batch-populated bronze external tables. +{% enddocs %} + +{{ config( + materialized = var('LIVE_TABLE_MATERIALIZATION', 'view'), + secure = false, + tags = ['livetable','fact_transactions'] +) }} + +WITH core_tx AS ( + SELECT * FROM {{ ref('core__fact_transactions') }} +), +bronze_tx AS ( + SELECT + btx.* + FROM {{ ref('bronze__FR_transactions') }} btx + QUALIFY ROW_NUMBER() OVER (PARTITION BY btx.data:transaction:hash ORDER BY btx.value:block_id DESC) = 1 +) +SELECT + tx_hash, + block_id, + block_timestamp, + nonce, + signature, + tx_receiver, + tx_signer, + tx, + gas_used, + transaction_fee, + attached_gas, + tx_succeeded, + fact_transactions_id, + inserted_timestamp, + modified_timestamp, + btx.data, + btx.value, + btx.partition_key + +FROM + core_tx ctx +JOIN bronze_tx btx +ON ctx.tx_hash = btx.data:transaction:hash diff --git a/models/overrides/bronze/livetable_bronze_transactions.sql b/models/overrides/bronze/livetable_bronze_transactions.sql new file mode 100644 index 00000000..1b3a9283 --- /dev/null +++ b/models/overrides/bronze/livetable_bronze_transactions.sql @@ -0,0 +1,4 @@ +-- depends_on: {{ ref('near_models','bronze__transactions') }} +-- depends_on: {{ ref('near_models','bronze__FR_transactions') }} + +SELECT * FROM {{ ref('near_models','bronze__FR_transactions')}} diff --git a/models/overrides/livetable_fact_transactions.sql b/models/overrides/livetable_fact_transactions.sql index 61b04319..95dbdfb8 100644 --- a/models/overrides/livetable_fact_transactions.sql +++ b/models/overrides/livetable_fact_transactions.sql @@ -2,5 +2,6 @@ -- depends_on: {{ ref('near_models','bronze__FR_transactions') }} -- depends_on: {{ ref('near_models', 'silver__transactions_v2') }} -- depends_on: {{ ref('near_models', 'silver__transactions_final') }} +-- depends_on: {{ ref('near_models', 'core__fact_transactions') }} -SELECT * FROM {{ ref('near_models','core__fact_transactions')}} +SELECT * FROM {{ ref('near_models','core__fact_transactions_live')}} diff --git a/models/sources.yml b/models/sources.yml index e9e4cd7b..2696d8a9 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -43,3 +43,11 @@ sources: - name: chunks_v2 - name: transactions_v2 - name: nearblocks_ft_metadata + + - name: livetable_bronze + database: | + {{ "NEAR_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "NEAR" }} + schema: bronze_live + tables: + - name: transactions + \ No newline at end of file