735 lines
37 KiB
Python
735 lines
37 KiB
Python
"""
|
|
Data Migration Script
|
|
Migrates VariableValue data from source to target database with ID mapping.
|
|
"""
|
|
import json
|
|
import logging
|
|
import csv
|
|
import os
|
|
import glob
|
|
from datetime import datetime
|
|
from db_utils import DatabaseConnection
|
|
|
|
|
|
class DataMigrator:
|
|
def __init__(self, config_path='config.json'):
|
|
"""Initialize the migrator with configuration."""
|
|
with open(config_path, 'r') as f:
|
|
self.config = json.load(f)
|
|
|
|
self.source_conn = None # this is the source DB connection (check config.json)
|
|
self.target_conn = None # this is the target DB connection (check config.json)
|
|
self.project_map = {} # this is where we store all the ProjectID maps between |source <-> target|
|
|
self.variable_map = {} # this is where we store all the VariableID maps between |source <-> target|
|
|
self.project_mapping_details = [] # Store full mapping details for export
|
|
self.variable_mapping_details = [] # Store full mapping details for export
|
|
|
|
# Store database identifiers for progress tracking
|
|
self.source_db_name = self.config['source_db']['database']
|
|
self.target_db_name = self.config['target_db']['database']
|
|
|
|
# Setup logging and timestamp
|
|
self.timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
log_filename = f"folderdata_migration_{self.timestamp}.log"
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(log_filename),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def connect_databases(self):
|
|
"""Establish connections to source and target databases."""
|
|
self.logger.info("Connecting to source database...")
|
|
self.source_conn = DatabaseConnection(self.config['source_db']) # use the DB info in config.json to connect
|
|
|
|
self.logger.info("Connecting to target database...")
|
|
self.target_conn = DatabaseConnection(self.config['target_db']) # use the DB info in config.json to connect
|
|
|
|
self.logger.info("Database connections established.")
|
|
|
|
def transform_source_path(self, source_path):
|
|
"""Transform source path to expected target path by prepending root folder."""
|
|
if not source_path:
|
|
return None
|
|
|
|
# Get target root folder from config
|
|
# TODO this is defaulting to Citadel, need to change to make this more generic
|
|
target_root = self.config.get('path_mapping', {}).get('target_root_folder', 'Citadel')
|
|
|
|
# Strip leading backslash if exists
|
|
path_without_leading = source_path.lstrip('\\')
|
|
|
|
# Handle root path specially. The way the code is set up our Source Root will always be blank
|
|
if not path_without_leading or path_without_leading == '':
|
|
# Source root maps to target root
|
|
return f'\\{target_root}\\'
|
|
|
|
# Prepend target root folder
|
|
transformed = f'\\{target_root}\\{path_without_leading}'
|
|
|
|
# Ensure trailing backslash if original had it
|
|
if source_path.endswith('\\') and not transformed.endswith('\\'):
|
|
transformed += '\\'
|
|
|
|
return transformed
|
|
|
|
def build_project_mapping(self):
|
|
"""Build mapping of source ProjectID to target ProjectID based on folder paths."""
|
|
"""NOTE: If you try to match based on names you are going to get a lot of the same folder names"""
|
|
self.logger.info("Building project ID mapping using path-based matching...")
|
|
|
|
# Fetch projects from source with paths (exclude deleted projects)
|
|
source_projects = self.source_conn.execute_query(
|
|
"SELECT ProjectID, Name, Path FROM Projects WHERE Deleted = 0 OR Deleted IS NULL"
|
|
)
|
|
|
|
# Fetch projects from target with paths (exclude deleted projects)
|
|
target_projects = self.target_conn.execute_query(
|
|
"SELECT ProjectID, Name, Path FROM Projects WHERE Deleted = 0 OR Deleted IS NULL"
|
|
)
|
|
|
|
self.logger.info(f"Found {len(source_projects)} active projects in source database (deleted projects excluded)")
|
|
self.logger.info(f"Found {len(target_projects)} active projects in target database (deleted projects excluded)")
|
|
|
|
# Create path-to-ID mapping for target
|
|
# Handle case sensitivity based on config, we default to FALSE
|
|
case_sensitive = self.config.get('path_mapping', {}).get('case_sensitive', False)
|
|
|
|
target_path_map = {}
|
|
# in this loop we are going through all of the folders in the Target DB (PDM calls them projects) and
|
|
# creating the key:value pair [path]:[project_DB_row]
|
|
for project in target_projects:
|
|
path = project['Path']
|
|
if path:
|
|
# Normalize path for case-insensitive matching if needed
|
|
key = path if case_sensitive else path.lower()
|
|
target_path_map[key] = project
|
|
|
|
self.logger.info(f"Built target path index with {len(target_path_map)} paths")
|
|
|
|
# Build source-to-target ID mapping
|
|
mapped_count = 0
|
|
unmapped_count = 0
|
|
null_path_count = 0
|
|
|
|
for source_project in source_projects:
|
|
source_id = source_project['ProjectID']
|
|
project_name = source_project['Name']
|
|
source_path = source_project['Path']
|
|
|
|
# Skip projects with no path
|
|
if not source_path:
|
|
self.logger.warning(f"Project '{project_name}' (ID: {source_id}) has no path - skipping")
|
|
null_path_count += 1
|
|
continue
|
|
|
|
# Transform source path to expected target path
|
|
target_path = self.transform_source_path(source_path)
|
|
|
|
# This is more of a precaution, it really should never be triggered, I guess maybe this would show you there was an error w/root
|
|
if not target_path:
|
|
self.logger.warning(f"Could not transform path for project '{project_name}' (ID: {source_id})")
|
|
unmapped_count += 1
|
|
continue
|
|
|
|
# Here we handle case sensitivity if its required.
|
|
# target_path - this is actually our "expected target path" or basically our source + new root folder
|
|
# lookup_key - this is basically just a case sensitive version of target_path
|
|
lookup_key = target_path if case_sensitive else target_path.lower()
|
|
|
|
# Here we are taking all of the paths we are expecting to find (lookup_key) and seeing if we actually find them in the TargetDB
|
|
# the if clause only gets triggered on the matches
|
|
if lookup_key in target_path_map:
|
|
target_project = target_path_map[lookup_key]
|
|
target_id = target_project['ProjectID']
|
|
|
|
# if we find a match we map it to Global project_map in our DataMigrator class (created in __init__)
|
|
self.project_map[source_id] = target_id
|
|
|
|
# Store full details for export this is used for logging in the CSV so we can manually review the mappings
|
|
self.project_mapping_details.append({
|
|
'ProjectName': project_name,
|
|
'SourceID': source_id,
|
|
'TargetID': target_id,
|
|
'SourcePath': source_path,
|
|
'TargetPath': target_path
|
|
})
|
|
|
|
self.logger.debug(f"Mapped Project '{project_name}': {source_id} -> {target_id} (Path: {source_path} -> {target_path})")
|
|
mapped_count += 1
|
|
else:
|
|
# Enhanced debugging for unmapped paths
|
|
self.logger.warning(f"Project '{project_name}' (ID: {source_id}) - Target path not found")
|
|
self.logger.warning(f" Source path: [{source_path}]")
|
|
self.logger.warning(f" Transformed to: [{target_path}]")
|
|
self.logger.warning(f" Lookup key: [{lookup_key}]")
|
|
|
|
# Check for similar paths (useful for finding typos/whitespace issues)
|
|
similar = [p for p in target_path_map.keys() if project_name.lower() in p.lower()][:3]
|
|
if similar:
|
|
self.logger.warning(f" Similar paths in target: {similar}")
|
|
|
|
unmapped_count += 1
|
|
|
|
self.logger.info(f"Project mapping complete:")
|
|
self.logger.info(f"--- Successfully mapped: {mapped_count} projects")
|
|
self.logger.info(f"--- Unmapped (path not found): {unmapped_count} projects")
|
|
self.logger.info(f"--- Skipped (null path): {null_path_count} projects")
|
|
self.logger.info(f"--- Total in project_map: {len(self.project_map)} projects")
|
|
|
|
def build_variable_mapping(self):
|
|
"""Build mapping of source VariableID to target VariableID based on variable names."""
|
|
"""NOTE: The logic in this section is different than the Project Mapping"""
|
|
"""NOTE: the reason for this is that there cannot be duplicate Variable names so we can match on Name"""
|
|
self.logger.info("Building variable ID mapping...")
|
|
|
|
# Fetch variables from source (exclude deleted variables)
|
|
source_variables = self.source_conn.execute_query(
|
|
"SELECT VariableID, VariableName FROM Variable WHERE IsDeleted = 0 OR IsDeleted IS NULL"
|
|
)
|
|
|
|
# Fetch variables from target (exclude deleted variables)
|
|
target_variables = self.target_conn.execute_query(
|
|
"SELECT VariableID, VariableName FROM Variable WHERE IsDeleted = 0 OR IsDeleted IS NULL"
|
|
)
|
|
|
|
# Filter out system variables (names in curly brackets like {GUID})
|
|
source_variables_before_filter = len(source_variables)
|
|
target_variables_before_filter = len(target_variables)
|
|
|
|
# The PDMVault has a two kinds of variables. All the user created variables show up with human readable names
|
|
# the system variables show up like Name = {ASDFASD-FASDFSDF-ADFDFDFS}
|
|
# so in these two lines, we are making sure we ignore the system variables by ignoring { }
|
|
source_variables = [v for v in source_variables if not (v['VariableName'].startswith('{') and v['VariableName'].endswith('}'))]
|
|
target_variables = [v for v in target_variables if not (v['VariableName'].startswith('{') and v['VariableName'].endswith('}'))]
|
|
|
|
# We are keeping track of how many system variables we are skipping
|
|
source_system_vars_excluded = source_variables_before_filter - len(source_variables)
|
|
target_system_vars_excluded = target_variables_before_filter - len(target_variables)
|
|
|
|
self.logger.info(f"Found {len(source_variables)} user variables in source database (deleted: excluded, system variables: {source_system_vars_excluded} excluded)")
|
|
self.logger.info(f"Found {len(target_variables)} user variables in target database (deleted: excluded, system variables: {target_system_vars_excluded} excluded)")
|
|
|
|
# Create name-to-ID mapping for target
|
|
target_map = {row['VariableName']: row['VariableID'] for row in target_variables}
|
|
|
|
# Build source-to-target ID mapping
|
|
for source_variable in source_variables:
|
|
source_id = source_variable['VariableID']
|
|
variable_name = source_variable['VariableName']
|
|
|
|
if variable_name in target_map:
|
|
target_id = target_map[variable_name]
|
|
self.variable_map[source_id] = target_id
|
|
# Store full details for export
|
|
self.variable_mapping_details.append({
|
|
'VariableName': variable_name,
|
|
'SourceID': source_id,
|
|
'TargetID': target_id
|
|
})
|
|
self.logger.debug(f"Mapped Variable '{variable_name}': {source_id} -> {target_id}")
|
|
else:
|
|
self.logger.warning(f"Variable '{variable_name}' (ID: {source_id}) not found in target database")
|
|
|
|
self.logger.info(f"Variable mapping complete. Mapped {len(self.variable_map)} variables.")
|
|
|
|
def validate_mappings(self):
|
|
"""Validate project and variable mappings for integrity."""
|
|
self.logger.info("Validating mappings...")
|
|
|
|
validation_issues = []
|
|
|
|
# Check for duplicate target IDs in project mapping (multiple sources -> same target)
|
|
# NOTE This was put inplace before we switched to matching via project path. It really shouldnt trigger
|
|
# NOTE but we are going to leave it in, in case it does. If it does trigger its a sign of having duplicate names
|
|
target_id_counts = {}
|
|
|
|
# here are making a dictionary where...
|
|
# key/value ..... [targetID]:[sourceID_list]
|
|
# basically if the sourceID_list is longer than one that shows that we are mapping multiple things to the same target
|
|
for source_id, target_id in self.project_map.items():
|
|
if target_id not in target_id_counts:
|
|
target_id_counts[target_id] = []
|
|
target_id_counts[target_id].append(source_id)
|
|
|
|
# basically if the sourceID_list is longer than one that shows that we are mapping multiple things to the same target
|
|
for target_id, source_ids in target_id_counts.items():
|
|
if len(source_ids) > 1:
|
|
issue = f"Multiple source projects ({source_ids}) map to same target project {target_id}"
|
|
validation_issues.append(issue)
|
|
self.logger.warning(issue)
|
|
|
|
# Check for duplicate target IDs in variable mapping
|
|
# NOTE: I can't think of a reason why any of this would ever trigger, but its in here
|
|
var_target_id_counts = {}
|
|
for source_id, target_id in self.variable_map.items():
|
|
if target_id not in var_target_id_counts:
|
|
var_target_id_counts[target_id] = []
|
|
var_target_id_counts[target_id].append(source_id)
|
|
|
|
for target_id, source_ids in var_target_id_counts.items():
|
|
if len(source_ids) > 1:
|
|
issue = f"Multiple source variables ({source_ids}) map to same target variable {target_id}"
|
|
validation_issues.append(issue)
|
|
self.logger.warning(issue)
|
|
|
|
# logging the issues
|
|
if validation_issues:
|
|
self.logger.warning(f"Found {len(validation_issues)} validation issues")
|
|
else:
|
|
self.logger.info("All mappings validated successfully - no integrity issues found")
|
|
|
|
return validation_issues
|
|
|
|
def export_mappings_to_csv(self):
|
|
"""Export ID mappings to CSV files for manual review."""
|
|
self.logger.info("Exporting mappings to CSV files...")
|
|
|
|
# Export project mappings
|
|
project_csv = f"mapping_projects_{self.timestamp}.csv"
|
|
with open(project_csv, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.DictWriter(f, fieldnames=['ProjectName', 'SourceID', 'TargetID', 'SourcePath', 'TargetPath'])
|
|
writer.writeheader()
|
|
writer.writerows(self.project_mapping_details)
|
|
|
|
self.logger.info(f"Project mappings exported to: {project_csv} ({len(self.project_mapping_details)} mappings)")
|
|
|
|
# Export variable mappings
|
|
variable_csv = f"mapping_variables_{self.timestamp}.csv"
|
|
with open(variable_csv, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.DictWriter(f, fieldnames=['VariableName', 'SourceID', 'TargetID'])
|
|
writer.writeheader()
|
|
writer.writerows(self.variable_mapping_details)
|
|
|
|
self.logger.info(f"Variable mappings exported to: {variable_csv} ({len(self.variable_mapping_details)} mappings)")
|
|
|
|
def save_migration_progress(self, batch_num, total_batches, stats):
|
|
"""Save migration progress to resume on failure."""
|
|
"""NOTE: Migration takes several hours and is subject to timeouts/connection issues. Need a way to resume gracefully"""
|
|
|
|
# Create database-specific progress filename
|
|
# NOTE we create this file when the migration starts. Its specific to the DB's being migrated so you could conceivably
|
|
# NOTE go back and forth between migrations if you wanted
|
|
# NOTE when the migration is finished this file gets deleted
|
|
progress_file = f"migration_progress_{self.source_db_name}_to_{self.target_db_name}_{self.timestamp}.json"
|
|
|
|
# In config.json we specify how we are going to batch the migration and then as we migrate we keep stats of it
|
|
# essentially we commit changes every 10 batches so we are tracking that and rolling back to the nearest 10th if we have an error
|
|
progress_data = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'migration_timestamp': self.timestamp,
|
|
'source_database': self.source_db_name,
|
|
'target_database': self.target_db_name,
|
|
'last_completed_batch': batch_num,
|
|
'total_batches': total_batches,
|
|
'records_inserted': stats['inserted'],
|
|
'records_updated': stats['updated'],
|
|
'records_errors': stats['errors']
|
|
}
|
|
with open(progress_file, 'w') as f:
|
|
json.dump(progress_data, f, indent=2)
|
|
self.logger.info(f"Progress saved: batch {batch_num}/{total_batches}")
|
|
|
|
def load_migration_progress(self):
|
|
"""Load previous migration progress if exists for current database pair."""
|
|
# Look for progress files matching current database combination
|
|
# for instance if you are trying to Migrate A->B and then it times out and you try doing A->C you wont get
|
|
# any errors, otherwise it would see a progress file and assume the progress on A->B was the progress of your A->C migration
|
|
progress_pattern = f"migration_progress_{self.source_db_name}_to_{self.target_db_name}_*.json"
|
|
progress_files = glob.glob(progress_pattern)
|
|
|
|
if not progress_files:
|
|
self.logger.info("No previous migration progress found for this database pair")
|
|
return None
|
|
|
|
# Get most recent progress file for this database pair
|
|
latest_progress = max(progress_files, key=os.path.getmtime)
|
|
|
|
with open(latest_progress, 'r') as f:
|
|
progress = json.load(f)
|
|
|
|
# Validate that progress file matches current databases
|
|
if (progress.get('source_database') != self.source_db_name or
|
|
progress.get('target_database') != self.target_db_name):
|
|
self.logger.warning(f"Found progress file but database names don't match. Ignoring.")
|
|
return None
|
|
|
|
self.logger.info(f"Found previous migration progress: {latest_progress}")
|
|
self.logger.info(f" Source DB: {progress['source_database']} -> Target DB: {progress['target_database']}")
|
|
self.logger.info(f" Last completed batch: {progress['last_completed_batch']}/{progress['total_batches']}")
|
|
self.logger.info(f" Inserted so far: {progress['records_inserted']}")
|
|
self.logger.info(f" Updated so far: {progress['records_updated']}")
|
|
self.logger.info(f" Errors so far: {progress['records_errors']}")
|
|
|
|
return progress
|
|
|
|
def cleanup_progress_file(self):
|
|
"""Remove progress file after successful completion for this database pair."""
|
|
"""NOTE: This function is called from migrate_variable_values() """
|
|
"""NOTE: This function is only called after the final DB commit """
|
|
progress_pattern = f"migration_progress_{self.source_db_name}_to_{self.target_db_name}_*.json"
|
|
progress_files = glob.glob(progress_pattern) #TODO look more into glob, but its basically a path matching tool
|
|
|
|
for pf in progress_files:
|
|
os.remove(pf)
|
|
self.logger.info(f"Removed progress file: {pf}")
|
|
|
|
def migrate_variable_values(self):
|
|
"""Migrate VariableValue records from source to target using ID mappings with UPSERT logic."""
|
|
""" NOTE: We are using Upsert logic which means if a Folder card already has data in the Target """
|
|
""" NOTE: This migration is going to revert it to matching whatever is in the Source. """
|
|
self.logger.info("Starting VariableValue migration (UPSERT mode)...")
|
|
|
|
# Get configuration. In config.json we set how we want to batch it
|
|
# NOTE: We are defaulting to 500 Batch sizes
|
|
batch_size = self.config.get('migration', {}).get('batch_size', 500)
|
|
|
|
self.logger.info(f"Migration settings: batch_size={batch_size}, mode=UPSERT (insert new, update existing)")
|
|
|
|
# Fetch all VariableValue records from source
|
|
# TODO: REMOVE DocumentID=1 if you want to try migrating all the variable values
|
|
# TODO: Do that at your own risk
|
|
# TODO: Probably don't do that
|
|
# Only fetch records for folders (DocumentID = 1)
|
|
# ORDER BY ensures consistent ordering across runs for reliable resume
|
|
source_values = self.source_conn.execute_query(
|
|
"SELECT * FROM VariableValue WHERE DocumentID = 1 ORDER BY ProjectID, VariableID, RevisionNo"
|
|
)
|
|
|
|
self.logger.info(f"Found {len(source_values)} VariableValue records in source (DocumentID=1 only).")
|
|
|
|
# Prepare MERGE query for UPSERT operation
|
|
# MERGE will INSERT if not exists, UPDATE if exists
|
|
# NOTE: In this context the source is the staged data that we are about to insert. It has already been converted.
|
|
# NOTE: Basically we are saying We have a correctly formatted row we are about to insert called "SOURCE" does it already exist in our target DB
|
|
merge_query = """
|
|
MERGE INTO VariableValue AS target
|
|
USING (SELECT ? AS VariableID, ? AS DocumentID, ? AS ProjectID, ? AS RevisionNo,
|
|
? AS ConfigurationID, ? AS ValueText, ? AS ValueInt, ? AS ValueFloat,
|
|
? AS ValueDate, ? AS ValueCache, ? AS IsLongText) AS source
|
|
ON (target.VariableID = source.VariableID
|
|
AND target.ProjectID = source.ProjectID
|
|
AND target.DocumentID = source.DocumentID)
|
|
WHEN MATCHED THEN
|
|
UPDATE SET
|
|
RevisionNo = source.RevisionNo,
|
|
ConfigurationID = source.ConfigurationID,
|
|
ValueText = source.ValueText,
|
|
ValueInt = source.ValueInt,
|
|
ValueFloat = source.ValueFloat,
|
|
ValueDate = source.ValueDate,
|
|
ValueCache = source.ValueCache,
|
|
IsLongText = source.IsLongText
|
|
WHEN NOT MATCHED THEN
|
|
INSERT (VariableID, DocumentID, ProjectID, RevisionNo, ConfigurationID, ValueText, ValueInt, ValueFloat, ValueDate, ValueCache, IsLongText)
|
|
VALUES (source.VariableID, source.DocumentID, source.ProjectID, source.RevisionNo, source.ConfigurationID, source.ValueText, source.ValueInt, source.ValueFloat, source.ValueDate, source.ValueCache, source.IsLongText);
|
|
"""
|
|
|
|
# Collect records to upsert, with detailed tracking
|
|
params_list = []
|
|
record_metadata = [] # Store metadata for logging
|
|
skipped_unmapped = 0
|
|
skipped_invalid_doc = 0
|
|
|
|
for record in source_values:
|
|
# Extract source IDs
|
|
source_variable_id = record.get('VariableID')
|
|
source_project_id = record.get('ProjectID')
|
|
source_docID = record.get('DocumentID')
|
|
|
|
# Skip if DocumentID is not 1 (safety check, should be filtered by query)
|
|
if source_docID != 1:
|
|
self.logger.warning(f"Skipping record: ProjectID {source_project_id} - VariableID {source_variable_id} DocumentID is not == 1")
|
|
skipped_invalid_doc += 1
|
|
continue
|
|
|
|
# Map to target IDs
|
|
target_variable_id = self.variable_map.get(source_variable_id)
|
|
target_project_id = self.project_map.get(source_project_id)
|
|
|
|
# Skip if mapping not found
|
|
if target_variable_id is None:
|
|
self.logger.debug(f"Skipping record: VariableID {source_variable_id} not mapped")
|
|
skipped_unmapped += 1
|
|
continue
|
|
|
|
if target_project_id is None:
|
|
self.logger.debug(f"Skipping record: ProjectID {source_project_id} not mapped")
|
|
skipped_unmapped += 1
|
|
continue
|
|
|
|
# Extract other fields
|
|
source_rev = record.get('RevisionNo')
|
|
source_configID = record.get('ConfigurationID')
|
|
source_valueText = record.get('ValueText')
|
|
source_valueInt = record.get('ValueInt')
|
|
source_valueFloat = record.get('ValueFloat')
|
|
source_valueDate = record.get('ValueDate')
|
|
source_valueCache = record.get('ValueCache')
|
|
source_islongtext = record.get('IsLongText')
|
|
|
|
# Prepare params for MERGE statement
|
|
# Order matches the SELECT in MERGE USING clause
|
|
params = (
|
|
target_variable_id, # VariableID
|
|
source_docID, # DocumentID
|
|
target_project_id, # ProjectID
|
|
source_rev, # RevisionNo
|
|
source_configID, # ConfigurationID
|
|
source_valueText, # ValueText
|
|
source_valueInt, # ValueInt
|
|
source_valueFloat, # ValueFloat
|
|
source_valueDate, # ValueDate
|
|
source_valueCache, # ValueCache
|
|
source_islongtext # IsLongText
|
|
)
|
|
params_list.append(params)
|
|
|
|
# Store metadata for detailed logging if needed
|
|
record_metadata.append({
|
|
'target_variable_id': target_variable_id,
|
|
'target_project_id': target_project_id,
|
|
'source_variable_id': source_variable_id,
|
|
'source_project_id': source_project_id
|
|
})
|
|
|
|
self.logger.info(f"Prepared {len(params_list)} records for UPSERT")
|
|
self.logger.info(f"Skipped {skipped_unmapped} records (unmapped IDs)")
|
|
|
|
if skipped_invalid_doc > 0:
|
|
self.logger.info(f"Skipped {skipped_invalid_doc} records (invalid DocumentID)")
|
|
|
|
# Check for existing progress and offer to resume
|
|
previous_progress = self.load_migration_progress()
|
|
start_batch = 0
|
|
total_stats = {'inserted': 0, 'updated': 0, 'errors': 0}
|
|
|
|
if previous_progress:
|
|
self.logger.info(f"*** Previous migration found ***")
|
|
self.logger.info(f"Last completed batch: {previous_progress['last_completed_batch']}/{previous_progress['total_batches']}")
|
|
self.logger.info(f"Records inserted: {previous_progress['records_inserted']}")
|
|
self.logger.info(f"Records updated: {previous_progress['records_updated']}")
|
|
|
|
# Try to prompt user, if fails (non-interactive), automatically resume
|
|
try:
|
|
print(f"\n*** Previous migration found ***")
|
|
print(f"Last completed batch: {previous_progress['last_completed_batch']}/{previous_progress['total_batches']}")
|
|
print(f"Records inserted: {previous_progress['records_inserted']}")
|
|
print(f"Records updated: {previous_progress['records_updated']}")
|
|
response = input(f"Resume from batch {previous_progress['last_completed_batch'] + 1}? (y/n): ").strip().lower()
|
|
except EOFError:
|
|
# Non-interactive mode - automatically resume
|
|
response = 'y'
|
|
self.logger.info("Running in non-interactive mode - automatically resuming")
|
|
|
|
if response == 'y':
|
|
start_batch = previous_progress['last_completed_batch']
|
|
total_stats['inserted'] = previous_progress['records_inserted']
|
|
total_stats['updated'] = previous_progress['records_updated']
|
|
total_stats['errors'] = previous_progress['records_errors']
|
|
self.logger.info(f"Resuming from batch {start_batch + 1}")
|
|
else:
|
|
self.logger.info("Starting fresh migration (previous progress will be overwritten)")
|
|
self.cleanup_progress_file()
|
|
|
|
# Get commit interval from config
|
|
commit_interval = self.config.get('migration', {}).get('commit_interval', 10)
|
|
self.logger.info(f"Commit interval: every {commit_interval} batches")
|
|
|
|
# Process in batches with periodic commits
|
|
total_batches = (len(params_list) + batch_size - 1) // batch_size
|
|
|
|
try:
|
|
for i in range(start_batch * batch_size, len(params_list), batch_size):
|
|
batch = params_list[i:i + batch_size]
|
|
batch_num = (i // batch_size) + 1
|
|
|
|
self.logger.info(f"Processing batch {batch_num}/{total_batches} ({len(batch)} records)...")
|
|
|
|
# Execute MERGE for each record in batch
|
|
batch_inserted = 0
|
|
batch_updated = 0
|
|
batch_errors = 0
|
|
|
|
for j, params in enumerate(batch):
|
|
try:
|
|
cursor = self.target_conn.connection.cursor()
|
|
cursor.execute(merge_query, params)
|
|
affected = cursor.rowcount
|
|
cursor.close()
|
|
|
|
# MERGE returns 1 for INSERT, 2 for UPDATE (1 match + 1 update)
|
|
if affected == 1:
|
|
batch_inserted += 1
|
|
elif affected == 2:
|
|
batch_updated += 1
|
|
|
|
except Exception as e:
|
|
meta = record_metadata[i + j]
|
|
var_name = next((v['VariableName'] for v in self.variable_mapping_details if v['TargetID'] == meta['target_variable_id']), f"ID:{meta['target_variable_id']}")
|
|
proj_name = next((p['ProjectName'] for p in self.project_mapping_details if p['TargetID'] == meta['target_project_id']), f"ID:{meta['target_project_id']}")
|
|
|
|
self.logger.error(f"Failed to UPSERT - Variable: '{var_name}' (ID: {meta['target_variable_id']}), Project: '{proj_name}' (ID: {meta['target_project_id']}), Error: {e}")
|
|
batch_errors += 1
|
|
|
|
total_stats['inserted'] += batch_inserted
|
|
total_stats['updated'] += batch_updated
|
|
total_stats['errors'] += batch_errors
|
|
|
|
self.logger.info(f"Batch {batch_num} complete: inserted={batch_inserted}, updated={batch_updated}, errors={batch_errors}")
|
|
|
|
# Commit every N batches to keep connection alive and save progress
|
|
if batch_num % commit_interval == 0:
|
|
self.target_conn.commit()
|
|
self.save_migration_progress(batch_num, total_batches, total_stats)
|
|
self.logger.info(f"[COMMIT] Transaction committed at batch {batch_num} (every {commit_interval} batches)")
|
|
|
|
# Final commit for any remaining batches
|
|
self.target_conn.commit()
|
|
self.cleanup_progress_file()
|
|
self.logger.info("[SUCCESS] Final transaction committed successfully")
|
|
self.logger.info("[SUCCESS] Migration completed - progress file cleaned up")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Migration failed at batch {batch_num}: {e}")
|
|
self.logger.error(f"Progress has been saved. Run the migration again to resume from batch {((batch_num // commit_interval) * commit_interval) + 1}")
|
|
# Don't rollback - we want to keep what was already committed
|
|
raise
|
|
|
|
self.logger.info(f"Migration complete:")
|
|
self.logger.info(f" - Total inserted (new): {total_stats['inserted']}")
|
|
self.logger.info(f" - Total updated (existing): {total_stats['updated']}")
|
|
self.logger.info(f" - Unmapped skipped: {skipped_unmapped}")
|
|
self.logger.info(f" - Errors: {total_stats['errors']}")
|
|
|
|
def validate_migration(self):
|
|
# After the migration runs we will run this script to validate that everything was inserted correctly into the Target DB
|
|
self.logger.info("Running Validation on the Target to ensure we completed migration successfully")
|
|
|
|
# the primary key columns are VariableID, DocumentID, ProjectID, RevisionNo, ConfigurationID
|
|
source_values = self.source_conn.execute_query(
|
|
"""SELECT VariableID, DocumentID, ProjectID, RevisionNo, ConfigurationID FROM VariableValue
|
|
WHERE DocumentID = 1
|
|
ORDER BY DocumentID, VariableID, RevisionNo"""
|
|
)
|
|
|
|
target_values = self.target_conn.execute_query(
|
|
"""SELECT VariableID, DocumentID, ProjectID, RevisionNo, ConfigurationID FROM VariableValue
|
|
WHERE DocumentID = 1
|
|
ORDER BY DocumentID, VariableID, RevisionNo"""
|
|
)
|
|
|
|
# --------------------------------------
|
|
# Convert target_values to a set of tuples for fast lookup
|
|
# ---------------------------------------
|
|
self.logger.info("Building target record set for comparison...")
|
|
target_set = set() # this is every row of the VariableValue table in the Target DB
|
|
for record in target_values:
|
|
key = (
|
|
record['VariableID'],
|
|
record['DocumentID'],
|
|
record['ProjectID'],
|
|
record['RevisionNo'],
|
|
record['ConfigurationID']
|
|
)
|
|
target_set.add(key)
|
|
|
|
error_list = [] # this is the container we are going to use to hold the errors we find
|
|
success_count = 0 # we will just tally the records we find
|
|
ignore_count = 0 # counter for rows that we didnt map because we couldnt find a projectID in the Target DB
|
|
|
|
# now we search the target for each row using the mapped values to make sure that it is in there, we log it if we can't find it
|
|
# --------------------------------------
|
|
# Create a CSV to log all the rows we think we are missing and begin the scan
|
|
# ---------------------------------------
|
|
doc_csv_filename = f'validation_missing_folderdata_{self.timestamp}.csv'
|
|
|
|
with open(doc_csv_filename, 'w', newline='', encoding='utf-8') as csvfile:
|
|
writer = csv.writer(csvfile)
|
|
writer.writerow(['Target_VariableID', 'DocumentID', 'Target_ProjectID', 'RevisionNo', 'ConfigurationID']) # write header
|
|
self.logger.info(f"Prepared {len(source_values)} records. Beginning validation with Target DB")
|
|
|
|
for record in source_values:
|
|
# Extract source IDs
|
|
source_variable_id = record.get('VariableID')
|
|
source_project_id = record.get('ProjectID')
|
|
|
|
# Map to target IDs
|
|
target_variable_id = self.variable_map.get(source_variable_id)
|
|
target_project_id = self.project_map.get(source_project_id)
|
|
|
|
# Extract other fields
|
|
source_rev = record.get('RevisionNo')
|
|
source_configID = record.get('ConfigurationID')
|
|
source_documentID = record.get("DocumentID") # even though all of these should be 1 we are still going to pull the actual value
|
|
|
|
# Prepare params for MERGE statement
|
|
# DocumentID is always 1 for folders in the PDMVault but we are still going to pull the value directly instead of hard coding
|
|
params = (
|
|
target_variable_id, # mapped VariableID
|
|
source_documentID, # should be 1 but we are going to pull it anyway
|
|
target_project_id, # mapped ProjectID
|
|
source_rev, # RevisionNo
|
|
source_configID, # ConfigurationID
|
|
)
|
|
|
|
# Some projects didnt map into the Target DB these are usually things like '' or 'No Project'
|
|
# These projects wont have a target ID so we are going to ignore them
|
|
if not target_project_id or not target_variable_id:
|
|
ignore_count += 1
|
|
else:
|
|
if params in target_set:
|
|
success_count += 1
|
|
else:
|
|
self.logger.warning(f"Failed to find match for {params} in Target")
|
|
error_list.append(params) # record which record is missing
|
|
writer.writerow([params[0], params[1], params[2], params[3], params[4]])
|
|
|
|
self.logger.info("=" * 50)
|
|
self.logger.info("$ Migration Validation Completed!")
|
|
self.logger.info("=" * 50)
|
|
self.logger.info(f"Gross Success rate: {(success_count / len(source_values)) * 100:.2f}%")
|
|
self.logger.info(f"Success rate w/o Ignored Files: {(success_count / (len(source_values) - ignore_count)) * 100:.2f}%")
|
|
self.logger.info(f"{success_count} of {len(source_values)} Rows were found")
|
|
self.logger.info(f"MISSING ROW COUNT:{len(error_list)} - See CSV output for details")
|
|
self.logger.info(f"We ignored a total of {ignore_count} rows. We couldn't map these to the TargetDB. Either bad Var or Proj ID")
|
|
self.logger.info("$" * 50)
|
|
|
|
def run(self):
|
|
"""Execute the complete migration process."""
|
|
try:
|
|
self.logger.info("=" * 50)
|
|
self.logger.info("Starting Data Migration")
|
|
self.logger.info("=" * 50)
|
|
|
|
self.connect_databases()
|
|
self.build_project_mapping()
|
|
self.build_variable_mapping()
|
|
self.validate_mappings()
|
|
self.export_mappings_to_csv()
|
|
self.migrate_variable_values()
|
|
self.validate_migration()
|
|
|
|
self.logger.info("=" * 50)
|
|
self.logger.info("Migration completed successfully!")
|
|
self.logger.info("=" * 50)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Migration failed: {e}", exc_info=True)
|
|
raise
|
|
|
|
finally:
|
|
if self.source_conn:
|
|
self.source_conn.close()
|
|
if self.target_conn:
|
|
self.target_conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
migrator = DataMigrator('config.json')
|
|
migrator.run()
|