Skip to content

Conversation

nivm-port
Copy link
Contributor

@nivm-port nivm-port commented Oct 6, 2025

User description

Description

Reenable Memory optimizations on ocean core

What - Memory optimizations

Why - OOMs on items to parse cases especially on file kind with big payload and yaml parsing

How - offloading of data to filesystem and streaming data from the disk

Type of change

Please leave one option from the following and delete the rest:

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • New Integration (non-breaking change which adds a new integration)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Non-breaking change (fix of existing functionality that will not change current behavior)
  • Documentation (added/updated documentation)

All tests should be run against the port production environment(using a testing org).

Core testing checklist

  • Integration able to create all default resources from scratch
  • Resync finishes successfully
  • Resync able to create entities
  • Resync able to update entities
  • Resync able to detect and delete entities
  • Scheduled resync able to abort existing resync and start a new one
  • Tested with at least 2 integrations from scratch
  • Tested with Kafka and Polling event listeners
  • Tested deletion of entities that don't pass the selector

Integration testing checklist

  • Integration able to create all default resources from scratch
  • Completed a full resync from a freshly installed integration and it completed successfully
  • Resync able to create entities
  • Resync able to update entities
  • Resync able to detect and delete entities
  • Resync finishes successfully
  • If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the examples folder in the integration directory.
  • If resource kind is updated, run the integration with the example data and check if the expected result is achieved
  • If new resource kind is added or updated, validate that live-events for that resource are working as expected
  • Docs PR link here

Preflight checklist

  • Handled rate limiting
  • Handled pagination
  • Implemented the code in async
  • Support Multi account

Screenshots

Include screenshots from your environment showing how the resources of the integration will look.

API Documentation

Provide links to the API documentation used for this integration.


PR Type

Enhancement


Description

  • Re-enable memory optimizations for large payload processing

  • Add streaming data processing with filesystem offloading

  • Implement JQ expression input evaluation and mapping optimization

  • Add secure temporary file handling for items parsing


Diagram Walkthrough

flowchart LR
  A["Raw Data"] --> B["Memory Check"]
  B --> C["Filesystem Offload"]
  C --> D["JQ Processing"]
  D --> E["Streaming Parser"]
  E --> F["Entity Mapping"]
  F --> G["Optimized Output"]
Loading

File Walkthrough

Relevant files
Enhancement
3 files
jq_entity_processor.py
Enhanced JQ processor with memory optimizations                   
+340/-17
jq_input_evaluator.py
New JQ input evaluation utility module                                     
+69/-0   
utils.py
Added streaming processing and secure file handling           
+241/-23
Configuration changes
2 files
models.py
Updated items_to_parse_name field type                                     
+1/-1     
pyproject.toml
Version bump to 0.28.13                                                                   
+1/-1     
Bug fix
1 files
sync_raw.py
Fixed generator wrapper parameter passing                               
+1/-1     
Tests
1 files
test_jq_entity_processor.py
Comprehensive tests for mapping optimizations                       
+924/-1 
Documentation
1 files
CHANGELOG.md
Version 0.28.13 release notes                                                       
+7/-1     
Dependencies
2 files
Dockerfile.Deb
Added jq binary to Docker image                                                   
+1/-0     
Dockerfile.local
Added jq binary to local Docker image                                       
+1/-0     

@nivm-port nivm-port requested a review from a team as a code owner October 6, 2025 09:45
Copy link
Contributor

qodo-merge-pro bot commented Oct 6, 2025

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
Command execution risk

Description: External jq binary is invoked via subprocess on user-controlled expressions and data
paths, which may enable command injection or arbitrary file access if validation is
bypassed or jq has vulnerabilities; although basic validation exists, reliance on system
jq and temporary files broadens attack surface.
utils.py [150-245]

Referred Code
def is_resource_supported(
    kind: str, resync_event_mapping: dict[str | None, list[RESYNC_EVENT_LISTENER]]
) -> bool:
    return bool(resync_event_mapping[kind] or resync_event_mapping[None])

def _validate_jq_expression(expression: str) -> None:
    """Validate jq expression to prevent command injection."""
    try:
        _ = cast(JQEntityProcessor, ocean.app.integration.entity_processor)._compile(expression)
    except Exception as e:
        raise ValueError(f"Invalid jq expression: {e}") from e
    # Basic validation - reject expressions that could be dangerous
    # Check for dangerous patterns (include, import, module)
    dangerous_patterns = ['include', 'import', 'module', 'env']
    for pattern in dangerous_patterns:
        # Use word boundary regex to match only complete words, not substrings
        if re.search(rf'\b{re.escape(pattern)}\b', expression):
            raise ValueError(f"Potentially dangerous pattern '{pattern}' found in jq expression")

    # Special handling for 'env' - block environment variable access
    if re.search(r'(?<!\w)\$ENV(?:\.)?', expression):


 ... (clipped 75 lines)
Sensitive temp files

Description: Temporary files with JSON payloads are written to a fixed directory (/tmp/ocean) and
processed, which may risk sensitive data exposure on shared hosts despite restrictive file
permissions and cleanup attempts.
utils.py [176-251]

Referred Code
    """Create a secure temporary file with restricted permissions."""
    # Create temp directory if it doesn't exist
    temp_dir = "/tmp/ocean"
    os.makedirs(temp_dir, exist_ok=True)

    # Create temporary file with secure permissions
    fd, temp_path = tempfile.mkstemp(suffix=suffix, dir=temp_dir)
    try:
        # Set restrictive permissions (owner read/write only)
        os.chmod(temp_path, stat.S_IRUSR | stat.S_IWUSR)
        return temp_path
    finally:
        os.close(fd)

async def get_items_to_parse_bulks(raw_data: dict[Any, Any], data_path: str, items_to_parse: str, items_to_parse_name: str, base_jq: str) -> AsyncGenerator[list[dict[str, Any]], None]:
    # Validate inputs to prevent command injection
    _validate_jq_expression(items_to_parse)
    items_to_parse = items_to_parse.replace(base_jq, ".") if data_path else items_to_parse

    temp_data_path = None
    temp_output_path = None


 ... (clipped 55 lines)
Supply chain risk

Description: Adding jq to the runtime image increases the attack surface and supply-chain risk; ensure
pinning versions and verifying integrity to avoid vulnerable binaries.
Dockerfile.Deb [44-52]

Referred Code
RUN apt-get update \
    && apt-get install -y \
        ca-certificates \
        openssl \
        curl \
        acl \
        sudo \
        jq \
    && apt-get clean
Ticket Compliance
🎫 No ticket provided
  • Create ticket/issue
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
No custom compliance provided

Follow the guide to enable custom compliance check.

  • Update
Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

Copy link
Contributor

qodo-merge-pro bot commented Oct 6, 2025

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Security
Block dangerous debug keyword in jq
Suggestion Impact:The commit updated the dangerous_patterns list to include 'debug', matching the suggestion to enhance jq expression security.

code diff:

-    dangerous_patterns = ['include', 'import', 'module', 'env']
+    dangerous_patterns = ['include', 'import', 'module', 'env', 'debug']

Add the debug keyword to the dangerous_patterns list in the
_validate_jq_expression function to enhance security by preventing its use in
user-provided jq expressions.

port_ocean/core/integrations/mixins/utils.py [155-173]

 def _validate_jq_expression(expression: str) -> None:
     """Validate jq expression to prevent command injection."""
     try:
         _ = cast(JQEntityProcessor, ocean.app.integration.entity_processor)._compile(expression)
     except Exception as e:
         raise ValueError(f"Invalid jq expression: {e}") from e
     # Basic validation - reject expressions that could be dangerous
     # Check for dangerous patterns (include, import, module)
-    dangerous_patterns = ['include', 'import', 'module', 'env']
+    dangerous_patterns = ['include', 'import', 'module', 'env', 'debug']
     for pattern in dangerous_patterns:
         # Use word boundary regex to match only complete words, not substrings
         if re.search(rf'\b{re.escape(pattern)}\b', expression):
             raise ValueError(f"Potentially dangerous pattern '{pattern}' found in jq expression")
 
     # Special handling for 'env' - block environment variable access
     if re.search(r'(?<!\w)\$ENV(?:\.)?', expression):
         raise ValueError("Environment variable access '$ENV.' found in jq expression")
     if re.search(r'\benv\.', expression):
         raise ValueError("Environment variable access 'env.' found in jq expression")

[Suggestion processed]

Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies that the debug keyword in jq can be a security risk and is missing from the blacklist, making it a valid and important security hardening improvement.

Medium
Possible issue
Ensure items_to_parse result is iterable

In _calculate_entity, ensure the items variable is a list before iteration by
wrapping it in a list if it is not one, preventing a TypeError for single,
non-list jq results.

port_ocean/core/handlers/entity_processor/jq_entity_processor.py [332-363]

 async def _calculate_entity(
     self,
     data: dict[str, Any],
     raw_entity_mappings: dict[str, Any],
     items_to_parse: str | None,
     items_to_parse_name: str,
     selector_query: str,
     parse_all: bool = False,
 ) -> tuple[list[MappedEntity], list[Exception]]:
     raw_data: list[dict[str, Any]] | list[tuple[dict[str, Any], str]] = [
         data.copy()
     ]
     items_to_parse_key = None
     if items_to_parse:
         items_to_parse_key = items_to_parse_name
         if not ocean.config.yield_items_to_parse:
             if isinstance(data, dict) and data.get("__type") == "path":
                 file_path = data.get("file", {}).get("content", {}).get("path")
                 with open(file_path, "r") as f:
                     data["file"]["content"] = json.loads(f.read())
             items = await self._search(data, items_to_parse)
+            if items is None:
+                return [], []
             if not isinstance(items, list):
-                logger.warning(
-                    f"Failed to parse items for JQ expression {items_to_parse}, Expected list but got {type(items)}."
-                    f" Skipping..."
-                )
-                return [], []
+                items = [items]
+
             raw_all_payload_stringified = json.dumps(data)
             raw_data = [
                 ({items_to_parse_name: item}, raw_all_payload_stringified)
                 for item in items
             ]
  • Apply / Chat
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential TypeError if the jq expression in items_to_parse returns a single non-list item, and provides a robust fix by ensuring the result is always iterable.

Medium
  • Update

Copy link
Contributor

github-actions bot commented Oct 8, 2025

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/18339050360/artifacts/4212864011

Code Coverage Total Percentage: 87.23%

Copy link
Contributor

github-actions bot commented Oct 8, 2025

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/18339065511/artifacts/4212924478

Code Coverage Total Percentage: 87.23%

Copy link
Contributor

github-actions bot commented Oct 8, 2025

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/18339326996/artifacts/4212962496

Code Coverage Total Percentage: 87.23%

Copy link
Contributor

github-actions bot commented Oct 8, 2025

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/18339421633/artifacts/4213006634

Code Coverage Total Percentage: 87.23%

Copy link
Contributor

github-actions bot commented Oct 8, 2025

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/18346291715/artifacts/4215413722

Code Coverage Total Percentage: 87.23%

Copy link
Contributor

github-actions bot commented Oct 8, 2025

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/18347592987/artifacts/4215943856

Code Coverage Total Percentage: 87.23%

Copy link
Contributor

github-actions bot commented Oct 8, 2025

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/18349125396/artifacts/4216540195

Code Coverage Total Percentage: 87.23%

Copy link
Contributor

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/18464589022/artifacts/4254144981

Code Coverage Total Percentage: 87.23%

Copy link
Contributor

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/18523130663/artifacts/4274587135

Code Coverage Total Percentage: 87.64%

Copy link
Contributor

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/18523196873/artifacts/4274606068

Code Coverage Total Percentage: 87.64%

Copy link
Contributor

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/18527314566/artifacts/4276050251

Code Coverage Total Percentage: 87.63%

Copy link
Contributor

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/18527308414/artifacts/4276050736

Code Coverage Total Percentage: 87.63%

Copy link
Contributor

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/18528052187/artifacts/4276327893

Code Coverage Total Percentage: 87.63%

misconfigurations_all: dict[str, str] = {}
# Map the entity with jq expressions that classified as single item expressions with the single item as input
mapped_entity_item = await self._search_as_object(
modified_data[0], raw_entity_mappings["item"], misconfigurations_item
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use get method instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we always have the item, all and none props in the raw_entity_mappings object in this case (items to parse)

if misconfigurations_item:
# The misconfigurations dict not contains the mapping expressions themselves, so we need to filter the original mapping by the misconfigured keys
filtered_item_mappings = self._filter_mappings_by_keys(
raw_entity_mappings["item"], list(misconfigurations_item.keys())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use get method instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my last comment

Comment on lines +358 to +359
with open(file_path, "r") as f:
data["file"]["content"] = json.loads(f.read())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should ve place this under a feature flag like use_disk?

Copy link
Contributor Author

@nivm-port nivm-port Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least in this place I think it is not needed.... If the integration has not offloaded the data to disk (and marked the event as __type == "path") the ocean framework would continue as usual (without any disk usage)

@ivankalinovski ivankalinovski self-requested a review October 15, 2025 14:58
Copy link
Contributor

@ivankalinovski ivankalinovski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@nivm-port nivm-port merged commit baf49ec into main Oct 15, 2025
36 checks passed
@nivm-port nivm-port deleted the PORT-16058-reenable-memory-optimizations branch October 15, 2025 14:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants