Skip to content

Anomaly detection not working with simple data #1449

@jais001

Description

@jais001

Hi,

I tried using opensearch. I was planning on integrating this with my system for anomaly detection from a time series data. To check the use case of anomaly detection, I tried injecting a simple time series data and then created a detector. But unfortunately, I am not getting any anomalies. I would like to know if this is a bug or if I am doing anything wrong. The code sample that I used for injecting data in python is given below:

from datetime import datetime, timedelta
from opensearchpy import OpenSearch, helpers
import random
import json

# Connect to OpenSearch
client = OpenSearch(
    hosts=[{'host': 'localhost', 'port': 9200}],
    http_auth=('admin', 'testForAnomalyEA432!'),
    use_ssl=True,
    verify_certs=False,
    ssl_show_warn=False,
)

index_name = "bulk_sensor_data_latest"

# Create index if it doesn't exist
if not client.indices.exists(index=index_name):
    client.indices.create(index=index_name, body={
        "mappings": {
            "properties": {
                "timestamp": {"type": "date", "format": "epoch_millis"},
                "value": {"type": "float"}
            }
        }
    })

print("🚀 Generating 10 hours of bulk data...")

# Configuration
start_time = datetime.utcnow().replace(second=0, microsecond=0)
records_per_minute = 12  # One record every 5 seconds
total_hours = 10
total_minutes = total_hours * 60
total_records = total_minutes * records_per_minute

# Generate anomaly minutes with intervals
anomaly_minutes = []
current_minute = 4  # First anomaly occurs at minute 4

while current_minute < total_minutes:
    anomaly_minutes.append(current_minute)
    # Next anomaly after 4-6 minutes
    gap = random.randint(10, 20)
    current_minute += gap

# For each anomaly minute, pick 5 random positions
anomaly_positions = {}
for minute in anomaly_minutes:
    anomaly_positions[minute] = random.sample(range(records_per_minute), 5)

print(f"Anomaly minutes: {anomaly_minutes}")
print(f"Number of anomaly clusters: {len(anomaly_minutes)}")

# Function to generate data in batches
def generate_data_batch(batch_size=1000):
    data = []
    record_log = []  # To store records for printing
    
    for i in range(total_records):
        current_minute = i // records_per_minute
        current_minute_record = i % records_per_minute
        
        current_ts = start_time + timedelta(seconds=i * 5)  # 5 seconds interval
        epoch_millis = int(current_ts.timestamp() * 1000)
        
        # Check if this record should have an anomaly
        should_inject_anomaly = (
            current_minute in anomaly_positions and 
            current_minute_record in anomaly_positions[current_minute]
        )
        
        if should_inject_anomaly:
            value = round(random.uniform(5000, 10000), 2)  # Anomaly value
            is_anomaly = True
        else:
            value = round(random.uniform(20, 40), 2)  # Normal value
            is_anomaly = False
        
        # Format record for console output
        record_log.append({
            "minute": current_minute,
            "position": current_minute_record,
            "timestamp": current_ts.strftime("%H:%M:%S"),
            "value": value,
            "is_anomaly": is_anomaly
        })
        
        data.append({
            "_index": index_name,
            "_source": {
                "timestamp": epoch_millis,
                "value": value
            }
        })
        
        # If we've collected enough docs or reached the end, yield the batch along with logs
        if len(data) >= batch_size or i == total_records - 1:
            yield data, record_log
            data = []
            record_log = []

# Bulk insert logic with progress reporting
batch_size = 5000
total_batches = (total_records + batch_size - 1) // batch_size
docs_inserted = 0
anomalies_count = 0

print(f"Total records to generate: {total_records}")
print(f"Total anomalies to inject: {len(anomaly_minutes) * 5}")
print(f"Bulk inserting in batches of {batch_size}...")

# Perform the bulk insert
for batch_num, (batch, record_log) in enumerate(generate_data_batch(batch_size), 1):
    # Count anomalies in this batch
    batch_anomalies = sum(1 for doc in batch if doc["_source"]["value"] > 100)
    anomalies_count += batch_anomalies
    
    # Print some records from this batch to show anomalies
    print("\nSample data from current batch:")
    print("-" * 60)
    print(f"{'MINUTE':^8}|{'POS':^5}|{'TIMESTAMP':^10}|{'VALUE':^10}|{'ANOMALY':^8}")
    print("-" * 60)
    
    # Only print records that are anomalies or around anomalies (to avoid too much output)
    anomaly_indexes = [i for i, record in enumerate(record_log) if record["is_anomaly"]]
    records_to_print = set()
    
    for idx in anomaly_indexes:
        # Add the anomaly and some records before/after it
        for j in range(max(0, idx-2), min(len(record_log), idx+3)):
            records_to_print.add(j)
    
    # If no anomalies in this batch, just print first few records
    if not records_to_print:
        records_to_print = set(range(min(5, len(record_log))))
    
    # Print the selected records
    for i in sorted(records_to_print):
        record = record_log[i]
        anomaly_marker = "⚠️" if record["is_anomaly"] else ""
        print(f"{record['minute']:^8}|{record['position']:^5}|{record['timestamp']:^10}|{record['value']:^10.2f}|{anomaly_marker:^8}")
    
    print("-" * 60)
    
    # Insert the batch
    success, failed = helpers.bulk(client, batch, stats_only=True)
    docs_inserted += success
    
    # Report progress
    progress = int((batch_num / total_batches) * 100)
    print(f"Progress: {progress}% - Inserted {docs_inserted}/{total_records} records, {anomalies_count} anomalies")

print(f"\n✅ Completed! {docs_inserted} records inserted with {anomalies_count} anomalies")
print(f"Data spans from {start_time} to {start_time + timedelta(hours=total_hours)}")

# Print a summary of all anomaly clusters
print("\nSummary of Anomaly Clusters:")
print("-" * 40)
for minute in sorted(anomaly_positions.keys()):
    minute_start = start_time + timedelta(minutes=minute)
    positions = sorted(anomaly_positions[minute])
    print(f"Minute {minute} ({minute_start.strftime('%H:%M')}): Positions {positions}")

Then I have added detector configuration:

{
    "name": "sensor_avg_detector_live",
    "description": "Detector for value anomalies",
    "time_field": "timestamp",
    "indices": [
        "bulk_sensor_data_latest"
    ],
    "feature_attributes": [
        {
            "feature_name": "value_avg",
            "feature_enabled": true,
            "aggregation_query": {
                "value_avg": {
                    "avg": {
                        "field": "value"
                    }
                }
            }
        }
    ],
    "detection_interval": {
        "period": {
            "interval": 1,
            "unit": "Minutes"
        }
    },
    "window_delay": {
        "period": {
            "interval": 1,
            "unit": "Minutes"
        }
    },
    "shingle_size": 5
}

After starting the detector, It is not recognizing the huge spike in certain minutes as an anomaly, can I know if that is a bug or am I doing something wrong?

Thanks

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions