Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ temporal:
Converts a timestamp string to a TIMESTAMP_LTZ.

- string1: the timestamp string to parse
- string2: the format pattern (default 'yyyy-MM-dd HH:mm:ss'). The pattern follows Java's DateTimeFormatter syntax, where 'S' represents fractional seconds (e.g., 'SSS' for milliseconds, 'SSSSSSSSS' for nanoseconds).
- string2: the format pattern (default 'yyyy-MM-dd HH:mm:ss'). The pattern follows Java's [DateTimeFormatter](https://github.com/openjdk/jdk/blob/jdk-11%2B28/src/java.base/share/classes/java/time/format/DateTimeFormatter.java) syntax, where 'S' represents fractional seconds (e.g., 'SSS' for milliseconds, 'SSSSSSSSS' for nanoseconds).
- string3: the time zone of the input string (default 'UTC'). Supports zone IDs such as 'UTC', 'Asia/Shanghai', or 'America/Los_Angeles'.

The output precision is inferred from the number of 'S' characters in the format pattern, with a minimum of 3. E.g., format 'yyyy-MM-dd HH:mm:ss.SS' returns TIMESTAMP_LTZ(3), format 'yyyy-MM-dd HH:mm:ss.SSSSSS' returns TIMESTAMP_LTZ(6).
Expand All @@ -722,7 +722,19 @@ temporal:
TO_TIMESTAMP_LTZ('2023-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss', 'Asia/Shanghai') parses in Shanghai time zone.
- sql: TO_TIMESTAMP(string1[, string2])
table: toTimestamp(STRING1[, STRING2])
description: "Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone."
description: |
Converts a datetime string to a TIMESTAMP without time zone.

- string1: the datetime string to parse. Returns NULL if the string cannot be parsed.
- string2: the format pattern (default 'yyyy-MM-dd HH:mm:ss'). The pattern follows Java's [DateTimeFormatter](https://github.com/openjdk/jdk/blob/jdk-11%2B28/src/java.base/share/classes/java/time/format/DateTimeFormatter.java) syntax, where 'S' represents fractional seconds (e.g., 'SSSSS' for 5-digit fractional seconds, 'SSSSSSS' for 7-digit, 'SSSSSSSSS' for nanoseconds).

The output type is precision-aware (TIMESTAMP(3) through TIMESTAMP(9)):
- 1-arg variant: always returns TIMESTAMP(3).
- 2-arg variant: precision is inferred from the number of trailing 'S' characters in the format pattern (0-9), with a minimum of 3. E.g., format 'yyyy-MM-dd HH:mm:ss.SS' returns TIMESTAMP(3), format 'yyyy-MM-dd HH:mm:ss.SSSSS' returns TIMESTAMP(5), format 'yyyy-MM-dd HH:mm:ss.SSSSSSS' returns TIMESTAMP(7).

E.g., TO_TIMESTAMP('2023-01-01 00:00:00') parses using default format and returns TIMESTAMP(3),
TO_TIMESTAMP('2023-01-01 12:30:00.12345', 'yyyy-MM-dd HH:mm:ss.SSSSS') returns TIMESTAMP(5),
TO_TIMESTAMP('2023-01-01 12:30:00.1234567', 'yyyy-MM-dd HH:mm:ss.SSSSSSS') returns TIMESTAMP(7).
- sql: CURRENT_WATERMARK(rowtime)
description: |
Returns the current watermark for the given rowtime attribute, or `NULL` if no common watermark of all upstream operations is available at the current operation in the pipeline.
Expand Down
16 changes: 14 additions & 2 deletions docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ temporal:
Converts a timestamp string to a TIMESTAMP_LTZ.

- string1: the timestamp string to parse
- string2: the format pattern (default 'yyyy-MM-dd HH:mm:ss'). The pattern follows Java's DateTimeFormatter syntax, where 'S' represents fractional seconds (e.g., 'SSS' for milliseconds, 'SSSSSSSSS' for nanoseconds).
- string2: the format pattern (default 'yyyy-MM-dd HH:mm:ss'). The pattern follows Java's [DateTimeFormatter](https://github.com/openjdk/jdk/blob/jdk-11%2B28/src/java.base/share/classes/java/time/format/DateTimeFormatter.java) syntax, where 'S' represents fractional seconds (e.g., 'SSS' for milliseconds, 'SSSSSSSSS' for nanoseconds).
- string3: the time zone of the input string (default 'UTC'). Supports zone IDs such as 'UTC', 'Asia/Shanghai', or 'America/Los_Angeles'.

The output precision is inferred from the number of 'S' characters in the format pattern, with a minimum of 3. E.g., format 'yyyy-MM-dd HH:mm:ss.SS' returns TIMESTAMP_LTZ(3), format 'yyyy-MM-dd HH:mm:ss.SSSSSS' returns TIMESTAMP_LTZ(6).
Expand All @@ -848,7 +848,19 @@ temporal:
TO_TIMESTAMP_LTZ('2023-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss', 'Asia/Shanghai') parses in Shanghai time zone.
- sql: TO_TIMESTAMP(string1[, string2])
table: toTimestamp(STRING1[, STRING2])
description: 将格式为 string2(默认为:'yyyy-MM-dd HH:mm:ss')的字符串 string1 转换为 timestamp,不带时区。
description: |
Converts a datetime string to a TIMESTAMP without time zone.

- string1: the datetime string to parse. Returns NULL if the string cannot be parsed.
- string2: the format pattern (default 'yyyy-MM-dd HH:mm:ss'). The pattern follows Java's [DateTimeFormatter](https://github.com/openjdk/jdk/blob/jdk-11%2B28/src/java.base/share/classes/java/time/format/DateTimeFormatter.java) syntax, where 'S' represents fractional seconds (e.g., 'SSSSS' for 5-digit fractional seconds, 'SSSSSSS' for 7-digit, 'SSSSSSSSS' for nanoseconds).

The output type is precision-aware (TIMESTAMP(3) through TIMESTAMP(9)):
- 1-arg variant: always returns TIMESTAMP(3).
- 2-arg variant: precision is inferred from the number of trailing 'S' characters in the format pattern (0-9), with a minimum of 3. E.g., format 'yyyy-MM-dd HH:mm:ss.SS' returns TIMESTAMP(3), format 'yyyy-MM-dd HH:mm:ss.SSSSS' returns TIMESTAMP(5), format 'yyyy-MM-dd HH:mm:ss.SSSSSSS' returns TIMESTAMP(7).

E.g., TO_TIMESTAMP('2023-01-01 00:00:00') parses using default format and returns TIMESTAMP(3),
TO_TIMESTAMP('2023-01-01 12:30:00.12345', 'yyyy-MM-dd HH:mm:ss.SSSSS') returns TIMESTAMP(5),
TO_TIMESTAMP('2023-01-01 12:30:00.1234567', 'yyyy-MM-dd HH:mm:ss.SSSSSSS') returns TIMESTAMP(7).
- sql: CURRENT_WATERMARK(rowtime)
description: |
返回给定时间列属性 rowtime 的当前水印,如果管道中的当前操作没有可用的上游操作的公共水印时则为 `NULL`。
Expand Down
5 changes: 4 additions & 1 deletion flink-python/pyflink/table/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -1600,7 +1600,10 @@ def to_time(self) -> 'Expression':
def to_timestamp(self) -> 'Expression':
"""
Parses a timestamp string in the form "yyyy-MM-dd HH:mm:ss[.SSS]" to a SQL Timestamp.
It's equivalent to `col.cast(DataTypes.TIMESTAMP(3))`.
Returns TIMESTAMP(3).

For precision-aware parsing, use :func:`~pyflink.table.expressions.to_timestamp`
with a format pattern instead.

Example:
::
Expand Down
22 changes: 16 additions & 6 deletions flink-python/pyflink/table/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,22 @@ def to_date(date_str: Union[str, Expression[str]],
def to_timestamp(timestamp_str: Union[str, Expression[str]],
format: Union[str, Expression[str]] = None) -> Expression:
"""
Converts the date time string with the given format (by default: 'yyyy-MM-dd HH:mm:ss')
under the 'UTC+0' time zone to a timestamp.

:param timestamp_str: The date time string
:param format: The format of the string
:return: The date value with TIMESTAMP type.
Converts a datetime string to a TIMESTAMP without time zone.

The output type is precision-aware (TIMESTAMP(3) through TIMESTAMP(9)):

- 1-arg variant: always returns TIMESTAMP(3).
- 2-arg variant: precision is inferred from the number of trailing 'S' characters
in the format pattern (0-9), with a minimum of 3. E.g., format
'yyyy-MM-dd HH:mm:ss.SSSSS' returns TIMESTAMP(5), format
'yyyy-MM-dd HH:mm:ss.SSSSSSS' returns TIMESTAMP(7).

:param timestamp_str: The datetime string to parse. Returns NULL if the string cannot be parsed.
:param format: The format pattern (default 'yyyy-MM-dd HH:mm:ss'). The pattern follows
Java's `DateTimeFormatter
<https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/time/format/DateTimeFormatter.html>`_
syntax, where 'S' represents fractional seconds.
:return: The timestamp value with TIMESTAMP type.
"""
if format is None:
return _unary_op("toTimestamp", timestamp_str)
Expand Down
28 changes: 18 additions & 10 deletions flink-python/pyflink/table/tests/test_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,23 @@
import unittest

from pyflink.table import DataTypes
from pyflink.table.expression import TimeIntervalUnit, TimePointUnit, JsonExistsOnError, \
from pyflink.table.expression import TimeIntervalUnit, TimePointUnit, \
JsonExistsOnError, \
JsonValueOnEmptyOrError, JsonType, JsonQueryWrapper, JsonQueryOnEmptyOrError
from pyflink.table.expressions import (col, lit, range_, and_, or_, current_date,
current_time, current_timestamp, current_database,
local_timestamp, local_time, temporal_overlaps, date_format,
timestamp_diff, array, row, map_, row_interval, pi, e,
rand, rand_integer, atan2, negative, concat, concat_ws, uuid,
null_of, log, if_then_else, with_columns, call,
to_timestamp_ltz, from_unixtime, to_date, to_timestamp,
from pyflink.table.expressions import (col, lit, range_, and_, or_,
current_date,
current_time, current_timestamp,
current_database,
local_timestamp, local_time,
temporal_overlaps, date_format,
timestamp_diff, array, row, map_,
row_interval, pi, e,
rand, rand_integer, atan2, negative,
concat, concat_ws, uuid,
null_of, log, if_then_else, with_columns,
call,
to_timestamp_ltz, from_unixtime, to_date,
to_timestamp,
convert_tz, unix_timestamp)
from pyflink.testing.test_case_utils import PyFlinkTestCase

Expand Down Expand Up @@ -301,9 +309,9 @@ def test_expressions(self):
str(to_timestamp_ltz(expr1, "MM/dd/yyyy HH:mm:ss")))
self.assertEqual("TO_TIMESTAMP_LTZ(a, 'MM/dd/yyyy HH:mm:ss', 'UTC')",
str(to_timestamp_ltz(expr1, "MM/dd/yyyy HH:mm:ss", "UTC")))
self.assertEqual("toTimestamp('1970-01-01 08:01:40')",
self.assertEqual("TO_TIMESTAMP('1970-01-01 08:01:40')",
str(to_timestamp('1970-01-01 08:01:40')))
self.assertEqual("toTimestamp('1970-01-01 08:01:40', 'yyyy-MM-dd HH:mm:ss')",
self.assertEqual("TO_TIMESTAMP('1970-01-01 08:01:40', 'yyyy-MM-dd HH:mm:ss')",
str(to_timestamp('1970-01-01 08:01:40', 'yyyy-MM-dd HH:mm:ss')))
self.assertEqual("temporalOverlaps(cast('2:55:00', TIME(0)), 3600000, "
"cast('3:30:00', TIME(0)), 7200000)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2425,16 +2425,12 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)

public static final BuiltInFunctionDefinition TO_TIMESTAMP =
BuiltInFunctionDefinition.newBuilder()
.name("toTimestamp")
.sqlName("TO_TIMESTAMP")
.name("TO_TIMESTAMP")
.kind(SCALAR)
.inputTypeStrategy(
or(
sequence(logical(LogicalTypeFamily.CHARACTER_STRING)),
sequence(
logical(LogicalTypeFamily.CHARACTER_STRING),
logical(LogicalTypeFamily.CHARACTER_STRING))))
.outputTypeStrategy(nullableIfArgs(explicit(TIMESTAMP(3))))
.inputTypeStrategy(SpecificInputTypeStrategies.TO_TIMESTAMP)
.outputTypeStrategy(SpecificTypeStrategies.TO_TIMESTAMP)
.runtimeClass(
"org.apache.flink.table.runtime.functions.scalar.ToTimestampFunction")
.build();

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ public static ArgumentTypeStrategy percentageArray(boolean expectedNullability)
*/
public static final InputTypeStrategy LEAD_LAG = new LeadLagInputTypeStrategy();

/** Type strategy for {@link BuiltInFunctionDefinitions#TO_TIMESTAMP}. */
public static final InputTypeStrategy TO_TIMESTAMP = new ToTimestampInputTypeStrategy();

/** Type strategy for {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. */
public static final InputTypeStrategy TO_TIMESTAMP_LTZ = new ToTimestampLtzInputTypeStrategy();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ public final class SpecificTypeStrategies {
public static final TypeStrategy INTERNAL_REPLICATE_ROWS =
new InternalReplicateRowsTypeStrategy();

/** See {@link ToTimestampTypeStrategy}. */
public static final TypeStrategy TO_TIMESTAMP = new ToTimestampTypeStrategy();

/** See {@link ToTimestampLtzTypeStrategy}. */
public static final TypeStrategy TO_TIMESTAMP_LTZ = new ToTimestampLtzTypeStrategy();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.types.inference.strategies;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.ArgumentCount;
import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.Signature;
import org.apache.flink.table.types.inference.Signature.Argument;
import org.apache.flink.table.types.logical.LogicalTypeFamily;

import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import static org.apache.flink.table.types.inference.InputTypeStrategies.logical;

/**
* Input type strategy for {@link BuiltInFunctionDefinitions#TO_TIMESTAMP} that validates the format
* pattern at compile time when provided as a literal.
*/
@Internal
public class ToTimestampInputTypeStrategy implements InputTypeStrategy {

private static final ArgumentTypeStrategy CHARACTER_STRING_FAMILY_ARG =
logical(LogicalTypeFamily.CHARACTER_STRING);

@Override
public ArgumentCount getArgumentCount() {
return ConstantArgumentCount.between(1, 2);
}

@Override
public Optional<List<DataType>> inferInputTypes(
CallContext callContext, boolean throwOnFailure) {
final List<DataType> result = new ArrayList<>();
final int numberOfArguments = callContext.getArgumentDataTypes().size();

final Optional<DataType> timestampArg =
CHARACTER_STRING_FAMILY_ARG.inferArgumentType(callContext, 0, throwOnFailure);
if (timestampArg.isEmpty()) {
return Optional.empty();
}
result.add(timestampArg.get());

if (numberOfArguments > 1) {
final Optional<DataType> patternArg =
validatePatternArgument(callContext, throwOnFailure);
if (patternArg.isEmpty()) {
return Optional.empty();
}
result.add(patternArg.get());
}

return Optional.of(result);
}

private Optional<DataType> validatePatternArgument(
final CallContext callContext, final boolean throwOnFailure) {
final Optional<DataType> patternArg =
CHARACTER_STRING_FAMILY_ARG.inferArgumentType(callContext, 1, throwOnFailure);
if (patternArg.isEmpty()) {
return Optional.empty();
}

if (callContext.isArgumentLiteral(1)) {
final Optional<String> patternOpt = callContext.getArgumentValue(1, String.class);
if (patternOpt.isEmpty()) {
return callContext.fail(throwOnFailure, "Pattern can not be a null literal");
}
try {
DateTimeFormatter.ofPattern(patternOpt.get());
} catch (IllegalArgumentException e) {
return callContext.fail(
throwOnFailure,
"Invalid pattern for parsing TIMESTAMP: %s",
e.getMessage());
}
}

return patternArg;
}

@Override
public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
return List.of(
Signature.of(Argument.ofGroup(LogicalTypeFamily.CHARACTER_STRING)),
Signature.of(
Argument.ofGroup(LogicalTypeFamily.CHARACTER_STRING),
Argument.ofGroup("pattern", LogicalTypeFamily.CHARACTER_STRING)));
}
}
Loading