diff --git a/website/docs/quickstart/flink.md b/website/docs/quickstart/flink.md index 1319d19f6e..ab11316cbe 100644 --- a/website/docs/quickstart/flink.md +++ b/website/docs/quickstart/flink.md @@ -33,8 +33,19 @@ mkdir fluss-quickstart-flink cd fluss-quickstart-flink ``` -2. Create a `docker-compose.yml` file with the following content: +2. Create a `lib` directory and download the required jar files. You can adjust the Flink version as needed. Please make sure to download the compatible versions of [fluss-flink connector jar](/downloads) and [flink-connector-faker](https://github.com/knaufk/flink-faker/releases) +```shell +export FLINK_VERSION="1.20" +``` + +```shell +mkdir lib +wget -O lib/flink-faker-0.5.3.jar https://github.com/knaufk/flink-faker/releases/download/v0.5.3/flink-faker-0.5.3.jar +wget -O "lib/fluss-flink-${FLINK_VERSION}-$FLUSS_DOCKER_VERSION$.jar" "https://repo1.maven.org/maven2/org/apache/fluss/fluss-flink-${FLINK_VERSION}/$FLUSS_DOCKER_VERSION$/fluss-flink-${FLINK_VERSION}-$FLUSS_DOCKER_VERSION$.jar" +``` + +3. Create a `docker-compose.yml` file with the following content: ```yaml services: @@ -69,19 +80,20 @@ services: #end #begin Flink cluster jobmanager: - image: apache/fluss-quickstart-flink:1.20-$FLUSS_DOCKER_VERSION$ + image: flink:${FLINK_VERSION} ports: - "8083:8081" - command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager + entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec /docker-entrypoint.sh jobmanager"] + volumes: + - ./lib:/tmp/lib taskmanager: - image: apache/fluss-quickstart-flink:1.20-$FLUSS_DOCKER_VERSION$ + image: flink:${FLINK_VERSION} depends_on: - jobmanager - command: taskmanager environment: - | FLINK_PROPERTIES= @@ -89,16 +101,27 @@ services: taskmanager.numberOfTaskSlots: 10 taskmanager.memory.process.size: 2048m taskmanager.memory.framework.off-heap.size: 256m + entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec /docker-entrypoint.sh taskmanager"] + volumes: + - ./lib:/tmp/lib + sql-client: + image: flink:${FLINK_VERSION} + depends_on: + - jobmanager + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + rest.address: jobmanager + entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec /docker-entrypoint.sh bin/sql-client.sh"] + volumes: + - ./lib:/tmp/lib #end ``` The Docker Compose environment consists of the following containers: - **Fluss Cluster:** a Fluss `CoordinatorServer`, a Fluss `TabletServer` and a `ZooKeeper` server. -- **Flink Cluster**: a Flink `JobManager` and a Flink `TaskManager` container to execute queries. - -**Note:** The `apache/fluss-quickstart-flink` image is based on [flink:1.20.1-java17](https://hub.docker.com/layers/library/flink/1.20-java17/images/sha256:bf1af6406c4f4ad8faa46efe2b3d0a0bf811d1034849c42c1e3484712bc83505) and -includes the [fluss-flink](engine-flink/getting-started.md) and -[flink-connector-faker](https://flink-packages.org/packages/flink-faker) to simplify this guide. +- **Flink Cluster**: a Flink `JobManager`, a Flink `TaskManager`, and a Flink SQL client container to execute queries. 3. To start all containers, run: ```shell @@ -116,7 +139,6 @@ You can also visit http://localhost:8083/ to see if Flink is running normally. :::note - If you want to additionally use an observability stack, follow one of the provided quickstart guides [here](maintenance/observability/quickstart.md) and then continue with this guide. -- If you want to run with your own Flink environment, remember to download the [fluss-flink connector jar](/downloads), [flink-connector-faker](https://github.com/knaufk/flink-faker/releases) and then put them to `FLINK_HOME/lib/`. - All the following commands involving `docker compose` should be executed in the created working directory that contains the `docker-compose.yml` file. ::: @@ -125,25 +147,70 @@ Congratulations, you are all set! ## Enter into SQL-Client First, use the following command to enter the Flink SQL CLI Container: ```shell -docker compose exec jobmanager ./sql-client +docker compose run sql-client ``` -**Note**: -To simplify this guide, three temporary tables have been pre-created with `faker` connector to generate data. -You can view their schemas by running the following commands: +To simplify this guide, we will create three temporary tables with `faker` connector to generate data: ```sql title="Flink SQL" -SHOW CREATE TABLE source_customer; +CREATE TEMPORARY TABLE source_order ( + `order_key` BIGINT, + `cust_key` INT, + `total_price` DECIMAL(15, 2), + `order_date` DATE, + `order_priority` STRING, + `clerk` STRING +) WITH ( + 'connector' = 'faker', + 'rows-per-second' = '10', + 'number-of-rows' = '10000', + 'fields.order_key.expression' = '#{number.numberBetween ''0'',''100000000''}', + 'fields.cust_key.expression' = '#{number.numberBetween ''0'',''20''}', + 'fields.total_price.expression' = '#{number.randomDouble ''3'',''1'',''1000''}', + 'fields.order_date.expression' = '#{date.past ''100'' ''DAYS''}', + 'fields.order_priority.expression' = '#{regexify ''(low|medium|high){1}''}', + 'fields.clerk.expression' = '#{regexify ''(Clerk1|Clerk2|Clerk3|Clerk4){1}''}' +); ``` ```sql title="Flink SQL" -SHOW CREATE TABLE source_order; +CREATE TEMPORARY TABLE source_customer ( + `cust_key` INT, + `name` STRING, + `phone` STRING, + `nation_key` INT NOT NULL, + `acctbal` DECIMAL(15, 2), + `mktsegment` STRING, + PRIMARY KEY (`cust_key`) NOT ENFORCED +) WITH ( + 'connector' = 'faker', + 'number-of-rows' = '200', + 'fields.cust_key.expression' = '#{number.numberBetween ''0'',''20''}', + 'fields.name.expression' = '#{funnyName.name}', + 'fields.nation_key.expression' = '#{number.numberBetween ''1'',''5''}', + 'fields.phone.expression' = '#{phoneNumber.cellPhone}', + 'fields.acctbal.expression' = '#{number.randomDouble ''3'',''1'',''1000''}', + 'fields.mktsegment.expression' = '#{regexify ''(AUTOMOBILE|BUILDING|FURNITURE|MACHINERY|HOUSEHOLD){1}''}' +); ``` ```sql title="Flink SQL" -SHOW CREATE TABLE source_nation; +CREATE TEMPORARY TABLE `source_nation` ( + `nation_key` INT NOT NULL, + `name` STRING, + PRIMARY KEY (`nation_key`) NOT ENFORCED +) WITH ( + 'connector' = 'faker', + 'number-of-rows' = '100', + 'fields.nation_key.expression' = '#{number.numberBetween ''1'',''5''}', + 'fields.name.expression' = '#{regexify ''(CANADA|JORDAN|CHINA|UNITED|INDIA){1}''}' +); ``` +```sql title="Flink SQL" +-- drop records silently if a null value would have to be inserted into a NOT NULL column +SET 'table.exec.sink.not-null-enforcer'='DROP'; +``` ## Create Fluss Tables ### Create Fluss Catalog diff --git a/website/docs/quickstart/lakehouse.md b/website/docs/quickstart/lakehouse.md index 3dcd074bf2..f2b8a7fc12 100644 --- a/website/docs/quickstart/lakehouse.md +++ b/website/docs/quickstart/lakehouse.md @@ -332,6 +332,10 @@ For further information how to store catalog configurations, see [Flink's Catalo ::: ### Create Tables + + + + Running the following SQL to create Fluss tables to be used in this guide: ```sql title="Flink SQL" CREATE TABLE fluss_order ( @@ -366,6 +370,46 @@ CREATE TABLE fluss_nation ( ); ``` + + + + + +Running the following SQL to create Fluss tables to be used in this guide: +```sql title="Flink SQL" +CREATE TABLE fluss_order ( + `order_key` BIGINT, + `cust_key` INT NOT NULL, + `total_price` DECIMAL(15, 2), + `order_date` DATE, + `order_priority` STRING, + `clerk` STRING, + `ptime` AS PROCTIME() +); +``` + +```sql title="Flink SQL" +CREATE TABLE fluss_customer ( + `cust_key` INT NOT NULL, + `name` STRING, + `phone` STRING, + `nation_key` INT NOT NULL, + `acctbal` DECIMAL(15, 2), + `mktsegment` STRING, + PRIMARY KEY (`cust_key`) NOT ENFORCED +); +``` + +```sql title="Flink SQL" +CREATE TABLE fluss_nation ( + `nation_key` INT NOT NULL, + `name` STRING, + PRIMARY KEY (`nation_key`) NOT ENFORCED +); +``` + + + ## Streaming into Fluss First, run the following SQL to sync data from source tables to Fluss tables: @@ -520,13 +564,10 @@ SELECT o.order_key, c.acctbal, c.mktsegment, n.name -FROM ( - SELECT *, PROCTIME() as ptime - FROM `default_catalog`.`default_database`.source_order -) o -LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF o.ptime AS c +FROM fluss_order o +LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c` ON o.cust_key = c.cust_key -LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF o.ptime AS n +LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF `o`.`ptime` AS `n` ON c.nation_key = n.nation_key; ``` @@ -714,4 +755,4 @@ After finishing the tutorial, run `exit` to exit Flink SQL CLI Container and the ```shell docker compose down -v ``` -to stop all containers. +to stop all containers. \ No newline at end of file