-
Notifications
You must be signed in to change notification settings - Fork 114
Expand file tree
/
Copy pathzircolite.py
More file actions
executable file
·1349 lines (1179 loc) · 58.2 KB
/
zircolite.py
File metadata and controls
executable file
·1349 lines (1179 loc) · 58.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!python3
"""
Zircolite - Standalone SIGMA-Based Detection Tool for EVTX, Auditd, Sysmon Linux, and more.
This is the main entry point for Zircolite. The core functionality has been modularized into
the zircolite/ package for better maintainability and code organization.
Package structure:
- zircolite/core.py: ZircoliteCore class for database and rule execution
- zircolite/streaming.py: StreamingEventProcessor for single-pass processing
- zircolite/extractor.py: EvtxExtractor for log format conversion
- zircolite/rules.py: RulesetHandler and RulesUpdater for rule management
- zircolite/templates.py: TemplateEngine and ZircoliteGuiGenerator for output
- zircolite/utils.py: Utility functions and MemoryTracker
"""
# Standard libs
import argparse
import logging
import os
import random
import re
import string
import sys
import time
from pathlib import Path
from typing import Any, List, Optional, Tuple
# Force UTF-8 on Windows so argparse help and banner (Unicode/emojis) don't raise
# UnicodeEncodeError when the console uses cp1252 (see PYI-1448 / PYI-4560).
if sys.platform == "win32":
try:
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace") # type: ignore[attr-defined]
sys.stderr.reconfigure(encoding="utf-8", errors="replace") # type: ignore[attr-defined]
elif hasattr(sys.stdout, "buffer"):
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
except (AttributeError, OSError):
pass
# External libs - Rich for styled terminal output
from rich.logging import RichHandler
from rich.panel import Panel
from rich.table import Table
# Rich argparse for colored --help output
try:
from rich_argparse import RichHelpFormatter
_HAS_RICH_ARGPARSE = True
except ImportError:
RichHelpFormatter = None # type: ignore[assignment,misc]
_HAS_RICH_ARGPARSE = False
# Import from package
from zircolite import (
RulesetHandler,
RulesUpdater,
TemplateEngine,
ZircoliteGuiGenerator,
MemoryTracker,
init_logger,
quit_on_error,
check_if_exists,
select_files,
avoid_files,
analyze_files_and_recommend_mode,
print_mode_recommendation,
# Config dataclasses
RulesetConfig,
TemplateConfig,
GuiConfig,
# Log type detection
LogTypeDetector,
DetectionResult,
# YAML configuration
ConfigLoader,
create_default_config_file,
# Rich console
console,
DetectionStats,
LEVEL_PRIORITY,
# UI/UX helpers
set_quiet_mode,
is_quiet,
print_banner,
print_section,
print_error_panel,
build_attack_summary,
make_file_link,
)
# Processing modes and context (from the dedicated processing module)
from zircolite.processing import (
ProcessingContext,
create_extractor,
process_unified_streaming,
process_perfile_streaming,
process_db_input,
process_parallel_streaming,
)
from zircolite.shutdown import (
install_signal_handler,
is_shutdown_requested,
request_shutdown,
)
################################################################
# NOTE: ProcessingContext and all process_* functions live in
# zircolite/processing.py – imported above.
################################################################
################################################################
# ARGUMENT PARSING
################################################################
def parse_arguments() -> argparse.Namespace:
"""Parse command line arguments."""
kwargs = {}
if _HAS_RICH_ARGPARSE:
kwargs["formatter_class"] = RichHelpFormatter
parser = argparse.ArgumentParser(**kwargs)
# Input files and filtering/selection options
logs_input_args = parser.add_argument_group('📁 INPUT FILES AND FILTERING')
logs_input_args.add_argument("-e", "--evtx", "--events", help="Path to log file or directory containing log files in supported format", type=str)
logs_input_args.add_argument("-s", "--select", help="Process only files with filenames containing the specified string (applied before exclusions)", action='append', nargs='+')
logs_input_args.add_argument("-a", "--avoid", help="Skip files with filenames containing the specified string", action='append', nargs='+')
logs_input_args.add_argument("-f", "--fileext", help="File extension of the log files to process", type=str)
logs_input_args.add_argument("-fp", "--file-pattern", help="Python Glob pattern to select files (only works with directories)", type=str)
logs_input_args.add_argument("--no-recursion", help="Search for log files only in the specified directory (disable recursive search)", action="store_true")
logs_input_args.add_argument("--archive-password", help="Password for encrypted ZIP or 7-Zip archives", type=str, metavar="PASSWORD")
# Events filtering options
event_args = parser.add_argument_group('🔍 EVENTS FILTERING')
event_args.add_argument("-A", "--after", help="Process only events after this timestamp (UTC format: 1970-01-01T00:00:00)", type=str, default="1970-01-01T00:00:00")
event_args.add_argument("-B", "--before", help="Process only events before this timestamp (UTC format: 1970-01-01T00:00:00)", type=str, default="9999-12-12T23:59:59")
event_args.add_argument("--no-event-filter", help="Disable early event filtering based on channel/eventID (process all events)", action='store_true')
# Event and log formats options
event_formats_args = parser.add_mutually_exclusive_group()
event_formats_args.add_argument("-j", "--json-input", "--jsononly", "--jsonline", "--jsonl", help="Input logs are in JSON lines format", action='store_true')
event_formats_args.add_argument("--json-array-input", "--jsonarray", "--json-array", help="Input logs are in JSON array format", action='store_true')
event_formats_args.add_argument("--db-input", "-D", "--dbonly", help="Use a previously saved database file (time range filters will not work)", action='store_true')
event_formats_args.add_argument("-S", "--sysmon-linux-input", "--sysmon4linux", "--sysmon-linux", help="Process Sysmon for Linux log files (default extension: '.log')", action='store_true')
event_formats_args.add_argument("-AU", "--auditd-input", "--auditd", help="Process Auditd log files (default extension: '.log')", action='store_true')
event_formats_args.add_argument("-x", "--xml-input", "--xml", help="Process EVTX files converted to XML format (default extension: '.xml')", action='store_true')
event_formats_args.add_argument("--evtxtract-input", "--evtxtract", help="Process log files extracted with EVTXtract (default extension: '.log')", action='store_true')
event_formats_args.add_argument("--csv-input", "--csvonly", help="Process log files in CSV format (extension: '.csv')", action='store_true')
# Ruleset options
rulesets_formats_args = parser.add_argument_group('📋 RULES AND RULESETS')
rulesets_formats_args.add_argument("-r", "--ruleset", help="Sigma ruleset in JSON (Zircolite format) or YAML/directory of YAML files (Native Sigma format)", action='append', nargs='+')
rulesets_formats_args.add_argument("-sr", "--save-ruleset", help="Save converted ruleset (from Sigma to Zircolite format) to disk", action='store_true')
rulesets_formats_args.add_argument("-p", "--pipeline", help="Use specified pipeline for native Sigma rulesets (YAML). Examples: 'sysmon', 'windows-logsources', 'windows-audit'. Use '--pipeline-list' to see available pipelines.", action='append', nargs='+')
rulesets_formats_args.add_argument("-pl", "--pipeline-list", help="List all installed pysigma pipelines", action='store_true')
rulesets_formats_args.add_argument("-R", "--rulefilter", help="Remove rules from ruleset by matching rule title (case sensitive)", action='append', nargs='*')
rulesets_formats_args.add_argument("--test-rules", help="JSON file with rule test cases (true-positive / true-negative events per rule)", type=str, metavar="TEST_FILE")
# Output formats and output files options
output_formats_args = parser.add_argument_group('💾 OUTPUT FORMATS AND FILES')
output_formats_args.add_argument("-o", "--outfile", help="Output file for detected events", type=str, default="detected_events.json")
output_formats_args.add_argument(
"--csv",
"--csv-output",
help=(
"Output results in CSV format (empty fields included). "
"Column headers are fixed from the first detection row; match fields that only "
"appear in later rules are omitted—use default JSON output for a full field set."
),
action="store_true",
)
output_formats_args.add_argument("--csv-delimiter", help="Delimiter for CSV output", type=str, default=";")
output_formats_args.add_argument("--keepflat", "--keep-flat", help="Save flattened events as JSON", action='store_true')
output_formats_args.add_argument("--profile-rules", help="Time each rule execution and print a performance report at the end", action='store_true')
output_formats_args.add_argument("-d", "--dbfile", "--db-file", help="Save all logs to a SQLite database file", type=str)
output_formats_args.add_argument("-l", "--logfile", "--log-file", help="Log file name", default="zircolite.log", type=str)
output_formats_args.add_argument("--hashes", help="Add xxhash64 of the original log event to each event", action='store_true')
output_formats_args.add_argument("-L", "--limit", "--limit-results", help="Discard results exceeding this limit from output file", type=int, default=-1)
# Advanced configuration options
config_formats_args = parser.add_argument_group('⚙️ ADVANCED CONFIGURATION')
config_formats_args.add_argument("-c", "--config", help="JSON or YAML file containing field mappings and exclusions", type=str, default="config/config.yaml")
config_formats_args.add_argument("-LE", "--logs-encoding", help="Specify encoding for Sysmon for Linux or Auditd files", type=str)
config_formats_args.add_argument("-q", "--quiet", help="Quiet mode: suppress banner, progress, and info messages. Only the summary panel and errors are shown.", action='store_true')
config_formats_args.add_argument("--debug", help="Enable debug logging", action='store_true')
config_formats_args.add_argument("-n", "--nolog", "--no-log", help="Don't create log or result files", action='store_true')
config_formats_args.add_argument("-RE", "--remove-events", help="Remove processed log files after successful analysis (use with caution)", action='store_true')
config_formats_args.add_argument("-U", "--update-rules", help="Update rulesets in the 'rules' directory", action='store_true')
config_formats_args.add_argument("-v", "--version", help="Display Zircolite version", action='store_true')
config_formats_args.add_argument("--timefield", "--time-field", help="Specify time field name for time filtering (default: 'SystemTime', auto-detects if not found)", type=str, default="SystemTime")
config_formats_args.add_argument("--unified-db", "--all-in-one", help="Force unified database mode (all files in one DB, enables cross-file correlation)", action='store_true')
config_formats_args.add_argument("--no-auto-mode", help="Disable automatic processing mode selection based on file analysis", action='store_true')
config_formats_args.add_argument("--no-auto-detect", help="Disable automatic log type and timestamp detection (use explicit format flags instead)", action='store_true')
config_formats_args.add_argument("--strict", help="Strict EVTX parsing: stop on corrupted or malformed chunks instead of skipping them (default: lenient, recovers as many events as possible)", action='store_true')
config_formats_args.add_argument("--add-index", help="Create an index on the given column(s). Can be repeated or list multiple columns (e.g. --add-index Channel EventID).", action='append', nargs='+', metavar="COL", default=[])
config_formats_args.add_argument("--remove-index", help="Drop the given index name(s) after creation. Can be repeated or list multiple (e.g. --remove-index idx_channel idx_eventid).", action='append', nargs='+', metavar="IDX", default=[])
config_formats_args.add_argument("--auto-index", help="Inspect the loaded ruleset and auto-create indices on the top-N most-referenced columns (default N=5 when used without an explicit number). Combine with --add-index for additional manually chosen columns.", type=int, nargs='?', const=5, default=0, metavar="N")
# Transform options
transform_args = parser.add_argument_group('🔄 TRANSFORMS')
transform_args.add_argument("--all-transforms", help="Enable all defined transforms (overrides enabled_transforms list)", action='store_true')
transform_args.add_argument("--transform-category", help="Enable transforms by category name (can be repeated). Use '--transform-list' to see available categories.", action='append', dest='transform_categories')
transform_args.add_argument("--transform-list", help="List available transform categories and their transforms, then exit", action='store_true')
# YAML configuration file options
yaml_config_args = parser.add_argument_group('📄 YAML CONFIGURATION FILE')
yaml_config_args.add_argument("--yaml-config", "-Y", help="YAML configuration file (CLI arguments override file settings)", type=str)
yaml_config_args.add_argument("--generate-config", help="Generate a default YAML configuration file and exit", type=str, metavar="OUTPUT_FILE")
# Parallel processing options
parallel_args = parser.add_argument_group('⚡ PARALLEL PROCESSING')
parallel_args.add_argument("-P", "--no-parallel", help="Disable automatic parallel processing (parallel is enabled by default when beneficial)", action='store_true')
parallel_args.add_argument("-w", "--parallel-workers", help="Maximum number of parallel workers (default: auto-detect based on CPU/memory)", type=int)
parallel_args.add_argument("--parallel-memory-limit", help="Memory usage threshold percentage before throttling (default: 85)", type=float, default=85.0)
# Templating and Mini GUI options
templating_formats_args = parser.add_argument_group('🎨 TEMPLATING AND MINI GUI')
templating_formats_args.add_argument("-t", "--template", help="Jinja2 template to use for output generation", type=str, action='append', nargs='+')
templating_formats_args.add_argument("-T", "--templateOutput", "--template-output", help="Output file for Jinja2 template results", type=str, action='append', nargs='+')
templating_formats_args.add_argument("--template-append", help="Append to template output files instead of overwriting them. Useful for accumulating results across multiple runs (e.g. cumulative NDJSON exports). Note: not all templates produce append-safe output (single-document JSON layers will become invalid).", action='store_true', dest='template_append')
templating_formats_args.add_argument("--timesketch", help="Shortcut: use Timesketch template and write to timesketch-<RAND>.json", action='store_true')
templating_formats_args.add_argument("--navigator-output", help="Shortcut: generate ATT&CK Navigator layer JSON and write to navigator-<RAND>.json (or specify a custom filename)", type=str, metavar="OUTPUT_FILE", nargs='?', const="")
templating_formats_args.add_argument("-G", "--package", help="Create a ZircoGui/Mini GUI package", action='store_true')
templating_formats_args.add_argument("--package-dir", help="Directory to save the ZircoGui/Mini GUI package", type=str, default="")
return parser.parse_args()
################################################################
# FILE DISCOVERY AND INPUT TYPE DETECTION
################################################################
def get_file_extension(args: argparse.Namespace) -> str:
"""Determine file extension based on input type."""
if args.fileext:
return args.fileext
if args.json_input or args.json_array_input:
return "json"
if args.sysmon_linux_input or args.auditd_input:
return "log"
if args.xml_input:
return "xml"
if args.csv_input:
return "csv"
return "evtx"
def _has_explicit_format_flag(args: argparse.Namespace) -> bool:
"""Check if the user has set an explicit format flag on the CLI."""
return any([
args.json_input, args.json_array_input, args.xml_input,
args.sysmon_linux_input, args.auditd_input,
args.csv_input, args.evtxtract_input, args.db_input,
])
def discover_files(
args: argparse.Namespace, logger: logging.Logger
) -> List[Path]:
"""Discover log files based on path and filters."""
args.fileext = get_file_extension(args)
log_path = Path(args.evtx)
log_list: List[Path] = []
if log_path.is_dir():
pattern = args.file_pattern if args.file_pattern else f"*.{args.fileext}"
fn_glob = log_path.rglob if not args.no_recursion else log_path.glob
log_list = list(fn_glob(pattern))
elif log_path.is_file():
log_list = [log_path]
else:
quit_on_error("[red] [-] Unable to find events from submitted path[/]", logger)
file_list = avoid_files(select_files(log_list, args.select), args.avoid)
if not file_list:
quit_on_error("[red] [-] No file found. Please verify filters, directory or the extension with '--fileext' or '--file-pattern'[/]", logger)
return [Path(p) for p in file_list]
def get_input_type(args: argparse.Namespace) -> str:
"""Determine input type for streaming processor from explicit CLI flags."""
if args.db_input:
return 'sqlite'
if args.json_input:
return 'json'
if args.json_array_input:
return 'json_array'
if args.xml_input:
return 'xml'
if args.sysmon_linux_input:
return 'sysmon_linux'
if args.auditd_input:
return 'auditd'
if args.csv_input:
return 'csv'
if args.evtxtract_input:
return 'evtxtract'
return 'evtx'
_TIMEFIELD_SANITIZE_RE = re.compile(r"[^a-zA-Z0-9]")
def _apply_detection_result(
args: argparse.Namespace,
detection: "DetectionResult",
logger: logging.Logger,
) -> str:
"""
Apply a DetectionResult to the args namespace and return the input_type.
Sets the appropriate CLI flag on args so that downstream code
(extractor creation, file extension logic, etc.) works correctly.
When detection failed (log_source "unknown"), still use detection.input_type
if it is a known format (e.g. json from extension fallback), otherwise
default to evtx.
"""
input_type = detection.input_type
known_formats = (
'json', 'json_array', 'xml', 'sysmon_linux', 'auditd', 'csv', 'evtxtract', 'sqlite'
)
if detection.log_source == "unknown" and input_type not in known_formats:
return "evtx"
# Map input_type back to the args flag
flag_map = {
'json': 'json_input',
'json_array': 'json_array_input',
'xml': 'xml_input',
'sysmon_linux': 'sysmon_linux_input',
'auditd': 'auditd_input',
'csv': 'csv_input',
'evtxtract': 'evtxtract_input',
'sqlite': 'db_input',
}
if input_type in flag_map:
setattr(args, flag_map[input_type], True)
# Update timefield if detection found a timestamp and user didn't override.
# The streaming processor strips non-alphanumeric characters from field
# names (e.g. "@timestamp" → "timestamp") when storing events in SQLite,
# so the timefield must be sanitized the same way to match the column name.
if detection.timestamp_field and args.timefield == "SystemTime":
args.timefield = _TIMEFIELD_SANITIZE_RE.sub("", detection.timestamp_field)
return input_type
def auto_detect_log_type(
file_list: List[Path], args, logger,
field_mappings_config: Optional[dict] = None,
) -> str:
"""
Automatically detect log type from the provided files.
Analyzes file content and structure to determine the log format.
If an explicit format flag was set by the user, this is skipped.
Args:
file_list: List of discovered log files
args: Parsed CLI arguments
logger: Logger instance
field_mappings_config: Optional field mappings config (for timestamp detection fields)
Returns:
The detected input_type string
"""
# If user set an explicit format flag, respect it
if _has_explicit_format_flag(args):
input_type = get_input_type(args)
logger.debug(f"Using explicit format flag: {input_type}")
return input_type
# If auto-detect is disabled, fall back to flag-based detection
if getattr(args, 'no_auto_detect', False):
input_type = get_input_type(args)
logger.debug(f"Auto-detect disabled, using default: {input_type}")
return input_type
# Load timestamp detection fields from config if available
ts_fields = None
if field_mappings_config:
ts_config = field_mappings_config.get("timestamp_detection", {})
ts_fields = ts_config.get("detection_fields")
detector = LogTypeDetector(
logger=logger,
timestamp_detection_fields=ts_fields,
archive_password=getattr(args, 'archive_password', None),
)
# Use batch detection for better accuracy
detection = detector.detect_batch(file_list)
logger.info(
f"[+] Auto-detected log type: "
f"[cyan]{detection.log_source}[/] "
f"([yellow]{detection.input_type}[/]) "
f"- confidence: [{'green' if detection.confidence == 'high' else 'yellow' if detection.confidence == 'medium' else 'red'}]"
f"{detection.confidence}[/]"
)
if detection.details:
logger.debug(f" Detection details: {detection.details}")
if detection.timestamp_field:
logger.info(f"[+] Auto-detected timestamp field: [cyan]{detection.timestamp_field}[/]")
if detection.suggested_pipeline:
logger.debug(f" Suggested pipeline: {detection.suggested_pipeline}")
if detection.confidence == "low":
logger.warning(
"[yellow] [!] Low confidence detection. "
"Consider using explicit format flags (-j, -x, -S, -AU, etc.)[/]"
)
# Apply detection result to args
input_type = _apply_detection_result(args, detection, logger)
# If detection changed the format from default, update the file extension
# for directory scanning (re-discover files if needed)
return input_type
################################################################
# YAML CONFIGURATION – split into per-section helpers
################################################################
def _apply_yaml_input_config(
yaml_config: Any, args: argparse.Namespace
) -> None:
"""Apply YAML input section to CLI args."""
if yaml_config.input.path and not args.evtx:
args.evtx = yaml_config.input.path
if not any([args.json_input, args.json_array_input, args.xml_input,
args.csv_input, args.sysmon_linux_input, args.auditd_input, args.evtxtract_input]):
format_map = {
'json': 'json_input', 'json_array': 'json_array_input',
'xml': 'xml_input', 'csv': 'csv_input',
'sysmon_linux': 'sysmon_linux_input', 'auditd': 'auditd_input',
'evtxtract': 'evtxtract_input',
}
if yaml_config.input.format in format_map:
setattr(args, format_map[yaml_config.input.format], True)
if yaml_config.input.recursive is False:
args.no_recursion = True
if yaml_config.input.file_pattern:
args.file_pattern = args.file_pattern or yaml_config.input.file_pattern
if yaml_config.input.file_extension:
args.fileext = args.fileext or yaml_config.input.file_extension
if yaml_config.input.encoding:
args.logs_encoding = args.logs_encoding or yaml_config.input.encoding
def _apply_yaml_rules_config(
yaml_config: Any, args: argparse.Namespace
) -> None:
"""Apply YAML rules section to CLI args."""
if not args.ruleset or args.ruleset == [["rules/rules_windows_generic.json"]]:
args.ruleset = yaml_config.rules.rulesets
if yaml_config.rules.pipelines and not args.pipeline:
args.pipeline = [[p] for p in yaml_config.rules.pipelines]
if yaml_config.rules.filters and not args.rulefilter:
args.rulefilter = [[f] for f in yaml_config.rules.filters]
if yaml_config.rules.save_ruleset:
args.save_ruleset = True
def _apply_yaml_output_config(
yaml_config: Any, args: argparse.Namespace
) -> None:
"""Apply YAML output section to CLI args."""
if args.outfile == "detected_events.json":
args.outfile = yaml_config.output.file
if yaml_config.output.format == 'csv':
args.csv = True
args._csv_from_yaml = True
if yaml_config.output.csv_delimiter != ';':
args.csv_delimiter = yaml_config.output.csv_delimiter
if yaml_config.output.templates and not args.template:
args.template = [[t['template']] for t in yaml_config.output.templates]
args.templateOutput = [[t['output']] for t in yaml_config.output.templates]
if getattr(yaml_config.output, 'template_append', False) and not getattr(args, 'template_append', False):
args.template_append = True
if yaml_config.output.package:
args.package = True
if yaml_config.output.package_dir:
args.package_dir = yaml_config.output.package_dir
if yaml_config.output.keep_flat:
args.keepflat = True
if yaml_config.output.db_file:
args.dbfile = yaml_config.output.db_file
if yaml_config.output.log_file != 'zircolite.log':
args.logfile = yaml_config.output.log_file
if yaml_config.output.no_output:
args.nolog = True
def _apply_yaml_processing_config(
yaml_config: Any, args: argparse.Namespace
) -> None:
"""Apply YAML processing + time-filter + parallel sections to CLI args."""
# Processing
if yaml_config.processing.unified_db:
args.unified_db = True
if not yaml_config.processing.auto_mode:
args.no_auto_mode = True
if yaml_config.processing.hashes:
args.hashes = True
if yaml_config.processing.limit != -1:
args.limit = yaml_config.processing.limit
if yaml_config.processing.time_field != 'SystemTime':
args.timefield = yaml_config.processing.time_field
if yaml_config.processing.debug:
args.debug = True
if yaml_config.processing.remove_events:
args.remove_events = True
if yaml_config.processing.all_transforms:
args.all_transforms = True
if yaml_config.processing.transform_categories:
# Merge with any CLI-provided categories
existing = getattr(args, 'transform_categories', None) or []
args.transform_categories = existing + yaml_config.processing.transform_categories
if yaml_config.processing.add_index:
existing = _flatten_add_remove_index(getattr(args, 'add_index', None))
args.add_index = [existing + list(yaml_config.processing.add_index)]
if yaml_config.processing.remove_index:
existing = _flatten_add_remove_index(getattr(args, 'remove_index', None))
args.remove_index = [existing + list(yaml_config.processing.remove_index)]
if yaml_config.processing.strict_evtx:
args.strict = True
# Time filters
if yaml_config.time_filter.after != '1970-01-01T00:00:00':
args.after = yaml_config.time_filter.after
if yaml_config.time_filter.before != '9999-12-12T23:59:59':
args.before = yaml_config.time_filter.before
# Parallel
if yaml_config.parallel.enabled is False:
args.no_parallel = True
if yaml_config.parallel.max_workers:
args.parallel_workers = yaml_config.parallel.max_workers
if yaml_config.parallel.memory_limit_percent != 85.0:
args.parallel_memory_limit = yaml_config.parallel.memory_limit_percent
def _print_transform_categories(config_path: str, logger):
"""Print available transform categories and their transforms, then exit."""
from zircolite.utils import load_field_mappings
try:
config = load_field_mappings(config_path, logger=logger)
except (FileNotFoundError, ValueError) as e:
logger.error(f"[red] [-] {e}[/]")
return
categories = config.get("transform_categories", {})
if not categories:
logger.info("[yellow] [!] No transform categories defined in config.[/]")
return
table = Table(title="Transform Categories", show_lines=True)
table.add_column("Category", style="cyan", min_width=15)
table.add_column("Transforms", style="white")
table.add_column("Count", style="green", justify="right")
for cat_name, cat_transforms in sorted(categories.items()):
table.add_row(cat_name, ", ".join(cat_transforms), str(len(cat_transforms)))
console.print(table)
def load_yaml_config_and_merge(args, logger) -> argparse.Namespace:
"""Load YAML config file and merge with CLI arguments."""
if not args.yaml_config:
return args
try:
config_loader = ConfigLoader(logger=logger)
yaml_config = config_loader.load(args.yaml_config)
issues = config_loader.validate_config(yaml_config)
if issues:
for issue in issues:
logger.warning(f"[yellow] [!] Config warning: {issue}[/]")
yaml_config = config_loader.merge_with_args(yaml_config, args)
_apply_yaml_input_config(yaml_config, args)
_apply_yaml_rules_config(yaml_config, args)
_apply_yaml_output_config(yaml_config, args)
_apply_yaml_processing_config(yaml_config, args)
logger.info(f"[+] Configuration loaded and merged from: {make_file_link(args.yaml_config)}")
except FileNotFoundError as e:
logger.error(f"[red] [-] {e}[/]")
sys.exit(1)
except Exception as e:
logger.error(f"[red] [-] Error loading YAML config: {e}[/]")
if logger.isEnabledFor(logging.DEBUG):
console.print_exception(show_locals=False)
sys.exit(1)
return args
################################################################
# POST-PROCESSING
################################################################
def handle_templating(
ctx: ProcessingContext,
results: List[Any],
args: argparse.Namespace,
) -> None:
"""Handle template generation and package creation."""
if ctx.ready_for_templating:
tmpl_config = TemplateConfig(
template=args.template,
template_output=args.templateOutput,
time_field=ctx.time_field,
append=getattr(args, 'template_append', False),
)
template_generator = TemplateEngine(tmpl_config, logger=ctx.logger)
template_generator.run(results)
if ctx.package and results:
template_path = Path("templates/exportForZircoGui.tmpl")
gui_zip_path = Path("gui/zircogui.zip")
if template_path.is_file() and gui_zip_path.is_file():
gui_config = GuiConfig(
package_dir=str(gui_zip_path),
template_file=str(template_path),
time_field=ctx.time_field
)
packager = ZircoliteGuiGenerator(gui_config, logger=ctx.logger)
packager.generate(results, args.package_dir)
else:
missing = []
if not template_path.is_file():
missing.append(str(template_path))
if not gui_zip_path.is_file():
missing.append(str(gui_zip_path))
ctx.logger.warning(
f"[yellow] [!] Cannot create GUI package: missing file(s): {', '.join(missing)}[/]"
)
def _flatten_add_remove_index(value: Any) -> List[str]:
"""Flatten argparse append nargs='+' list of lists into a single list."""
if not value:
return []
return [item for group in value for item in group]
def cleanup(
args: argparse.Namespace,
logger: logging.Logger,
log_list: Optional[List[Path]] = None,
) -> None:
"""Clean up temporary files and optionally remove original events."""
if args.remove_events and log_list:
logger.info("[+] Cleaning")
for evtx in log_list:
try:
os.remove(evtx)
except OSError as e:
logger.error(f"[red] [-] Cannot remove file {e}[/]")
def print_stats(
memory_tracker: MemoryTracker,
start_time: float,
logger: logging.Logger,
all_results: Optional[List[Any]] = None,
files_processed: int = 0,
total_events: int = 0,
workers_used: int = 1,
filtered_events: int = 0,
total_rules: int = 0,
phase_times: Optional[dict] = None,
has_template: bool = False,
has_package: bool = False,
outfile: Optional[str] = None,
) -> None:
"""Print final execution statistics with a Rich summary dashboard."""
memory_tracker.sample()
peak_memory, avg_memory = memory_tracker.get_stats()
processing_time = time.time() - start_time
# Build summary table
summary_table = Table(show_header=False, box=None, padding=(0, 2), expand=True)
summary_table.add_column("Metric", style="dim", width=16)
summary_table.add_column("Value", style="bold", ratio=1)
# ── Duration with phase breakdown ──
if processing_time >= 60:
time_str = f"{int(processing_time // 60)}m {int(processing_time % 60)}s"
else:
time_str = f"{processing_time:.1f}s"
summary_table.add_row("⏱ Duration", f"[yellow]{time_str}[/]")
# Phase timing breakdown
if phase_times and processing_time > 0:
bar_width = 16
for phase_name, phase_secs in phase_times.items():
if phase_secs <= 0:
continue
pct = phase_secs / processing_time
filled = max(1, int(bar_width * pct))
bar = "\u2588" * filled + "\u2591" * (bar_width - filled)
if phase_secs >= 60:
t_str = f"{int(phase_secs // 60)}m {int(phase_secs % 60)}s"
else:
t_str = f"{phase_secs:.1f}s"
summary_table.add_row("", f" [dim]\u251c\u2500 {phase_name} {bar} {t_str} ({pct:.0%})[/]")
# ── Files ──
if files_processed > 0:
summary_table.add_row("📁 Files", f"[cyan]{files_processed:,}[/]")
# ── Events with filter efficiency (#5) ──
if total_events > 0:
events_text = f"[magenta]{total_events:,}[/]"
if filtered_events > 0:
total_scanned = total_events + filtered_events
match_rate = (total_events / total_scanned * 100) if total_scanned > 0 else 0
events_text += f" [dim]({filtered_events:,} filtered out — {match_rate:.1f}% match rate)[/]"
summary_table.add_row("📊 Events", events_text)
# ── Throughput ──
if processing_time > 0 and total_events > 0:
throughput = total_events / processing_time
summary_table.add_row("⚡ Throughput", f"[green]{throughput:,.0f}[/] events/s")
# Workers (if parallel)
if workers_used > 1:
summary_table.add_row("👥 Workers", f"[yellow]{workers_used}[/]")
# Memory
if peak_memory > 0:
mem_str = memory_tracker.format_memory(peak_memory)
summary_table.add_row("💾 Peak Memory", f"[cyan]{mem_str}[/]")
# ── Detection summary ──
if all_results:
det_stats = DetectionStats()
for result in all_results:
level = result.get("rule_level", "unknown")
count = result.get("count", 0)
det_stats.add_detection(level, count)
detection_parts = []
if det_stats.critical > 0:
detection_parts.append(f"[bold red]{det_stats.critical} CRIT[/]")
if det_stats.high > 0:
detection_parts.append(f"[bold magenta]{det_stats.high} HIGH[/]")
if det_stats.medium > 0:
detection_parts.append(f"[bold yellow]{det_stats.medium} MED[/]")
if det_stats.low > 0:
detection_parts.append(f"[green]{det_stats.low} LOW[/]")
if det_stats.informational > 0:
detection_parts.append(f"[dim]{det_stats.informational} INFO[/]")
if detection_parts:
summary_table.add_row("🎯 Detections", " │ ".join(detection_parts))
else:
summary_table.add_row("🎯 Detections", "[dim]None[/]")
# Rule coverage bar
if total_rules > 0:
matched_rules = det_stats.total_rules_matched
coverage_pct = matched_rules / total_rules * 100
bar_w = 16
filled = max(0, int(bar_w * matched_rules / total_rules))
cov_bar = "\u2588" * filled + "\u2591" * (bar_w - filled)
summary_table.add_row(
"\U0001f4cf Coverage",
f"[cyan]{matched_rules}[/]/[cyan]{total_rules}[/] rules matched ({coverage_pct:.1f}%) [dim]{cov_bar}[/]"
)
# Total matched events
if det_stats.total_events > 0:
summary_table.add_row(
"🔍 Matched",
f"[magenta]{det_stats.total_events:,}[/] events across [cyan]{det_stats.total_rules_matched}[/] rules"
)
# Top-N detections by severity (most critical first)
sorted_results = sorted(
all_results,
key=lambda r: (LEVEL_PRIORITY.get(r.get("rule_level", "unknown").lower(), 5), -r.get("count", 0))
)
top_n = sorted_results[:5]
if top_n:
_level_abbrev = {
"critical": "CRIT", "high": "HIGH", "medium": " MED",
"low": " LOW", "informational": "INFO",
}
_level_style = {
"critical": "bold white on red", "high": "bold white on magenta",
"medium": "bold black on yellow", "low": "bold white on green",
"informational": "white on bright_black",
}
top_lines = []
for r in top_n:
level = r.get("rule_level", "unknown")
style = _level_style.get(level.lower(), "cyan")
title = r.get("title", "Unknown")
count = r.get("count", 0)
abbrev = _level_abbrev.get(level.lower(), level.upper()[:4])
if len(title) > 50:
title = title[:47] + "..."
top_lines.append(f"[{style}]{abbrev}[/] {title} [dim]({count:,})[/]")
summary_table.add_row("\U0001f4cb Top Hits", top_lines[0])
for line in top_lines[1:]:
summary_table.add_row("", line)
else:
summary_table.add_row("\U0001f3af Detections", "[dim]None[/]")
# Section separator before summary
print_section("Results")
# Print summary panel
console.print()
panel = Panel(
summary_table,
title="[bold]\u2728 Summary[/]",
border_style="cyan",
padding=(1, 2),
expand=True,
)
console.print(panel)
# ATT&CK Coverage panel - always full width, stacked below summary
if all_results:
attack_panel = build_attack_summary(all_results)
if attack_panel:
console.print(attack_panel)
# Output file location - prominent and always visible
if outfile:
console.print()
console.print(f" [bold green]\u2192[/] Output: {make_file_link(outfile)}")
################################################################
# PROCESSING DISPATCH
################################################################
def _warn_ignored_db_flags(
args: argparse.Namespace, logger: logging.Logger
) -> None:
"""Warn when CLI flags incompatible with DB input mode were supplied."""
ignored: List[str] = []
if args.unified_db:
ignored.append("--unified-db")
if getattr(args, 'no_auto_mode', False):
ignored.append("--no-auto-mode")
if getattr(args, 'no_parallel', False):
ignored.append("--no-parallel")
if getattr(args, 'add_index', None):
ignored.append("--add-index")
if getattr(args, 'remove_index', None):
ignored.append("--remove-index")
if getattr(args, 'auto_index', 0):
ignored.append("--auto-index")
if getattr(args, 'hashes', False):
ignored.append("--hashes")
if ignored:
logger.warning(
f"[yellow]DB input mode: the following flags have no effect and will be "
f"ignored: {', '.join(ignored)}[/]"
)
def _run_processing(
ctx: ProcessingContext,
args: argparse.Namespace,
logger: logging.Logger,
memory_tracker: MemoryTracker,
) -> Tuple[Any, Any, Any, Optional[List[Path]], float]:
"""Run the main processing pipeline and return all state needed by main().
Returns:
(zircolite_core, all_results, extractor, use_streaming, log_list, phase_setup_end)
"""
zircolite_core = None
extractor = None
log_list = None
all_results = []
# Load field mappings config early (needed for auto-detection)
field_mappings_config = None
if not args.db_input:
from zircolite.utils import load_field_mappings
try:
field_mappings_config = load_field_mappings(args.config, logger=logger)
except Exception:
field_mappings_config = None
phase_setup_end = time.time()
# ----- DB input mode (explicit -D) -----
if args.db_input:
_warn_ignored_db_flags(args, logger)
zircolite_core, all_results = process_db_input(ctx, args)
return zircolite_core, all_results, extractor, log_list, phase_setup_end
# ----- File input mode -----
check_if_exists(
args.config,
"[red] [-] Cannot find mapping file, you can get the default one here : "
"https://github.com/wagga40/Zircolite/blob/master/config/config.yaml [/]",
logger,
)
original_ext = args.fileext or "evtx"
file_list = discover_files(args, logger)
log_list = file_list
# Auto-detect log type
if not is_quiet() and not _has_explicit_format_flag(args) and not getattr(args, 'no_auto_detect', False):
with console.status("[bold cyan]Auto-detecting log type...", spinner="dots"):
input_type = auto_detect_log_type(file_list, args, logger, field_mappings_config)
else:
input_type = auto_detect_log_type(file_list, args, logger, field_mappings_config)
# Re-discover files if auto-detection changed the expected extension
if Path(args.evtx).is_dir() and not args.file_pattern:
new_ext = get_file_extension(args)
if new_ext != original_ext:
args.fileext = new_ext
old_count = len(file_list)
file_list = discover_files(args, logger)
log_list = file_list
if len(file_list) != old_count:
logger.info(
f"[+] Re-discovered [yellow]{len(file_list)}[/] file(s) "
f"with extension '.{new_ext}'"
)
ctx.time_field = args.timefield
# DB input mode (auto-detected SQLite file)
if args.db_input:
_warn_ignored_db_flags(args, logger)
zircolite_core, all_results = process_db_input(ctx, args, file_list=file_list)
return zircolite_core, all_results, extractor, log_list, phase_setup_end
# Auto-select processing mode
use_parallel = False
parallel_workers = 1
if not args.no_auto_mode and not args.unified_db:
recommended_mode, reason, stats = analyze_files_and_recommend_mode(file_list, logger)
forced_workers = getattr(args, 'parallel_workers', None)
print_mode_recommendation(
recommended_mode, reason, stats, logger,
show_parallel=True, forced_workers=forced_workers,
)
if recommended_mode == 'unified':
args.unified_db = True
if not args.unified_db and not getattr(args, 'no_parallel', False) and not getattr(args, 'profile_rules', False):
if stats.get('parallel_recommended', False):
use_parallel = True
parallel_workers = stats.get('parallel_workers', 1)
elif forced_workers and forced_workers > 1 and len(file_list) > 1:
use_parallel = True
parallel_workers = forced_workers
elif args.unified_db:
logger.info("[+] [cyan]Database mode:[/] [green]UNIFIED[/] (forced)")
logger.info("")
else:
if not getattr(args, 'no_parallel', False) and not getattr(args, 'profile_rules', False) and len(file_list) > 1:
_, _, stats = analyze_files_and_recommend_mode(file_list, logger)
if stats.get('parallel_recommended', False):
use_parallel = True
parallel_workers = stats.get('parallel_workers', 1)
if getattr(args, 'profile_rules', False):
logger.info(
"[+] [cyan]Profile mode[/] (--profile-rules): rule execution will be timed; "
"files will be processed sequentially (parallel disabled)."
)
if args.unified_db:
logger.info(
"[+] [cyan]Note:[/] --profile-rules with --unified-db reports per-rule "
"timings against the combined dataset, not per-file breakdowns."
)
logger.info("")
# Streaming processing (single-pass pipeline)
extractor = create_extractor(args, logger, input_type)
if use_parallel and len(file_list) > 1 and getattr(args, "dbfile", None):
logger.error(