""" Rollback File Data Migration Script Deletes VariableValue records from target database for documents that were migrated. Uses the document mapping CSV to identify which target DocumentIDs to delete. """ import json import logging import csv import sys from datetime import datetime from db_utils import DatabaseConnection class FileDataRollback: def __init__(self, config_path='config.json', mapping_csv_path=None): """Initialize the rollback tool with configuration and mapping file.""" with open(config_path, 'r') as f: self.config = json.load(f) self.target_conn = None self.mapping_csv_path = mapping_csv_path self.target_document_ids = [] # Setup logging self.timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') log_filename = f"rollback_filedata_{self.timestamp}.log" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_filename), logging.StreamHandler() ] ) self.logger = logging.getLogger(__name__) def connect_database(self): """Establish connection to target database.""" self.logger.info("Connecting to target database...") self.target_conn = DatabaseConnection(self.config['target_db']) self.logger.info("Target database connection established.") def load_document_mappings(self): """Load document mappings from CSV file and extract target DocumentIDs.""" if not self.mapping_csv_path: raise ValueError("No mapping CSV file path provided") self.logger.info(f"Loading document mappings from: {self.mapping_csv_path}") with open(self.mapping_csv_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) # Check if required columns exist fieldnames = reader.fieldnames if 'TargetDocID' not in fieldnames: raise ValueError(f"CSV must contain 'TargetDocID' column. Found: {fieldnames}") for row in reader: target_id = row['TargetDocID'] if target_id and target_id != '': try: self.target_document_ids.append(int(target_id)) except ValueError: self.logger.warning(f"Skipping invalid TargetID: {target_id}") self.logger.info(f"Loaded {len(self.target_document_ids)} target DocumentIDs from mapping file") def preview_deletion(self): """Preview how many records will be deleted.""" self.logger.info("=" * 50) self.logger.info("Previewing deletion impact...") self.logger.info("=" * 50) # Count records that will be deleted count_query = """ SELECT COUNT(*) as RecordCount FROM VariableValue WHERE DocumentID = ? AND ProjectID = 2 """ total_records = 0 documents_with_data = 0 for doc_id in self.target_document_ids[:10]: # Preview first 10 result = self.target_conn.execute_query(count_query, (doc_id,)) count = result[0]['RecordCount'] if result else 0 if count > 0: self.logger.info(f" DocumentID {doc_id}: {count} records") total_records += count documents_with_data += 1 if len(self.target_document_ids) > 10: self.logger.info(f" ... and {len(self.target_document_ids) - 10} more documents") # Get total count for all documents self.logger.info("") self.logger.info("Calculating total impact across all documents...") # Count in batches to avoid SQL parameter limit (2100) batch_size = 2000 total_count = 0 for i in range(0, len(self.target_document_ids), batch_size): batch = self.target_document_ids[i:i + batch_size] placeholders = ','.join(['?'] * len(batch)) batch_query = f""" SELECT COUNT(*) as BatchCount FROM VariableValue WHERE DocumentID IN ({placeholders}) AND ProjectID = 2 """ result = self.target_conn.execute_query(batch_query, tuple(batch)) batch_count = result[0]['BatchCount'] if result else 0 total_count += batch_count self.logger.info("=" * 50) self.logger.info(f"TOTAL RECORDS TO BE DELETED: {total_count}") self.logger.info(f"Across {len(self.target_document_ids)} mapped documents") self.logger.info("=" * 50) return total_count def rollback_variable_values(self): """Delete VariableValue records for all mapped target DocumentIDs.""" self.logger.info("=" * 50) self.logger.info("Starting Rollback of File Data") self.logger.info("=" * 50) # Get user confirmation total_to_delete = self.preview_deletion() if total_to_delete == 0: self.logger.info("No records found to delete. Exiting.") return self.logger.info("") try: response = input(f"Are you sure you want to DELETE {total_to_delete} records? (yes/no): ").strip().lower() except EOFError: # Non-interactive mode self.logger.error("Cannot proceed without confirmation in non-interactive mode") return if response != 'yes': self.logger.info("Rollback cancelled by user") return # Perform deletion delete_query = """ DELETE FROM VariableValue WHERE DocumentID = ? AND ProjectID = 2 """ total_deleted = 0 batch_size = 100 commit_interval = 10 # Commit every 10 batches self.logger.info(f"Processing {len(self.target_document_ids)} documents in batches of {batch_size}...") try: for i in range(0, len(self.target_document_ids), batch_size): batch = self.target_document_ids[i:i + batch_size] batch_num = (i // batch_size) + 1 total_batches = (len(self.target_document_ids) + batch_size - 1) // batch_size batch_deleted = 0 for doc_id in batch: try: cursor = self.target_conn.connection.cursor() cursor.execute(delete_query, (doc_id,)) rows_deleted = cursor.rowcount batch_deleted += rows_deleted cursor.close() except Exception as e: self.logger.error(f"Failed to delete records for DocumentID {doc_id}: {e}") total_deleted += batch_deleted self.logger.info(f"Batch {batch_num}/{total_batches}: Deleted {batch_deleted} records") # Commit every N batches if batch_num % commit_interval == 0: self.target_conn.commit() self.logger.info(f"[COMMIT] Transaction committed at batch {batch_num}") # Final commit self.target_conn.commit() self.logger.info("[COMMIT] Final transaction committed") self.logger.info("=" * 50) self.logger.info(f"Rollback Complete!") self.logger.info(f"Total records deleted: {total_deleted}") self.logger.info("=" * 50) except Exception as e: self.logger.error(f"Rollback failed: {e}") self.logger.error("Attempting to rollback transaction...") self.target_conn.connection.rollback() raise def run(self): """Execute the rollback process.""" try: self.logger.info("=" * 50) self.logger.info("File Data Rollback Tool") self.logger.info("=" * 50) self.connect_database() self.load_document_mappings() self.rollback_variable_values() self.logger.info("=" * 50) self.logger.info("Rollback process completed successfully!") self.logger.info("=" * 50) except Exception as e: self.logger.error(f"Rollback process failed: {e}", exc_info=True) raise finally: if self.target_conn: self.target_conn.close() if __name__ == "__main__": # Check for CSV file argument if len(sys.argv) < 2: print("Usage: python rollback_filedata.py ") print("Example: python rollback_filedata.py mapping_documents_filedata_20251208_075312.csv") sys.exit(1) mapping_csv = sys.argv[1] rollback = FileDataRollback(config_path='config.json', mapping_csv_path=mapping_csv) rollback.run()