Files
pdm/migrate_filedata.py

1081 lines
51 KiB
Python

"""
File Data Migration Script
Migrates VariableValue data for files (not projects) between SQL Server databases.
Maps DocumentIDs using the Documents table and Filename as the unique identifier.
Key differences from migrate.py:
- All files have ProjectID=2 in both databases
- Maps DocumentID instead of ProjectID
- Uses Filename from Documents table for mapping
- Filters for file-based records (DocumentID != 1)
"""
import json
import logging
import csv
import os
import glob
from datetime import datetime
from db_utils import DatabaseConnection
class FileDataMigration:
"""Handles migration of file-based VariableValue data between databases."""
def __init__(self, config_file='config.json'):
"""Initialize migration with configuration file."""
# Load configuration
with open(config_file, 'r') as f:
self.config = json.load(f)
# Store database identifiers for progress tracking
self.source_db_name = self.config['source_db']['database']
self.target_db_name = self.config['target_db']['database']
# Setup logging with timestamp
self.timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
log_filename = f'filedata_migration_{self.timestamp}.log'
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_filename),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
# Initialize mappings
self.variable_map = {}
self.document_map = {}
self.configuration_map = {}
self.state_map = {}
# Database connections
self.source_conn = DatabaseConnection(self.config['source_db'])
self.target_conn = DatabaseConnection(self.config['target_db'])
self.logger.info("Database connections established.")
def build_variable_mapping(self):
"""Build mapping of source VariableID to target VariableID based on variable names."""
# Pulls all non-deleted, non-system variables from both the source and target databases,
# then matches them by name. This produces a dict that translates source VariableIDs to
# their corresponding target VariableIDs so migrated records point to the correct variable
# in the new vault. Any source variables without a name match in the target are logged as unmapped.
self.logger.info("Building variable ID mapping...")
# Fetch variables from source (exclude deleted and system variables)
source_variables = self.source_conn.execute_query(
"SELECT VariableID, VariableName FROM Variable WHERE IsDeleted = 0 OR IsDeleted IS NULL"
)
# Fetch variables from target (exclude deleted and system variables)
target_variables = self.target_conn.execute_query(
"SELECT VariableID, VariableName FROM Variable WHERE IsDeleted = 0 OR IsDeleted IS NULL"
)
# Filter out system variables (names in curly brackets)
source_user_vars = [v for v in source_variables if not (v['VariableName'].startswith('{') and v['VariableName'].endswith('}'))]
target_user_vars = [v for v in target_variables if not (v['VariableName'].startswith('{') and v['VariableName'].endswith('}'))]
self.logger.info(f"Found {len(source_user_vars)} user variables in source database")
self.logger.info(f"Found {len(target_user_vars)} user variables in target database")
# Create name-to-ID mapping for target
target_var_map = {v['VariableName']: v['VariableID'] for v in target_user_vars}
# Build source-to-target mapping
mapped_count = 0
unmapped_count = 0
for source_var in source_user_vars:
source_id = source_var['VariableID']
var_name = source_var['VariableName']
if var_name in target_var_map:
target_id = target_var_map[var_name]
self.variable_map[source_id] = target_id
mapped_count += 1
else:
self.logger.warning(f"Variable '{var_name}' (ID: {source_id}) not found in target database")
unmapped_count += 1
self.logger.info(f"Variable mapping complete. Mapped {mapped_count} variables.")
if unmapped_count > 0:
self.logger.warning(f"Unmapped variables: {unmapped_count}")
def transform_source_path(self, source_path):
"""Transform source path to expected target path by prepending root folder."""
if not source_path:
return None
# Get target root folder from config
target_root = self.config.get('path_mapping', {}).get('target_root_folder', 'Citadel')
# Strip leading backslash if exists
path_without_leading = source_path.lstrip('\\')
# Prepend target root folder
transformed = f'\\{target_root}\\{path_without_leading}'
# Ensure trailing backslash if original had it
if source_path.endswith('\\') and not transformed.endswith('\\'):
transformed += '\\'
return transformed
def build_document_mapping(self):
"""Build mapping of source DocumentID to target DocumentID based on full vault path."""
self.logger.info("Building document ID mapping using full vault path (folder path + filename)...")
# Fetch documents with full paths from source
# Join Documents -> DocumentsInProjects -> Projects to get folder path
source_documents = self.source_conn.execute_query("""
SELECT
d.DocumentID,
d.Filename,
p.Path AS FolderPath,
p.Path + d.Filename AS FullVaultPath
FROM Documents d
INNER JOIN DocumentsInProjects dp ON d.DocumentID = dp.DocumentID
INNER JOIN Projects p ON dp.ProjectID = p.ProjectID
""")
# Fetch documents with full paths from target
# Filter out deleted documents (Deleted=0) to avoid mapping to stale records
# when a document has been added and deleted multiple times
target_documents = self.target_conn.execute_query("""
SELECT
d.DocumentID,
d.Filename,
p.Path AS FolderPath,
p.Path + d.Filename AS FullVaultPath
FROM Documents d
INNER JOIN DocumentsInProjects dp ON d.DocumentID = dp.DocumentID
INNER JOIN Projects p ON dp.ProjectID = p.ProjectID
WHERE d.Deleted = 0
""")
self.logger.info(f"Found {len(source_documents)} documents in source database")
self.logger.info(f"Found {len(target_documents)} documents in target database")
# Create full-path-to-ID mapping for target
# Use case-insensitive comparison
case_sensitive = self.config.get('path_mapping', {}).get('case_sensitive', False)
target_doc_map = {}
duplicate_count = 0
for doc in target_documents:
full_path = doc['FullVaultPath']
if full_path:
# Normalize path for case-insensitive matching if needed
key = full_path if case_sensitive else full_path.lower()
if key in target_doc_map:
duplicate_count += 1
# Keep the last occurrence
target_doc_map[key] = doc
self.logger.info(f"Built target document index with {len(target_doc_map)} unique paths")
if duplicate_count > 0:
self.logger.warning(f"Found {duplicate_count} duplicate paths in target (kept last occurrence)")
# Build source-to-target mapping
mapped_count = 0
unmapped_count = 0
null_path_count = 0
for source_doc in source_documents:
source_id = source_doc['DocumentID']
filename = source_doc['Filename']
folder_path = source_doc['FolderPath']
source_full_path = source_doc['FullVaultPath']
# Skip documents with no path
if not source_full_path or not folder_path:
self.logger.debug(f"Document '{filename}' (ID: {source_id}) has no path - skipping")
null_path_count += 1
continue
# Transform source folder path to expected target folder path
target_folder_path = self.transform_source_path(folder_path)
if not target_folder_path:
self.logger.debug(f"Document '{filename}' (ID: {source_id}) - path transformation failed")
unmapped_count += 1
continue
# Construct expected target full path
# Folder path already ends with '\', so just concatenate filename
target_full_path = target_folder_path + filename
# Look up in target using case-insensitive comparison
key = target_full_path if case_sensitive else target_full_path.lower()
if key in target_doc_map:
target_doc = target_doc_map[key]
target_id = target_doc['DocumentID']
self.document_map[source_id] = target_id
mapped_count += 1
else:
self.logger.debug(f"Document '{filename}' (ID: {source_id}) - Target path not found")
self.logger.debug(f" Source path: [{source_full_path}]")
self.logger.debug(f" Transformed to: [{target_full_path}]")
unmapped_count += 1
self.logger.info(f"Document mapping complete:")
self.logger.info(f" - Successfully mapped: {mapped_count} documents")
self.logger.info(f" - Unmapped (path not found): {unmapped_count} documents")
self.logger.info(f" - Skipped (null path): {null_path_count} documents")
self.logger.info(f" - Total in document_map: {len(self.document_map)} documents")
def build_configuration_mapping(self):
"""Build mapping of source ConfigurationID to target ConfigurationID based on ConfigurationName."""
""" NOTE: find the manual overrides by searching for all the instances where ConfigurationName has dupe in the DB"""
self.logger.info("Building configuration ID mapping...")
# Fetch configurations from source
source_configs = self.source_conn.execute_query(
"SELECT ConfigurationID, ConfigurationName FROM DocumentConfiguration"
)
# Fetch configurations from target
target_configs = self.target_conn.execute_query(
"SELECT ConfigurationID, ConfigurationName FROM DocumentConfiguration"
)
self.logger.info(f"Found {len(source_configs)} configurations in source database")
self.logger.info(f"Found {len(target_configs)} configurations in target database")
# Create ConfigurationName-to-ID mapping for target
target_config_map = {}
target_duplicates = {}
for config in target_configs:
config_name = config['ConfigurationName']
config_id = config['ConfigurationID']
if config_name in target_config_map:
# Track duplicates
if config_name not in target_duplicates:
target_duplicates[config_name] = [target_config_map[config_name]]
target_duplicates[config_name].append(config_id)
else:
target_config_map[config_name] = config_id
if target_duplicates:
self.logger.warning(f"Found {len(target_duplicates)} duplicate ConfigurationNames in target:")
for name, ids in target_duplicates.items():
self.logger.warning(f" '{name}': IDs {ids}")
# Load manual overrides from config file
manual_overrides = self.config.get('configuration_mapping_overrides', {})
if manual_overrides:
self.logger.warning("=" * 70)
self.logger.warning("MANUAL CONFIGURATION MAPPING OVERRIDES DETECTED!")
self.logger.warning(f"Found {len(manual_overrides)} manual configuration mapping overrides in config.json")
self.logger.warning("=" * 70)
self.logger.warning("Please verify these mappings are correct before proceeding:")
# Convert string keys to int (JSON keys are always strings)
manual_overrides = {int(k): int(v) for k, v in manual_overrides.items()}
# Display the manual overrides
for source_id, target_id in sorted(manual_overrides.items()):
self.logger.warning(f" Source ConfigurationID {source_id} -> Target ConfigurationID {target_id}")
self.logger.warning("=" * 70)
# Prompt user for confirmation
try:
response = input("\nHave you verified these configuration mappings are correct? (yes/no): ").strip().lower()
if response != 'yes':
self.logger.error("Migration cancelled by user - please verify configuration mappings in config.json")
raise ValueError("User cancelled migration - configuration mappings not verified")
except EOFError:
# Non-interactive mode - log warning but continue
self.logger.warning("Running in non-interactive mode - cannot prompt for confirmation")
self.logger.warning("PROCEEDING WITH MANUAL CONFIGURATION OVERRIDES - ENSURE THESE ARE CORRECT!")
self.logger.info(f"Proceeding with {len(manual_overrides)} manual configuration mapping overrides")
# Build source-to-target ID mapping
mapped_count = 0
unmapped_count = 0
override_count = 0
for source_config in source_configs:
source_id = source_config['ConfigurationID']
config_name = source_config['ConfigurationName']
# Check if there's a manual override first
if source_id in manual_overrides:
target_id = manual_overrides[source_id]
self.configuration_map[source_id] = target_id
self.logger.debug(f"Manual override: ConfigurationID {source_id} -> {target_id} ('{config_name}')")
override_count += 1
mapped_count += 1
elif config_name in target_config_map:
target_id = target_config_map[config_name]
self.configuration_map[source_id] = target_id
self.logger.debug(f"Mapped Configuration '{config_name}': {source_id} -> {target_id}")
mapped_count += 1
else:
self.logger.warning(f"Configuration '{config_name}' (ID: {source_id}) not found in target database")
unmapped_count += 1
self.logger.info(f"Configuration mapping complete:")
self.logger.info(f" - Successfully mapped: {mapped_count} configurations")
self.logger.info(f" - Manual overrides applied: {override_count} configurations")
self.logger.info(f" - Unmapped: {unmapped_count} configurations")
self.logger.info(f" - Total in configuration_map: {len(self.configuration_map)} configurations")
def build_state_mapping(self):
"""Build mapping of source CurrentStatusID to target CurrentStatusID based on explicit config.json mappings."""
self.logger.info("Building state (CurrentStatusID) mapping...")
# Load manual state mappings from config file
# Unlike configuration mapping, state mapping ONLY uses explicit mappings from config
# If no mapping exists, the target DB value is left unchanged
manual_mappings = self.config.get('state_mapping_overrides', {})
if not manual_mappings:
self.logger.info("No state mapping overrides found in config.json")
self.logger.info("CurrentStatusID values will remain unchanged in target database")
return
self.logger.warning("=" * 70)
self.logger.warning("MANUAL STATE (CurrentStatusID) MAPPING OVERRIDES DETECTED!")
self.logger.warning(f"Found {len(manual_mappings)} manual state mapping overrides in config.json")
self.logger.warning("=" * 70)
self.logger.warning("Please verify these mappings are correct before proceeding:")
# Convert string keys to int (JSON keys are always strings)
manual_mappings = {int(k): int(v) for k, v in manual_mappings.items()}
# Display the manual mappings
for source_id, target_id in sorted(manual_mappings.items()):
self.logger.warning(f" Source CurrentStatusID {source_id} -> Target CurrentStatusID {target_id}")
self.logger.warning("=" * 70)
# Prompt user for confirmation
try:
response = input("\nHave you verified these state mappings are correct? (yes/no): ").strip().lower()
if response != 'yes':
self.logger.error("Migration cancelled by user - please verify state mappings in config.json")
raise ValueError("User cancelled migration - state mappings not verified")
except EOFError:
# Non-interactive mode - log warning but continue
self.logger.warning("Running in non-interactive mode - cannot prompt for confirmation")
self.logger.warning("PROCEEDING WITH MANUAL STATE OVERRIDES - ENSURE THESE ARE CORRECT!")
# Store the mappings
self.state_map = manual_mappings
self.logger.info(f"State mapping complete:")
self.logger.info(f" - Total explicit mappings loaded: {len(self.state_map)} state mappings")
self.logger.info(" - Any CurrentStatusID not in this mapping will remain unchanged in target")
def export_mappings_to_csv(self):
"""Export variable and document mappings to CSV files for verification."""
self.logger.info("Exporting mappings to CSV files...")
# Export variable mappings
var_csv_filename = f'mapping_variables_filedata_{self.timestamp}.csv'
with open(var_csv_filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['VariableName', 'SourceID', 'TargetID'])
# Get variable names for the mapping
source_vars = self.source_conn.execute_query(
"SELECT VariableID, VariableName FROM Variable"
)
source_var_names = {v['VariableID']: v['VariableName'] for v in source_vars}
for source_id, target_id in self.variable_map.items():
var_name = source_var_names.get(source_id, 'Unknown')
writer.writerow([var_name, source_id, target_id])
self.logger.info(f"Variable mappings exported to: {var_csv_filename} ({len(self.variable_map)} mappings)")
# Export document mappings
doc_csv_filename = f'mapping_documents_filedata_{self.timestamp}.csv'
with open(doc_csv_filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['Filename', 'SourceDocID', 'TargetDocID', 'SourcePath', 'TargetPath'])
# Get filenames and full paths for the mapping
source_docs = self.source_conn.execute_query("""
SELECT
d.DocumentID,
d.Filename,
p.Path + d.Filename AS FullVaultPath
FROM Documents d
INNER JOIN DocumentsInProjects dp ON d.DocumentID = dp.DocumentID
INNER JOIN Projects p ON dp.ProjectID = p.ProjectID
""")
source_doc_info = {d['DocumentID']: {'Filename': d['Filename'], 'FullVaultPath': d['FullVaultPath']} for d in source_docs}
target_docs = self.target_conn.execute_query("""
SELECT
d.DocumentID,
d.Filename,
p.Path + d.Filename AS FullVaultPath
FROM Documents d
INNER JOIN DocumentsInProjects dp ON d.DocumentID = dp.DocumentID
INNER JOIN Projects p ON dp.ProjectID = p.ProjectID
""")
target_doc_info = {d['DocumentID']: {'Filename': d['Filename'], 'FullVaultPath': d['FullVaultPath']} for d in target_docs}
for source_id, target_id in self.document_map.items():
source_info = source_doc_info.get(source_id, {})
target_info = target_doc_info.get(target_id, {})
filename = source_info.get('Filename', 'Unknown')
source_path = source_info.get('FullVaultPath', 'Unknown')
target_path = target_info.get('FullVaultPath', 'Unknown')
writer.writerow([filename, source_id, target_id, source_path, target_path])
self.logger.info(f"Document mappings exported to: {doc_csv_filename} ({len(self.document_map)} mappings)")
def save_migration_progress(self, batch_num, total_batches, stats):
"""Save migration progress to resume on failure."""
progress_file = f"filedata_migration_progress_{self.source_db_name}_to_{self.target_db_name}_{self.timestamp}.json"
progress_data = {
'timestamp': datetime.now().isoformat(),
'migration_timestamp': self.timestamp,
'source_database': self.source_db_name,
'target_database': self.target_db_name,
'last_completed_batch': batch_num,
'total_batches': total_batches,
'records_inserted': stats['inserted'],
'records_updated': stats['updated'],
'records_errors': stats['errors']
}
with open(progress_file, 'w') as f:
json.dump(progress_data, f, indent=2)
self.logger.info(f"Progress saved: batch {batch_num}/{total_batches}")
def load_migration_progress(self):
"""Load previous migration progress if exists for current database pair."""
progress_pattern = f"filedata_migration_progress_{self.source_db_name}_to_{self.target_db_name}_*.json"
progress_files = glob.glob(progress_pattern)
if not progress_files:
self.logger.info("No previous file data migration progress found for this database pair")
return None
latest_progress = max(progress_files, key=os.path.getmtime)
with open(latest_progress, 'r') as f:
progress = json.load(f)
if (progress.get('source_database') != self.source_db_name or
progress.get('target_database') != self.target_db_name):
self.logger.warning(f"Found progress file but database names don't match. Ignoring.")
return None
self.logger.info(f"Found previous file data migration progress: {latest_progress}")
self.logger.info(f" Source DB: {progress['source_database']} -> Target DB: {progress['target_database']}")
self.logger.info(f" Last completed batch: {progress['last_completed_batch']}/{progress['total_batches']}")
self.logger.info(f" Records inserted: {progress['records_inserted']}")
self.logger.info(f" Records updated: {progress['records_updated']}")
return progress
def cleanup_progress_file(self):
"""Remove progress file after successful completion for this database pair."""
progress_pattern = f"filedata_migration_progress_{self.source_db_name}_to_{self.target_db_name}_*.json"
progress_files = glob.glob(progress_pattern)
for pf in progress_files:
os.remove(pf)
self.logger.info(f"Removed progress file: {pf}")
def preview_migration(self, num_records=20):
"""
Preview what the first batch of VariableValue migrations would look like.
Shows the actual SQL parameters that will be inserted/updated.
"""
self.logger.info("="*70)
self.logger.info(f"MIGRATION PREVIEW - First {num_records} Records")
self.logger.info("="*70)
self.logger.info("Showing actual SQL MERGE parameters that will be executed")
self.logger.info("="*70)
# Fetch source VariableValue records (same filter as actual migration)
source_values = self.source_conn.execute_query(
f"""SELECT TOP {num_records} * FROM VariableValue
WHERE ProjectID = 2 AND DocumentID != 1
ORDER BY DocumentID, VariableID, RevisionNo"""
)
if not source_values:
self.logger.info("No source records found to preview.")
return
self.logger.info(f"\nFound {len(source_values)} source records to preview\n")
# Preview each record
mapped_count = 0
skipped_count = 0
for i, record in enumerate(source_values, 1):
# Extract source IDs
source_variable_id = record.get('VariableID')
source_document_id = record.get('DocumentID')
# Map to target IDs
target_variable_id = self.variable_map.get(source_variable_id)
target_document_id = self.document_map.get(source_document_id)
self.logger.info(f"Record #{i}:")
self.logger.info(f" SOURCE VariableValue Row:")
self.logger.info(f" VariableID : {record.get('VariableID')}")
self.logger.info(f" DocumentID : {record.get('DocumentID')}")
self.logger.info(f" ProjectID : {record.get('ProjectID')}")
self.logger.info(f" RevisionNo : {record.get('RevisionNo')}")
self.logger.info(f" ConfigurationID: {record.get('ConfigurationID')}")
self.logger.info(f" ValueText : {record.get('ValueText')}")
self.logger.info(f" ValueInt : {record.get('ValueInt')}")
self.logger.info(f" ValueFloat : {record.get('ValueFloat')}")
self.logger.info(f" ValueDate : {record.get('ValueDate')}")
self.logger.info(f" ValueCache : {record.get('ValueCache')}")
self.logger.info(f" IsLongText : {record.get('IsLongText')}")
# Check if mapping exists
if target_variable_id is None or target_document_id is None:
self.logger.warning(f" TARGET: WILL BE SKIPPED (unmapped)")
if target_variable_id is None:
self.logger.warning(f" - Source VariableID {source_variable_id} not mapped to target")
if target_document_id is None:
self.logger.warning(f" - Source DocumentID {source_document_id} not mapped to target")
skipped_count += 1
else:
# Prepare params exactly as migration would (same logic as migrate_file_variable_values)
params = (
target_variable_id, # VariableID
target_document_id, # DocumentID
2, # ProjectID (always 2 for files)
record.get('RevisionNo'), # RevisionNo
record.get('ConfigurationID'), # ConfigurationID
record.get('ValueText'), # ValueText
record.get('ValueInt'), # ValueInt
record.get('ValueFloat'), # ValueFloat
record.get('ValueDate'), # ValueDate
record.get('ValueCache'), # ValueCache
record.get('IsLongText') # IsLongText
)
self.logger.info(f" TARGET MERGE Parameters (will be UPSERTED):")
self.logger.info(f" VariableID : {params[0]} (mapped from source {source_variable_id})")
self.logger.info(f" DocumentID : {params[1]} (mapped from source {source_document_id})")
self.logger.info(f" ProjectID : {params[2]} (always 2 for files)")
self.logger.info(f" RevisionNo : {params[3]}")
self.logger.info(f" ConfigurationID: {params[4]}")
self.logger.info(f" ValueText : {params[5]}")
self.logger.info(f" ValueInt : {params[6]}")
self.logger.info(f" ValueFloat : {params[7]}")
self.logger.info(f" ValueDate : {params[8]}")
self.logger.info(f" ValueCache : {params[9]}")
self.logger.info(f" IsLongText : {params[10]}")
mapped_count += 1
self.logger.info("")
self.logger.info("="*70)
self.logger.info(f"PREVIEW SUMMARY")
self.logger.info("="*70)
self.logger.info(f"Records that WILL be migrated: {mapped_count}")
self.logger.info(f"Records that will be SKIPPED: {skipped_count}")
self.logger.info("")
def migrate_file_variable_values(self):
"""
Migrate VariableValue records for files (ProjectID=2, DocumentID != 1).
Uses UPSERT mode with periodic commits and resume capability.
"""
self.logger.info("Starting file-based VariableValue migration (UPSERT mode)...")
# Get batch settings
batch_size = self.config.get('migration', {}).get('batch_size', 500)
commit_interval = self.config.get('migration', {}).get('commit_interval', 10)
self.logger.info(f"Migration settings: batch_size={batch_size}, mode=UPSERT (insert new, update existing)")
# Fetch ONLY the latest revision for each VariableID+DocumentID+ConfigurationID combination
# Filter: ProjectID=2 (files) and DocumentID != 1 (not project-level variables)
# Uses window function to get only the highest RevisionNo for each variable per document per configuration
source_values = self.source_conn.execute_query(
"""SELECT * FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY DocumentID, VariableID, ConfigurationID
ORDER BY RevisionNo DESC) as rn
FROM VariableValue
WHERE ProjectID = 2 AND DocumentID != 1
) ranked
WHERE rn = 1
ORDER BY DocumentID, VariableID, ConfigurationID"""
)
self.logger.info(f"Found {len(source_values)} latest-revision VariableValue records in source (ProjectID=2, DocumentID!=1).")
# Prepare MERGE query for UPSERT operation
# Match on all primary key columns: VariableID + DocumentID + ProjectID + RevisionNo + ConfigurationID
merge_query = """
MERGE INTO VariableValue AS target
USING (SELECT ? AS VariableID, ? AS DocumentID, ? AS ProjectID, ? AS RevisionNo,
? AS ConfigurationID, ? AS ValueText, ? AS ValueInt, ? AS ValueFloat,
? AS ValueDate, ? AS ValueCache, ? AS IsLongText) AS source
ON (target.VariableID = source.VariableID
AND target.DocumentID = source.DocumentID
AND target.ProjectID = source.ProjectID
AND target.RevisionNo = source.RevisionNo
AND target.ConfigurationID = source.ConfigurationID)
WHEN MATCHED THEN
UPDATE SET
ValueText = source.ValueText,
ValueInt = source.ValueInt,
ValueFloat = source.ValueFloat,
ValueDate = source.ValueDate,
ValueCache = source.ValueCache,
IsLongText = source.IsLongText
WHEN NOT MATCHED THEN
INSERT (VariableID, DocumentID, ProjectID, RevisionNo, ConfigurationID, ValueText, ValueInt, ValueFloat, ValueDate, ValueCache, IsLongText)
VALUES (source.VariableID, source.DocumentID, source.ProjectID, source.RevisionNo, source.ConfigurationID, source.ValueText, source.ValueInt, source.ValueFloat, source.ValueDate, source.ValueCache, source.IsLongText);
"""
# Collect records to upsert
params_list = []
skipped_unmapped = 0
for record in source_values:
# Extract source IDs
source_variable_id = record.get('VariableID')
source_document_id = record.get('DocumentID')
# Map to target IDs
target_variable_id = self.variable_map.get(source_variable_id)
target_document_id = self.document_map.get(source_document_id)
# Skip if mapping not found
if target_variable_id is None:
self.logger.debug(f"Skipping record: VariableID {source_variable_id} not mapped")
skipped_unmapped += 1
continue
if target_document_id is None:
self.logger.debug(f"Skipping record: DocumentID {source_document_id} not mapped")
skipped_unmapped += 1
continue
# Extract other fields
source_rev = record.get('RevisionNo') # Read but not used - we always insert as Rev 1
source_configID = record.get('ConfigurationID')
source_valueText = record.get('ValueText')
source_valueInt = record.get('ValueInt')
source_valueFloat = record.get('ValueFloat')
source_valueDate = record.get('ValueDate')
source_valueCache = record.get('ValueCache')
source_islongtext = record.get('IsLongText')
# Map ConfigurationID to target
target_config_id = self.configuration_map.get(source_configID)
# Skip if configuration mapping not found
if target_config_id is None:
self.logger.debug(f"Skipping record: ConfigurationID {source_configID} not mapped")
skipped_unmapped += 1
continue
# Prepare params for MERGE statement
# ProjectID is always 2 for files in target database
# RevisionNo is always 1 (we only migrate latest revision as new Rev 1)
params = (
target_variable_id, # VariableID
target_document_id, # DocumentID
2, # ProjectID (always 2 for files)
1, # RevisionNo (always 1 for migrated data)
target_config_id, # ConfigurationID (mapped)
source_valueText, # ValueText
source_valueInt, # ValueInt
source_valueFloat, # ValueFloat
source_valueDate, # ValueDate
source_valueCache, # ValueCache
source_islongtext # IsLongText
)
params_list.append(params)
self.logger.info(f"Prepared {len(params_list)} records for UPSERT")
self.logger.info(f"Skipped {skipped_unmapped} records (unmapped IDs)")
# Check for previous progress
previous_progress = self.load_migration_progress()
# Calculate batches
total_batches = (len(params_list) + batch_size - 1) // batch_size
# Initialize statistics
total_stats = {'inserted': 0, 'updated': 0, 'errors': 0}
start_batch = 0
# Handle resume
if previous_progress:
self.logger.info(f"*** Previous file data migration found ***")
self.logger.info(f"Last completed batch: {previous_progress['last_completed_batch']}/{previous_progress['total_batches']}")
self.logger.info(f"Records inserted: {previous_progress['records_inserted']}")
self.logger.info(f"Records updated: {previous_progress['records_updated']}")
try:
print(f"\n*** Previous file data migration found ***")
print(f"Last completed batch: {previous_progress['last_completed_batch']}/{previous_progress['total_batches']}")
print(f"Records inserted: {previous_progress['records_inserted']}")
print(f"Records updated: {previous_progress['records_updated']}")
response = input(f"Resume from batch {previous_progress['last_completed_batch'] + 1}? (y/n): ").strip().lower()
except EOFError:
response = 'y'
self.logger.info("Running in non-interactive mode - automatically resuming")
if response == 'y':
start_batch = previous_progress['last_completed_batch']
total_stats['inserted'] = previous_progress['records_inserted']
total_stats['updated'] = previous_progress['records_updated']
total_stats['errors'] = previous_progress['records_errors']
self.logger.info(f"Resuming from batch {start_batch + 1}")
else:
self.logger.info("Starting fresh migration (previous progress will be overwritten)")
self.cleanup_progress_file()
self.logger.info(f"Commit interval: every {commit_interval} batches")
# Process in batches
for batch_num in range(start_batch, total_batches):
batch_start = batch_num * batch_size
batch_end = min(batch_start + batch_size, len(params_list))
batch_params = params_list[batch_start:batch_end]
self.logger.info(f"Processing batch {batch_num + 1}/{total_batches} ({len(batch_params)} records)...")
# Execute MERGE for this batch
for params in batch_params:
try:
cursor = self.target_conn.connection.cursor()
cursor.execute(merge_query, params)
# Check if record was inserted or updated
# (This is approximate - SQL Server doesn't easily report MERGE action)
if cursor.rowcount > 0:
total_stats['inserted'] += 1
else:
total_stats['updated'] += 1
cursor.close()
except Exception as e:
total_stats['errors'] += 1
self.logger.error(f"Error upserting record: {e}")
self.logger.info(f"Batch {batch_num + 1} complete: inserted={total_stats['inserted']}, updated={total_stats['updated']}, errors={total_stats['errors']}")
# Commit every N batches to keep connection alive and save progress
if (batch_num + 1) % commit_interval == 0:
self.target_conn.commit()
self.save_migration_progress(batch_num + 1, total_batches, total_stats)
self.logger.info(f"[COMMIT] Transaction committed at batch {batch_num + 1} (every {commit_interval} batches)")
# Final commit
self.target_conn.commit()
self.logger.info(f"[SUCCESS] Final transaction committed")
# Clean up progress file on success
self.cleanup_progress_file()
self.logger.info("File-based VariableValue migration completed successfully!")
self.logger.info(f"Total records inserted: {total_stats['inserted']}")
self.logger.info(f"Total records updated: {total_stats['updated']}")
self.logger.info(f"Total errors: {total_stats['errors']}")
def validate_migration(self):
# After the migration runs we will run this script to validate that everything was inserted correctly into the Target DB
self.logger.info("Running Validation on the Target to ensure we completed migration successfully")
# the primary key columns are VariableID, DocumentID, ProjectID, RevisionNo, ConfigurationID
# IMPORTANT: We only migrated the LATEST revision from source, so only validate those
source_values = self.source_conn.execute_query(
"""SELECT * FROM (
SELECT VariableID, DocumentID, ProjectID, RevisionNo, ConfigurationID,
ROW_NUMBER() OVER (PARTITION BY DocumentID, VariableID, ConfigurationID
ORDER BY RevisionNo DESC) as rn
FROM VariableValue
WHERE ProjectID = 2 AND DocumentID != 1
) ranked
WHERE rn = 1
ORDER BY DocumentID, VariableID, ConfigurationID"""
)
# Target should have all records with RevisionNo = 1
target_values = self.target_conn.execute_query(
"""SELECT VariableID, DocumentID, ProjectID, RevisionNo, ConfigurationID FROM VariableValue
WHERE ProjectID = 2 AND DocumentID != 1 AND RevisionNo = 1
ORDER BY DocumentID, VariableID, ConfigurationID"""
)
# --------------------------------------
# Convert target_values to a set of tuples for fast lookup
# ---------------------------------------
self.logger.info("Building target record set for comparison...")
target_set = set() # this is every row of the VariableValue table in the Target DB
for record in target_values:
key = (
record['VariableID'],
record['DocumentID'],
record['ProjectID'],
record['RevisionNo'],
record['ConfigurationID']
)
target_set.add(key)
error_list = [] # this is the container we are going to use to hold the errors we find
success_count = 0 # we will just tally the records we find
ignore_count = 0 # counter for rows that we didnt map because we couldnt find a documentID or variableID in the Target DB
# now we search the target for each row using the mapped values to make sure that it is in there, we log it if we can't find it
# --------------------------------------
# Create a CSV to log all the rows we think we are missing and begin the scan
# ---------------------------------------
doc_csv_filename = f'validation_missing_filedata_{self.timestamp}.csv'
with open(doc_csv_filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['Target_VariableID', 'Target_DocumentID', 'ProjectID', 'RevisionNo', 'ConfigurationID']) # write header
self.logger.info(f"Prepared {len(source_values)} records. Beginning validation with Target DB")
for record in source_values:
# Extract source IDs
source_variable_id = record.get('VariableID')
source_document_id = record.get('DocumentID')
# Map to target IDs
target_variable_id = self.variable_map.get(source_variable_id)
target_document_id = self.document_map.get(source_document_id)
# Extract other fields
source_rev = record.get('RevisionNo') # Read but not used - we always insert as Rev 1
source_configID = record.get('ConfigurationID')
source_projectID = record.get("ProjectID") # even though all of these should be 2 we are still going to pull the actual value
# Map ConfigurationID to target
target_config_id = self.configuration_map.get(source_configID)
# Prepare params for validation
# ProjectID is always 2 for files in target database
# RevisionNo is always 1 (we migrated latest revision as Rev 1)
params = (
target_variable_id, # mapped VariableID
target_document_id, # mapped DocumentID
source_projectID, # ProjectID (should be 2 for files)
1, # RevisionNo (always 1 in target)
target_config_id, # ConfigurationID (mapped)
)
# Some documents, variables, or configurations didnt map into the Target DB
# These wont have a target ID so we are going to ignore them
if not target_document_id or not target_variable_id or not target_config_id:
ignore_count += 1
else:
if params in target_set:
success_count += 1
else:
self.logger.warning(f"Failed to find match for {params} in Target")
error_list.append(params) # record which record is missing
writer.writerow([params[0], params[1], params[2], params[3], params[4]])
self.logger.info("=" * 50)
self.logger.info("$ Migration Validation Completed!")
self.logger.info("=" * 50)
self.logger.info(f"Gross Success rate: {(success_count / len(source_values)) * 100:.2f}%")
self.logger.info(f"Success rate w/o Ignored Files: {(success_count / (len(source_values) - ignore_count)) * 100:.2f}%")
self.logger.info(f"{success_count} of {len(source_values)} Rows were found")
self.logger.info(f"-" * 50)
self.logger.info(f"MISSING ROW COUNT:{len(error_list)} - See CSV output for details")
self.logger.info(f"We ignored a total of {ignore_count} rows. We couldn't map these to the TargetDB. Either bad Var, Doc, or Config ID")
def migrate_documents_status(self):
"""
Migrate CurrentStatusID from source Documents to target Documents.
Only updates CurrentStatusID if a state mapping exists in config.json.
If no mapping exists, the target DB value remains unchanged.
"""
self.logger.info("Starting Documents CurrentStatusID migration...")
# Check if there are any state mappings
if not self.state_map:
self.logger.info("No state mappings configured - skipping CurrentStatusID migration")
return
# Get batch settings - use dedicated document status batch size if configured
batch_size = self.config.get('migration', {}).get('document_status_batch_size', 5000)
commit_interval = self.config.get('migration', {}).get('commit_interval', 10)
self.logger.info(f"Migration settings: batch_size={batch_size}, commit_interval={commit_interval}")
self.logger.info(f"Using executemany() for fast batch updates")
# Fetch CurrentStatusID from source Documents
source_documents = self.source_conn.execute_query("""
SELECT DocumentID, CurrentStatusID
FROM Documents
WHERE CurrentStatusID IS NOT NULL
""")
self.logger.info(f"Found {len(source_documents)} documents with CurrentStatusID in source database")
# Prepare update parameters
update_params = []
skipped_no_mapping = 0
skipped_unmapped_doc = 0
for doc in source_documents:
source_doc_id = doc['DocumentID']
source_status_id = doc['CurrentStatusID']
# Map source DocumentID to target DocumentID
target_doc_id = self.document_map.get(source_doc_id)
if target_doc_id is None:
# Document not mapped to target - skip
skipped_unmapped_doc += 1
continue
# Map source CurrentStatusID to target CurrentStatusID
target_status_id = self.state_map.get(source_status_id)
if target_status_id is None:
# No state mapping exists - leave target DB value unchanged
self.logger.debug(f"Document {source_doc_id}: No state mapping for CurrentStatusID={source_status_id} - skipping")
skipped_no_mapping += 1
continue
# Add to update list
update_params.append((target_status_id, target_doc_id))
self.logger.info(f"Prepared {len(update_params)} documents for CurrentStatusID update")
self.logger.info(f"Skipped {skipped_unmapped_doc} documents (not mapped to target)")
self.logger.info(f"Skipped {skipped_no_mapping} documents (no state mapping configured)")
if not update_params:
self.logger.info("No documents to update - migration complete")
return
# Update query
update_query = """
UPDATE Documents
SET CurrentStatusID = ?
WHERE DocumentID = ?
"""
# Calculate batches
total_batches = (len(update_params) + batch_size - 1) // batch_size
# Initialize statistics
total_updated = 0
total_errors = 0
# Process in batches
for batch_num in range(total_batches):
batch_start = batch_num * batch_size
batch_end = min(batch_start + batch_size, len(update_params))
batch_params = update_params[batch_start:batch_end]
self.logger.info(f"Processing batch {batch_num + 1}/{total_batches} ({len(batch_params)} documents)...")
# Execute updates for this batch using executemany (much faster than individual executes)
try:
cursor = self.target_conn.connection.cursor()
cursor.executemany(update_query, batch_params)
# executemany returns total affected rows
batch_updated = cursor.rowcount
total_updated += batch_updated
cursor.close()
self.logger.info(f"Batch {batch_num + 1} complete: {batch_updated} documents updated in this batch")
except Exception as e:
total_errors += len(batch_params)
self.logger.error(f"Error updating batch {batch_num + 1}: {e}")
self.logger.error(f"Failed to update {len(batch_params)} documents in this batch")
# Commit every N batches
if (batch_num + 1) % commit_interval == 0:
self.target_conn.commit()
self.logger.info(f"[COMMIT] Transaction committed at batch {batch_num + 1} (every {commit_interval} batches)")
# Final commit
self.target_conn.commit()
self.logger.info(f"[SUCCESS] Final transaction committed")
self.logger.info("Documents CurrentStatusID migration completed successfully!")
self.logger.info(f"Total documents updated: {total_updated}")
self.logger.info(f"Total errors: {total_errors}")
def run(self):
"""Execute the complete file data migration process."""
try:
self.logger.info("=" * 50)
self.logger.info("Starting File Data Migration")
self.logger.info("=" * 50)
# Build mappings
self.build_variable_mapping()
self.build_document_mapping()
self.build_configuration_mapping()
self.build_state_mapping()
# Export mappings for verification
self.export_mappings_to_csv()
# Preview first batch of migrations
# self.preview_migration(num_records=20)
# Perform migration
self.migrate_file_variable_values()
# Migrate Documents CurrentStatusID if state mappings are configured
# TODO DONT TOUCH THIS UNTIL A NEW VAULT MIGRATION
# TODO State Mapping via SQL does not work
# self.migrate_documents_status()
# Perform validation
# self.validate_migration()
self.logger.info("=" * 50)
self.logger.info("File Data Migration Completed Successfully!")
self.logger.info("=" * 50)
except Exception as e:
self.logger.error(f"Migration failed: {e}", exc_info=True)
raise
finally:
# Close connections
self.source_conn.close()
self.target_conn.close()
def main():
"""Main entry point."""
migrator = FileDataMigration()
migrator.run()
if __name__ == '__main__':
main()