Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/querying/math-expr.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ The following built-in functions are available.

|name|description|
|----|-----------|
|now|now() returns the current system timestamp in milliseconds since epoch (1970-01-01 00:00:00 UTC). This function is evaluated for each row at processing time. It's recommended to use this only for troubleshooting issues - see [Using now() in ingestion](#using-now-in-ingestion).|
|timestamp|timestamp(expr[,format-string]) parses string expr into date then returns milliseconds from java epoch. without 'format-string' it's regarded as ISO datetime format |
|unix_timestamp|same with 'timestamp' function but returns seconds instead |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
|unix_timestamp|same with 'timestamp' function but returns seconds instead |
|unix_timestamp|unix_timestamp(expr[,format-string]) same with 'timestamp' function but returns seconds instead |

Just for consistency with other functions

|timestamp_ceil|timestamp_ceil(expr, period, \[origin, \[timezone\]\]) rounds up a timestamp, returning it as a new timestamp. Period can be any ISO8601 period, like P3M (quarters) or PT12H (half-days). The time zone, if provided, should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|
Expand All @@ -119,6 +120,26 @@ The following built-in functions are available.
|timestamp_parse|timestamp_parse(string expr, \[pattern, [timezone\]\]) parses a string into a timestamp using a given [Joda DateTimeFormat pattern](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat). If the pattern is not provided, this parses time strings in either ISO8601 or SQL format. The time zone, if provided, should be a time zone name like "America/Los_Angeles" or offset like "-08:00", and will be used as the time zone for strings that do not include a time zone offset. Pattern and time zone must be literals. Strings that cannot be parsed as timestamps will be returned as nulls.|
|timestamp_format|timestamp_format(expr, \[pattern, \[timezone\]\]) formats a timestamp as a string with a given [Joda DateTimeFormat pattern](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat), or ISO8601 if the pattern is not provided. The time zone, if provided, should be a time zone name like "America/Los_Angeles" or offset like "-08:00". Pattern and time zone must be literals.|

### Using `now()` in ingestion

:::warning
`now()` is non-deterministic: replicated streaming tasks and task replays evaluate it to different
values, producing inconsistent results across replicas. Do not use `now()` to overwrite `__time`.
For Kafka, prefer [`kafka.timestamp`](../ingestion/data-formats.md#kafka) as the
`__time` source.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it makes sense to mention Kafka here. The timestamps aren't equivalent as one is the time the event lands on Kafka and the other is the time the event is ingested in Druid

:::

To troubleshoot end-to-end pipeline delays, store `now() - __time` as a separate dimension via a
[`transformSpec`](../ingestion/ingestion-spec.md#transformspec):

```json
"transformSpec": {
"transforms": [
{ "type": "expression", "name": "ingestion_lag_ms", "expression": "now() - __time" }
]
}
```

## Math functions

See javadoc of java.lang.Math for detailed explanation for each function.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,95 @@ public Object getLiteralValue()
}
}

/**
* Expression macro for now() function that returns current system timestamp.
* Implemented as a macro to prevent constant folding optimization.
*/
public static class NowExprMacro implements ExprMacroTable.ExprMacro
{
public static final String NAME = "now";

@Override
public String name()
{
return NAME;
}

@Override
public Expr apply(List<Expr> args)
{
validationHelperCheckArgumentCount(args, 0);
return new NowExpression();
}

static final class NowExpression implements Expr
{
@Override
public ExprEval eval(ObjectBinding bindings)
{
return ExprEval.ofLong(System.currentTimeMillis());
}

@Override
public String stringify()
{
return "now()";
}

@Override
public Expr visit(Shuttle shuttle)
{
return shuttle.visit(this);
}

@Override
public BindingAnalysis analyzeInputs()
{
// Return analysis indicating this is NOT constant
// by pretending we have a free variable
return new BindingAnalysis();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Prevent now() from being planned as constant

Returning an empty BindingAnalysis makes ExpressionPlanner mark now() and expressions that only depend on now() as CONSTANT. Query-time expression selectors then build a ConstantExprEvalSelector and evaluate now() once when the selector is created, and expression filters/virtual columns also get stable cache keys, so native queries can reuse stale timestamps instead of evaluating at processing time. Add explicit non-deterministic handling, or otherwise keep now() out of constant selector/index/cache optimizations.

}

@Nullable
@Override
public ExpressionType getOutputType(InputBindingInspector inspector)
{
return ExpressionType.LONG;
}

@Override
public boolean isLiteral()
{
// NOT a literal - prevents constant folding
return false;
}

@Override
public boolean isNullLiteral()
{
return false;
}

@Nullable
@Override
public Object getLiteralValue()
{
// Not a literal, so no constant value
return null;
}

@Override
public int hashCode()
{
return NowExpression.class.hashCode();
}

@Override
public boolean equals(Object obj)
{
return obj instanceof NowExpression;
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public class ExprMacroTable
COMPLEX_DECODE_BASE_64_EXPR_MACRO,
BuiltInExprMacros.ComplexDecodeBase64ExprMacro.ALIAS
),
new BuiltInExprMacros.StringDecodeBase64UTFExprMacro()
new BuiltInExprMacros.StringDecodeBase64UTFExprMacro(),
new BuiltInExprMacros.NowExprMacro()
);
private static final ExprMacroTable NIL = new ExprMacroTable(Collections.emptyList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1558,4 +1558,70 @@ public static void assertArrayExpr(
Assert.assertArrayEquals(expr.getCacheKey(), roundTrip.getCacheKey());
Assert.assertArrayEquals(expr.getCacheKey(), roundTripFlatten.getCacheKey());
}

@Test
public void testNow()
{
long beforeCall = System.currentTimeMillis();

Expr expr = Parser.parse("now()", ExprMacroTable.nil());
ExprEval result = expr.eval(InputBindings.nilBindings());

long afterCall = System.currentTimeMillis();

Assert.assertNotNull(result.value());
Assert.assertEquals(ExpressionType.LONG, result.type());

long timestamp = result.asLong();
Assert.assertTrue(
"Timestamp should be between before and after: " + beforeCall + " <= " + timestamp + " <= " + afterCall,
timestamp >= beforeCall && timestamp <= afterCall
);
}

@Test
public void testNowEvaluatedPerRow()
{
Expr expr = Parser.parse("now()", ExprMacroTable.nil());

// now() must not be treated as a literal, otherwise it would be constant-folded
// and not re-evaluated per row.
Assert.assertFalse(expr.isLiteral());
Assert.assertNull(expr.getLiteralValue());

long time1 = expr.eval(InputBindings.nilBindings()).asLong();
long time2 = expr.eval(InputBindings.nilBindings()).asLong();
Assert.assertTrue(
"Second call should return same or later timestamp: " + time1 + " <= " + time2,
time2 >= time1
);
}

@Test
public void testNowRejectsArguments()
{
Throwable t = Assert.assertThrows(
ExpressionValidationException.class,
() -> Parser.parse("now(123)", ExprMacroTable.nil()).eval(InputBindings.nilBindings())
);
Assert.assertEquals("Function[now] does not accept arguments", t.getMessage());
}

@Test
public void testNowInExpression()
{
// Test using now() in a more complex expression
Expr expr = Parser.parse("now() - 1000", ExprMacroTable.nil());

long beforeCall = System.currentTimeMillis();
ExprEval result = expr.eval(InputBindings.nilBindings());
long afterCall = System.currentTimeMillis();

long computed = result.asLong();
Assert.assertTrue(
"Result should be approximately 1 second before now: computed=" + computed
+ ", expected range=[" + (beforeCall - 1000) + ", " + (afterCall - 1000) + "]",
computed >= (beforeCall - 1100) && computed <= (afterCall - 900)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -507,4 +507,54 @@ public int compareTo(Row o)
Assert.assertArrayEquals(dimList.subList(0, 5).toArray(), (Object[]) actualTranformedRow.getRaw("dim"));
Assert.assertEquals(ImmutableList.of("a"), actualTranformedRow.getDimension("dim1"));
}

@Test
public void testNowTransform()
{
TransformSpec transformSpec = new TransformSpec(
null,
ImmutableList.of(
new ExpressionTransform("ingestion_time", "now()", TestExprMacroTable.INSTANCE),
new ExpressionTransform("lag_ms", "now() - __time", TestExprMacroTable.INSTANCE)
)
);

long beforeTransform = System.currentTimeMillis();

InputRow row = new MapBasedInputRow(
DateTimes.of("2024-01-01T00:00:00Z"),
ImmutableList.of("dim"),
ImmutableMap.of("dim", "value")
);

Transformer transformer = transformSpec.toTransformer();
InputRow transformed = transformer.transform(row);

long afterTransform = System.currentTimeMillis();

Assert.assertNotNull(transformed);
Assert.assertNotNull(transformed.getRaw("ingestion_time"));

long ingestionTime = ((Number) transformed.getRaw("ingestion_time")).longValue();
Assert.assertTrue(
"Ingestion time should be between transform start and end: "
+ beforeTransform + " <= " + ingestionTime + " <= " + afterTransform,
ingestionTime >= beforeTransform && ingestionTime <= afterTransform
);

// Verify lag calculation (may be slightly different from ingestionTime - __time due to timing)
long lag = ((Number) transformed.getRaw("lag_ms")).longValue();
long eventTime = DateTimes.of("2024-01-01T00:00:00Z").getMillis();
long expectedLag = ingestionTime - eventTime;

// Allow small difference since now() is called twice (once for ingestion_time, once for lag_ms)
long lagDiff = Math.abs(lag - expectedLag);
Assert.assertTrue(
"Lag should be approximately correct (diff=" + lagDiff + "ms): expected=" + expectedLag + ", actual=" + lag,
lagDiff < 100 // Allow up to 100ms difference
);

// Verify lag is positive (ingestion happened after event)
Assert.assertTrue("Lag should be positive", lag > 0);
}
}
Loading