Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions meta/src/meta/codegen_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class BuiltinTemplate:
"make_empty_bytes": BuiltinTemplate('b""'),
"dict_from_list": BuiltinTemplate("dict({0})"),
"dict_get": BuiltinTemplate("{0}.get({1})"),
"dict_to_pairs": BuiltinTemplate("sorted({0}.items())"),
"has_proto_field": BuiltinTemplate("{0}.HasField({1})"),
"string_to_upper": BuiltinTemplate("{0}.upper()"),
"string_in_list": BuiltinTemplate("{0} in {1}"),
Expand Down Expand Up @@ -134,6 +135,7 @@ class BuiltinTemplate:
"make_empty_bytes": BuiltinTemplate("UInt8[]"),
"dict_from_list": BuiltinTemplate("Dict({0})"),
"dict_get": BuiltinTemplate("get({0}, {1}, nothing)"),
"dict_to_pairs": BuiltinTemplate("sort([(k, v) for (k, v) in {0}])"),
"has_proto_field": BuiltinTemplate("_has_proto_field({0}, Symbol({1}))"),
"string_to_upper": BuiltinTemplate("uppercase({0})"),
"string_in_list": BuiltinTemplate("({0} in {1})"),
Expand Down Expand Up @@ -231,6 +233,7 @@ class BuiltinTemplate:
"make_empty_bytes": BuiltinTemplate("[]byte{}"),
"dict_from_list": BuiltinTemplate("dictFromList({0})"),
"dict_get": BuiltinTemplate("dictGetValue({0}, {1})"),
"dict_to_pairs": BuiltinTemplate("dictToPairs({0})"),
"has_proto_field": BuiltinTemplate("hasProtoField({0}, {1})"),
"string_to_upper": BuiltinTemplate("strings.ToUpper({0})"),
"string_in_list": BuiltinTemplate("stringInList({0}, {1})"),
Expand Down
78 changes: 78 additions & 0 deletions meta/src/meta/grammar.y
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@
%nonterm csv_locator_paths Sequence[String]
%nonterm csvlocator logic.CSVLocator
%nonterm data logic.Data
%nonterm iceberg_config logic.IcebergConfig
%nonterm iceberg_config_credentials Sequence[Tuple[String, String]]
%nonterm iceberg_config_properties Sequence[Tuple[String, String]]
%nonterm iceberg_config_scope String
%nonterm iceberg_data logic.IcebergData
%nonterm iceberg_kv_pair Tuple[String, String]
%nonterm iceberg_locator logic.IcebergLocator
%nonterm iceberg_locator_namespace Sequence[String]
%nonterm iceberg_to_snapshot String
%nonterm date logic.DateValue
%nonterm date_type logic.DateType
%nonterm datetime logic.DateTimeValue
Expand Down Expand Up @@ -957,6 +966,10 @@ data
construct: $$ = logic.Data(csv_data=$1)
deconstruct if builtin.has_proto_field($$, 'csv_data'):
$1: logic.CSVData = $$.csv_data
| iceberg_data
construct: $$ = logic.Data(iceberg_data=$1)
deconstruct if builtin.has_proto_field($$, 'iceberg_data'):
$1: logic.IcebergData = $$.iceberg_data

edb_path
: "[" STRING* "]"
Expand Down Expand Up @@ -1026,6 +1039,54 @@ csv_config
construct: $$ = construct_csv_config($3)
deconstruct: $3: Sequence[Tuple[String, logic.Value]] = deconstruct_csv_config($$)

iceberg_data
: "(" "iceberg_data" iceberg_locator iceberg_config gnf_columns iceberg_to_snapshot? ")"
construct: $$ = logic.IcebergData(locator=$3, config=$4, columns=$5, to_snapshot=$6)
deconstruct:
$3: logic.IcebergLocator = $$.locator
$4: logic.IcebergConfig = $$.config
$5: Sequence[logic.GNFColumn] = $$.columns
$6: Optional[String] = $$.to_snapshot

iceberg_locator_namespace
: "(" "namespace" STRING* ")"

iceberg_locator
: "(" "iceberg_locator" STRING iceberg_locator_namespace STRING ")"
construct: $$ = logic.IcebergLocator(table_name=$3, namespace=$4, warehouse=$5)
deconstruct:
$3: String = $$.table_name
$4: Sequence[String] = $$.namespace
$5: String = $$.warehouse

iceberg_config
: "(" "iceberg_config" STRING iceberg_config_scope? iceberg_config_properties? iceberg_config_credentials? ")"
construct: $$ = construct_iceberg_config($3, $4, $5, $6)
deconstruct:
$3: String = $$.catalog_uri
$4: Optional[String] = $$.scope if $$.scope != "" else None
$5: Optional[Sequence[Tuple[String, String]]] = builtin.dict_to_pairs($$.properties) if not builtin.is_empty(builtin.dict_to_pairs($$.properties)) else None
$6: Optional[Sequence[Tuple[String, String]]] = builtin.none()

iceberg_config_scope
: "(" "scope" STRING ")"

iceberg_config_properties
: "(" "properties" iceberg_kv_pair* ")"

iceberg_config_credentials
: "(" "credentials" iceberg_kv_pair* ")"

iceberg_kv_pair
: "(" STRING STRING ")"
construct: $$ = builtin.tuple($2, $3)
deconstruct:
$2: String = $$[0]
$3: String = $$[1]

iceberg_to_snapshot
: "(" "to_snapshot" STRING ")"

gnf_column_path
: STRING
construct: $$ = [$1]
Expand Down Expand Up @@ -1439,6 +1500,23 @@ def deconstruct_csv_config(msg: logic.CSVConfig) -> List[Tuple[String, logic.Val
return builtin.list_sort(result)


def construct_iceberg_config(
catalog_uri: String,
scope: Optional[String],
properties: Optional[Sequence[Tuple[String, String]]],
credentials: Optional[Sequence[Tuple[String, String]]],
) -> logic.IcebergConfig:
props: Dict[String, String] = builtin.dict_from_list(builtin.unwrap_option_or(properties, list[Tuple[String, String]]()))
creds: Dict[String, String] = builtin.dict_from_list(builtin.unwrap_option_or(credentials, list[Tuple[String, String]]()))
return logic.IcebergConfig(
catalog_uri=catalog_uri,
scope=builtin.unwrap_option_or(scope, ""),
properties=props,
credentials=creds,
)




def deconstruct_betree_info_config(msg: logic.BeTreeInfo) -> List[Tuple[String, logic.Value]]:
result: List[Tuple[String, logic.Value]] = list[Tuple[String, logic.Value]]()
Expand Down
3 changes: 3 additions & 0 deletions meta/src/meta/proto_ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class ProtoField:
number: int
is_repeated: bool = False
is_optional: bool = False
is_map: bool = False
map_key_type: str = ""
map_value_type: str = ""


@dataclass
Expand Down
24 changes: 24 additions & 0 deletions meta/src/meta/proto_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
_NESTED_ENUM_PATTERN = re.compile(r"enum\s+(\w+)\s*\{([^}]+)\}")
_ONEOF_PATTERN = re.compile(r"oneof\s+(\w+)\s*\{((?:[^{}]|\{[^}]*\})*)\}")
_FIELD_PATTERN = re.compile(r"(repeated|optional)?\s*(\w+)\s+(\w+)\s*=\s*(\d+);")
_MAP_FIELD_PATTERN = re.compile(r"map<\s*(\w+)\s*,\s*(\w+)\s*>\s+(\w+)\s*=\s*(\d+);")
_ONEOF_FIELD_PATTERN = re.compile(r"(\w+)\s+(\w+)\s*=\s*(\d+);")
_ENUM_VALUE_PATTERN = re.compile(r"(\w+)\s*=\s*(\d+);")
_RESERVED_PATTERN = re.compile(r"reserved\s+([^;]+);")
Expand Down Expand Up @@ -193,6 +194,29 @@ def _parse_message(self, name: str, body: str) -> ProtoMessage:
)
message.fields.append(proto_field)

# Parse map fields
for match in _MAP_FIELD_PATTERN.finditer(body):
if any(
start <= match.start() and match.end() <= end
for start, end in excluded_spans
):
continue

key_type = match.group(1)
value_type = match.group(2)
field_name = match.group(3)
field_number = int(match.group(4))

proto_field = ProtoField(
name=field_name,
type=f"map<{key_type},{value_type}>",
number=field_number,
is_map=True,
map_key_type=key_type,
map_value_type=value_type,
)
message.fields.append(proto_field)

# Parse nested enums
for match in _NESTED_ENUM_PATTERN.finditer(body):
enum_name = match.group(1)
Expand Down
1 change: 1 addition & 0 deletions meta/src/meta/target_builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ def is_builtin(name: str) -> bool:
# === Dict operations ===
register_builtin("dict_from_list", [SequenceType(TupleType([K, V]))], DictType(K, V))
register_builtin("dict_get", [DictType(K, V), K], OptionType(V))
register_builtin("dict_to_pairs", [DictType(K, V)], ListType(TupleType([K, V])))

# === Protobuf operations ===
register_builtin("has_proto_field", [T, STRING], BOOLEAN) # msg.HasField(field_name)
Expand Down
14 changes: 14 additions & 0 deletions meta/src/meta/type_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .proto_parser import ProtoParser
from .target import (
BaseType,
DictType,
FunctionType,
MessageType,
OptionType,
Expand All @@ -27,6 +28,13 @@
}


def _scalar_to_target(type_name: str) -> TargetType:
"""Convert a scalar proto type name to a TargetType."""
if type_name in _PRIMITIVE_TO_BASE_TYPE:
return BaseType(_PRIMITIVE_TO_BASE_TYPE[type_name])
raise ValueError(f"Unknown scalar proto type for map: {type_name}")


class TypeEnv:
"""Type environment for validating grammar expressions.

Expand Down Expand Up @@ -87,6 +95,12 @@ def _is_enum_type(self, type_name: str) -> bool:

def _proto_type_to_target(self, proto_field: ProtoField) -> TargetType:
"""Convert a protobuf field to its target type."""
# Handle map fields
if proto_field.is_map:
key_type = _scalar_to_target(proto_field.map_key_type)
value_type = _scalar_to_target(proto_field.map_value_type)
return DictType(key_type, value_type)

# Get base type
base_type: TargetType
if proto_field.type in _PRIMITIVE_TO_BASE_TYPE:
Expand Down
18 changes: 14 additions & 4 deletions meta/src/meta/yacc_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
)
from .target import (
BaseType,
DictType,
ListType,
MessageType,
OptionType,
Expand Down Expand Up @@ -739,10 +740,19 @@ def _make_field_type_lookup(

for (module, msg_name), proto_msg in proto_messages.items():
for field in proto_msg.fields:
field_type = _proto_type_to_target_type(
field.type, field.is_repeated, field.is_optional, name_to_module
)
field_types[(module, msg_name, field.name)] = field_type
if field.is_map:
key_type = _proto_type_to_target_type(
field.map_key_type, False, name_to_module=name_to_module
)
value_type = _proto_type_to_target_type(
field.map_value_type, False, name_to_module=name_to_module
)
field_types[(module, msg_name, field.name)] = DictType(key_type, value_type)
else:
field_type = _proto_type_to_target_type(
field.type, field.is_repeated, field.is_optional, name_to_module
)
field_types[(module, msg_name, field.name)] = field_type

# Also add oneof fields
for oneof in proto_msg.oneofs:
Expand Down
24 changes: 22 additions & 2 deletions proto/relationalai/lqp/v1/logic.proto
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,14 @@ message Attribute {
}

//
// Input data (base relations, CSVs)
// Input data (base relations, CSVs, Iceberg)
//
message Data {
oneof data_type {
EDB edb = 1;
BeTreeRelation betree_relation = 2;
CSVData csv_data = 3;
// IcebergData iceberg_data = 4;
IcebergData iceberg_data = 4;
}
}

Expand Down Expand Up @@ -314,6 +314,26 @@ message CSVConfig {
int64 partition_size_mb = 12;
}

message IcebergData {
IcebergLocator locator = 1;
IcebergConfig config = 2;
repeated GNFColumn columns = 3;
optional string to_snapshot = 4;
}

message IcebergLocator {
string table_name = 1;
repeated string namespace = 2;
string warehouse = 3;
}

message IcebergConfig {
string catalog_uri = 1;
optional string scope = 2;
map<string, string> properties = 3;
map<string, string> credentials = 4;
}

message GNFColumn {
repeated string column_path = 1; // Column identifier path (was: string column_name)
optional RelationId target_id = 2; // Target relation (now explicit optional)
Expand Down
Loading
Loading