""" Database connection utilities for SQL Server. """ import pyodbc import logging class DatabaseConnection: """Wrapper class for SQL Server database connections.""" def __init__(self, config): """ Initialize database connection. Args: config (dict): Database configuration with keys: - driver: ODBC driver name - server: Server name/address - database: Database name - username: Username (if not using trusted connection) - password: Password (if not using trusted connection) - trusted_connection: Boolean for Windows authentication """ self.config = config self.connection = None self.logger = logging.getLogger(__name__) self._connect() def _connect(self): """Establish database connection.""" try: if self.config.get('trusted_connection', False): # Windows Authentication connection_string = ( f"DRIVER={self.config['driver']};" f"SERVER={self.config['server']};" f"DATABASE={self.config['database']};" f"Trusted_Connection=yes;" ) else: # SQL Server Authentication connection_string = ( f"DRIVER={self.config['driver']};" f"SERVER={self.config['server']};" f"DATABASE={self.config['database']};" f"UID={self.config['username']};" f"PWD={self.config['password']};" ) self.connection = pyodbc.connect(connection_string) self.connection.autocommit = False # Use transactions self.logger.info(f"Connected to {self.config['database']} on {self.config['server']}") except pyodbc.Error as e: self.logger.error(f"Database connection failed: {e}") raise def execute_query(self, query, params=None): """ Execute a SELECT query and return results as list of dictionaries. Args: query (str): SQL SELECT query params (tuple): Query parameters Returns: list: List of dictionaries representing rows """ try: cursor = self.connection.cursor() if params: cursor.execute(query, params) else: cursor.execute(query) # Get column names columns = [column[0] for column in cursor.description] # Fetch all rows and convert to dictionaries results = [] for row in cursor.fetchall(): results.append(dict(zip(columns, row))) cursor.close() return results except pyodbc.Error as e: self.logger.error(f"Query execution failed: {e}") self.logger.error(f"Query: {query}") raise def execute_non_query(self, query, params=None): """ Execute an INSERT, UPDATE, or DELETE query. Args: query (str): SQL query params (tuple): Query parameters Returns: int: Number of affected rows """ try: cursor = self.connection.cursor() if params: cursor.execute(query, params) else: cursor.execute(query) affected_rows = cursor.rowcount self.connection.commit() cursor.close() return affected_rows except pyodbc.Error as e: self.logger.error(f"Non-query execution failed: {e}") self.logger.error(f"Query: {query}") self.connection.rollback() raise def execute_many(self, query, params_list): """ Execute a query multiple times with different parameters (batch insert). Args: query (str): SQL query params_list (list): List of parameter tuples Returns: int: Number of affected rows """ try: cursor = self.connection.cursor() cursor.executemany(query, params_list) affected_rows = cursor.rowcount self.connection.commit() cursor.close() return affected_rows except pyodbc.Error as e: self.logger.error(f"Batch execution failed: {e}") self.logger.error(f"Query: {query}") self.connection.rollback() raise def execute_many_with_duplicate_handling(self, query, params_list, duplicate_handling='ignore'): """ Execute batch insert with duplicate key handling. Args: query (str): SQL INSERT query params_list (list): List of parameter tuples duplicate_handling (str): 'ignore' or 'update' Returns: dict: Statistics with 'inserted', 'updated', 'skipped', 'errors' """ stats = {'inserted': 0, 'updated': 0, 'skipped': 0, 'errors': 0} if not params_list: return stats try: # Try batch insert first cursor = self.connection.cursor() cursor.executemany(query, params_list) stats['inserted'] = cursor.rowcount cursor.close() return stats except pyodbc.IntegrityError as e: # Batch failed due to duplicate keys, fall back to individual inserts self.logger.warning(f"Batch insert encountered duplicates, falling back to individual inserts") for params in params_list: try: cursor = self.connection.cursor() cursor.execute(query, params) stats['inserted'] += cursor.rowcount cursor.close() except pyodbc.IntegrityError: # Duplicate key violation if duplicate_handling == 'ignore': stats['skipped'] += 1 self.logger.debug(f"Skipped duplicate record") elif duplicate_handling == 'update': # For update, we need to construct an UPDATE or MERGE statement # This is handled in the migration logic stats['skipped'] += 1 self.logger.debug(f"Duplicate record found (update not implemented in this method)") except pyodbc.Error as e: stats['errors'] += 1 self.logger.error(f"Error inserting record: {e}") except pyodbc.Error as e: self.logger.error(f"Unexpected error during batch insert: {e}") raise return stats def execute_non_query_no_commit(self, query, params=None): """ Execute an INSERT, UPDATE, or DELETE query without committing. Useful for manual transaction control. Args: query (str): SQL query params (tuple): Query parameters Returns: int: Number of affected rows """ try: cursor = self.connection.cursor() if params: cursor.execute(query, params) else: cursor.execute(query) affected_rows = cursor.rowcount cursor.close() return affected_rows except pyodbc.Error as e: self.logger.error(f"Non-query execution failed: {e}") self.logger.error(f"Query: {query}") raise def commit(self): """Commit current transaction.""" self.connection.commit() def rollback(self): """Rollback current transaction.""" self.connection.rollback() def close(self): """Close database connection.""" if self.connection: self.connection.close() self.logger.info(f"Connection to {self.config['database']} closed") def __enter__(self): """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" if exc_type: self.rollback() self.close()