Skip to content

Stream 1201/livetable sproc ht #456

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 32 commits into
base: STREAM-1089/live-table-udtf
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
838ffd5
dynamic udft params sproc success
ShahNewazKhan Apr 23, 2025
b81b46b
add sproc | finalize max_batch_rows tuning
ShahNewazKhan Apr 24, 2025
189208c
fix blocks header timestamp handoff
ShahNewazKhan Apr 27, 2025
a2b58c9
add sprock task
ShahNewazKhan Apr 27, 2025
30a57c4
fix livetable schema deploy
ShahNewazKhan Apr 28, 2025
60c4b19
add index | fix hybrid table pruning
ShahNewazKhan Apr 30, 2025
218a88a
drop table name prefixes
ShahNewazKhan Apr 30, 2025
f603195
schema scope to livetable
ShahNewazKhan Apr 30, 2025
1cc194d
WIP permissions
ShahNewazKhan Apr 30, 2025
469d2c7
update to livetable specific quicknode ep
ShahNewazKhan May 1, 2025
71007de
add deploy_sf_tasks() to on-run-start
ShahNewazKhan May 2, 2025
8e17144
add _dummpy deploy
ShahNewazKhan May 5, 2025
44eb970
add tf_fact_tx extentions table columns
ShahNewazKhan May 5, 2025
e68ed65
update live_table -> livetable
ShahNewazKhan May 5, 2025
5ee5491
add core__fact_transaction_live with bronze flush extension | live_ta…
ShahNewazKhan May 5, 2025
4767750
clean up fact_tx sporc
ShahNewazKhan May 5, 2025
b14a360
Add docstrings for core__fact_transactions_live extention
ShahNewazKhan May 5, 2025
d9d95a1
Reduce chunks udf_api to 25 MBR | Update tx to lt qn ep
ShahNewazKhan May 5, 2025
93b04b1
RPC response persisted to hybrid table
ShahNewazKhan May 7, 2025
224ef28
add lt_ephemeral_deploy
ShahNewazKhan May 8, 2025
152d129
drop hybrid table index for RPC responses
ShahNewazKhan May 8, 2025
42cabf9
add qualify window for tx_hash upsert
ShahNewazKhan May 8, 2025
e29fc47
dedupe bronze tx_hash
ShahNewazKhan May 8, 2025
2951fd9
extend lt_ephemeral_deploy | fix livetable deployment schema
ShahNewazKhan May 12, 2025
6dd3cd9
tf_bronze_transaction success
ShahNewazKhan May 14, 2025
9ce34dc
sp_bronze_tx success
ShahNewazKhan May 15, 2025
106e000
compile bronze sproc utils
ShahNewazKhan May 15, 2025
5bdaa41
add bronze_tx sf tasks deployers
ShahNewazKhan May 15, 2025
644abdf
remove bronze flush from sp_fact_tx sproc
ShahNewazKhan May 15, 2025
a9fcdc2
add UDF_TARGET evnar | bronze layer materialization success
ShahNewazKhan May 20, 2025
24ad8c8
add bronze livetable source
ShahNewazKhan May 20, 2025
fda0a91
Add livetable SPROC dataflow diagram
ShahNewazKhan May 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,41 @@ rm_logs:
fi

# deploy live table udtfs
deploy_live_table_udtfs: rm_logs
deploy_livetable_udtfs: rm_logs
dbt run \
-s near_models.deploy.near.live__table \
--vars '{UPDATE_UDFS_AND_SPS: true, ENABLE_LIVE_TABLE: true, LIVE_TABLE_MATERIALIZATION: ephemeral}' \
-s near_models.deploy.livetable \
--vars '{UPDATE_UDFS_AND_SPS: true, ENABLE_LIVE_TABLE: true, LIVE_TABLE_MATERIALIZATION: ephemeral, UDTF_TARGET: gold_core}' \
--profiles-dir ~/.dbt \
--profile near \
--target dev

deploy_tx_sproc: rm_logs
dbt run-operation create_sp_refresh_fact_transactions_live \
--profiles-dir ~/.dbt \
--profile near \
--target dev

deploy_tx_task: rm_logs
dbt run-operation create_fact_tx_sproc_task \
--profiles-dir ~/.dbt \
--profile near \
--target dev

compile_sp_macro: rm_logs
dbt compile --select _compile_sp_macro \
--profiles-dir ~/.dbt \
--profile near \
--target dev

compile_task: rm_logs
dbt compile --select _compile_task \
--profiles-dir ~/.dbt \
--profile near \
--target dev

test_deploy_sf_tasks: rm_logs
dbt run --select _dummy_model \
--profiles-dir ~/.dbt \
--profile near \
--target dev

3 changes: 2 additions & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ clean-targets: # directories to be removed by `dbt clean`
on-run-start:
- "{{ create_udfs() }}"
- "{{ create_sps() }}"

- "{{ deploy_sf_tasks() }}"

on-run-end:
- "{{ apply_meta_as_tags(results) }}"

Expand Down
3 changes: 2 additions & 1 deletion macros/create_sps.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{% macro create_sps() %}
{% if target.database == 'NEAR' %}
CREATE schema IF NOT EXISTS _internal;
{{ sp_create_prod_clone('_internal') }};
{{ sp_create_prod_clone('_internal') }};
{% endif %}
{{ create_sp_refresh_fact_transactions_live() }}
{% endmacro %}
53 changes: 53 additions & 0 deletions macros/livetable/assets/overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
## Near Livetable SPROC

### Overview
```
+-----------------+ +---------------------------------------------+ +---------------------------+
| Snowflake Task | | Stored Procedure (SP) | | _live.udf_api |
| (Sched: 5 min) |----->| SP_REFRESH_FACT_TRANSACTIONS_LIVE() | |---------------------------|
| QUERY_TAG: {...}| |---------------------------------------------| | |
+-----------------+ | 1. Check Schema/Table Exists (Idempotent) | | [RPC call to Near Node] |
| - CREATE SCHEMA IF NOT EXISTS ... | | Quicknode Endpoint |
| - CREATE HYBRID TABLE IF NOT EXISTS ... | | |
| (Incl. PRIMARY KEY) | +---------------------------+
| | ^
| 2. Get Chain Head | |
| CALL _live.udf_api()---------------------+------------------>+
| | | |
| | | |
| v | |
| [Near RPC API] <-------------------------|-------------------+
| ^ |
| | Returns Height |
| height <-- _live.udf_api() |
| |
| 3. Calculate Start Block | .-------------------------.
| start_block = height - buffer | | Hybrid Table |
| | | CORE_LIVE. |
| 4. Call UDTF to Fetch Transactions | | FACT_TRANSACTIONS |
| rows <-- | |-------------------------|
| TABLE(livetable.tf_fact_tx(...)) ----+| | - tx_hash (PK) |
| | || | - block_id |
| v UDTF Execution || | - block_timestamp (*) |
| (Requires Owner Permissions) || | - tx_signer |
| (Calls other UDFs for. || | - tx_receiver |
| block/chunk/tx details || | - ... |
| internally) || | - _hybrid_updated_at |
| | mimic's gold fact_tx || `-------------------------'
| +------------------------------|| ^ |
| | | | Step 6: Prune
| 5. MERGE Data into Hybrid Table |--------------->| | DELETE Rows
| rows_merged = SQLROWCOUNT | | | Older than 60m
| | | | (based on block_ts)
| 7. Return Status String | | v
| RETURN 'Fetched...Merged...Deleted...' | | ............
+--------------------|------------------------+ | : Old Data :
| | :..........:
| Status String |
v |
+------------------------+ +------------------------+ |
| Snowflake Task History |<----| Task Record |<------------------------------'
| (Logs Status, Return, | | (Captures SP Return) |
| Duration) | +------------------------+
+------------------------+
```
52 changes: 43 additions & 9 deletions macros/livetable/near.yaml.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
This macro is used to generate the high level abstractions for the Near
blockchain.
#}
{% set schema = blockchain ~ "_" ~ network %}
{% set schema = blockchain %}

- name: {{ schema }}.udf_get_latest_block_height
signature: []
return_type: INTEGER
sql: |
{{ near_live_table_latest_block_height() | indent(4) -}}
{{ near_livetable_latest_block_height() | indent(4) -}}

- name: {{ schema }}.lt_tx_udf_api
signature:
Expand All @@ -24,7 +24,7 @@
api_integration: '{{ var("API_INTEGRATION") }}'
options: |
NOT NULL
MAX_BATCH_ROWS = 25
MAX_BATCH_ROWS = 150
sql: udf_api

- name: {{ schema }}.lt_blocks_udf_api
Expand All @@ -40,7 +40,7 @@
api_integration: '{{ var("API_INTEGRATION") }}'
options: |
NOT NULL
MAX_BATCH_ROWS = 25
MAX_BATCH_ROWS = 120
sql: udf_api

- name: {{ schema }}.lt_chunks_udf_api
Expand Down Expand Up @@ -71,21 +71,35 @@
VOLATILE
COMMENT = $$Returns the block data for a given block height.Fetches blocks for the specified number of blocks $$
sql: |
{{ near_live_table_fact_blocks(schema, blockchain, network) | indent(4) -}}
{{ near_livetable_fact_blocks(schema, blockchain, network) | indent(4) -}}

- name: {{ schema -}}.tf_fact_transactions
signature:
- [_block_height, INTEGER, The start block height to get the transactions from]
- [row_count, INTEGER, The number of rows to fetch]
return_type:
- "TABLE(tx_hash STRING, block_id NUMBER, block_timestamp TIMESTAMP_NTZ, nonce INT, signature STRING, tx_receiver STRING, tx_signer STRING, tx VARIANT, gas_used NUMBER, transaction_fee NUMBER, attached_gas NUMBER, tx_succeeded BOOLEAN, fact_transactions_id STRING, inserted_timestamp TIMESTAMP_NTZ, modified_timestamp TIMESTAMP_NTZ)"
- "TABLE(tx_hash STRING, block_id NUMBER, block_timestamp TIMESTAMP_NTZ, nonce INT, signature STRING, tx_receiver STRING, tx_signer STRING, tx VARIANT, gas_used NUMBER, transaction_fee NUMBER, attached_gas NUMBER, tx_succeeded BOOLEAN, fact_transactions_id STRING, inserted_timestamp TIMESTAMP_NTZ, modified_timestamp TIMESTAMP_NTZ, data VARIANT, value OBJECT, partition_key NUMBER)"
options: |
NOT NULL
RETURNS NULL ON NULL INPUT
VOLATILE
COMMENT = $$Returns transaction details for blocks starting from a given height.Fetches txs for the specified number of blocks.$$
sql: |
{{ near_live_table_fact_transactions(schema, blockchain, network) | indent(4) -}}
{{ near_livetable_fact_transactions(schema, blockchain, network) | indent(4) -}}

- name: {{ schema -}}.tf_bronze_transactions
signature:
- [_block_height, INTEGER, The start block height to get the transactions from]
- [row_count, INTEGER, The number of rows to fetch]
return_type:
- "TABLE(data VARIANT, value OBJECT, partition_key NUMBER(38,0), _inserted_timestamp TIMESTAMP_LTZ(9))"
options: |
NOT NULL
RETURNS NULL ON NULL INPUT
VOLATILE
COMMENT = $$Returns transaction details for blocks starting from a given height.Fetches txs for the specified number of blocks.$$
sql: |
{{ near_livetable_bronze_transactions(schema, blockchain, network) | indent(4) -}}

- name: {{ schema -}}.tf_fact_receipts
signature:
Expand All @@ -99,7 +113,7 @@
VOLATILE
COMMENT = $$Returns receipt details for blocks starting from a given height. Fetches receipts for the specified number of blocks.$$
sql: |
{{ near_live_table_fact_receipts(schema, blockchain, network) | indent(4) -}}
{{ near_livetable_fact_receipts(schema, blockchain, network) | indent(4) -}}

- name: {{ schema -}}.tf_ez_actions
signature:
Expand All @@ -113,7 +127,27 @@
VOLATILE
COMMENT = $$Returns decoded action details for blocks starting from a given height. Fetches actions for the specified number of blocks.$$
sql: |
{{ near_live_table_ez_actions(schema, blockchain, network) | indent(4) -}}
{{ near_livetable_ez_actions(schema, blockchain, network) | indent(4) -}}

{%- endmacro -%}

{% macro lt_ephemeral_deploy(configs) %}
{#
This macro is used to deploy livetable udf/udtfs using ephemeral models.
#}

{% if execute and (var("UPDATE_UDFS_AND_SPS") or var("DROP_UDFS_AND_SPS")) and model.unique_id in selected_resources %}
{% set sql %}
{% for config in configs %}
{{- livequery_models.crud_udfs_by_chain(config, this.schema, network, var("DROP_UDFS_AND_SPS")) -}}
{%- endfor -%}
{%- endset -%}
{%- if var("DROP_UDFS_AND_SPS") -%}
{%- do log("Drop partner udfs: " ~ this.database ~ "." ~ schema, true) -%}
{%- else -%}
{%- do log("Deploy partner udfs: " ~ this.database ~ "." ~ schema, true) -%}
{%- endif -%}
{%- do run_query(sql ~ livequery_models.apply_grants_by_schema(schema)) -%}
{%- endif -%}
SELECT '{{ model.schema }}' as schema_
{%- endmacro -%}
49 changes: 27 additions & 22 deletions macros/livetable/near_live_table_abstractions.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-- Get Near Chain Head

{% macro near_live_table_latest_block_height() %}
{% macro near_livetable_latest_block_height() %}
WITH rpc_call AS (
SELECT
DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS request_timestamp,
Expand All @@ -15,7 +15,7 @@ WITH rpc_call AS (
'params' : {'finality' : 'final'}
},
_utils.UDF_WHOAMI(),
'Vault/prod/near/quicknode/mainnet'
'Vault/prod/near/quicknode/livetable/mainnet'
):data::object AS rpc_result
)
SELECT
Expand All @@ -24,7 +24,7 @@ FROM
rpc_call
{% endmacro %}

{% macro near_live_table_min_max_block_height(start_block, block_count) %}
{% macro near_livetable_min_max_block_height(start_block, block_count) %}
SELECT
{{ start_block }} AS min_height,
min_height + {{ block_count }} AS max_height,
Expand All @@ -33,14 +33,14 @@ FROM
{% endmacro %}

-- Get Near Block Data
{% macro near_live_table_target_blocks(start_block, block_count) %}
{% macro near_livetable_target_blocks(start_block, block_count) %}

WITH heights AS (
SELECT
min_height,
max_height,
FROM (
{{- near_live_table_min_max_block_height(start_block=start_block, block_count=block_count) | indent(13) -}}
{{- near_livetable_min_max_block_height(start_block=start_block, block_count=block_count) | indent(13) -}}
)
),
block_spine AS (
Expand All @@ -59,7 +59,7 @@ FROM
FROM block_spine
{% endmacro %}

{% macro near_live_table_get_spine(table_name) %}
{% macro near_livetable_get_spine(table_name) %}
SELECT
block_height,
ROW_NUMBER() OVER (ORDER BY block_height) - 1 as partition_num
Expand All @@ -81,11 +81,11 @@ FROM
)
{% endmacro %}

{% macro near_live_table_get_raw_block_data(spine) %}
{% macro near_livetable_get_raw_block_data(spine) %}
SELECT
block_height,
DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS request_timestamp,
live_table.lt_blocks_udf_api(
livetable.lt_blocks_udf_api(
'POST',
'{Service}',
{'Content-Type' : 'application/json'},
Expand All @@ -96,14 +96,14 @@ SELECT
'params':{'block_id': block_height}
},
_utils.UDF_WHOAMI(),
'Vault/prod/near/quicknode/mainnet'
'Vault/prod/near/quicknode/livetable/mainnet'
):data.result AS rpc_data_result
from
{{spine}}

{% endmacro %}

{% macro near_live_table_extract_raw_block_data(raw_blocks) %}
{% macro near_livetable_extract_raw_block_data(raw_blocks) %}
SELECT
block_data:header:height::string as block_id,
TO_TIMESTAMP_NTZ(
Expand Down Expand Up @@ -146,22 +146,27 @@ FROM {{raw_blocks}}

{% endmacro %}

{% macro near_live_table_fact_blocks(schema, blockchain, network) %}
{%- set near_live_table_fact_blocks = livequery_models.get_rendered_model('near_models', 'livetable_fact_blocks', schema, blockchain, network) -%}
{{ near_live_table_fact_blocks }}
{% macro near_livetable_fact_blocks(schema, blockchain, network) %}
{%- set near_livetable_fact_blocks = livequery_models.get_rendered_model('near_models', 'livetable_fact_blocks', schema, blockchain, network) -%}
{{ near_livetable_fact_blocks }}
{% endmacro %}

{% macro near_live_table_fact_transactions(schema, blockchain, network) %}
{%- set near_live_table_fact_transactions = livequery_models.get_rendered_model('near_models', 'livetable_fact_transactions', schema, blockchain, network) -%}
{{ near_live_table_fact_transactions }}
{% macro near_livetable_fact_transactions(schema, blockchain, network) %}
{%- set near_livetable_fact_transactions = livequery_models.get_rendered_model('near_models', 'livetable_fact_transactions', schema, blockchain, network) -%}
{{ near_livetable_fact_transactions }}
{% endmacro %}

{% macro near_live_table_fact_receipts(schema, blockchain, network) %}
{%- set near_live_table_fact_receipts = livequery_models.get_rendered_model('near_models', 'livetable_fact_receipts', schema, blockchain, network) -%}
{{ near_live_table_fact_receipts }}
{% macro near_livetable_bronze_transactions(schema, blockchain, network) %}
{%- set near_livetable_bronze_transactions = livequery_models.get_rendered_model('near_models', 'livetable_bronze_transactions', schema, blockchain, network) -%}
{{ near_livetable_bronze_transactions }}
{% endmacro %}

{% macro near_live_table_ez_actions(schema, blockchain, network) %}
{%- set near_live_table_ez_actions = livequery_models.get_rendered_model('near_models', 'livetable_ez_actions', schema, blockchain, network) -%}
{{ near_live_table_ez_actions }}
{% macro near_livetable_fact_receipts(schema, blockchain, network) %}
{%- set near_livetable_fact_receipts = livequery_models.get_rendered_model('near_models', 'livetable_fact_receipts', schema, blockchain, network) -%}
{{ near_livetable_fact_receipts }}
{% endmacro %}

{% macro near_livetable_ez_actions(schema, blockchain, network) %}
{%- set near_livetable_ez_actions = livequery_models.get_rendered_model('near_models', 'livetable_ez_actions', schema, blockchain, network) -%}
{{ near_livetable_ez_actions }}
{% endmacro %}
14 changes: 14 additions & 0 deletions macros/livetable/permissions/udtf_permissions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{% macro lt_udtf_permissions() %}


{% set udtfs = [
'{{ target.database }}.livetable.lt_blocks_udf_api',
'{{ target.database }}.livetable.lt_tx_udf_api',
'{{ target.database }}.livetable.lt_chunks_udf_api',
] %}

{% for udtf in udtfs %}
GRANT USAGE ON FUNCTION {{ udtf }} TO ROLE {{ target.role }};
{% endfor %}

{% endmacro %}
Loading