-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgenerate_article_observable.py
More file actions
1268 lines (1034 loc) · 48 KB
/
generate_article_observable.py
File metadata and controls
1268 lines (1034 loc) · 48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
The D-AI-LY Observable Markdown Article Generator
Generates Observable Framework-compatible markdown articles from Statistics Canada data.
Outputs to docs/en/ or docs/fr/ directories for the Observable site.
"""
import argparse
import json
import re
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Any, Optional
import logging
import sys
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(levelname)s: %(message)s'
)
logger = logging.getLogger(__name__)
# Global translations dictionary (loaded at runtime)
TRANSLATIONS: Dict[str, Any] = {}
LANG: str = "en"
# =============================================================================
# PRE-GENERATION VALIDATION
# =============================================================================
class ValidationError(Exception):
"""Raised when data validation fails critically."""
pass
class ReviewError(Exception):
"""Raised when article review finds unfixable issues."""
pass
class ValidationResult:
"""Result of data validation with errors and warnings."""
def __init__(self):
self.errors: List[str] = []
self.warnings: List[str] = []
def add_error(self, msg: str):
self.errors.append(msg)
logger.error(msg)
def add_warning(self, msg: str):
self.warnings.append(msg)
logger.warning(msg)
@property
def is_valid(self) -> bool:
return len(self.errors) == 0
def summary(self) -> str:
if self.is_valid and not self.warnings:
return "Validation passed with no issues"
elif self.is_valid:
return f"Validation passed with {len(self.warnings)} warning(s)"
else:
return f"Validation FAILED: {len(self.errors)} error(s), {len(self.warnings)} warning(s)"
def validate_data(data: Dict[str, Any], strict: bool = False) -> ValidationResult:
"""
Validate JSON data before article generation.
Args:
data: The loaded JSON data from R script
strict: If True, treat warnings as errors
Returns:
ValidationResult with errors and warnings
"""
result = ValidationResult()
# 1. Check required top-level fields
required_fields = ["metadata", "latest", "time_series"]
for field in required_fields:
if field not in data:
result.add_error(f"Missing required field: {field}")
elif data[field] is None:
result.add_error(f"Field '{field}' is null")
if not result.is_valid:
return result # Can't continue without these fields
# 2. Validate metadata
metadata = data.get("metadata", {})
required_metadata = ["table_number", "table_title", "reference_period"]
for field in required_metadata:
if not metadata.get(field):
result.add_error(f"Missing metadata field: {field}")
# 3. Validate latest data point
latest = data.get("latest", {})
if latest.get("value") is None:
result.add_error("Latest value is missing or null")
if latest.get("ref_date") is None:
result.add_error("Latest ref_date is missing")
if latest.get("mom_pct_change") is None:
result.add_warning("Month-over-month change is missing")
if latest.get("yoy_pct_change") is None:
result.add_warning("Year-over-year change is missing")
# 4. Validate date freshness
if latest.get("ref_date"):
try:
ref_date = datetime.strptime(latest["ref_date"], "%Y-%m")
age_days = (datetime.now() - ref_date).days
age_months = age_days / 30
if age_months > 6:
result.add_error(f"Data is too old: {age_months:.1f} months (max 6)")
elif age_months > 3:
result.add_warning(f"Data is {age_months:.1f} months old")
except ValueError:
result.add_warning(f"Could not parse ref_date: {latest['ref_date']}")
# 5. Validate time series length
time_series = data.get("time_series", [])
if isinstance(time_series, list):
ts_length = len(time_series)
if ts_length < 6:
result.add_error(f"Time series too short: {ts_length} points (min 6)")
elif ts_length < 12:
result.add_warning(f"Time series has only {ts_length} points (recommend 12+)")
else:
result.add_error("Time series is not a list")
# 6. Check for subseries and provincial data (optional but informative)
if not data.get("subseries"):
result.add_warning("No subseries breakdown data available")
if not data.get("provincial"):
result.add_warning("No provincial breakdown data available")
# 7. Check R script validation results if present
r_validation = data.get("validation", {})
if r_validation:
if not r_validation.get("passed", True):
r_errors = r_validation.get("errors", {})
for key, msg in r_errors.items() if isinstance(r_errors, dict) else []:
result.add_error(f"R validation error ({key}): {msg}")
r_warnings = r_validation.get("warnings", {})
if isinstance(r_warnings, dict):
for key, msg in r_warnings.items():
result.add_warning(f"R validation warning ({key}): {msg}")
elif isinstance(r_warnings, list):
for msg in r_warnings:
result.add_warning(f"R validation: {msg}")
# 8. Sanity check values
if latest.get("value") is not None:
value = latest["value"]
series_name = metadata.get("series_name", "")
if series_name == "Consumer Price Index":
if value < 50 or value > 300:
result.add_error(f"CPI value {value:.1f} outside expected range (50-300)")
if latest.get("mom_pct_change") is not None:
mom = latest["mom_pct_change"]
if abs(mom) > 15:
result.add_warning(f"Large month-over-month change: {mom:.1f}%")
if latest.get("yoy_pct_change") is not None:
yoy = latest["yoy_pct_change"]
if abs(yoy) > 50:
result.add_warning(f"Large year-over-year change: {yoy:.1f}%")
# Convert warnings to errors if strict mode
if strict and result.warnings:
for warning in result.warnings:
result.errors.append(f"[strict] {warning}")
result.warnings.clear()
return result
# =============================================================================
# DATA REBASING FOR HISTORICAL PERIODS
# =============================================================================
def rebase_data_to_period(data: Dict[str, Any], target_ref_date: str) -> Dict[str, Any]:
"""
Rebase data to a historical reference period.
Finds the target period in time_series and updates the 'latest' object
to point to that period's values.
Args:
data: Original data dictionary
target_ref_date: Target reference date (e.g., "2025-10")
Returns:
Modified data dictionary with 'latest' pointing to target period
Raises:
ValueError: If target period not found in time_series
"""
import copy
data = copy.deepcopy(data) # Don't mutate original
# Save original reference period before any modifications
original_ref_period = data.get("metadata", {}).get("reference_period", "")
time_series = data.get("time_series", [])
# Find the target period in time series
target_entry = None
for entry in time_series:
if entry.get("ref_date") == target_ref_date:
target_entry = entry
break
if not target_entry:
available = [e.get("ref_date") for e in time_series[-12:]]
raise ValueError(
f"Reference date '{target_ref_date}' not found in time series. "
f"Recent available periods: {available}"
)
# Build new 'latest' object from the target entry
new_latest = {
"date": target_entry.get("date"),
"ref_date": target_ref_date,
"value": target_entry.get("value"),
"mom_change": target_entry.get("mom_change", target_entry.get("value", 0) * target_entry.get("mom_pct_change", 0) / 100),
"mom_pct_change": target_entry.get("mom_pct_change", 0),
"yoy_change": target_entry.get("yoy_change", target_entry.get("value", 0) * target_entry.get("yoy_pct_change", 0) / 100),
"yoy_pct_change": target_entry.get("yoy_pct_change", 0),
}
data["latest"] = new_latest
# Update metadata reference_period
if "metadata" in data:
data["metadata"]["reference_period"] = target_ref_date
# Trim time_series to end at target period
trimmed_series = [e for e in time_series if e.get("ref_date", "") <= target_ref_date]
data["time_series"] = trimmed_series
# Strip subseries and provincial data when rebasing to historical period
# These sections contain only latest-period breakdowns and cannot be rebased
if target_ref_date != original_ref_period:
if "subseries" in data:
del data["subseries"]
logger.warning(f"Removed subseries (only had {original_ref_period} data, not {target_ref_date})")
if "provincial" in data:
del data["provincial"]
logger.warning(f"Removed provincial (only had {original_ref_period} data, not {target_ref_date})")
logger.info(f"Rebased data to reference period: {target_ref_date}")
logger.info(f" Value: {new_latest['value']}, YoY: {new_latest['yoy_pct_change']}%")
return data
# =============================================================================
# POST-GENERATION SELF-REVIEW
# =============================================================================
class ReviewResult:
"""Result of article self-review with issues found/fixed."""
def __init__(self):
self.issues_found: List[str] = []
self.issues_fixed: List[str] = []
self.issues_unfixed: List[str] = []
def add_found(self, issue: str):
self.issues_found.append(issue)
logger.info(f"Review found: {issue}")
def add_fixed(self, issue: str, fix_description: str):
self.issues_fixed.append(f"{issue} -> {fix_description}")
logger.info(f"Review fixed: {issue} -> {fix_description}")
def add_unfixed(self, issue: str, reason: str):
self.issues_unfixed.append(f"{issue}: {reason}")
logger.warning(f"Review could not fix: {issue} ({reason})")
@property
def has_unfixed_issues(self) -> bool:
return len(self.issues_unfixed) > 0
def summary(self) -> str:
total = len(self.issues_found)
fixed = len(self.issues_fixed)
unfixed = len(self.issues_unfixed)
if total == 0:
return "Self-review passed: no issues found"
elif unfixed == 0:
return f"Self-review: found {total} issue(s), all fixed"
else:
return f"Self-review: found {total} issue(s), fixed {fixed}, {unfixed} unfixed"
def review_and_fix_article(markdown: str, data: Dict[str, Any], lang: str = "en") -> tuple:
"""
Review generated article markdown and fix common issues.
This function examines the generated content for:
- Placeholder text (—, TBD, TODO, [placeholder], etc.)
- Empty sections (## heading followed by another ## or end)
- Tables with missing values
- Incomplete data that could be filled from source
Args:
markdown: The generated article markdown
data: The source data JSON (for filling missing values)
lang: Language code for formatting
Returns:
Tuple of (fixed_markdown, ReviewResult)
"""
result = ReviewResult()
fixed_md = markdown
# Pattern definitions for common issues
placeholder_patterns = [
(r'\| [^|]*— [^|]*\|', 'em-dash placeholder in table'),
(r'\| [^|]*—\|', 'em-dash placeholder in table'),
(r'\bTBD\b', 'TBD placeholder'),
(r'\bTODO\b', 'TODO placeholder'),
(r'\[placeholder\]', 'placeholder marker'),
(r'\[TBD\]', 'TBD marker'),
(r'\$0\.0 billion', 'zero dollar value'),
(r'\+0\.0%\s*\|', 'zero percent in table (may be intentional)'),
]
# Check for placeholder patterns
for pattern, description in placeholder_patterns:
matches = re.findall(pattern, fixed_md, re.IGNORECASE)
if matches:
result.add_found(f"{description} ({len(matches)} occurrence(s))")
# Attempt to fix based on pattern type
if 'em-dash' in description:
fixed_md, fixed_count = _fix_table_placeholders(fixed_md, data, lang, result)
# Check for empty sections
empty_section_pattern = r'## ([^\n]+)\n\n(?=## |\n*$|\n*<div)'
empty_matches = re.findall(empty_section_pattern, fixed_md)
for section_title in empty_matches:
result.add_found(f"Empty section: '{section_title}'")
# Can't auto-fix empty sections - need content generation
result.add_unfixed(f"Empty section '{section_title}'", "requires content generation")
# Check for very short content sections (less than 50 chars between headers)
short_section_pattern = r'## ([^\n]+)\n\n(.{1,50})\n\n(?=## |<div)'
short_matches = re.findall(short_section_pattern, fixed_md)
for section_title, content in short_matches:
if not content.strip().startswith('```'): # Ignore if it's just a code block
result.add_found(f"Very short section: '{section_title}' ({len(content)} chars)")
# Check for tables with all placeholder values
table_pattern = r'\|[^\n]+\|\n\|[-|]+\|\n((?:\|[^\n]+\|\n)+)'
for match in re.finditer(table_pattern, fixed_md):
table_content = match.group(1)
placeholder_count = table_content.count('—')
row_count = table_content.count('\n')
if placeholder_count > 0 and placeholder_count >= row_count:
result.add_found(f"Table with many placeholders ({placeholder_count} in {row_count} rows)")
# Check for missing highlights
if '**Highlights**' in fixed_md or '**Faits saillants**' in fixed_md:
highlights_pattern = r'\*\*(Highlights|Faits saillants)\*\*\n\n((?:- [^\n]+\n)*)'
match = re.search(highlights_pattern, fixed_md)
if match:
highlights_content = match.group(2)
highlight_count = highlights_content.count('- ')
if highlight_count < 2:
result.add_found(f"Too few highlights ({highlight_count})")
# Check for duplicate content
paragraphs = re.findall(r'\n\n([^#<\n][^\n]{50,})', fixed_md)
seen_paragraphs = {}
for para in paragraphs:
para_normalized = para.strip().lower()[:100]
if para_normalized in seen_paragraphs:
result.add_found(f"Duplicate paragraph starting with: '{para[:50]}...'")
seen_paragraphs[para_normalized] = True
return fixed_md, result
def _fix_table_placeholders(markdown: str, data: Dict[str, Any], lang: str, result: ReviewResult) -> tuple:
"""
Attempt to fix table placeholder values using source data.
Returns:
Tuple of (fixed_markdown, count_of_fixes)
"""
fixed_md = markdown
fix_count = 0
# Try to identify what kind of table has placeholders and fill from data
# Pattern 1: Sector breakdown tables (GDP-style)
# Look for rows like "| Services-producing industries | — | — |"
sector_patterns = {
'Services-producing industries': ('subseries', 'Services-producing industries'),
'Goods-producing industries': ('subseries', 'Goods-producing industries'),
'Industries productrices de services': ('subseries', 'Services-producing industries'),
'Industries productrices de biens': ('subseries', 'Goods-producing industries'),
}
for display_name, (data_key, lookup_name) in sector_patterns.items():
pattern = rf'\| {re.escape(display_name)} \| — \| — \|'
if re.search(pattern, fixed_md):
# Try to find the value in subseries data
subseries = data.get('subseries', {})
if subseries and 'category' in subseries:
categories = subseries.get('category', [])
values = subseries.get('value', [])
mom_changes = subseries.get('mom_pct_change', [])
for i, cat in enumerate(categories):
if lookup_name.lower() in cat.lower():
value = values[i] if i < len(values) else None
mom = mom_changes[i] if i < len(mom_changes) else None
if value is not None and mom is not None:
if lang == 'fr':
value_str = f"{value:,.1f}".replace(",", " ").replace(".", ",")
mom_str = f"+{mom:.1f}".replace(".", ",") if mom >= 0 else f"{mom:.1f}".replace(".", ",")
replacement = f"| {display_name} | {value_str} | {mom_str} % |"
else:
value_str = f"{value:,.1f}"
mom_str = f"+{mom:.1f}" if mom >= 0 else f"{mom:.1f}"
replacement = f"| {display_name} | {value_str} | {mom_str}% |"
fixed_md = re.sub(pattern, replacement, fixed_md)
result.add_fixed(
f"Placeholder in '{display_name}' row",
f"filled with value={value:.1f}, change={mom:.1f}%"
)
fix_count += 1
break
# Pattern 2: Provincial comparison tables with "vs nationale" column
# Look for rows like "| Province | +2.2% | — |"
provincial_vs_pattern = r'\| ([^|]+) \| \+?(\d+[.,]\d+) %? \| — \|'
matches = list(re.finditer(provincial_vs_pattern, fixed_md))
if matches and data.get('latest', {}).get('yoy_pct_change') is not None:
national_rate = data['latest']['yoy_pct_change']
for match in matches:
province = match.group(1).strip()
prov_rate_str = match.group(2).replace(',', '.')
try:
prov_rate = float(prov_rate_str)
diff = prov_rate - national_rate
if lang == 'fr':
if abs(diff) < 0.05:
diff_str = "0,0 pp"
else:
diff_str = f"+{diff:.1f}".replace(".", ",") if diff >= 0 else f"{diff:.1f}".replace(".", ",")
diff_str += " pp"
else:
if abs(diff) < 0.05:
diff_str = "0.0 pp"
else:
diff_str = f"+{diff:.1f} pp" if diff >= 0 else f"{diff:.1f} pp"
old_str = match.group(0)
new_str = old_str.replace('— |', f'{diff_str} |')
fixed_md = fixed_md.replace(old_str, new_str)
result.add_fixed(
f"Placeholder in '{province}' vs national column",
f"calculated diff={diff:.1f}pp"
)
fix_count += 1
except ValueError:
result.add_unfixed(
f"Placeholder in '{province}' row",
f"could not parse rate '{prov_rate_str}'"
)
# If we found placeholders but couldn't fix them, note that
remaining_placeholders = len(re.findall(r'\| [^|]*—[^|]* \|', fixed_md))
if remaining_placeholders > 0:
result.add_unfixed(
f"{remaining_placeholders} remaining placeholder(s)",
"no matching data in source JSON"
)
return fixed_md, fix_count
def load_translations(lang: str = "en") -> Dict[str, Any]:
"""Load translations for the specified language."""
global TRANSLATIONS, LANG
LANG = lang
translations_path = Path(__file__).parent / "templates" / "translations.json"
with open(translations_path, "r", encoding="utf-8") as f:
all_translations = json.load(f)
TRANSLATIONS = all_translations.get(lang, all_translations["en"])
return TRANSLATIONS
def t(key_path: str, default: str = "") -> str:
"""Get a translation by dot-notation path."""
keys = key_path.split(".")
value = TRANSLATIONS
for key in keys:
if isinstance(value, dict) and key in value:
value = value[key]
else:
return default
return value if isinstance(value, str) else default
def format_month_year(ref_date: str, lang: str = None) -> str:
"""Convert '2025-11' to 'November 2025' (or French equivalent)."""
if lang is None:
lang = LANG
try:
date = datetime.strptime(ref_date, "%Y-%m")
month_en = date.strftime("%B")
year = date.strftime("%Y")
month_translated = t(f"months.{month_en}", month_en)
return f"{month_translated} {year}"
except ValueError:
return ref_date
def generate_headline(data: Dict[str, Any]) -> str:
"""Generate a headline with key number first."""
latest = data["latest"]
metadata = data["metadata"]
period = format_month_year(latest["ref_date"])
series_name = metadata.get("series_name", "")
mom_change = latest.get("mom_pct_change", 0)
yoy_change = latest.get("yoy_pct_change", 0)
if series_name == "Consumer Price Index":
if LANG == "fr":
if yoy_change > 0:
return f"Les prix à la consommation en hausse de {yoy_change:.1f} % d'une année à l'autre en {period}"
elif yoy_change < 0:
return f"Les prix à la consommation en baisse de {abs(yoy_change):.1f} % d'une année à l'autre en {period}"
else:
return f"Les prix à la consommation inchangés en {period}"
else:
if yoy_change > 0:
return f"Consumer prices up {yoy_change:.1f}% year over year in {period}"
elif yoy_change < 0:
return f"Consumer prices down {abs(yoy_change):.1f}% year over year in {period}"
else:
return f"Consumer prices unchanged in {period}"
elif series_name == "Retail Sales":
if LANG == "fr":
if mom_change > 0:
return f"Les ventes au détail en hausse de {mom_change:.1f} % en {period}"
elif mom_change < 0:
return f"Les ventes au détail en baisse de {abs(mom_change):.1f} % en {period}"
else:
return f"Les ventes au détail inchangées en {period}"
else:
if mom_change > 0:
return f"Retail sales up {mom_change:.1f}% in {period}"
elif mom_change < 0:
return f"Retail sales down {abs(mom_change):.1f}% in {period}"
else:
return f"Retail sales unchanged in {period}"
elif series_name == "Manufacturing Sales":
if LANG == "fr":
if mom_change > 0:
return f"Les ventes du secteur de la fabrication en hausse de {mom_change:.1f} % en {period}"
elif mom_change < 0:
return f"Les ventes du secteur de la fabrication en baisse de {abs(mom_change):.1f} % en {period}"
else:
return f"Les ventes du secteur de la fabrication inchangées en {period}"
else:
if mom_change > 0:
return f"Manufacturing sales up {mom_change:.1f}% in {period}"
elif mom_change < 0:
return f"Manufacturing sales down {abs(mom_change):.1f}% in {period}"
else:
return f"Manufacturing sales unchanged in {period}"
else:
title_short = metadata["table_title"].split(",")[0]
if mom_change != 0:
direction = "up" if mom_change > 0 else "down"
if LANG == "fr":
direction = "en hausse de" if mom_change > 0 else "en baisse de"
return f"{title_short} {direction} {abs(mom_change):.1f} % en {period}"
return f"{title_short} {direction} {abs(mom_change):.1f}% in {period}"
return f"{title_short}: {period}"
def generate_slug(data: Dict[str, Any]) -> str:
"""Generate a URL-friendly slug from the data."""
series_name = data["metadata"].get("series_name", "")
ref_date = data["latest"]["ref_date"]
try:
date = datetime.strptime(ref_date, "%Y-%m")
month = date.strftime("%B").lower()
year = date.strftime("%Y")
except ValueError:
month = "unknown"
year = ref_date
if series_name == "Consumer Price Index":
if LANG == "fr":
return f"ipc-{month}-{year}"
return f"cpi-{month}-{year}"
elif series_name == "Retail Sales":
if LANG == "fr":
return f"ventes-detail-{month}-{year}"
return f"retail-sales-{month}-{year}"
elif series_name == "Manufacturing Sales":
if LANG == "fr":
return f"fabrication-{month}-{year}"
return f"manufacturing-{month}-{year}"
else:
slug = series_name.lower().replace(" ", "-")
return f"{slug}-{month}-{year}"
def generate_metric_value(data: Dict[str, Any]) -> str:
"""Generate the big metric value for the metric box."""
latest = data["latest"]
series_name = data["metadata"].get("series_name", "")
yoy_change = latest.get("yoy_pct_change", 0)
mom_change = latest.get("mom_pct_change", 0)
if series_name == "Consumer Price Index":
sign = "+" if yoy_change >= 0 else ""
return f"{sign}{yoy_change:.1f}%"
elif series_name == "Retail Sales":
sign = "+" if mom_change >= 0 else ""
return f"{sign}{mom_change:.1f}%"
else:
return f"{latest['value']:.1f}"
def generate_metric_label(data: Dict[str, Any]) -> str:
"""Generate the label for the metric box."""
period = format_month_year(data["latest"]["ref_date"])
series_name = data["metadata"].get("series_name", "")
if series_name == "Consumer Price Index":
if LANG == "fr":
return f"Variation d'une année à l'autre de l'Indice des prix à la consommation, {period}"
return f"Year-over-year change in Consumer Price Index, {period}"
elif series_name == "Retail Sales":
if LANG == "fr":
return f"Variation mensuelle des ventes au détail, {period}"
return f"Month-over-month change in retail sales, {period}"
else:
return f"{series_name}, {period}"
def generate_lede(data: Dict[str, Any]) -> str:
"""Generate the opening paragraph."""
latest = data["latest"]
comparison = data["comparison"]
series_name = data["metadata"].get("series_name", "")
period = format_month_year(latest["ref_date"])
value = latest["value"]
yoy = latest.get("yoy_pct_change", 0)
mom = latest.get("mom_pct_change", 0)
if series_name == "Consumer Price Index":
if LANG == "fr":
lede = f"L'Indice des prix à la consommation (IPC) a augmenté de {yoy:.1f} % en {period} par rapport au même mois un an plus tôt."
lede += f" L'indice s'établissait à {value:.1f}"
if comparison.get("year_ago"):
year_ago_value = comparison["year_ago"]["value"]
year_ago_period = format_month_year(comparison["year_ago"]["ref_date"])
lede += f", en hausse par rapport à {year_ago_value:.1f} en {year_ago_period}"
lede += "."
if mom is not None and mom != 0:
prev_period = format_month_year(comparison["previous_period"]["ref_date"])
direction = "augmenté" if mom > 0 else "diminué"
lede += f" Sur une base mensuelle, les prix ont {direction} de {abs(mom):.1f} % par rapport à {prev_period}."
else:
lede = f"The Consumer Price Index (CPI) rose {yoy:.1f}% in {period} compared with the same month a year earlier."
lede += f" The index stood at {value:.1f}"
if comparison.get("year_ago"):
year_ago_value = comparison["year_ago"]["value"]
year_ago_period = format_month_year(comparison["year_ago"]["ref_date"])
lede += f", up from {year_ago_value:.1f} in {year_ago_period}"
lede += "."
if mom is not None and mom != 0:
prev_period = format_month_year(comparison["previous_period"]["ref_date"])
direction = "increased" if mom > 0 else "decreased"
lede += f" On a monthly basis, prices {direction} {abs(mom):.1f}% from {prev_period}."
return lede
elif series_name == "Retail Sales":
value_str = f"${value/1000:.1f} billion"
if LANG == "fr":
return f"Les ventes au détail ont totalisé {value_str} en {period}."
return f"Retail sales totalled {value_str} in {period}."
elif series_name == "Manufacturing Sales":
value_str = f"${value/1000:.1f} billion"
if LANG == "fr":
direction = "augmenté" if mom > 0 else "diminué"
lede = f"Les ventes du secteur de la fabrication ont {direction} de {abs(mom):.1f} % en {period}"
lede += f", totalisant {value_str}."
if yoy is not None and yoy != 0:
lede += f" Par rapport au même mois un an plus tôt, les ventes ont augmenté de {yoy:.1f} %."
return lede
else:
direction = "increased" if mom > 0 else "decreased"
lede = f"Manufacturing sales {direction} {abs(mom):.1f}% in {period}"
lede += f", totalling {value_str}."
if yoy is not None and yoy != 0:
lede += f" Compared with the same month a year earlier, sales were up {yoy:.1f}%."
return lede
return ""
def generate_highlights(data: Dict[str, Any]) -> List[str]:
"""Generate 3-5 highlight bullets."""
latest = data["latest"]
series_name = data["metadata"].get("series_name", "")
period = format_month_year(latest["ref_date"])
yoy = latest.get("yoy_pct_change", 0)
subseries = data.get("subseries")
provincial = data.get("provincial")
highlights = []
if series_name == "Consumer Price Index":
# Main YoY finding
if LANG == "fr":
highlights.append(f"L'Indice des prix à la consommation a augmenté de {yoy:.1f} % d'une année à l'autre en {period}")
else:
highlights.append(f"The Consumer Price Index rose {yoy:.1f}% year over year in {period}")
# Leading contributors from subseries
if subseries and "category" in subseries:
categories = subseries["category"]
yoy_changes = subseries.get("yoy_pct_change", [])
if categories and yoy_changes:
# Sort by yoy change descending
sorted_items = sorted(zip(categories, yoy_changes), key=lambda x: x[1] or 0, reverse=True)
if sorted_items:
top_cat, top_yoy = sorted_items[0]
if LANG == "fr":
highlights.append(f"Les coûts du {top_cat.lower()} ont augmenté de {top_yoy:.1f} %, la plus forte hausse")
else:
highlights.append(f"{top_cat} costs increased {top_yoy:.1f}%, the largest contributor to inflation")
if len(sorted_items) > 1:
second_cat, second_yoy = sorted_items[1]
if LANG == "fr":
highlights.append(f"Les prix des {second_cat.lower()} ont augmenté de {second_yoy:.1f} % par rapport à {period.split()[0]} l'an dernier")
else:
highlights.append(f"{second_cat} prices rose {second_yoy:.1f}% compared to {period.split()[0]} last year")
# Provincial highlight
if provincial and "category" in provincial:
categories = provincial["category"]
yoy_changes = provincial.get("yoy_pct_change", [])
if categories and yoy_changes:
sorted_items = sorted(zip(categories, yoy_changes), key=lambda x: x[1] or 0, reverse=True)
if sorted_items:
top_prov, top_yoy = sorted_items[0]
if LANG == "fr":
highlights.append(f"{top_prov} a enregistré la hausse la plus élevée à {top_yoy:.1f} %")
else:
highlights.append(f"{top_prov} recorded the highest increase at {top_yoy:.1f}%")
return highlights[:5]
def generate_trend_chart_js(data: Dict[str, Any]) -> str:
"""Generate Observable Plot code for the trend chart."""
time_series = data["time_series"]
series_name = data["metadata"].get("series_name", "")
# Get last 6 months for recent trend
recent = time_series[-6:] if len(time_series) >= 6 else time_series
if series_name == "Consumer Price Index":
data_points = []
for item in recent:
ref_date = item["ref_date"]
yoy = item.get("yoy_pct_change")
if yoy is not None:
data_points.append(f' {{date: new Date("{ref_date}"), rate: {yoy:.1f}}}')
data_js = ",\n".join(data_points)
if LANG == "fr":
title = "Taux d'inflation d'une année à l'autre (%)"
y_label = "Pourcentage"
else:
title = "Year-over-year inflation rate (%)"
y_label = "Percent"
return f'''```js
import * as Plot from "npm:@observablehq/plot";
const inflationData = [
{data_js}
];
display(Plot.plot({{
title: "{title}",
width: 640,
height: 280,
y: {{domain: [0, 4], grid: true, label: "{y_label}"}},
x: {{type: "utc", label: null}},
marks: [
Plot.ruleY([0]),
Plot.ruleY([1, 3], {{stroke: "#ddd", strokeDasharray: "4,4"}}),
Plot.lineY(inflationData, {{x: "date", y: "rate", stroke: "#AF3C43", strokeWidth: 2}}),
Plot.dot(inflationData, {{x: "date", y: "rate", fill: "#AF3C43", r: 4}})
]
}}));
```'''
return ""
def generate_component_chart_js(data: Dict[str, Any]) -> str:
"""Generate Observable Plot code for the component breakdown chart."""
subseries = data.get("subseries")
if not subseries or "category" not in subseries:
return ""
categories = subseries["category"]
yoy_changes = subseries.get("yoy_pct_change", [])
if not categories or not yoy_changes:
return ""
# Build data array
data_points = []
for cat, yoy in zip(categories, yoy_changes):
if yoy is not None:
data_points.append(f' {{name: "{cat}", change: {yoy:.1f}}}')
data_js = ",\n".join(data_points)
if LANG == "fr":
title = "Variation annuelle selon la composante (%)"
x_label = "Variation en pourcentage"
else:
title = "Year-over-year change by component (%)"
x_label = "Percent change"
return f'''```js
const components = [
{data_js}
];
display(Plot.plot({{
title: "{title}",
width: 640,
height: 320,
marginLeft: 140,
x: {{domain: [-1, 5], grid: true, label: "{x_label}"}},
y: {{label: null}},
marks: [
Plot.ruleX([0]),
Plot.barX(components, {{
y: "name",
x: "change",
fill: d => d.change >= 0 ? "#AF3C43" : "#2e7d32",
sort: {{y: "-x"}}
}}),
Plot.text(components, {{
y: "name",
x: "change",
text: d => d.change.toFixed(1) + "%",
dx: 20,
fill: "currentColor"
}})
]
}}));
```'''
def generate_provincial_table(data: Dict[str, Any]) -> str:
"""Generate markdown table for provincial data."""
provincial = data.get("provincial")
if not provincial or "category" not in provincial:
return ""
categories = provincial["category"]
yoy_changes = provincial.get("yoy_pct_change", [])
if not categories or not yoy_changes:
return ""
# Sort by yoy descending
sorted_items = sorted(zip(categories, yoy_changes), key=lambda x: x[1] or 0, reverse=True)
if LANG == "fr":
header = "| Province | Variation annuelle |"
else:
header = "| Province | Year-over-year change |"
separator = "|----------|----------------------|"
rows = []
for prov, yoy in sorted_items:
sign = "+" if yoy >= 0 else ""
rows.append(f"| {prov} | {sign}{yoy:.1f}% |")
return "\n".join([header, separator] + rows)
def generate_note_to_readers(data: Dict[str, Any]) -> str:
"""Generate the note to readers section."""
series_name = data["metadata"].get("series_name", "")
if series_name == "Consumer Price Index":
if LANG == "fr":
p1 = "L'Indice des prix à la consommation mesure le taux de variation des prix que subissent les consommateurs canadiens. Il est calculé en comparant le coût d'un panier fixe de biens et de services achetés par les consommateurs au fil du temps."
p2 = "L'IPC n'est pas désaisonnalisé. Les variations d'un mois à l'autre peuvent refléter des tendances saisonnières en plus des tendances de prix sous-jacentes."
else:
p1 = "The Consumer Price Index measures the rate of price change experienced by Canadian consumers. It is calculated by comparing the cost of a fixed basket of goods and services purchased by consumers over time."
p2 = "The CPI is not seasonally adjusted. Month-to-month movements can reflect seasonal patterns in addition to underlying price trends."
return f"{p1}\n\n{p2}"
elif series_name == "Retail Sales":
if LANG == "fr":
return "Les ventes au détail représentent la valeur de toutes les ventes effectuées par l'intermédiaire des canaux de vente au détail. Les données sont désaisonnalisées."
return "Retail sales represent the value of all sales made through retail channels. Data are seasonally adjusted."
elif series_name == "Manufacturing Sales":
if LANG == "fr":
p1 = "L'enquête mensuelle sur les industries manufacturières mesure les ventes de biens fabriqués, les stocks et les commandes dans le secteur de la fabrication."
p2 = "Les données sont désaisonnalisées pour tenir compte des variations saisonnières régulières."
return f"{p1}\n\n{p2}"
else:
p1 = "The Monthly Survey of Manufacturing measures sales of goods manufactured, inventories and orders in the manufacturing sector."
p2 = "Data are seasonally adjusted to account for regular seasonal variations."
return f"{p1}\n\n{p2}"
return ""
def generate_component_narrative(data: Dict[str, Any]) -> str:
"""Generate narrative for the component breakdown section."""
subseries = data.get("subseries")
series_name = data["metadata"].get("series_name", "")
if not subseries or "category" not in subseries:
return ""
categories = subseries["category"]
yoy_changes = subseries.get("yoy_pct_change", [])
if not categories or not yoy_changes or series_name != "Consumer Price Index":
return ""
# Find top contributors
sorted_items = sorted(zip(categories, yoy_changes), key=lambda x: x[1] or 0, reverse=True)
if len(sorted_items) < 2:
return ""
top_cat, top_yoy = sorted_items[0]
if LANG == "fr":
narrative = f"Parmi les huit principales composantes de l'IPC, les prix du {top_cat.lower()} ont affiché la plus forte hausse annuelle, soit {top_yoy:.1f} %."
narrative += " Les coûts hypothécaires et les loyers ont continué d'exercer une pression à la hausse sur cette catégorie."
else:
narrative = f"Among the eight major components of the CPI, {top_cat.lower()} prices showed the largest year-over-year increase at {top_yoy:.1f}%."
narrative += " Mortgage interest costs and rent continued to put upward pressure on this category."
# Add food narrative if available
food_items = [(cat, yoy) for cat, yoy in sorted_items if "food" in cat.lower() or "aliment" in cat.lower()]
if food_items:
food_cat, food_yoy = food_items[0]
if LANG == "fr":
narrative += f"\n\nLes prix des {food_cat.lower()} ont augmenté de {food_yoy:.1f} %."
else:
narrative += f"\n\n{food_cat} prices rose {food_yoy:.1f}%."
return narrative