|
1 | 1 | import os |
2 | 2 | import re |
3 | 3 | from pathlib import Path |
| 4 | + |
4 | 5 | import pandas as pd |
| 6 | +import pandas_gbq |
| 7 | +from google.cloud import bigquery |
| 8 | +from google.oauth2 import service_account |
| 9 | + |
| 10 | +PROJECT_ID = r"flare-network-sandbox" |
| 11 | +CREDENTIALS_PATH = r"./flare-network-sandbox-e95a1db92f64.json" |
| 12 | + |
5 | 13 |
|
6 | | -MARKDOWN_EXTENSION = [".md", ".mdx"] |
| 14 | +credentials = service_account.Credentials.from_service_account_file(CREDENTIALS_PATH) |
| 15 | +client = bigquery.Client(credentials=credentials, project=PROJECT_ID) |
7 | 16 |
|
8 | 17 |
|
9 | 18 | def parse_markdown(content: str) -> list[tuple[str, str]]: |
10 | | - pattern = re.compile(r"^---\n(.*?)\n---\n(.*)", re.DOTALL) |
11 | | - match = pattern.match(content) |
12 | | - return ( |
13 | | - [(match.group(1).strip(), match.group(2).strip())] |
14 | | - if match |
15 | | - else [("", content.strip())] |
16 | | - ) |
| 19 | + """Extract metadata and content from markdown files.""" |
| 20 | + pattern = re.compile(r"^---\n(.*?)\n---", re.DOTALL) |
| 21 | + metadata_match = pattern.search(content) |
| 22 | + |
| 23 | + if metadata_match: |
| 24 | + metadata_str = metadata_match.group(1).strip() |
| 25 | + content_extract = content.split("---", 2)[-1].strip() |
| 26 | + return [(metadata_str, content_extract)] |
| 27 | + |
| 28 | + return [("", content)] |
| 29 | + |
| 30 | + |
| 31 | +def create_table(table_id: str) -> None: |
| 32 | + """Create a BigQuery table if it does not exist.""" |
| 33 | + schema = [ |
| 34 | + bigquery.SchemaField("file_name", "STRING", mode="REQUIRED"), |
| 35 | + bigquery.SchemaField("meta_data", "STRING"), |
| 36 | + bigquery.SchemaField("content", "STRING"), |
| 37 | + bigquery.SchemaField("last_updated", "DATETIME"), |
| 38 | + ] |
| 39 | + table = bigquery.Table(table_id, schema=schema) |
| 40 | + try: |
| 41 | + client.create_table(table) |
| 42 | + print(f"Table {table_id} created successfully.") |
| 43 | + except Exception as e: |
| 44 | + print(f"Table creation error: {e}") |
| 45 | + |
| 46 | + |
| 47 | +def check_table_exist(table_id: str) -> bool: |
| 48 | + """Check if a table exists in BigQuery.""" |
| 49 | + try: |
| 50 | + if client.get_table(table_id): |
| 51 | + return True |
| 52 | + except Exception: |
| 53 | + return False |
17 | 54 |
|
18 | 55 |
|
19 | | -def process_markdown_files(directory: Path, output_csv: Path) -> None: |
20 | | - if not directory.exists(): |
| 56 | +def load_data(data: pd.DataFrame, table_id: str) -> None: |
| 57 | + """Load data into BigQuery, ensuring last_updated is in DATETIME format.""" |
| 58 | + print(f"Loading data into {table_id}") |
| 59 | + try: |
| 60 | + # Convert timestamp to datetime format for BigQuery compatibility |
| 61 | + data["last_updated"] = pd.to_datetime(data["last_updated"]) |
| 62 | + pandas_gbq.to_gbq( |
| 63 | + data, |
| 64 | + table_id, |
| 65 | + project_id=PROJECT_ID, |
| 66 | + credentials=credentials, |
| 67 | + if_exists="append", |
| 68 | + ) |
| 69 | + print(f"Successfully loaded {len(data)} rows into {table_id}") |
| 70 | + except Exception as e: |
| 71 | + print(f"Error loading data: {e}") |
| 72 | + |
| 73 | + |
| 74 | +def process_markdown_files(directory: str, table_id: str) -> None: |
| 75 | + """Process markdown files from the given directory and load into BigQuery.""" |
| 76 | + directory_path = Path(directory) |
| 77 | + if not directory_path.exists(): |
21 | 78 | print(f"Directory not found: {directory}") |
22 | 79 | return |
23 | 80 |
|
24 | | - file_content: list[list[str]] = [] |
| 81 | + file_content = [] |
25 | 82 | for root, _, files in os.walk(directory): |
26 | 83 | for file in files: |
27 | 84 | file_path = Path(root) / file |
28 | | - if file_path.suffix.lower() not in MARKDOWN_EXTENSION: |
| 85 | + if file_path.suffix.lower() not in [".md", ".mdx"]: |
29 | 86 | continue |
30 | | - |
31 | | - print(f"Processing file: {file_path}") |
32 | 87 | try: |
33 | | - content = file_path.read_text(encoding="utf-8") |
34 | | - parsed_content = parse_markdown(content) |
35 | | - |
36 | | - for metadata_, content in parsed_content: |
37 | | - file_content.append([file_path.name, metadata_, content]) |
38 | | - |
| 88 | + with file_path.open(encoding="utf-8") as md_file: |
| 89 | + content = md_file.read() |
| 90 | + parsed_content = parse_markdown(content) |
| 91 | + for metadata, content in parsed_content: |
| 92 | + file_content.append([file_path.name, metadata, content]) |
39 | 93 | except OSError as e: |
40 | 94 | print(f"Error processing file {file_path}: {e}") |
41 | | - pd_data = pd.DataFrame(file_content, columns=["Filename", "Metadata", "Contents"]) # pyright: ignore [reportArgumentType] |
42 | | - pd_data.to_csv(output_csv, index=False) |
43 | 95 |
|
| 96 | + if file_content: |
| 97 | + pd_data = pd.DataFrame( |
| 98 | + file_content, columns=["file_name", "meta_data", "content"] |
| 99 | + ) |
| 100 | + pd_data["last_updated"] = pd.Timestamp.now().floor( |
| 101 | + "s" |
| 102 | + ) # Ensures precision matches DATETIME |
44 | 103 |
|
45 | | -if __name__ == "__main__": |
46 | | - # Define the directory containing .md/.mdx files and the output CSV file |
47 | | - directory = Path("..", "docs") |
48 | | - output_csv = Path("docs.csv") |
| 104 | + if not check_table_exist(table_id): |
| 105 | + create_table(table_id) |
| 106 | + |
| 107 | + load_data(pd_data, table_id) |
| 108 | + else: |
| 109 | + print("No markdown files found to process.") |
49 | 110 |
|
50 | | - # Process the files |
51 | | - process_markdown_files(directory, output_csv) |
52 | | - print(f"CSV file created at: {output_csv}") |
| 111 | + |
| 112 | +if __name__ == "__main__": |
| 113 | + directory = "../docs" |
| 114 | + dataset_id = f"{PROJECT_ID}.flare_network_docs_data" |
| 115 | + table_id = f"{dataset_id}.docs_data001" |
| 116 | + process_markdown_files(directory, table_id) |
0 commit comments