Skip to content

Commit a9a125e

Browse files
Christophe Di Primacpcloud
Christophe Di Prima
authored andcommitted
feat(risingwave): add support for includes in create_source
1 parent 5366d57 commit a9a125e

File tree

1 file changed

+19
-4
lines changed

1 file changed

+19
-4
lines changed

ibis/backends/risingwave/__init__.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,7 @@ def create_source(
744744
data_format: str,
745745
encode_format: str,
746746
encode_properties: dict | None = None,
747+
includes: dict[str, str] | None = None,
747748
) -> ir.Table:
748749
"""Creating a source.
749750
@@ -764,21 +765,35 @@ def create_source(
764765
The encode format for the new source, e.g., "JSON". data_format and encode_format must be specified at the same time.
765766
encode_properties
766767
The properties of encode format, providing information like schema registry url. Refer https://docs.risingwave.com/docs/current/sql-create-source/ for more details.
768+
includes
769+
A dict of `INCLUDE` clauses of the form `{field: alias, ...}`.
770+
Set value(s) to `None` if no alias is needed. Refer to https://docs.risingwave.com/docs/current/sql-create-source/ for more details.
767771
768772
Returns
769773
-------
770774
Table
771775
Table expression
772776
"""
773-
table = sg.table(name, db=database, quoted=self.compiler.quoted)
777+
quoted = self.compiler.quoted
778+
table = sg.table(name, db=database, quoted=quoted)
774779
target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect))
775780

781+
expressions = [
782+
sge.IncludeProperty(
783+
this=sg.to_identifier(include_type),
784+
alias=sg.to_identifier(column_name, quoted=quoted)
785+
if column_name
786+
else None,
787+
)
788+
for include_type, column_name in (includes or {}).items()
789+
]
790+
791+
expressions.extend(sge.Properties.from_dict(connector_properties))
792+
776793
create_stmt = sge.Create(
777794
kind="SOURCE",
778795
this=target,
779-
properties=sge.Properties(
780-
expressions=sge.Properties.from_dict(connector_properties)
781-
),
796+
properties=sge.Properties(expressions=expressions),
782797
)
783798

784799
create_stmt = create_stmt.sql(self.dialect) + data_and_encode_format(

0 commit comments

Comments
 (0)