Files
pdm/db_utils.py

258 lines
8.2 KiB
Python

"""
Database connection utilities for SQL Server.
"""
import pyodbc
import logging
class DatabaseConnection:
"""Wrapper class for SQL Server database connections."""
def __init__(self, config):
"""
Initialize database connection.
Args:
config (dict): Database configuration with keys:
- driver: ODBC driver name
- server: Server name/address
- database: Database name
- username: Username (if not using trusted connection)
- password: Password (if not using trusted connection)
- trusted_connection: Boolean for Windows authentication
"""
self.config = config
self.connection = None
self.logger = logging.getLogger(__name__)
self._connect()
def _connect(self):
"""Establish database connection."""
try:
if self.config.get('trusted_connection', False):
# Windows Authentication
connection_string = (
f"DRIVER={self.config['driver']};"
f"SERVER={self.config['server']};"
f"DATABASE={self.config['database']};"
f"Trusted_Connection=yes;"
)
else:
# SQL Server Authentication
connection_string = (
f"DRIVER={self.config['driver']};"
f"SERVER={self.config['server']};"
f"DATABASE={self.config['database']};"
f"UID={self.config['username']};"
f"PWD={self.config['password']};"
)
self.connection = pyodbc.connect(connection_string)
self.connection.autocommit = False # Use transactions
self.logger.info(f"Connected to {self.config['database']} on {self.config['server']}")
except pyodbc.Error as e:
self.logger.error(f"Database connection failed: {e}")
raise
def execute_query(self, query, params=None):
"""
Execute a SELECT query and return results as list of dictionaries.
Args:
query (str): SQL SELECT query
params (tuple): Query parameters
Returns:
list: List of dictionaries representing rows
"""
try:
cursor = self.connection.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
# Get column names
columns = [column[0] for column in cursor.description]
# Fetch all rows and convert to dictionaries
results = []
for row in cursor.fetchall():
results.append(dict(zip(columns, row)))
cursor.close()
return results
except pyodbc.Error as e:
self.logger.error(f"Query execution failed: {e}")
self.logger.error(f"Query: {query}")
raise
def execute_non_query(self, query, params=None):
"""
Execute an INSERT, UPDATE, or DELETE query.
Args:
query (str): SQL query
params (tuple): Query parameters
Returns:
int: Number of affected rows
"""
try:
cursor = self.connection.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
affected_rows = cursor.rowcount
self.connection.commit()
cursor.close()
return affected_rows
except pyodbc.Error as e:
self.logger.error(f"Non-query execution failed: {e}")
self.logger.error(f"Query: {query}")
self.connection.rollback()
raise
def execute_many(self, query, params_list):
"""
Execute a query multiple times with different parameters (batch insert).
Args:
query (str): SQL query
params_list (list): List of parameter tuples
Returns:
int: Number of affected rows
"""
try:
cursor = self.connection.cursor()
cursor.executemany(query, params_list)
affected_rows = cursor.rowcount
self.connection.commit()
cursor.close()
return affected_rows
except pyodbc.Error as e:
self.logger.error(f"Batch execution failed: {e}")
self.logger.error(f"Query: {query}")
self.connection.rollback()
raise
def execute_many_with_duplicate_handling(self, query, params_list, duplicate_handling='ignore'):
"""
Execute batch insert with duplicate key handling.
Args:
query (str): SQL INSERT query
params_list (list): List of parameter tuples
duplicate_handling (str): 'ignore' or 'update'
Returns:
dict: Statistics with 'inserted', 'updated', 'skipped', 'errors'
"""
stats = {'inserted': 0, 'updated': 0, 'skipped': 0, 'errors': 0}
if not params_list:
return stats
try:
# Try batch insert first
cursor = self.connection.cursor()
cursor.executemany(query, params_list)
stats['inserted'] = cursor.rowcount
cursor.close()
return stats
except pyodbc.IntegrityError as e:
# Batch failed due to duplicate keys, fall back to individual inserts
self.logger.warning(f"Batch insert encountered duplicates, falling back to individual inserts")
for params in params_list:
try:
cursor = self.connection.cursor()
cursor.execute(query, params)
stats['inserted'] += cursor.rowcount
cursor.close()
except pyodbc.IntegrityError:
# Duplicate key violation
if duplicate_handling == 'ignore':
stats['skipped'] += 1
self.logger.debug(f"Skipped duplicate record")
elif duplicate_handling == 'update':
# For update, we need to construct an UPDATE or MERGE statement
# This is handled in the migration logic
stats['skipped'] += 1
self.logger.debug(f"Duplicate record found (update not implemented in this method)")
except pyodbc.Error as e:
stats['errors'] += 1
self.logger.error(f"Error inserting record: {e}")
except pyodbc.Error as e:
self.logger.error(f"Unexpected error during batch insert: {e}")
raise
return stats
def execute_non_query_no_commit(self, query, params=None):
"""
Execute an INSERT, UPDATE, or DELETE query without committing.
Useful for manual transaction control.
Args:
query (str): SQL query
params (tuple): Query parameters
Returns:
int: Number of affected rows
"""
try:
cursor = self.connection.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
affected_rows = cursor.rowcount
cursor.close()
return affected_rows
except pyodbc.Error as e:
self.logger.error(f"Non-query execution failed: {e}")
self.logger.error(f"Query: {query}")
raise
def commit(self):
"""Commit current transaction."""
self.connection.commit()
def rollback(self):
"""Rollback current transaction."""
self.connection.rollback()
def close(self):
"""Close database connection."""
if self.connection:
self.connection.close()
self.logger.info(f"Connection to {self.config['database']} closed")
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
if exc_type:
self.rollback()
self.close()