Skip to content

Live Table UDTF's #449

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions .github/workflows/dbt_integration_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,32 @@ run-name: ${{ github.event.inputs.branch }}

on:
workflow_dispatch:
inputs:
environment:
required: true
type: string

concurrency: ${{ github.workflow }}

jobs:
prepare_vars:
runs-on: ubuntu-latest
environment:
name: ${{ inputs.environment }}
outputs:
warehouse: ${{ steps.set_outputs.outputs.warehouse }}
steps:
- name: Set warehouse output
id: set_outputs
run: |
echo "warehouse=${{ vars.WAREHOUSE }}" >> $GITHUB_OUTPUT

called_workflow_template:
needs: prepare_vars
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt.yml@main
with:
command: >
dbt test --selector 'integration_tests'
environment: ${{ github.ref == 'refs/heads/main' && 'workflow_prod' || 'workflow_dev' }}
warehouse: ${{ vars.WAREHOUSE }}
environment: ${{ inputs.environment }}
warehouse: ${{ needs.prepare_vars.outputs.warehouse }}
secrets: inherit
18 changes: 17 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,20 @@ decoder_poc:
-m 1+models/streamline/poc/decoder/streamline__decoded_input_events.sql \
--profile near \
--target dev \
--profiles-dir ~/.dbt
--profiles-dir ~/.dbt

rm_logs:
@if [ -d logs ]; then \
rm -r logs 2>/dev/null || echo "Warning: Could not remove logs directory"; \
else \
echo "Logs directory does not exist"; \
fi

# deploy live table udtfs
deploy_live_table_udtfs: rm_logs
dbt run \
-s near_models.deploy.livetable \
--vars '{UPDATE_UDFS_AND_SPS: true, ENABLE_LIVE_TABLE: true, LIVE_TABLE_MATERIALIZATION: ephemeral}' \
--profiles-dir ~/.dbt \
--profile near \
--target dev
6 changes: 6 additions & 0 deletions dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ models:
+on_schema_change: "append_new_columns"
near_models:
+pre-hook: '{{ fsc_utils.set_query_tag() }}'
deploy:
+materialized: ephemeral
livequery_models:
deploy:
core:
Expand Down Expand Up @@ -81,6 +83,10 @@ vars:
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] }}'
API_AWS_ROLE_ARN: '{{ var("config")[target.name]["API_AWS_ROLE_ARN"] if var("config")[target.name] else var("config")["dev"]["API_AWS_ROLE_ARN"] }}'
ROLES: '{{ var("config")[target.name]["ROLES"] }}'

# Livetable config
DROP_UDFS_AND_SPS: false

config:
# The keys correspond to dbt profiles and are case sensitive
dev:
Expand Down
119 changes: 119 additions & 0 deletions macros/livetable/near.yaml.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
{% macro config_near_high_level_abstractions(blockchain, network) -%}
{#
This macro is used to generate the high level abstractions for the Near
blockchain.
#}
{% set schema = blockchain %}

- name: {{ schema }}.udf_get_latest_block_height
signature: []
return_type: INTEGER
sql: |
{{ near_live_table_latest_block_height() | indent(4) -}}

- name: {{ schema }}.lt_tx_udf_api
signature:
- [method, STRING]
- [url, STRING]
- [headers, OBJECT]
- [DATA, VARIANT]
- [user_id, STRING]
- [SECRET, STRING]
return_type: VARIANT
func_type: EXTERNAL
api_integration: '{{ var("API_INTEGRATION") }}'
options: |
NOT NULL
MAX_BATCH_ROWS = 25
sql: udf_api

- name: {{ schema }}.lt_blocks_udf_api
signature:
- [method, STRING]
- [url, STRING]
- [headers, OBJECT]
- [DATA, VARIANT]
- [user_id, STRING]
- [SECRET, STRING]
return_type: VARIANT
func_type: EXTERNAL
api_integration: '{{ var("API_INTEGRATION") }}'
options: |
NOT NULL
MAX_BATCH_ROWS = 25
sql: udf_api

- name: {{ schema }}.lt_chunks_udf_api
signature:
- [method, STRING]
- [url, STRING]
- [headers, OBJECT]
- [DATA, VARIANT]
- [user_id, STRING]
- [SECRET, STRING]
return_type: VARIANT
func_type: EXTERNAL
api_integration: '{{ var("API_INTEGRATION") }}'
options: |
NOT NULL
MAX_BATCH_ROWS = 25
sql: udf_api

- name: {{ schema -}}.tf_fact_blocks
signature:
- [_block_height, INTEGER, The start block height to get the blocks from]
- [row_count, INTEGER, The number of rows to fetch]
return_type:
- "TABLE(block_id NUMBER, block_timestamp TIMESTAMP_NTZ, block_hash STRING, block_author STRING, header OBJECT, block_challenges_result ARRAY, block_challenges_root STRING, chunk_headers_root STRING, chunk_tx_root STRING, chunk_mask ARRAY, chunk_receipts_root STRING, chunks ARRAY, chunks_included NUMBER, epoch_id STRING, epoch_sync_data_hash STRING, gas_price FLOAT, last_ds_final_block STRING, last_final_block STRING, latest_protocol_version INT, next_bp_hash STRING, next_epoch_id STRING, outcome_root STRING, prev_hash STRING, prev_height NUMBER, prev_state_root STRING, random_value STRING, rent_paid FLOAT, signature STRING, total_supply FLOAT, validator_proposals ARRAY, validator_reward FLOAT, fact_blocks_id STRING, inserted_timestamp TIMESTAMP_NTZ, modified_timestamp TIMESTAMP_NTZ)"
options: |
NOT NULL
RETURNS NULL ON NULL INPUT
VOLATILE
COMMENT = $$Returns the block data for a given block height.Fetches blocks for the specified number of blocks $$
sql: |
{{ near_live_table_fact_blocks(schema, blockchain, network) | indent(4) -}}

- name: {{ schema -}}.tf_fact_transactions
signature:
- [_block_height, INTEGER, The start block height to get the transactions from]
- [row_count, INTEGER, The number of rows to fetch]
return_type:
- "TABLE(tx_hash STRING, block_id NUMBER, block_timestamp TIMESTAMP_NTZ, nonce INT, signature STRING, tx_receiver STRING, tx_signer STRING, tx VARIANT, gas_used NUMBER, transaction_fee NUMBER, attached_gas NUMBER, tx_succeeded BOOLEAN, fact_transactions_id STRING, inserted_timestamp TIMESTAMP_NTZ, modified_timestamp TIMESTAMP_NTZ)"
options: |
NOT NULL
RETURNS NULL ON NULL INPUT
VOLATILE
COMMENT = $$Returns transaction details for blocks starting from a given height.Fetches txs for the specified number of blocks.$$
sql: |
{{ near_live_table_fact_transactions(schema, blockchain, network) | indent(4) -}}

- name: {{ schema -}}.tf_fact_receipts
signature:
- [_block_height, INTEGER, The start block height to get the receipts from]
- [row_count, INTEGER, The number of rows to fetch]
return_type:
- "TABLE(block_timestamp TIMESTAMP_NTZ, block_id NUMBER, tx_hash STRING, receipt_id STRING, receipt_outcome_id ARRAY, receiver_id STRING, predecessor_id STRING, actions VARIANT, outcome VARIANT, gas_burnt NUMBER, status_value VARIANT, logs ARRAY, proof ARRAY, metadata VARIANT, receipt_succeeded BOOLEAN, fact_receipts_id STRING, inserted_timestamp TIMESTAMP_NTZ, modified_timestamp TIMESTAMP_NTZ)"
options: |
NOT NULL
RETURNS NULL ON NULL INPUT
VOLATILE
COMMENT = $$Returns receipt details for blocks starting from a given height. Fetches receipts for the specified number of blocks.$$
sql: |
{{ near_live_table_fact_receipts(schema, blockchain, network) | indent(4) -}}

- name: {{ schema -}}.tf_ez_actions
signature:
- [_block_height, INTEGER, The start block height to get the actions from]
- [row_count, INTEGER, The number of rows to fetch]
return_type:
- "TABLE(block_id NUMBER, block_timestamp TIMESTAMP_NTZ, tx_hash STRING, tx_signer BOOLEAN, tx_receiver STRING, tx_gas_used STRING, tx_succeeded NUMBER, tx_fee NUMBER, receipt_id STRING, receipt_receiver_id STRING, receipt_signer_id STRING, receipt_predecessor_id STRING, receipt_succeeded BOOLEAN, receipt_gas_burnt NUMBER, receipt_status_value OBJECT, is_delegated NUMBER, action_index BOOLEAN, action_name STRING, action_data OBJECT, action_gas_price NUMBER, _partition_by_block_number NUMBER, actions_id STRING, inserted_timestamp TIMESTAMP_NTZ, modified_timestamp TIMESTAMP_NTZ, _invocation_id STRING)"
options: |
NOT NULL
RETURNS NULL ON NULL INPUT
VOLATILE
COMMENT = $$Returns decoded action details for blocks starting from a given height. Fetches actions for the specified number of blocks.$$
sql: |
{{ near_live_table_ez_actions(schema, blockchain, network) | indent(4) -}}

{%- endmacro -%}

167 changes: 167 additions & 0 deletions macros/livetable/near_live_table_abstractions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
-- Get Near Chain Head

{% macro near_live_table_latest_block_height() %}
WITH rpc_call AS (
SELECT
DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS request_timestamp,
_live.udf_api(
'POST',
'{Service}',
{'Content-Type' : 'application/json', 'fsc-compression-mode' : 'auto'},
{
'jsonrpc' : '2.0',
'method' : 'block',
'id' : 'Flipside/block/' || request_timestamp,
'params' : {'finality' : 'final'}
},
_utils.UDF_WHOAMI(),
'Vault/prod/near/quicknode/livetable/mainnet'
):data::object AS rpc_result
)
SELECT
rpc_result:result:header:height::INTEGER AS latest_block_height
FROM
rpc_call
{% endmacro %}

{% macro near_live_table_min_max_block_height(start_block, block_count) %}
SELECT
{{ start_block }} AS min_height,
min_height + {{ block_count }} AS max_height,
FROM
dual
{% endmacro %}

-- Get Near Block Data
{% macro near_live_table_target_blocks(start_block, block_count) %}

WITH heights AS (
SELECT
min_height,
max_height,
FROM (
{{- near_live_table_min_max_block_height(start_block=start_block, block_count=block_count) | indent(13) -}}
)
),
block_spine AS (
SELECT
ROW_NUMBER() OVER (
ORDER BY
NULL
) - 1 + h.min_height::integer AS block_number,
FROM
heights h,
TABLE(generator(ROWCOUNT => {{ block_count }} ))
qualify block_number BETWEEN h.min_height AND h.max_height
)
SELECT
block_number as block_height
FROM block_spine
{% endmacro %}

{% macro near_live_table_get_spine(table_name) %}
SELECT
block_height,
ROW_NUMBER() OVER (ORDER BY block_height) - 1 as partition_num
FROM
(
SELECT
row_number() over (order by seq4()) - 1 + COALESCE(block_id, 0)::integer as block_height,
min_height,
max_height

FROM
TABLE(generator(ROWCOUNT => IFF(
COALESCE(to_latest, false),
latest_block_height - min_height + 1,
1
))),
{{ table_name }}
qualify block_height BETWEEN min_height AND max_height
)
{% endmacro %}

{% macro near_live_table_get_raw_block_data(spine) %}
SELECT
block_height,
DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS request_timestamp,
livetable.lt_blocks_udf_api(
'POST',
'{Service}',
{'Content-Type' : 'application/json'},
{
'jsonrpc' : '2.0',
'method' : 'block',
'id' : 'Flipside/getBlock/' || request_timestamp || '/' || block_height :: STRING,
'params':{'block_id': block_height}
},
_utils.UDF_WHOAMI(),
'Vault/prod/near/quicknode/livetable/mainnet'
):data.result AS rpc_data_result
from
{{spine}}

{% endmacro %}

{% macro near_live_table_extract_raw_block_data(raw_blocks) %}
SELECT
block_data:header:height::string as block_id,
TO_TIMESTAMP_NTZ(
block_data :header :timestamp :: STRING
) AS block_timestamp,
block_data:header:hash::STRING as block_hash,
ARRAY_SIZE(block_data:chunks)::NUMBER as tx_count,
block_data:header as header,
block_data:header:challenges_result::ARRAY as block_challenges_result,
block_data:header:challenges_root::STRING as block_challenges_root,
block_data:header:chunk_headers_root::STRING as chunk_headers_root,
block_data:header:chunk_tx_root::STRING as chunk_tx_root,
block_data:header:chunk_mask::ARRAY as chunk_mask,
block_data:header:chunk_receipts_root::STRING as chunk_receipts_root,
block_data:chunks as chunks,
block_data:header:chunks_included::NUMBER as chunks_included,
block_data:header:epoch_id::STRING as epoch_id,
block_data:header:epoch_sync_data_hash::STRING as epoch_sync_data_hash,
block_data:events as events,
block_data:header:gas_price::NUMBER as gas_price,
block_data:header:last_ds_final_block::STRING as last_ds_final_block,
block_data:header:last_final_block::STRING as last_final_block,
block_data:header:latest_protocol_version::NUMBER as latest_protocol_version,
block_data:header:next_bp_hash::STRING as next_bp_hash,
block_data:header:next_epoch_id::STRING as next_epoch_id,
block_data:header:outcome_root::STRING as outcome_root,
block_data:header:prev_hash::STRING as prev_hash,
block_data:header:prev_height::NUMBER as prev_height,
block_data:header:prev_state_root::STRING as prev_state_root,
block_data:header:random_value::STRING as random_value,
block_data:header:rent_paid::FLOAT as rent_paid,
block_data:header:signature::STRING as signature,
block_data:header:total_supply::NUMBER as total_supply,
block_data:header:validator_proposals as validator_proposals,
block_data:header:validator_reward::NUMBER as validator_reward,
MD5(block_data:header:height::STRING) as fact_blocks_id,
SYSDATE() as inserted_timestamp,
SYSDATE() as modified_timestamp
FROM {{raw_blocks}}

{% endmacro %}

{% macro near_live_table_fact_blocks(schema, blockchain, network) %}
{%- set near_live_table_fact_blocks = livequery_models.get_rendered_model('near_models', 'livetable_fact_blocks', schema, blockchain, network) -%}
{{ near_live_table_fact_blocks }}
{% endmacro %}

{% macro near_live_table_fact_transactions(schema, blockchain, network) %}
{%- set near_live_table_fact_transactions = livequery_models.get_rendered_model('near_models', 'livetable_fact_transactions', schema, blockchain, network) -%}
{{ near_live_table_fact_transactions }}
{% endmacro %}

{% macro near_live_table_fact_receipts(schema, blockchain, network) %}
{%- set near_live_table_fact_receipts = livequery_models.get_rendered_model('near_models', 'livetable_fact_receipts', schema, blockchain, network) -%}
{{ near_live_table_fact_receipts }}
{% endmacro %}

{% macro near_live_table_ez_actions(schema, blockchain, network) %}
{%- set near_live_table_ez_actions = livequery_models.get_rendered_model('near_models', 'livetable_ez_actions', schema, blockchain, network) -%}
{{ near_live_table_ez_actions }}
{% endmacro %}
Loading