-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcore.py
More file actions
315 lines (256 loc) · 11.2 KB
/
core.py
File metadata and controls
315 lines (256 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
core.py - Core classes for the OraSchemaGen modular Oracle schema generator
This module provides the foundational classes used throughout the OraSchemaGen system,
including the base class for Oracle objects and generators.
Author: John Clark Naldoza
"""
import os
import datetime
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Union, Set, Tuple
from faker import Faker
class OracleObject:
"""
Base class representing an Oracle database object
"""
def __init__(self, name: str, object_type: str):
self.name = name
self.object_type = object_type
self.dependencies: List[str] = []
self.sql: str = ""
def add_dependency(self, dependency: str):
"""Add a dependency to this object"""
if dependency not in self.dependencies:
self.dependencies.append(dependency)
def __str__(self) -> str:
return f"{self.object_type} {self.name}"
class OracleObjectGenerator(ABC):
"""
Abstract base class for Oracle object generators
"""
def __init__(self):
self.fake = Faker(['en_US', 'ja_JP'])
self.objects: List[OracleObject] = []
@abstractmethod
def generate(self, **kwargs) -> List[OracleObject]:
"""Generate Oracle objects with the given parameters"""
pass
def get_objects(self) -> List[OracleObject]:
"""Get the generated objects"""
return self.objects
class TableInfo:
"""Class to store table information"""
def __init__(self, name: str, columns: List[Dict[str, Any]]):
self.name = name
self.columns = columns
def get_primary_key_columns(self) -> List[str]:
"""Get the primary key columns of this table"""
return [col['name'] for col in self.columns
if 'constraints' in col and 'PRIMARY KEY' in col['constraints']]
def get_column_names(self) -> List[str]:
"""Get all column names of this table"""
return [col['name'] for col in self.columns]
def get_column_by_name(self, name: str) -> Optional[Dict[str, Any]]:
"""Get a column by name"""
for col in self.columns:
if col['name'] == name:
return col
return None
class EncodingHandler:
"""
Handles character encoding conversion
"""
@staticmethod
def convert_to_shift_jis(input_file: str, output_file: str) -> bool:
"""
Convert a file from UTF-8 to Shift-JIS encoding
Returns:
bool: True if conversion was successful, False otherwise
"""
try:
print(f"Converting file to Shift-JIS encoding...")
# Get file size for progress reporting
file_size = os.path.getsize(input_file)
print(f"- File size: {file_size / (1024 * 1024):.1f} MB")
with open(input_file, 'r', encoding='utf-8') as f:
content = f.read()
# Try to convert content to Shift-JIS
try:
with open(output_file, 'w', encoding='shift_jis') as f:
f.write(content)
print(f"✓ Successfully converted to Shift-JIS: {output_file}")
return True
except UnicodeEncodeError as e:
print(f"⚠ Warning: Some characters couldn't be encoded in Shift-JIS. Error: {e}")
print(" Attempting to replace problematic characters and continue...")
# Fallback with character replacement
with open(output_file, 'w', encoding='shift_jis', errors='replace') as f:
f.write(content)
print(f"✓ Converted to Shift-JIS with character replacements: {output_file}")
return True
except Exception as e:
print(f"Error converting to Shift-JIS: {str(e)}")
return False
class OracleObjectFactory:
"""
Factory for creating Oracle object generators
"""
@staticmethod
def create_generators(object_types: List[str]) -> List[OracleObjectGenerator]:
"""Create generators based on object types"""
from schema_generator import SchemaGenerator
from data_generator import DataGenerator
from trigger_generator import TriggerGenerator
from procedure_generator import ProcedureGenerator
from function_generator import FunctionGenerator
from package_generator import PackageGenerator
from lob_generator import LobGenerator
generators = []
# Always include schema generator for tables
generators.append(SchemaGenerator())
# Include data generator if specified
if 'data' in object_types or 'all' in object_types:
generators.append(DataGenerator())
# Include PL/SQL objects if specified
if 'trigger' in object_types or 'all' in object_types:
generators.append(TriggerGenerator())
if 'procedure' in object_types or 'all' in object_types:
generators.append(ProcedureGenerator())
if 'function' in object_types or 'all' in object_types:
generators.append(FunctionGenerator())
if 'package' in object_types or 'all' in object_types:
generators.append(PackageGenerator())
if 'lob' in object_types or 'all' in object_types:
generators.append(LobGenerator())
return generators
class OutputHandler:
"""
Handles output for generated Oracle objects
"""
def __init__(self, output_dir: Optional[str] = None, single_file: bool = True):
self.output_dir = output_dir
self.single_file = single_file
# Create output directory if specified and doesn't exist
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
def write_objects(self, objects: List[OracleObject], output_file: str, include_header: bool = True) -> None:
"""Write objects to output"""
if self.single_file:
self._write_single_file(objects, output_file, include_header)
else:
self._write_multiple_files(objects, include_header)
def _write_single_file(self, objects: List[OracleObject], output_file: str, include_header: bool) -> None:
"""Write objects to a single file"""
# Sort objects by dependencies
sorted_objects = self._sort_objects_by_dependencies(objects)
# Write to file
file_path = os.path.join(self.output_dir, output_file) if self.output_dir else output_file
with open(file_path, 'w', encoding='utf-8') as f:
# Write header
if include_header:
self._write_header(f)
# Write objects
for obj in sorted_objects:
f.write(f"-- {obj.object_type}: {obj.name}\n")
f.write(obj.sql)
f.write("\n\n")
# Write footer
if include_header:
self._write_footer(f)
def _write_multiple_files(self, objects: List[OracleObject], include_header: bool) -> None:
"""Write each object to a separate file"""
# Group objects by type
objects_by_type = {}
for obj in objects:
if obj.object_type not in objects_by_type:
objects_by_type[obj.object_type] = []
objects_by_type[obj.object_type].append(obj)
# Write each type to a separate file
for obj_type, objs in objects_by_type.items():
# Sort objects by dependencies
sorted_objects = self._sort_objects_by_dependencies(objs)
# Create type directory if it doesn't exist
type_dir = os.path.join(self.output_dir, obj_type.lower())
if not os.path.exists(type_dir):
os.makedirs(type_dir)
# Write each object to a file
for obj in sorted_objects:
file_name = f"{obj.name}.sql"
file_path = os.path.join(type_dir, file_name)
with open(file_path, 'w', encoding='utf-8') as f:
# Write header
if include_header:
self._write_object_header(f, obj)
# Write object
f.write(obj.sql)
# Write footer
if include_header:
self._write_object_footer(f)
def _sort_objects_by_dependencies(self, objects: List[OracleObject]) -> List[OracleObject]:
"""Sort objects by dependencies"""
# Create dependency graph
graph = {}
for obj in objects:
graph[obj.name] = set(obj.dependencies)
# Perform topological sort
result = []
visited = set()
temp_visited = set()
def visit(node_name):
# Skip if already visited
if node_name in visited:
return
# Check for circular dependencies
if node_name in temp_visited:
# Circular dependency detected, but we'll continue anyway
return
# Mark as temporarily visited
temp_visited.add(node_name)
# Visit dependencies
for dep in graph.get(node_name, set()):
visit(dep)
# Mark as visited
temp_visited.remove(node_name)
visited.add(node_name)
# Add to result
for obj in objects:
if obj.name == node_name:
result.append(obj)
break
# Visit all nodes
for obj in objects:
if obj.name not in visited:
visit(obj.name)
return result
def _write_header(self, file) -> None:
"""Write header to file"""
timestamp = datetime.datetime.now().strftime("%d-%b-%Y %H:%M:%S")
file.write(f"""-- OraSchemaGen generated SQL script
-- Generated: {timestamp}
-- Character Set: UTF-8
--
-- This script contains DDL and DML statements for Oracle Database.
-- Execute in SQL*Plus, SQLcl, or SQL Developer.
-- Review all generated SQL before executing against production databases.
""")
def _write_footer(self, file) -> None:
"""Write footer to file"""
timestamp = datetime.datetime.now().strftime("%d-%b-%Y %H:%M:%S")
file.write(f"""
-- Export completed successfully
-- Export Timestamp: {timestamp}
""")
def _write_object_header(self, file, obj: OracleObject) -> None:
"""Write object header to file"""
timestamp = datetime.datetime.now().strftime("%d-%b-%Y %H:%M:%S")
file.write(f"""-- {obj.object_type}: {obj.name}
-- Generated: {timestamp}
-- Dependencies: {', '.join(obj.dependencies) if obj.dependencies else 'None'}
""")
def _write_object_footer(self, file) -> None:
"""Write object footer to file"""
file.write("""
-- End of object definition
""")