228 lines
8.9 KiB
Python
228 lines
8.9 KiB
Python
"""
|
|
Pre-migration check script to identify variable name clashes between source and target databases.
|
|
|
|
This script compares Variable names from both databases and identifies any matching names,
|
|
which could cause conflicts during migration. Results are exported to a CSV file for review.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import csv
|
|
from datetime import datetime
|
|
from db_utils import DatabaseConnection
|
|
|
|
|
|
class VariableClashChecker:
|
|
"""Checks for variable name clashes between source and target databases."""
|
|
|
|
def __init__(self, config_file='config.json'):
|
|
"""Initialize with configuration file."""
|
|
# Load configuration
|
|
with open(config_file, 'r') as f:
|
|
self.config = json.load(f)
|
|
|
|
# Setup logging
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
log_filename = f'variable_clash_check_{timestamp}.log'
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(log_filename),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
self.logger = logging.getLogger(__name__)
|
|
self.timestamp = timestamp
|
|
|
|
def is_system_variable(self, variable_name):
|
|
"""Check if a variable is a system variable (name in curly brackets)."""
|
|
if not variable_name:
|
|
return False
|
|
return variable_name.startswith('{') and variable_name.endswith('}')
|
|
|
|
def check_variable_clashes(self):
|
|
"""
|
|
Check for variable name clashes between source and target databases.
|
|
|
|
Returns:
|
|
list: List of dictionaries containing clash information
|
|
"""
|
|
self.logger.info("=" * 50)
|
|
self.logger.info("Variable Clash Check")
|
|
self.logger.info("=" * 50)
|
|
|
|
# Connect to databases
|
|
self.logger.info("Connecting to source database...")
|
|
source_conn = DatabaseConnection(self.config['source_db'])
|
|
source_db_name = self.config['source_db']['database']
|
|
|
|
self.logger.info("Connecting to target database...")
|
|
target_conn = DatabaseConnection(self.config['target_db'])
|
|
target_db_name = self.config['target_db']['database']
|
|
|
|
try:
|
|
# Fetch all variables from source
|
|
self.logger.info("Fetching variables from source database...")
|
|
source_variables = source_conn.execute_query(
|
|
"SELECT VariableID, VariableName, IsDeleted FROM Variable"
|
|
)
|
|
self.logger.info(f"Found {len(source_variables)} variables in source database")
|
|
|
|
# Fetch all variables from target
|
|
self.logger.info("Fetching variables from target database...")
|
|
target_variables = target_conn.execute_query(
|
|
"SELECT VariableID, VariableName, IsDeleted FROM Variable"
|
|
)
|
|
self.logger.info(f"Found {len(target_variables)} variables in target database")
|
|
|
|
# Create name-to-variable mapping for target (case-insensitive)
|
|
target_var_map = {}
|
|
for var in target_variables:
|
|
var_name = var['VariableName']
|
|
if var_name:
|
|
# Use lowercase for case-insensitive comparison
|
|
key = var_name.lower()
|
|
if key not in target_var_map:
|
|
target_var_map[key] = []
|
|
target_var_map[key].append(var)
|
|
|
|
# Find clashes
|
|
clashes = []
|
|
for source_var in source_variables:
|
|
source_name = source_var['VariableName']
|
|
if not source_name:
|
|
continue
|
|
|
|
# Check if this name exists in target
|
|
key = source_name.lower()
|
|
if key in target_var_map:
|
|
# Found a clash - could be multiple targets with same name
|
|
for target_var in target_var_map[key]:
|
|
clash = {
|
|
'SourceDatabase': source_db_name,
|
|
'TargetDatabase': target_db_name,
|
|
'VariableName': source_name,
|
|
'SourceID': source_var['VariableID'],
|
|
'TargetID': target_var['VariableID'],
|
|
'TargetName': target_var['VariableName'], # Show actual casing
|
|
'IsSystemVariable': self.is_system_variable(source_name),
|
|
'SourceIsDeleted': bool(source_var.get('IsDeleted', 0)),
|
|
'TargetIsDeleted': bool(target_var.get('IsDeleted', 0))
|
|
}
|
|
clashes.append(clash)
|
|
|
|
# Summary statistics
|
|
total_clashes = len(clashes)
|
|
active_clashes = [c for c in clashes if not c['SourceIsDeleted'] and not c['TargetIsDeleted']]
|
|
system_clashes = [c for c in clashes if c['IsSystemVariable']]
|
|
user_clashes = [c for c in clashes if not c['IsSystemVariable']]
|
|
active_user_clashes = [c for c in active_clashes if not c['IsSystemVariable']]
|
|
|
|
self.logger.info("")
|
|
self.logger.info("=" * 50)
|
|
self.logger.info("CLASH SUMMARY")
|
|
self.logger.info("=" * 50)
|
|
self.logger.info(f"Total variable clashes found: {total_clashes}")
|
|
self.logger.info(f" - Active clashes (both non-deleted): {len(active_clashes)}")
|
|
self.logger.info(f" - System variable clashes: {len(system_clashes)}")
|
|
self.logger.info(f" - User variable clashes: {len(user_clashes)}")
|
|
self.logger.info(f" - Active user variable clashes: {len(active_user_clashes)}")
|
|
self.logger.info("")
|
|
|
|
if len(active_user_clashes) > 0:
|
|
self.logger.warning("WARNING: Active user variable clashes detected!")
|
|
self.logger.warning("These will cause the migration to map source variables to existing target variables.")
|
|
self.logger.warning("Review the output CSV file to determine if this is expected.")
|
|
else:
|
|
self.logger.info("No active user variable clashes found - migration should proceed safely.")
|
|
|
|
return clashes
|
|
|
|
finally:
|
|
# Close connections
|
|
source_conn.close()
|
|
target_conn.close()
|
|
|
|
def export_clashes_to_csv(self, clashes):
|
|
"""
|
|
Export clashes to CSV file.
|
|
|
|
Args:
|
|
clashes (list): List of clash dictionaries
|
|
"""
|
|
if not clashes:
|
|
self.logger.info("No clashes to export.")
|
|
return
|
|
|
|
csv_filename = f'Variable_Clashes_{self.timestamp}.csv'
|
|
|
|
with open(csv_filename, 'w', newline='', encoding='utf-8') as csvfile:
|
|
fieldnames = [
|
|
'SourceDatabase',
|
|
'TargetDatabase',
|
|
'VariableName',
|
|
'SourceID',
|
|
'TargetID',
|
|
'TargetName',
|
|
'IsSystemVariable',
|
|
'SourceIsDeleted',
|
|
'TargetIsDeleted',
|
|
'ConflictSeverity'
|
|
]
|
|
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
|
|
|
writer.writeheader()
|
|
|
|
for clash in clashes:
|
|
# Determine conflict severity
|
|
if clash['SourceIsDeleted'] or clash['TargetIsDeleted']:
|
|
severity = 'LOW (deleted)'
|
|
elif clash['IsSystemVariable']:
|
|
severity = 'LOW (system)'
|
|
else:
|
|
severity = 'HIGH (active user variable)'
|
|
|
|
writer.writerow({
|
|
'SourceDatabase': clash['SourceDatabase'],
|
|
'TargetDatabase': clash['TargetDatabase'],
|
|
'VariableName': clash['VariableName'],
|
|
'SourceID': clash['SourceID'],
|
|
'TargetID': clash['TargetID'],
|
|
'TargetName': clash['TargetName'],
|
|
'IsSystemVariable': 'Yes' if clash['IsSystemVariable'] else 'No',
|
|
'SourceIsDeleted': 'Yes' if clash['SourceIsDeleted'] else 'No',
|
|
'TargetIsDeleted': 'Yes' if clash['TargetIsDeleted'] else 'No',
|
|
'ConflictSeverity': severity
|
|
})
|
|
|
|
self.logger.info(f"Clashes exported to: {csv_filename}")
|
|
|
|
def run(self):
|
|
"""Run the complete clash check process."""
|
|
try:
|
|
clashes = self.check_variable_clashes()
|
|
self.export_clashes_to_csv(clashes)
|
|
|
|
self.logger.info("")
|
|
self.logger.info("=" * 50)
|
|
self.logger.info("Variable clash check completed successfully!")
|
|
self.logger.info("=" * 50)
|
|
|
|
return len(clashes)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error during clash check: {e}", exc_info=True)
|
|
raise
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
checker = VariableClashChecker()
|
|
checker.run()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|