Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.9.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.12-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
10 changes: 6 additions & 4 deletions warp10/src/main/java/io/warp10/continuum/ingress/Ingress.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2018-2024 SenX S.A.S.
// Copyright 2018-2025 SenX S.A.S.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -286,7 +286,7 @@ protected boolean removeEldestEntry(java.util.Map.Entry<BigInteger, Long> eldest
final boolean updateActivity;
private final boolean metaActivity;
final long activityWindow;
public final boolean parseAttributes;
public final boolean PARSE_ATTRIBUTES;
final Long maxpastDefault;
final Long maxfutureDefault;
final Long maxpastOverride;
Expand Down Expand Up @@ -376,7 +376,7 @@ public Ingress(KeyStore keystore, Properties props) {
this.sendMetadataOnDelete = Boolean.parseBoolean(props.getProperty(Configuration.INGRESS_DELETE_METADATA_INCLUDE, "false"));
this.sendMetadataOnStore = Boolean.parseBoolean(props.getProperty(Configuration.INGRESS_STORE_METADATA_INCLUDE, "false"));

this.parseAttributes = "true".equals(props.getProperty(Configuration.INGRESS_PARSE_ATTRIBUTES));
this.PARSE_ATTRIBUTES = "true".equals(props.getProperty(Configuration.INGRESS_PARSE_ATTRIBUTES));

if (null != WarpConfig.getProperty(Configuration.INGRESS_MAXPAST_DEFAULT)) {
maxpastDefault = Long.parseLong(WarpConfig.getProperty(Configuration.INGRESS_MAXPAST_DEFAULT));
Expand Down Expand Up @@ -1009,7 +1009,9 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
}
}

boolean deltaAttributes = "delta".equals(request.getHeader(Constants.getHeader(Configuration.HTTP_HEADER_ATTRIBUTES)));
boolean skipAttributes = "skip".equals(request.getHeader(Constants.getHeader(Configuration.HTTP_HEADER_ATTRIBUTES)));
boolean deltaAttributes = skipAttributes ? false : "delta".equals(request.getHeader(Constants.getHeader(Configuration.HTTP_HEADER_ATTRIBUTES)));
boolean parseAttributes = PARSE_ATTRIBUTES && !skipAttributes;

if (deltaAttributes && !this.allowDeltaAttributes) {
httpStatusCode = HttpServletResponse.SC_BAD_REQUEST;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ public static class StandaloneStreamUpdateWebSocket {

private IngressStreamUpdateHandler handler;

private boolean parseAttributes = false;
private boolean deltaAttributes = false;
private boolean skipAttributes = false;

private boolean errormsg = false;

Expand Down Expand Up @@ -148,6 +150,12 @@ public void onWebSocketMessage(Session session, String message) throws Exception
this.deltaAttributes = true;
} else if (null != tokens && "DELTAOFF".equals(tokens[0])) {
this.deltaAttributes = false;
} else if (null != tokens && "ATTRSKIPON".equals(tokens[0])) {
this.skipAttributes = true;
this.parseAttributes = false;
} else if (null != tokens && "ATTRSKIPOFF".equals(tokens[0])) {
this.skipAttributes = false;
this.parseAttributes = this.handler.ingress.PARSE_ATTRIBUTES;
} else {
//
// Anything else is considered a measurement
Expand Down Expand Up @@ -220,7 +228,7 @@ public void onWebSocketMessage(Session session, String message) throws Exception
}

// Atomic boolean to track if attributes were parsed
AtomicBoolean hadAttributes = this.handler.ingress.parseAttributes ? new AtomicBoolean(false) : null;
AtomicBoolean hadAttributes = this.parseAttributes ? new AtomicBoolean(false) : null;

try {
WarpConfig.setThreadProperty(WarpConfig.THREAD_PROPERTY_SESSION, UUID.randomUUID().toString());
Expand All @@ -240,7 +248,7 @@ public void onWebSocketMessage(Session session, String message) throws Exception

do {

if (this.handler.ingress.parseAttributes) {
if (this.parseAttributes) {
lastHadAttributes = lastHadAttributes || hadAttributes.get();
hadAttributes.set(false);
}
Expand Down Expand Up @@ -341,7 +349,7 @@ public void onWebSocketMessage(Session session, String message) throws Exception
this.handler.ingress.pushDataMessage(lastencoder, kafkaDataMessageAttributes);
count += lastencoder.getCount();

if (this.handler.ingress.parseAttributes && lastHadAttributes) {
if (this.parseAttributes && lastHadAttributes) {
// We need to push lastencoder's metadata update as they were updated since the last
// metadata update message sent
Metadata meta = new Metadata(lastencoder.getMetadata());
Expand Down Expand Up @@ -396,7 +404,7 @@ public void onWebSocketMessage(Session session, String message) throws Exception
this.handler.ingress.pushDataMessage(lastencoder, kafkaDataMessageAttributes);
count += lastencoder.getCount();

if (this.handler.ingress.parseAttributes && lastHadAttributes) {
if (this.parseAttributes && lastHadAttributes) {
// Push a metadata UPDATE message so attributes are stored
// Build metadata object to push
Metadata meta = new Metadata(lastencoder.getMetadata());
Expand Down Expand Up @@ -442,6 +450,7 @@ public void onWebSocketClose(Session session, int statusCode, String reason) {

public void setHandler(IngressStreamUpdateHandler handler) {
this.handler = handler;
this.parseAttributes = handler.ingress.PARSE_ATTRIBUTES && !this.skipAttributes;
}

private void setToken(String token) throws IOException {
Expand Down
8 changes: 6 additions & 2 deletions warp10/src/main/java/io/warp10/script/WarpScriptLib.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@
import io.warp10.script.functions.ATBUCKET;
import io.warp10.script.functions.ATINDEX;
import io.warp10.script.functions.ATTICK;
import io.warp10.script.functions.ATTRDELTA;
import io.warp10.script.functions.ATTRIBUTES;
import io.warp10.script.functions.ATTRSKIP;
import io.warp10.script.functions.B58TO;
import io.warp10.script.functions.B64TO;
import io.warp10.script.functions.B64TOHEX;
Expand Down Expand Up @@ -693,7 +695,6 @@
import io.warp10.script.functions.STDOUT;
import io.warp10.script.functions.TDESCRIBE;
import io.warp10.script.functions.SLEEP;

import io.warp10.script.functions.math.GETEXPONENT;
import io.warp10.script.functions.math.RANDOM;
import io.warp10.script.functions.math.SCALB;
Expand All @@ -702,7 +703,6 @@
import io.warp10.script.functions.shape.PERMUTE;
import io.warp10.script.functions.shape.RESHAPE;
import io.warp10.script.functions.shape.SHAPE;
import io.warp10.script.functions.SLEEP;
import io.warp10.script.interpolation.INTERPOLATOR_1D;
import io.warp10.script.interpolation.INTERPOLATOR_2D;
import io.warp10.script.interpolation.INTERPOLATOR_3D;
Expand Down Expand Up @@ -1536,6 +1536,8 @@ public class WarpScriptLib {
public static final String ACCEL_NOPERSIST = "ACCEL.NOPERSIST";
public static final String ACCEL_PERSIST = "ACCEL.PERSIST";
public static final String ACCEL_REPORT = "ACCEL.REPORT";
public static final String ATTRDELTA = "ATTRDELTA";
public static final String ATTRSKIP = "ATTRSKIP";
public static final String UPDATE = "UPDATE";
public static final String META = "META";
public static final String METAMATCH = "METAMATCH";
Expand Down Expand Up @@ -2551,6 +2553,8 @@ public class WarpScriptLib {
addNamedWarpScriptFunction(new ACCELPERSIST(ACCEL_PERSIST, false));
addNamedWarpScriptFunction(new ACCELPERSIST(ACCEL_NOPERSIST, true));
addNamedWarpScriptFunction(new ACCELREPORT(ACCEL_REPORT));
addNamedWarpScriptFunction(new ATTRDELTA(ATTRDELTA));
addNamedWarpScriptFunction(new ATTRSKIP(ATTRSKIP));
addNamedWarpScriptFunction(new UPDATE(UPDATE));
addNamedWarpScriptFunction(new META(META));
addNamedWarpScriptFunction(new METAMATCH(METAMATCH));
Expand Down
10 changes: 10 additions & 0 deletions warp10/src/main/java/io/warp10/script/WarpScriptStack.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ public static enum Signal {

public static final String TOP_LEVEL_SECTION = "[TOP]";

/**
* Name of stack attribute for storing the delta attributes selection for UPDATE
*/
public static final String ATTRIBUTE_ATTRIBUTES_DELTA = "attributes.delta";

/**
* Name of stack attribute for storing the skip attributes selection for UPDATE
*/
public static final String ATTRIBUTE_ATTRIBUTES_SKIP = "attributes.skip";

/**
* Name of attribute for storing macro import rules
*/
Expand Down
45 changes: 45 additions & 0 deletions warp10/src/main/java/io/warp10/script/functions/ATTRDELTA.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
//
// Copyright 2025 SenX S.A.S.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package io.warp10.script.functions;

import io.warp10.script.NamedWarpScriptFunction;
import io.warp10.script.WarpScriptException;
import io.warp10.script.WarpScriptStack;
import io.warp10.script.WarpScriptStackFunction;

public class ATTRDELTA extends NamedWarpScriptFunction implements WarpScriptStackFunction {

public ATTRDELTA(String name) {
super(name);
}

@Override
public Object apply(WarpScriptStack stack) throws WarpScriptException {
Object top = stack.pop();

if (top instanceof Boolean) {
stack.setAttribute(WarpScriptStack.ATTRIBUTE_ATTRIBUTES_DELTA, Boolean.TRUE.equals(top));
} else if (null == top) {
stack.push(Boolean.TRUE.equals(stack.getAttribute(WarpScriptStack.ATTRIBUTE_ATTRIBUTES_DELTA)));
} else {
throw new WarpScriptException(getName() + " invalid parameter, expected a BOOLEAN or NULL.");
}

return stack;
}

}
45 changes: 45 additions & 0 deletions warp10/src/main/java/io/warp10/script/functions/ATTRSKIP.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
//
// Copyright 2025 SenX S.A.S.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package io.warp10.script.functions;

import io.warp10.script.NamedWarpScriptFunction;
import io.warp10.script.WarpScriptException;
import io.warp10.script.WarpScriptStack;
import io.warp10.script.WarpScriptStackFunction;

public class ATTRSKIP extends NamedWarpScriptFunction implements WarpScriptStackFunction {

public ATTRSKIP(String name) {
super(name);
}

@Override
public Object apply(WarpScriptStack stack) throws WarpScriptException {
Object top = stack.pop();

if (top instanceof Boolean) {
stack.setAttribute(WarpScriptStack.ATTRIBUTE_ATTRIBUTES_SKIP, Boolean.TRUE.equals(top));
} else if (null == top) {
stack.push(Boolean.TRUE.equals(stack.getAttribute(WarpScriptStack.ATTRIBUTE_ATTRIBUTES_SKIP)));
} else {
throw new WarpScriptException(getName() + " invalid parameter, expected a BOOLEAN or NULL.");
}

return stack;
}

}
Loading