Skip to content

Inputs and Preprocessing

Phil Hagen edited this page Nov 25, 2025 · 9 revisions

SOF-ELK ingests a variety of file formats, which are detailed below. In general, simply placing the files into the appropriate directory is all that's needed. However, if any specific instructions are required for exporting or generating the files, or any preprocessing is needed, the details are provided below.

Filesystem Ingest Locations

SOF-ELK ingests files from the following filesystem locations:

  • /logstash/aws/: JSON-formatted Amazon Web Services CloudTrail log files. Use the included aws-cloudtrail2sof-elk.py loader script.
  • /logstash/azure/: JSON-formatted Microsoft Azure logs. At this time, the following log types are supported: Event Logs, Sign In Logs, Audit Logs, Admin Activity Logs, and Storage Logs.
  • /logstash/gcp/: JSON-formatted Google Compute Platform logs. See the Cloud Evidence Acquisition -> Google Compute Platform (GCP) page for more specific details.
  • /logstash/gws/: JSON-formatted Google Workspace logs extracted using the Google Workspace API.
  • /logstash/hayabusa/: Output from Yamato Security's Hayabusa Windows event log fast forensics timeline generator and threat hunting tool. JSON or JSONL formatted output is supported, as well as CSV output created with the standard profile. Files must be named *.json, *.jsonl, or *.csv, respectively.
  • /logstash/httpd/: Apache logs in common, combined, or vhost-combined formats
  • /logstash/kape/: JSON-format files generated by the KAPE triage collection tool. (See this document for details on which specific output files are currently supported and their required file naming structure.)
  • /logstash/kubernetes/: Kubernetes log files.
  • /logstash/microsoft365/: JSON-formatted Microsoft 365 logs only.
  • /logstash/nfarch/: Archived NetFlow output, formatted as described below.
  • /logstash/passivedns/: Logs from the passivedns utility.
  • /logstash/plaso/: CSV bodyfile-format files generated by the Plaso tool from the log2timeline framework. (See this document for details on creating CSV files in a supported format.)
  • /logstash/syslog/: Syslog-formatted data
    • NOTICE: Remember that syslog DOES NOT reflect the year of a log entry! Therefore, Logstash has been configured to look for a year value in the path to a file. For example: /logstash/syslog/2015/var/log/messages will assign all entries from that file to the year 2015. If no year is present, the current year will be assumed. This is enabled only for the /logstash/syslog/ directory.
  • /logstash/zeek/: JSON-formatted logs from the Zeek Network Security Monitoring platform. These must be in decompressed form. The following Zeek logs are supported:
    • conn.log: Treated like NetFlow and stored in the netflow-* indices.
    • dns.log: Treated like other DNS logs and stored in the logstash-* indices.
    • http.log: Treated like other HTTP logs and stored in the httpdlog-* indices.
    • The following logs are stored in the zeek-* indices:
      • files.log
      • ftp.log
      • notice.log
      • ssl.log
      • weird.log
      • x509.log

Destination Indices and Relevant Dashboards

Files ingested from the above locations will be available in the corresponding index, as detailed below. These can be explored by accessing the desired index in Kibana's Discover application. Some of these log types also have dashboards and visualizations that, where available, are indicated below. These can be accessed using Kibana's Dashboard application.

Ingest Directory within /logstash/ type field value for remote filebeat shipper Elasticsearch Index Kibana Dashboard
appleul/ appleul appleul
aws/ aws aws
azure/ azure azure
gcp/ gcp gcp
gws/ gws gws
hayabusa/ hayabusa evtxlogs Eventlog Dashboard
httpd/ httpdlog httpdlog HTTPD Log Dashboard
kape/**/*_MFTECmd*_Output.json kape_filesystem filesystem
kape/**/*_LECmd_Output.json kape_lnkfiles lnkfiles LNK File Dashboard
kape/**/*_EvtxECmd_Output.json kape_evtxlogs evtxlogs Eventlog Dashboard
kubernetes/ kubernetes kubernetes
microsoft365/ microsoft365 microsoft365
nfarch/ archive-netflow netflow NetFlow Dashboard
passivedns/ archive-passivedns logstash Syslog Dashboard
plaso/ plaso extxlogs Eventlog Dashboard
syslog/ syslog logstash Syslog Dashboard
zeek/**/ssl* zeek_ssl
zeek/**/x509* zeek_x509
zeek/**/ftp* zeek_ftp
zeek/**/notice* zeek_notice
zeek/**/weird* zeek_weird
zeek/**/http.* zeek_http httpdlog HTTPD Log Dashboard
zeek/**/conn.* zeek_conn netflow NetFlow Dashboard
zeek/**/files.* zeek_files zeek

Native flow exports can also be sent to a SOF-ELK instance via the network. The appropriate firewall port must be opened first.

Input Method Elasticsearch Index Kibana Dashboard
NetFlow v5, NetFlow v9, IPFIX via UDP/9995 netflow NetFlow Dashboard

Filesystem Ingest Caveats

In Filebeat version 9, the default tracking method for "new" files changed to use the fingerprint method. For full details about this method, please see the Filebeat documentation. However, the significant implications for SOF-ELK's ingest process are listed below:

  • Files will not be read until they are at least 1,024 bytes in size.
    • To ingest files smaller than this, concatenate the content of the smaller file(s) into a larger one of the same type in the appropriate ingest directory. (For example: concatenate multiple small syslog files into a single file larger than 1,024 bytes before placing the combined file in the /logstash/syslog/ directory.)
  • Files with identical initial 1,024 bytes will be marked as identical and the perceived duplicates will be skipped.
    • To ingest multiple files that happen to have the exact same initial 1,024 bytes, you may want to add blank lines to the beginning of the log file or use the shuf shell utility to randomize the lines of the file to create a "new" one--that is to say a file with a different initial 1,024 bytes. An example of using shuf to accommodate this is below. NOTE that the shuf method will ONLY work for log files that contain one entry per line. If you're attempting to re-load multiline JSON, this method will result in wildly unpredictable and completely wrong/unusable results.
      cd /logstash/nfarch/
      shuf file1.txt -o file2.txt
      

Ingesting Archived NetFlow

  • To ingest existing nfcapd-created NetFlow evidence, it must be parsed into a specific format. The included nfdump2sof-elk.sh script will take care of this.
    • Read from single file: nfdump2sof-elk.sh -r /path/to/netflow/nfcapd.201703190000 -w /logstash/nfarch/inputfile_1.txt
    • Read recursively from directory: nfdump2sof-elk.sh -r /path/to/netflow/ -w /logstash/nfarch/inputfile_2.txt
    • Optionally, you can specify the IP address of the exporter that created the flow data: nfdump2sof-elk.sh -e 10.3.58.1 -r /path/to/netflow/ -w /logstash/nfarch/inputfile_3.txt
  • To ingest existing AWS VPC Flow data files in JSON format, use the included aws-vpcflow2sof-elk.sh script.
    • Read recursively from directory: aws-vpcflow2sof-elk.sh -r /path/to/aws-vpcflow/ -w /logstash/nfarch/aws-vpcflow_1.txt
  • To ingest existing GCP VPC Flow data files in JSON format, use the included azure-flow2sof-elk.py script. This transparently handles both the latest Virtual Network flow and legacy VPC flow formats.
    • Read from single file: azure-flow2sof-elk.py -r /path/to/azure-flow/file1.json -w /logstash/nfarch/azure-flow_1.txt
    • Read recursively from directory: azure-flow2sof-elk.py -r /path/to/azure-flow/ -w /logstash/nfarch/azure-flow_2.txt

Clone this wiki locally