diff --git a/docs/querying/math-expr.md b/docs/querying/math-expr.md index 06ac395c7ad4..46ed0870bcf4 100644 --- a/docs/querying/math-expr.md +++ b/docs/querying/math-expr.md @@ -110,6 +110,7 @@ The following built-in functions are available. |name|description| |----|-----------| +|now|now() returns the current system timestamp in milliseconds since epoch (1970-01-01 00:00:00 UTC). This function is evaluated for each row at processing time. It's recommended to use this only for troubleshooting issues - see [Using now() in ingestion](#using-now-in-ingestion).| |timestamp|timestamp(expr[,format-string]) parses string expr into date then returns milliseconds from java epoch. without 'format-string' it's regarded as ISO datetime format | |unix_timestamp|same with 'timestamp' function but returns seconds instead | |timestamp_ceil|timestamp_ceil(expr, period, \[origin, \[timezone\]\]) rounds up a timestamp, returning it as a new timestamp. Period can be any ISO8601 period, like P3M (quarters) or PT12H (half-days). The time zone, if provided, should be a time zone name like "America/Los_Angeles" or offset like "-08:00".| @@ -119,6 +120,26 @@ The following built-in functions are available. |timestamp_parse|timestamp_parse(string expr, \[pattern, [timezone\]\]) parses a string into a timestamp using a given [Joda DateTimeFormat pattern](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat). If the pattern is not provided, this parses time strings in either ISO8601 or SQL format. The time zone, if provided, should be a time zone name like "America/Los_Angeles" or offset like "-08:00", and will be used as the time zone for strings that do not include a time zone offset. Pattern and time zone must be literals. Strings that cannot be parsed as timestamps will be returned as nulls.| |timestamp_format|timestamp_format(expr, \[pattern, \[timezone\]\]) formats a timestamp as a string with a given [Joda DateTimeFormat pattern](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat), or ISO8601 if the pattern is not provided. The time zone, if provided, should be a time zone name like "America/Los_Angeles" or offset like "-08:00". Pattern and time zone must be literals.| +### Using `now()` in ingestion + +:::warning +`now()` is non-deterministic: replicated streaming tasks and task replays evaluate it to different +values, producing inconsistent results across replicas. Do not use `now()` to overwrite `__time`. +For Kafka, prefer [`kafka.timestamp`](../ingestion/data-formats.md#kafka) as the +`__time` source. +::: + +To troubleshoot end-to-end pipeline delays, store `now() - __time` as a separate dimension via a +[`transformSpec`](../ingestion/ingestion-spec.md#transformspec): + +```json +"transformSpec": { + "transforms": [ + { "type": "expression", "name": "ingestion_lag_ms", "expression": "now() - __time" } + ] +} +``` + ## Math functions See javadoc of java.lang.Math for detailed explanation for each function. diff --git a/processing/src/main/java/org/apache/druid/math/expr/BuiltInExprMacros.java b/processing/src/main/java/org/apache/druid/math/expr/BuiltInExprMacros.java index 189ae918c3d3..747da37e58aa 100644 --- a/processing/src/main/java/org/apache/druid/math/expr/BuiltInExprMacros.java +++ b/processing/src/main/java/org/apache/druid/math/expr/BuiltInExprMacros.java @@ -214,4 +214,95 @@ public Object getLiteralValue() } } + /** + * Expression macro for now() function that returns current system timestamp. + * Implemented as a macro to prevent constant folding optimization. + */ + public static class NowExprMacro implements ExprMacroTable.ExprMacro + { + public static final String NAME = "now"; + + @Override + public String name() + { + return NAME; + } + + @Override + public Expr apply(List args) + { + validationHelperCheckArgumentCount(args, 0); + return new NowExpression(); + } + + static final class NowExpression implements Expr + { + @Override + public ExprEval eval(ObjectBinding bindings) + { + return ExprEval.ofLong(System.currentTimeMillis()); + } + + @Override + public String stringify() + { + return "now()"; + } + + @Override + public Expr visit(Shuttle shuttle) + { + return shuttle.visit(this); + } + + @Override + public BindingAnalysis analyzeInputs() + { + // Return analysis indicating this is NOT constant + // by pretending we have a free variable + return new BindingAnalysis(); + } + + @Nullable + @Override + public ExpressionType getOutputType(InputBindingInspector inspector) + { + return ExpressionType.LONG; + } + + @Override + public boolean isLiteral() + { + // NOT a literal - prevents constant folding + return false; + } + + @Override + public boolean isNullLiteral() + { + return false; + } + + @Nullable + @Override + public Object getLiteralValue() + { + // Not a literal, so no constant value + return null; + } + + @Override + public int hashCode() + { + return NowExpression.class.hashCode(); + } + + @Override + public boolean equals(Object obj) + { + return obj instanceof NowExpression; + } + } + } + } diff --git a/processing/src/main/java/org/apache/druid/math/expr/ExprMacroTable.java b/processing/src/main/java/org/apache/druid/math/expr/ExprMacroTable.java index 0f8b9e1f07fe..f2a7556e802f 100644 --- a/processing/src/main/java/org/apache/druid/math/expr/ExprMacroTable.java +++ b/processing/src/main/java/org/apache/druid/math/expr/ExprMacroTable.java @@ -51,7 +51,8 @@ public class ExprMacroTable COMPLEX_DECODE_BASE_64_EXPR_MACRO, BuiltInExprMacros.ComplexDecodeBase64ExprMacro.ALIAS ), - new BuiltInExprMacros.StringDecodeBase64UTFExprMacro() + new BuiltInExprMacros.StringDecodeBase64UTFExprMacro(), + new BuiltInExprMacros.NowExprMacro() ); private static final ExprMacroTable NIL = new ExprMacroTable(Collections.emptyList()); diff --git a/processing/src/test/java/org/apache/druid/math/expr/FunctionTest.java b/processing/src/test/java/org/apache/druid/math/expr/FunctionTest.java index 51068feefd44..66f0bc0907e4 100644 --- a/processing/src/test/java/org/apache/druid/math/expr/FunctionTest.java +++ b/processing/src/test/java/org/apache/druid/math/expr/FunctionTest.java @@ -1558,4 +1558,70 @@ public static void assertArrayExpr( Assert.assertArrayEquals(expr.getCacheKey(), roundTrip.getCacheKey()); Assert.assertArrayEquals(expr.getCacheKey(), roundTripFlatten.getCacheKey()); } + + @Test + public void testNow() + { + long beforeCall = System.currentTimeMillis(); + + Expr expr = Parser.parse("now()", ExprMacroTable.nil()); + ExprEval result = expr.eval(InputBindings.nilBindings()); + + long afterCall = System.currentTimeMillis(); + + Assert.assertNotNull(result.value()); + Assert.assertEquals(ExpressionType.LONG, result.type()); + + long timestamp = result.asLong(); + Assert.assertTrue( + "Timestamp should be between before and after: " + beforeCall + " <= " + timestamp + " <= " + afterCall, + timestamp >= beforeCall && timestamp <= afterCall + ); + } + + @Test + public void testNowEvaluatedPerRow() + { + Expr expr = Parser.parse("now()", ExprMacroTable.nil()); + + // now() must not be treated as a literal, otherwise it would be constant-folded + // and not re-evaluated per row. + Assert.assertFalse(expr.isLiteral()); + Assert.assertNull(expr.getLiteralValue()); + + long time1 = expr.eval(InputBindings.nilBindings()).asLong(); + long time2 = expr.eval(InputBindings.nilBindings()).asLong(); + Assert.assertTrue( + "Second call should return same or later timestamp: " + time1 + " <= " + time2, + time2 >= time1 + ); + } + + @Test + public void testNowRejectsArguments() + { + Throwable t = Assert.assertThrows( + ExpressionValidationException.class, + () -> Parser.parse("now(123)", ExprMacroTable.nil()).eval(InputBindings.nilBindings()) + ); + Assert.assertEquals("Function[now] does not accept arguments", t.getMessage()); + } + + @Test + public void testNowInExpression() + { + // Test using now() in a more complex expression + Expr expr = Parser.parse("now() - 1000", ExprMacroTable.nil()); + + long beforeCall = System.currentTimeMillis(); + ExprEval result = expr.eval(InputBindings.nilBindings()); + long afterCall = System.currentTimeMillis(); + + long computed = result.asLong(); + Assert.assertTrue( + "Result should be approximately 1 second before now: computed=" + computed + + ", expected range=[" + (beforeCall - 1000) + ", " + (afterCall - 1000) + "]", + computed >= (beforeCall - 1100) && computed <= (afterCall - 900) + ); + } } diff --git a/processing/src/test/java/org/apache/druid/segment/transform/TransformerTest.java b/processing/src/test/java/org/apache/druid/segment/transform/TransformerTest.java index 2583b81deabb..546001f22f31 100644 --- a/processing/src/test/java/org/apache/druid/segment/transform/TransformerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/transform/TransformerTest.java @@ -507,4 +507,54 @@ public int compareTo(Row o) Assert.assertArrayEquals(dimList.subList(0, 5).toArray(), (Object[]) actualTranformedRow.getRaw("dim")); Assert.assertEquals(ImmutableList.of("a"), actualTranformedRow.getDimension("dim1")); } + + @Test + public void testNowTransform() + { + TransformSpec transformSpec = new TransformSpec( + null, + ImmutableList.of( + new ExpressionTransform("ingestion_time", "now()", TestExprMacroTable.INSTANCE), + new ExpressionTransform("lag_ms", "now() - __time", TestExprMacroTable.INSTANCE) + ) + ); + + long beforeTransform = System.currentTimeMillis(); + + InputRow row = new MapBasedInputRow( + DateTimes.of("2024-01-01T00:00:00Z"), + ImmutableList.of("dim"), + ImmutableMap.of("dim", "value") + ); + + Transformer transformer = transformSpec.toTransformer(); + InputRow transformed = transformer.transform(row); + + long afterTransform = System.currentTimeMillis(); + + Assert.assertNotNull(transformed); + Assert.assertNotNull(transformed.getRaw("ingestion_time")); + + long ingestionTime = ((Number) transformed.getRaw("ingestion_time")).longValue(); + Assert.assertTrue( + "Ingestion time should be between transform start and end: " + + beforeTransform + " <= " + ingestionTime + " <= " + afterTransform, + ingestionTime >= beforeTransform && ingestionTime <= afterTransform + ); + + // Verify lag calculation (may be slightly different from ingestionTime - __time due to timing) + long lag = ((Number) transformed.getRaw("lag_ms")).longValue(); + long eventTime = DateTimes.of("2024-01-01T00:00:00Z").getMillis(); + long expectedLag = ingestionTime - eventTime; + + // Allow small difference since now() is called twice (once for ingestion_time, once for lag_ms) + long lagDiff = Math.abs(lag - expectedLag); + Assert.assertTrue( + "Lag should be approximately correct (diff=" + lagDiff + "ms): expected=" + expectedLag + ", actual=" + lag, + lagDiff < 100 // Allow up to 100ms difference + ); + + // Verify lag is positive (ingestion happened after event) + Assert.assertTrue("Lag should be positive", lag > 0); + } }