|
| 1 | +#!/usr/bin/env python3 |
| 2 | +"""Jinja2 template substitution with custom label extension support.""" |
| 3 | + |
| 4 | +import argparse |
| 5 | +import json |
| 6 | +import logging |
| 7 | +import sys |
| 8 | + |
| 9 | +from typing import Any, Dict, List, Optional |
| 10 | + |
| 11 | +from jinja2.sandbox import SandboxedEnvironment, Environment |
| 12 | +from jinja2 import DebugUndefined, StrictUndefined |
| 13 | +from jinja2.lexer import Lexer, Token, TokenStream |
| 14 | + |
| 15 | +LOGGER = logging.getLogger("subst_template") |
| 16 | + |
| 17 | + |
| 18 | +class CustomLexer(Lexer): |
| 19 | + """Custom Lexer that modifies tokenization to not treat '-' as a mathematical operator. |
| 20 | +
|
| 21 | + This allows strings like 'foo-bar' in templates to be treated as identifiers |
| 22 | + rather than being parsed as mathematical expressions (foo minus bar). |
| 23 | + """ |
| 24 | + |
| 25 | + def tokenize(self, source, name=None, filename=None, state=None): |
| 26 | + """Merge hyphenated identifiers into single tokens.""" |
| 27 | + # Get token stream from original tokenizer |
| 28 | + token_stream = super().tokenize(source, name, filename, state) |
| 29 | + |
| 30 | + # Convert to list to process |
| 31 | + tokens = list(token_stream) |
| 32 | + |
| 33 | + # Process tokens to merge hyphenated names |
| 34 | + result = [] |
| 35 | + i = 0 |
| 36 | + while i < len(tokens): |
| 37 | + if ( |
| 38 | + i + 2 < len(tokens) |
| 39 | + and tokens[i].test("name") |
| 40 | + and tokens[i + 1].test("sub") |
| 41 | + and tokens[i + 2].test("name") |
| 42 | + ): |
| 43 | + # Merge NAME - NAME into a single NAME token |
| 44 | + merged_value = f"{tokens[i].value}-{tokens[i + 2].value}" |
| 45 | + merged_token = Token(tokens[i].lineno, "name", merged_value) |
| 46 | + result.append(merged_token) |
| 47 | + i += 3 # Skip the three tokens we just merged |
| 48 | + else: |
| 49 | + result.append(tokens[i]) |
| 50 | + i += 1 |
| 51 | + |
| 52 | + # Return TokenStream instead of plain iterator |
| 53 | + return TokenStream(result, name, filename) |
| 54 | + |
| 55 | + |
| 56 | +class CustomEnvironment(SandboxedEnvironment): |
| 57 | + """Custom Jinja2 Environment that uses CustomLexer for tokenization.""" |
| 58 | + |
| 59 | + def _tokenize(self, source, name, filename=None, state=None): |
| 60 | + """Override _tokenize to use our custom lexer.""" |
| 61 | + # Create our custom lexer if not already created |
| 62 | + if not hasattr(self, "_custom_lexer"): |
| 63 | + self._custom_lexer = CustomLexer(self) |
| 64 | + return self._custom_lexer.tokenize(source, name, filename, state) |
| 65 | + |
| 66 | + |
| 67 | +class LabelsProvider: |
| 68 | + """A custom class to provide label access in Jinja2 templates. |
| 69 | +
|
| 70 | + Supports both simple and nested label access: |
| 71 | + - {{labels.mylabel}} - accesses label "mylabel" |
| 72 | + - {{labels.mylabel.with-dash}} - accesses label "mylabel.with-dash" |
| 73 | +
|
| 74 | + The class builds up the label path through attribute access and resolves |
| 75 | + it when the value is needed. |
| 76 | + """ |
| 77 | + |
| 78 | + def __init__(self, labels: Dict[str, Any], path: str = "", strict: bool = False): |
| 79 | + """Initialize the LabelsProvider. |
| 80 | +
|
| 81 | + Args: |
| 82 | + labels: Dictionary containing all available labels |
| 83 | + path: Current path being built (used internally for nested access) |
| 84 | + strict: Whether to raise errors on missing labels |
| 85 | +
|
| 86 | + """ |
| 87 | + self._labels = labels |
| 88 | + self._path = path |
| 89 | + self._strict = strict |
| 90 | + |
| 91 | + def __getattr__(self, name: str) -> "LabelsProvider": |
| 92 | + """Handle attribute access to build up label paths. |
| 93 | +
|
| 94 | + Args: |
| 95 | + name: The attribute name being accessed |
| 96 | +
|
| 97 | + Returns: |
| 98 | + A new LabelsProvider instance with extended path |
| 99 | +
|
| 100 | + """ |
| 101 | + # Avoid infinite recursion for special attributes |
| 102 | + if name.startswith("_"): |
| 103 | + raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") |
| 104 | + |
| 105 | + # Build the new path |
| 106 | + new_path = f"{self._path}.{name}" if self._path else name |
| 107 | + return LabelsProvider(self._labels, path=new_path, strict=self._strict) |
| 108 | + |
| 109 | + def __getitem__(self, key: str) -> "LabelsProvider": |
| 110 | + """Handle item access for labels with special characters. |
| 111 | +
|
| 112 | + Args: |
| 113 | + key: The key being accessed |
| 114 | +
|
| 115 | + Returns: |
| 116 | + A new LabelsProvider instance with extended path |
| 117 | +
|
| 118 | + """ |
| 119 | + new_path = f"{self._path}.{key}" if self._path else key |
| 120 | + return LabelsProvider(self._labels, new_path, strict=self._strict) |
| 121 | + |
| 122 | + def __str__(self) -> str: |
| 123 | + """Resolve the label path and return its value. |
| 124 | +
|
| 125 | + Returns: |
| 126 | + The label value as a string, or empty string if not found |
| 127 | +
|
| 128 | + """ |
| 129 | + if not self._path: |
| 130 | + raise KeyError(f"No label specified in path '{self._path}'") |
| 131 | + |
| 132 | + # Look up the value in the labels dictionary |
| 133 | + if self._strict and self._path not in self._labels: |
| 134 | + raise KeyError(f"Label '{self._path}' not found in labels") |
| 135 | + value = self._labels.get(self._path, "") |
| 136 | + return str(value) if value is not None else "" |
| 137 | + |
| 138 | + |
| 139 | +def setup_argparser(args: List[str]) -> argparse.Namespace: # pragma: no cover |
| 140 | + """Parse command-line arguments. |
| 141 | +
|
| 142 | + Returns: |
| 143 | + Initialized argument parser |
| 144 | +
|
| 145 | + """ |
| 146 | + parser = argparse.ArgumentParser( |
| 147 | + description="Process Jinja2 template with input data and output the result. " |
| 148 | + "Custom --labels-ext extension allows accessing labels with " |
| 149 | + "hyphens in their names." |
| 150 | + ) |
| 151 | + |
| 152 | + parser.add_argument( |
| 153 | + "--template", |
| 154 | + help="Path to the template file to process. If not specified, reads from stdin.", |
| 155 | + required=False, |
| 156 | + ) |
| 157 | + parser.add_argument( |
| 158 | + "--data", |
| 159 | + help="Path to JSON file containing input data", |
| 160 | + required=True, |
| 161 | + ) |
| 162 | + parser.add_argument( |
| 163 | + "-o", |
| 164 | + "--output", |
| 165 | + help="Path to the output file. If not specified, prints to stdout.", |
| 166 | + ) |
| 167 | + parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") |
| 168 | + parser.add_argument( |
| 169 | + "--labels-ext", |
| 170 | + action="store_true", |
| 171 | + help=( |
| 172 | + "Use custom Jinja2 extension to handle labels " |
| 173 | + + "in identifiers (e.g. labels.mylabel.with-dash)" |
| 174 | + ), |
| 175 | + ) |
| 176 | + parser.add_argument( |
| 177 | + "--strict", |
| 178 | + action="store_true", |
| 179 | + help="Use strict undefined behavior (raise error on undefined variables)", |
| 180 | + ) |
| 181 | + parser.add_argument( |
| 182 | + "--allow-empty-inputs", |
| 183 | + action="store_true", |
| 184 | + help='Allow empty inputs (e.g. empty string ("") or null) without raising errors', |
| 185 | + ) |
| 186 | + |
| 187 | + return parser.parse_args(args) |
| 188 | + |
| 189 | + |
| 190 | +def validate_input_data(input_data: Dict[str, Any], allow_empty_inputs: bool = False): |
| 191 | + """Validate input data before processing. |
| 192 | +
|
| 193 | + Args: |
| 194 | + input_data: The input data dictionary to validate |
| 195 | + allow_empty_inputs: Whether to allow empty inputs without raising errors |
| 196 | + Raises: |
| 197 | + ValueError: If required fields are missing or if empty inputs are not allowed |
| 198 | +
|
| 199 | + """ |
| 200 | + for key, value in input_data.items(): |
| 201 | + if (value is None or value == "") and not allow_empty_inputs: |
| 202 | + raise ValueError(f"Input '{key}' is empty but empty inputs are not allowed") |
| 203 | + |
| 204 | + |
| 205 | +def setup_jinja( |
| 206 | + input_data: Dict[str, Any], labels_ext: bool = False, strict: bool = False |
| 207 | +) -> Environment: |
| 208 | + """Set up Jinja2 environment with optional custom extensions. |
| 209 | +
|
| 210 | + Args: |
| 211 | + input_data: Input data dictionary to provide to templates |
| 212 | + labels_ext: Whether to enable custom label extension |
| 213 | + strict: Whether to use strict undefined behavior |
| 214 | +
|
| 215 | + Returns: |
| 216 | + Configured Jinja2 Environment instance |
| 217 | +
|
| 218 | + """ |
| 219 | + undefined_class = StrictUndefined if strict else DebugUndefined |
| 220 | + |
| 221 | + if labels_ext: |
| 222 | + # Create LabelsProvider instance |
| 223 | + labels_dict = input_data.get("labels", {}) |
| 224 | + labels_provider = LabelsProvider(labels_dict, strict=strict) |
| 225 | + |
| 226 | + input_data["labels"] = labels_provider |
| 227 | + |
| 228 | + # Create Jinja2 environment with custom extension |
| 229 | + env = CustomEnvironment(undefined=undefined_class) |
| 230 | + else: |
| 231 | + env = Environment(undefined=undefined_class) |
| 232 | + |
| 233 | + return env |
| 234 | + |
| 235 | + |
| 236 | +def subst_template( |
| 237 | + env: Environment, template_str: str, data: Dict[str, Any], allow_empty_inputs: bool = True |
| 238 | +) -> str: |
| 239 | + """Substitute variables in a template string with data. |
| 240 | +
|
| 241 | + Args: |
| 242 | + env: Jinja2 Environment instance |
| 243 | + template_str: Template string to process |
| 244 | + data: Data dictionary to substitute into template |
| 245 | + allow_empty_inputs: Whether to allow empty values without raising errors |
| 246 | +
|
| 247 | + Returns: |
| 248 | + Rendered template string |
| 249 | +
|
| 250 | + """ |
| 251 | + template = env.from_string(template_str) |
| 252 | + validate_input_data(data, allow_empty_inputs=allow_empty_inputs) |
| 253 | + content = template.render(data) |
| 254 | + return content |
| 255 | + |
| 256 | + |
| 257 | +def load_input_data(path: str) -> Dict[str, Any]: |
| 258 | + """Load input data from a JSON file. |
| 259 | +
|
| 260 | + Args: |
| 261 | + path: Path to JSON file |
| 262 | +
|
| 263 | + Returns: |
| 264 | + Parsed JSON data as dictionary |
| 265 | +
|
| 266 | + """ |
| 267 | + LOGGER.info(f"Loading input data from {path}") |
| 268 | + with open(path, "r") as f: |
| 269 | + return json.load(f) |
| 270 | + |
| 271 | + |
| 272 | +def load_template(path: Optional[str]) -> str: |
| 273 | + """Load template from file or stdin. |
| 274 | +
|
| 275 | + Args: |
| 276 | + path: Path to template file, or None to read from stdin |
| 277 | +
|
| 278 | + Returns: |
| 279 | + Template content as string |
| 280 | +
|
| 281 | + """ |
| 282 | + if path: |
| 283 | + LOGGER.info(f"Loading template from {path}") |
| 284 | + with open(path, "r") as f: |
| 285 | + return f.read() |
| 286 | + else: |
| 287 | + LOGGER.info("Loading template from stdin") |
| 288 | + return sys.stdin.read() |
| 289 | + |
| 290 | + |
| 291 | +def write_output(content: str, path: Optional[str] = None): |
| 292 | + """Write output to file or stdout. |
| 293 | +
|
| 294 | + Args: |
| 295 | + content: Content to write |
| 296 | + path: Path to output file, or None to print to stdout |
| 297 | +
|
| 298 | + """ |
| 299 | + if path: |
| 300 | + LOGGER.info(f"Writing output to {path}") |
| 301 | + with open(path, "w") as f: |
| 302 | + f.write(content) |
| 303 | + else: |
| 304 | + LOGGER.info("Writing output to stdout") |
| 305 | + print(content) |
| 306 | + |
| 307 | + |
| 308 | +def main(args): |
| 309 | + """Execute template substitution from command-line arguments.""" |
| 310 | + args = setup_argparser(args) |
| 311 | + log_level = logging.DEBUG if args.verbose else logging.INFO |
| 312 | + LOGGER.setLevel(log_level) |
| 313 | + |
| 314 | + # Load input data |
| 315 | + input_data = load_input_data(args.data) |
| 316 | + env = setup_jinja(input_data, labels_ext=args.labels_ext, strict=args.strict) |
| 317 | + |
| 318 | + # Load and render template |
| 319 | + template_str = load_template(args.template) |
| 320 | + |
| 321 | + LOGGER.info("Rendering template") |
| 322 | + content = subst_template( |
| 323 | + env, template_str, input_data, allow_empty_inputs=args.allow_empty_inputs |
| 324 | + ) |
| 325 | + |
| 326 | + # Output result |
| 327 | + write_output(content, args.output) |
| 328 | + |
| 329 | + |
| 330 | +if __name__ == "__main__": # pragma: no cover |
| 331 | + main(sys.argv[1:]) |
0 commit comments