Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 147 additions & 21 deletions importer_client/python/timesketch_import_client/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Timesketch data importer."""

from __future__ import unicode_literals

import codecs
Expand All @@ -20,17 +21,16 @@
import logging
import math
import os
import time
import uuid

import numpy
import pandas

from timesketch_api_client import timeline
from timesketch_api_client import definitions
from timesketch_api_client import timeline
from timesketch_api_client.error import UnableToRunAnalyzer
from timesketch_import_client import utils


logger = logging.getLogger("timesketch_importer.importer")


Expand Down Expand Up @@ -66,6 +66,12 @@ def run_analyzers(analyzer_names=None, timeline_obj=None):
class ImportStreamer(object):
"""Upload object used to stream results to Timesketch."""

# Timesketch default max form size is 200MB (209715200 bytes).
DEFAULT_MAX_PAYLOAD_SIZE = 209715200 # 200 Mb

# Reserve 1MB for HTTP headers, multipart boundaries, and JSON overhead.
PAYLOAD_SAFETY_BUFFER = 1 * 1024 * 1024

# The number of entries before automatically flushing
# the streamer.
DEFAULT_ENTRY_THRESHOLD = 50000
Expand Down Expand Up @@ -108,6 +114,8 @@ def __init__(self):
self._timestamp_desc = self.DEFAULT_TIMESTAMP_DESC
self._threshold_entry = self.DEFAULT_ENTRY_THRESHOLD
self._threshold_filesize = self.DEFAULT_FILESIZE_THRESHOLD
self._max_payload_size = self.DEFAULT_MAX_PAYLOAD_SIZE
self._safe_payload_limit = self._max_payload_size - self.PAYLOAD_SAFETY_BUFFER

def _fix_dict(self, my_dict):
"""Adjusts a dict with so that it can be uploaded to Timesketch.
Expand Down Expand Up @@ -289,42 +297,93 @@ def _reset(self):
self._count = 0
self._data_lines = []

def _upload_data_buffer(self, end_stream, retry_count=0):
"""Upload data to Timesketch.
def _upload_data_buffer(self, end_stream, data_lines=None, retry_count=0):
"""Upload data buffer to Timesketch using multipart/form-data.

Args:
end_stream (bool): boolean indicating whether this is the last chunk of
the stream.
end_stream (bool): boolean indicating whether this is the last chunk
of the stream.
data_lines (list): optional list of data lines to upload. If None,
uses self._data_lines.
retry_count (int): optional int that is only set if this is a retry
of the upload.

Returns:
None: if the upload is successful or skipped.

Raises:
RuntimeError: If the data buffer is not successfully uploaded.
"""
if not self._data_lines:
# 1. Determine which data to use (Instance buffer or recursive argument)
if data_lines is None:
data_lines = self._data_lines

if not data_lines:
return None

# 2. Serialize to check size
# We construct the JSON string here based on the local variable
events_string = "\n".join([json.dumps(x) for x in data_lines])
payload_size = len(events_string.encode("utf-8"))

# 3. Dynamic Splitting Logic
if payload_size > self._safe_payload_limit:
num_rows = len(data_lines)

if num_rows <= 1:
logger.warning(
"Single dictionary entry (%d bytes) exceeds safe limit "
"(%d bytes). Attempting upload anyway.",
payload_size,
self._safe_payload_limit,
)
else:
logger.info(
"Buffer size (%d bytes) exceeds safe limit (%d bytes). "
"Splitting %d entries.",
payload_size,
self._safe_payload_limit,
num_rows,
)

mid_point = num_rows // 2
first_half = data_lines[:mid_point]
second_half = data_lines[mid_point:]

# Recursive Call 1: First half
self._upload_data_buffer(
end_stream=False, data_lines=first_half, retry_count=0
)

# Recursive Call 2: Second half
self._upload_data_buffer(
end_stream=end_stream, data_lines=second_half, retry_count=0
)
return

# 4. Prepare Metadata fields
data = {
"name": self._timeline_name,
"sketch_id": self._sketch.id,
"enable_stream": not end_stream,
"data_label": self._data_label,
"provider": self._provider,
"events": "\n".join([json.dumps(x) for x in self._data_lines]),
}
if self._index:
data["index_name"] = self._index

if self._upload_context:
data["context"] = self._upload_context

response = self._sketch.api.session.post(self._resource_url, data=data)
# 5. Prepare "Events" as a multipart/form-data field
# (None, data, content_type) -> Treated as form field, not file upload
files = {"events": (None, events_string, "application/json")}

# 6. Send Request
response = self._sketch.api.session.post(
self._resource_url, data=data, files=files
)

# TODO: Investigate why the sleep is needed, fix the underlying issue
# and get rid of it here.
# To prevent unexpected errors with connection refusal adding a quick
# sleep.
time.sleep(2)
if response.status_code not in definitions.HTTP_STATUS_CODE_20X:
if retry_count >= self.DEFAULT_RETRY_LIMIT:
raise RuntimeError(
Expand All @@ -345,7 +404,9 @@ def _upload_data_buffer(self, end_stream, retry_count=0):
)

return self._upload_data_buffer(
end_stream=end_stream, retry_count=retry_count + 1
end_stream=end_stream,
data_lines=data_lines,
retry_count=retry_count + 1,
)

self._chunk += 1
Expand All @@ -361,33 +422,85 @@ def _upload_data_buffer(self, end_stream, retry_count=0):
return None

def _upload_data_frame(self, data_frame, end_stream, retry_count=0):
"""Upload data to Timesketch.
"""Upload data to Timesketch using multipart/form-data.

Args:
data_frame (DataFrame): a pandas DataFrame with the content to upload.
end_stream (bool): boolean indicating whether this is the last chunk of
the stream.
end_stream (bool): boolean indicating whether this is the last chunk
of the stream.
retry_count (int): optional int that is only set if this is a retry
of the upload.

Returns:
None: if the upload is successful.

Raises:
RuntimeError: If the dataframe is not successfully uploaded.
"""

# 1. Serialize
events_json = data_frame.to_json(orient="records", lines=True)

# 2. Check Size (Dynamic logic)
payload_size = len(events_json.encode("utf-8"))

if payload_size > self._safe_payload_limit:
num_rows = len(data_frame)

if num_rows <= 1:
logger.warning(
"Single row (%d bytes) exceeds safe limit (%d bytes). "
"Attempting upload anyway.",
payload_size,
self._safe_payload_limit,
)
else:
logger.info(
"Payload size (%d bytes) exceeds safe limit (%d bytes). "
"Splitting %d rows.",
payload_size,
self._safe_payload_limit,
num_rows,
)

mid_point = num_rows // 2
first_half = data_frame.iloc[:mid_point]
second_half = data_frame.iloc[mid_point:]

# Recursive split
self._upload_data_frame(first_half, end_stream=False, retry_count=0)
self._upload_data_frame(
second_half, end_stream=end_stream, retry_count=0
)
return

# 3. Prepare Metadata fields
data = {
"name": self._timeline_name,
"sketch_id": self._sketch.id,
"enable_stream": not end_stream,
"data_label": self._data_label,
"provider": self._provider,
"events": data_frame.to_json(orient="records", lines=True),
}
if self._index:
data["index_name"] = self._index

if self._upload_context:
data["context"] = self._upload_context

response = self._sketch.api.session.post(self._resource_url, data=data)
# 4. Prepare "Events" as a Multipart Field
# (None, data) tells requests to send this as a form field, not a file
# upload
# This ensures it ends up in request.form on the Flask backend
files = {"events": (None, events_json, "application/json")}

# 5. Send Request
# 'data' contains metadata fields, 'files' contains the events payload.
# Requests automatically sets Content-Type to multipart/form-data.
response = self._sketch.api.session.post(
self._resource_url, data=data, files=files
)

if response.status_code not in definitions.HTTP_STATUS_CODE_20X:
if retry_count >= self.DEFAULT_RETRY_LIMIT:
raise RuntimeError(
Expand Down Expand Up @@ -906,6 +1019,19 @@ def set_timestamp_description(self, description):
"""Set the timestamp description field."""
self._timestamp_desc = description

def set_max_payload_size(self, size_in_bytes):
"""Set the maximum payload size allowed by the server.

Args:
size_in_bytes (int): The server limit (e.g. MAX_FORM_MEMORY_SIZE).
"""
self._max_payload_size = size_in_bytes
# Recalculate safe limit
self._safe_payload_limit = self._max_payload_size - self.PAYLOAD_SAFETY_BUFFER
if self._safe_payload_limit <= 0:
# Fallback if someone sets a ridiculously low limit
self._safe_payload_limit = 1024 * 1024 # 1MB minimum

@property
def state(self):
"""Returns a state string for the indexing process."""
Expand Down
19 changes: 19 additions & 0 deletions importer_client/python/tools/timesketch_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ def upload_file(
if context:
streamer.set_upload_context(context)

max_payload = config_dict.get("max_payload_size")
if max_payload:
streamer.set_max_payload_size(max_payload)

streamer.add_file(file_path)

timeline = streamer.timeline
Expand Down Expand Up @@ -476,6 +480,20 @@ def main(args=None):
),
)

config_group.add_argument(
"--max-payload-size",
"--max_payload_size",
action="store",
type=int,
default=importer.ImportStreamer.DEFAULT_MAX_PAYLOAD_SIZE,
dest="max_payload_size",
help=(
"The maximum size in bytes for a single HTTP upload request. "
"This should match the server's MAX_FORM_MEMORY_SIZE. "
"Defaults to 200MB."
),
)

options = argument_parser.parse_args(args)

if options.show_version:
Expand Down Expand Up @@ -649,6 +667,7 @@ def main(args=None):
"data_label": options.data_label,
"context": context,
"analyzer_names": options.analyzer_names,
"max_payload_size": options.max_payload_size,
}

logger.info("Uploading file.")
Expand Down
Loading