Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,21 @@ temporal:
TO_TIMESTAMP_LTZ('2023-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss', 'Asia/Shanghai') parses in Shanghai time zone.
- sql: TO_TIMESTAMP(string1[, string2])
table: toTimestamp(STRING1[, STRING2])
description: "Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone."
description: |
Converts a datetime string to a TIMESTAMP without time zone.

- string1: the datetime string to parse
- string2: the format pattern (default 'yyyy-MM-dd HH:mm:ss'). The pattern follows Java's DateTimeFormatter syntax, where 'S' represents fractional seconds (e.g., 'SSS' for milliseconds, 'SSSSSS' for microseconds, 'SSSSSSSSS' for nanoseconds).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like we are going to heavily introduce java classes here....

Would be great if we have it consistent at least.

I'm asking since here we are talking about DateTimeFormatter, in case of DATE_FORMAT there is SimpleDateFormatter...

I think we should have at least a link to their doc explaining the formats, it will simplify the search for non java people (sql, python)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the docs. Let me know if something is still missing.


The output precision depends on the variant used:
- 1-arg variant: always returns TIMESTAMP(3).
- 2-arg variant: precision is inferred from the number of trailing 'S' characters in the format pattern, with a minimum of 3. E.g., format 'yyyy-MM-dd HH:mm:ss.SS' returns TIMESTAMP(3), format 'yyyy-MM-dd HH:mm:ss.SSSSSS' returns TIMESTAMP(6).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens when the number of S's is not 3, 6 or 9.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added some tests and updated the docs


Returns NULL if the input string cannot be parsed.

E.g., TO_TIMESTAMP('2023-01-01 00:00:00') parses using default format and returns TIMESTAMP(3),
TO_TIMESTAMP('2023-01-01 12:30:00.123456', 'yyyy-MM-dd HH:mm:ss.SSSSSS') parses with microsecond precision and returns TIMESTAMP(6),
TO_TIMESTAMP('2023-01-01 12:30:00.123', 'yyyy-MM-dd HH:mm:ss.SSS') returns TIMESTAMP(3).
- sql: CURRENT_WATERMARK(rowtime)
description: |
Returns the current watermark for the given rowtime attribute, or `NULL` if no common watermark of all upstream operations is available at the current operation in the pipeline.
Expand Down
16 changes: 15 additions & 1 deletion docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,21 @@ temporal:
TO_TIMESTAMP_LTZ('2023-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss', 'Asia/Shanghai') parses in Shanghai time zone.
- sql: TO_TIMESTAMP(string1[, string2])
table: toTimestamp(STRING1[, STRING2])
description: 将格式为 string2(默认为:'yyyy-MM-dd HH:mm:ss')的字符串 string1 转换为 timestamp,不带时区。
description: |
Converts a datetime string to a TIMESTAMP without time zone.
- string1: the datetime string to parse
- string2: the format pattern (default 'yyyy-MM-dd HH:mm:ss'). The pattern follows Java's DateTimeFormatter syntax, where 'S' represents fractional seconds (e.g., 'SSS' for milliseconds, 'SSSSSS' for microseconds, 'SSSSSSSSS' for nanoseconds).
The output precision depends on the variant used:
- 1-arg variant: always returns TIMESTAMP(3).
- 2-arg variant: precision is inferred from the number of trailing 'S' characters in the format pattern, with a minimum of 3. E.g., format 'yyyy-MM-dd HH:mm:ss.SS' returns TIMESTAMP(3), format 'yyyy-MM-dd HH:mm:ss.SSSSSS' returns TIMESTAMP(6).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g., format 'yyyy-MM-dd HH:mm:ss.SS' returns TIMESTAMP(3)

why do we have such behavior?

what is the behavior for other vendors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kept for backward compatibility. Same behavior we have for TO_TIMESTAMP_LTZ https://issues.apache.org/jira/browse/FLINK-39244

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't release notes telling it a bit different?

TO_TIMESTAMP_LTZ() function now supports up to precision 9 for both numeric and string conversions. While we kept backwards compatibility for numeric precision 0-3 which always returns TIMESTAMP_LTZ(3), precisions p=4-9 now return TIMESTAMP_LTZ(p). Function calls taking string formats such as ".SSSS" now return precision 4 instead of silently loosing sub second information.

Returns NULL if the input string cannot be parsed.
E.g., TO_TIMESTAMP('2023-01-01 00:00:00') parses using default format and returns TIMESTAMP(3),
TO_TIMESTAMP('2023-01-01 12:30:00.123456', 'yyyy-MM-dd HH:mm:ss.SSSSSS') parses with microsecond precision and returns TIMESTAMP(6),
TO_TIMESTAMP('2023-01-01 12:30:00.123', 'yyyy-MM-dd HH:mm:ss.SSS') returns TIMESTAMP(3).
- sql: CURRENT_WATERMARK(rowtime)
description: |
返回给定时间列属性 rowtime 的当前水印,如果管道中的当前操作没有可用的上游操作的公共水印时则为 `NULL`。
Expand Down
5 changes: 4 additions & 1 deletion flink-python/pyflink/table/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -1600,7 +1600,10 @@ def to_time(self) -> 'Expression':
def to_timestamp(self) -> 'Expression':
"""
Parses a timestamp string in the form "yyyy-MM-dd HH:mm:ss[.SSS]" to a SQL Timestamp.
It's equivalent to `col.cast(DataTypes.TIMESTAMP(3))`.
Returns TIMESTAMP(3).

For precision-aware parsing, use :func:`~pyflink.table.expressions.to_timestamp`
with a format pattern instead.

Example:
::
Expand Down
17 changes: 12 additions & 5 deletions flink-python/pyflink/table/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,19 @@ def to_date(date_str: Union[str, Expression[str]],
def to_timestamp(timestamp_str: Union[str, Expression[str]],
format: Union[str, Expression[str]] = None) -> Expression:
"""
Converts the date time string with the given format (by default: 'yyyy-MM-dd HH:mm:ss')
under the 'UTC+0' time zone to a timestamp.
Converts a datetime string to a TIMESTAMP without time zone.
:param timestamp_str: The date time string
:param format: The format of the string
:return: The date value with TIMESTAMP type.
The output precision depends on the variant used:
- 1-arg variant: always returns TIMESTAMP(3).
- 2-arg variant: precision is inferred from the number of trailing 'S' characters
in the format pattern, with a minimum of 3. E.g., format
'yyyy-MM-dd HH:mm:ss.SSSSSS' returns TIMESTAMP(6).
:param timestamp_str: The datetime string to parse.
:param format: The format pattern (default 'yyyy-MM-dd HH:mm:ss'). 'S' represents
fractional seconds (e.g., 'SSS' for milliseconds, 'SSSSSS' for microseconds).
:return: The timestamp value with TIMESTAMP type.
"""
if format is None:
return _unary_op("toTimestamp", timestamp_str)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2425,16 +2425,12 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)

public static final BuiltInFunctionDefinition TO_TIMESTAMP =
BuiltInFunctionDefinition.newBuilder()
.name("toTimestamp")
.sqlName("TO_TIMESTAMP")
.name("TO_TIMESTAMP")
.kind(SCALAR)
.inputTypeStrategy(
or(
sequence(logical(LogicalTypeFamily.CHARACTER_STRING)),
sequence(
logical(LogicalTypeFamily.CHARACTER_STRING),
logical(LogicalTypeFamily.CHARACTER_STRING))))
.outputTypeStrategy(nullableIfArgs(explicit(TIMESTAMP(3))))
.inputTypeStrategy(SpecificInputTypeStrategies.TO_TIMESTAMP)
.outputTypeStrategy(SpecificTypeStrategies.TO_TIMESTAMP)
.runtimeClass(
"org.apache.flink.table.runtime.functions.scalar.ToTimestampFunction")
.build();

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ public static ArgumentTypeStrategy percentageArray(boolean expectedNullability)
*/
public static final InputTypeStrategy LEAD_LAG = new LeadLagInputTypeStrategy();

/** Type strategy for {@link BuiltInFunctionDefinitions#TO_TIMESTAMP}. */
public static final InputTypeStrategy TO_TIMESTAMP = new ToTimestampInputTypeStrategy();

/** Type strategy for {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. */
public static final InputTypeStrategy TO_TIMESTAMP_LTZ = new ToTimestampLtzInputTypeStrategy();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ public final class SpecificTypeStrategies {
public static final TypeStrategy INTERNAL_REPLICATE_ROWS =
new InternalReplicateRowsTypeStrategy();

/** See {@link ToTimestampTypeStrategy}. */
public static final TypeStrategy TO_TIMESTAMP = new ToTimestampTypeStrategy();

/** See {@link ToTimestampLtzTypeStrategy}. */
public static final TypeStrategy TO_TIMESTAMP_LTZ = new ToTimestampLtzTypeStrategy();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.types.inference.strategies;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.ArgumentCount;
import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.Signature;
import org.apache.flink.table.types.inference.Signature.Argument;
import org.apache.flink.table.types.logical.LogicalTypeFamily;

import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import static org.apache.flink.table.types.inference.InputTypeStrategies.logical;

/**
* Input type strategy for {@link BuiltInFunctionDefinitions#TO_TIMESTAMP} that validates the format
* pattern at compile time when provided as a literal.
*/
@Internal
public class ToTimestampInputTypeStrategy implements InputTypeStrategy {

private static final ArgumentTypeStrategy CHARACTER_STRING_FAMILY_ARG =
logical(LogicalTypeFamily.CHARACTER_STRING);

@Override
public ArgumentCount getArgumentCount() {
return ConstantArgumentCount.between(1, 2);
}

@Override
public Optional<List<DataType>> inferInputTypes(
CallContext callContext, boolean throwOnFailure) {
final List<DataType> result = new ArrayList<>();
final int numberOfArguments = callContext.getArgumentDataTypes().size();

final Optional<DataType> timestampArg =
CHARACTER_STRING_FAMILY_ARG.inferArgumentType(callContext, 0, throwOnFailure);
if (timestampArg.isEmpty()) {
return Optional.empty();
}
result.add(timestampArg.get());

if (numberOfArguments > 1) {
final Optional<DataType> patternArg =
validatePatternArgument(callContext, throwOnFailure);
if (patternArg.isEmpty()) {
return Optional.empty();
}
result.add(patternArg.get());
}

return Optional.of(result);
}

private Optional<DataType> validatePatternArgument(
final CallContext callContext, final boolean throwOnFailure) {
final Optional<DataType> patternArg =
CHARACTER_STRING_FAMILY_ARG.inferArgumentType(callContext, 1, throwOnFailure);
if (patternArg.isEmpty()) {
return Optional.empty();
}

if (callContext.isArgumentLiteral(1)) {
final Optional<String> patternOpt = callContext.getArgumentValue(1, String.class);
if (patternOpt.isEmpty()) {
return callContext.fail(throwOnFailure, "Pattern can not be a null literal");
}
try {
DateTimeFormatter.ofPattern(patternOpt.get());
} catch (IllegalArgumentException e) {
return callContext.fail(
throwOnFailure,
"Invalid pattern for parsing TIMESTAMP: %s",
e.getMessage());
}
}

return patternArg;
}

@Override
public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
return List.of(
Signature.of(Argument.ofGroup(LogicalTypeFamily.CHARACTER_STRING)),
Signature.of(
Argument.ofGroup(LogicalTypeFamily.CHARACTER_STRING),
Argument.ofGroup("pattern", LogicalTypeFamily.CHARACTER_STRING)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.types.inference.strategies;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.TypeStrategy;
import org.apache.flink.table.utils.DateTimeUtils;

import java.util.Optional;

/**
* Type strategy of {@code TO_TIMESTAMP}. Returns {@code TIMESTAMP(3)} for the 1-arg variant and
* infers precision from the format pattern's trailing 'S' count for the 2-arg variant.
*/
@Internal
public class ToTimestampTypeStrategy implements TypeStrategy {

private static final int DEFAULT_PRECISION = 3;

@Override
public Optional<DataType> inferType(CallContext callContext) {
int outputPrecision = DEFAULT_PRECISION;

if (callContext.getArgumentDataTypes().size() == 2) {
outputPrecision = inferPrecisionFromFormat(callContext);
}

return Optional.of(DataTypes.TIMESTAMP(outputPrecision).nullable());
}

/**
* Infers the output precision from a format string literal. Returns at least {@link
* #DEFAULT_PRECISION}.
*/
private static int inferPrecisionFromFormat(CallContext callContext) {
if (!callContext.isArgumentLiteral(1)) {
return DEFAULT_PRECISION;
}
return callContext
.getArgumentValue(1, String.class)
.map(DateTimeUtils::precisionFromFormat)
.orElse(DEFAULT_PRECISION);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,9 @@ public static TimestampData parseTimestampData(String dateStr, int precision, Ti

/**
* Parses a timestamp string with the given format, truncating to millisecond precision.
* Precision is hardcoded to match signature of TO_TIMESTAMP.
*
* @see <a href="https://issues.apache.org/jira/browse/FLINK-14925">FLINK-14925</a>
* <p>Note: For precision-aware parsing, use {@link #parseTimestampData(String, String, int)}
* with {@link #precisionFromFormat(String)} instead.
*/
public static TimestampData parseTimestampData(String dateStr, String format) {
return parseTimestampData(dateStr, format, 3);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.types.inference.strategies;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.inference.TypeStrategiesTestBase;

import java.util.stream.Stream;

/** Tests for {@link ToTimestampTypeStrategy}. */
class ToTimestampTypeStrategyTest extends TypeStrategiesTestBase {

@Override
protected Stream<TestSpec> testData() {
return Stream.of(
// 1-arg: always TIMESTAMP(3)
TestSpec.forStrategy(
"TO_TIMESTAMP(<STRING>) returns TIMESTAMP(3)",
SpecificTypeStrategies.TO_TIMESTAMP)
.inputTypes(DataTypes.STRING())
.expectDataType(DataTypes.TIMESTAMP(3).nullable()),
// 2-arg: non-literal format defaults to TIMESTAMP(3)
TestSpec.forStrategy(
"TO_TIMESTAMP(<STRING>, <STRING>) defaults to TIMESTAMP(3)",
SpecificTypeStrategies.TO_TIMESTAMP)
.inputTypes(DataTypes.STRING(), DataTypes.STRING())
.expectDataType(DataTypes.TIMESTAMP(3).nullable()),
// Format-based precision: SSS → TIMESTAMP(3)
TestSpec.forStrategy(
"Format with SSS returns TIMESTAMP(3)",
SpecificTypeStrategies.TO_TIMESTAMP)
.inputTypes(DataTypes.STRING(), DataTypes.STRING())
.calledWithLiteralAt(1, "yyyy-MM-dd HH:mm:ss.SSS")
.expectDataType(DataTypes.TIMESTAMP(3).nullable()),
// Format-based precision: SSSSSS → TIMESTAMP(6)
TestSpec.forStrategy(
"Format with SSSSSS returns TIMESTAMP(6)",
SpecificTypeStrategies.TO_TIMESTAMP)
.inputTypes(DataTypes.STRING(), DataTypes.STRING())
.calledWithLiteralAt(1, "yyyy-MM-dd HH:mm:ss.SSSSSS")
.expectDataType(DataTypes.TIMESTAMP(6).nullable()),
// Format-based precision: SSSSSSSSS → TIMESTAMP(9)
TestSpec.forStrategy(
"Format with SSSSSSSSS returns TIMESTAMP(9)",
SpecificTypeStrategies.TO_TIMESTAMP)
.inputTypes(DataTypes.STRING(), DataTypes.STRING())
.calledWithLiteralAt(1, "yyyy-MM-dd HH:mm:ss.SSSSSSSSS")
.expectDataType(DataTypes.TIMESTAMP(9).nullable()),
// Format without S → TIMESTAMP(3)
TestSpec.forStrategy(
"Format without S returns TIMESTAMP(3)",
SpecificTypeStrategies.TO_TIMESTAMP)
.inputTypes(DataTypes.STRING(), DataTypes.STRING())
.calledWithLiteralAt(1, "yyyy-MM-dd HH:mm:ss")
.expectDataType(DataTypes.TIMESTAMP(3).nullable()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,6 @@ void initNonDynamicFunctions() {
BuiltInFunctionDefinitions.UNIX_TIMESTAMP, FlinkSqlOperatorTable.UNIX_TIMESTAMP);
definitionSqlOperatorHashMap.put(
BuiltInFunctionDefinitions.TO_DATE, FlinkSqlOperatorTable.TO_DATE);
definitionSqlOperatorHashMap.put(
BuiltInFunctionDefinitions.TO_TIMESTAMP, FlinkSqlOperatorTable.TO_TIMESTAMP);

// catalog functions
definitionSqlOperatorHashMap.put(
Expand Down
Loading