""" Batch Workflow Processing for PDM Vault (Path-based) ===================================================== This module provides a framework for: 1. Logging into a PDM vault via API 2. Processing a list of files (by full vault path) through a specified workflow transition Usage: python batch_workflows_paths.py --vault "MyVault" --csv "files.csv" --transition "citadel_set_production released" """ import logging import argparse import getpass import time from datetime import datetime from pathlib import Path from typing import List, Optional, Dict, Any import ctypes import pythoncom import win32com.client import comtypes import comtypes.automation from comtypes import COMMETHOD, GUID, HRESULT from comtypes.automation import IDispatch as _CT_IDispatch # PDM API Type Library Constants EdmObject_File = 1 EdmObject_Folder = 2 EdmObject_Workflow = 6 # --------------------------------------------------------------------------- # comtypes interface for IEdmFile13 — enables vtable call to ChangeState3. # # Confirmed from gen_py stubs (IEdmFile13.py): # IID : {DB0646C9-9E3F-4EA2-93AA-EB6584D268E2} # oVft : 432 → slot 54 (IUnknown[0-2] + IDispatch[3-6] + 47 placeholders[7-53]) # --------------------------------------------------------------------------- _IEdmFile13_phs = [COMMETHOD([], HRESULT, f"_ph{i}") for i in range(47)] _VARIANT_p = ctypes.POINTER(comtypes.automation.VARIANT) class _IEdmFile13_CT(_CT_IDispatch): _iid_ = GUID("{DB0646C9-9E3F-4EA2-93AA-EB6584D268E2}") _idlflags_ = ["dual", "oleautomation"] _methods_ = _IEdmFile13_phs + [ COMMETHOD( [], HRESULT, "ChangeState3", (["in"], _VARIANT_p, "poStateIdOrName"), (["in"], _VARIANT_p, "poTransitionIdOrName"), (["in"], ctypes.c_long, "lFolderID"), (["in"], ctypes.c_wchar_p, "bsComment"), (["in"], ctypes.c_long, "lParentWnd"), (["in"], ctypes.c_long, "lEdmStateFlags"), (["in"], ctypes.c_wchar_p, "bsPasswd"), ), ] def _make_i4_variant(val: int) -> comtypes.automation.VARIANT: """Return a VARIANT with vt=VT_I4 containing the given integer.""" v = comtypes.automation.VARIANT() v.vt = 3 # VT_I4 ctypes.cast(ctypes.byref(v, 8), ctypes.POINTER(ctypes.c_int))[0] = int(val) return v class _Phase2AVError(OSError): """Raised when ChangeState3 crashes with an access violation after all retries. Distinct from generic OSError so batch_transition can count consecutive Phase-2 AV failures and trigger a vault reconnect to reset PDM's in-process state after repeated corruption. """ # Escalating backoff for Phase-2 access violations (seconds between attempts). # PDM's in-process DLL can take progressively longer to clean up after state # corruption accumulates; a longer final sleep gives it a real chance to settle. _PHASE2_BACKOFF = (3, 10, 30) def _changestate3(vault_obj, file_id: int, to_state_id: int, transition_id: int, folder_id: int, comment: str, password: str) -> None: """ Call IEdmFile13::ChangeState3 via comtypes vtable to transition a file using a *specific* transition ID, bypassing the ambiguous ChangeState. Uses the primary win32com vault for GetObject so that the returned COM proxy is in the primary connection's context. Bridging to comtypes is done by reading the IEdmFile13* stored inside the pythoncom PyIBase wrapper at CPython object offset 16, then calling QueryInterface to get an AddRef'd comtypes pointer. Reads from _oleobj_ directly (not from a secondary QI(IID_IUnknown) result) because for aggregated COM objects the controlling IUnknown can be at a different address with a shorter lifetime than the IEdmFile13* itself. Retries up to 3 additional times on Phase-1 access-violation or misaligned-pointer failures, and on Phase-2 access violations inside ChangeState3 itself (with escalating backoff of 3s, 10s, 30s between attempts to give PDM's in-process DLL time to clean up corrupted state). If all Phase-2 attempts fail, raises _Phase2AVError so batch_transition can trigger a vault reconnect after repeated failures. """ logger = logging.getLogger("batch_workflows_paths") max_attempts = 1 + len(_PHASE2_BACKOFF) # initial + 3 retries for attempt in range(max_attempts): if attempt > 0: logger.debug(f" [CS3] Retry {attempt} for file ID {file_id}") file_obj = None try: # Fresh COM wrapper each attempt — primary vault, no competing refs. file_obj = win32com.client.CastTo( vault_obj.GetObject(EdmObject_File, file_id), 'IEdmFile13' ) # CastTo('IEdmFile13') calls QI(IID_IEdmFile13) so _oleobj_ already # holds the IEdmFile13* directly. Read it at offset 16 in the # CPython object struct (ob_refcnt[8] + ob_type[8] + m_pUnknown[8]). py_disp = file_obj._oleobj_ raw_ptr = ctypes.c_uint64.from_address(id(py_disp) + 16).value logger.debug(f" [CS3] raw_ptr={raw_ptr:#018x} (& 7 == {raw_ptr & 7})") if not raw_ptr or (raw_ptr & 0x7) != 0: raise RuntimeError(f"Misaligned IEdmFile13* at offset 16: {raw_ptr:#x}") ct_unk = ctypes.cast(raw_ptr, ctypes.POINTER(comtypes.IUnknown)) file13 = ct_unk.QueryInterface(_IEdmFile13_CT) # AddRefs independently logger.debug(f" [CS3] QI OK (attempt {attempt})") except (OSError, RuntimeError) as exc: if file_obj is not None: del file_obj is_retryable = ( (isinstance(exc, OSError) and 'access violation' in str(exc).lower()) or isinstance(exc, RuntimeError) ) if is_retryable and attempt < max_attempts - 1: sleep_s = _PHASE2_BACKOFF[attempt] logger.debug( f" [CS3] Phase-1 failure ({exc}); sleeping {sleep_s}s then retrying" ) time.sleep(sleep_s) continue raise # Release win32com wrapper — file13 holds its own AddRef'd reference. del file_obj # --- Phase 2: call ChangeState3 --- v_state = _make_i4_variant(to_state_id) v_trans = _make_i4_variant(transition_id) try: file13.ChangeState3( ctypes.byref(v_state), ctypes.byref(v_trans), ctypes.c_long(folder_id), comment, ctypes.c_long(0), ctypes.c_long(0), password, ) return # success except OSError as exc: # Access violation inside ChangeState3 (PDM in-process DLL crashes while # accessing internal state left over from a recent transition). An # escalating sleep (3s, 10s, 30s) lets PDM's post-transition cleanup # finish, then we retry with a fresh COM wrapper. After all retries # are exhausted, raise _Phase2AVError so batch_transition can count # consecutive failures and reconnect the vault. if 'access violation' in str(exc).lower(): if attempt < max_attempts - 1: sleep_s = _PHASE2_BACKOFF[attempt] logger.debug( f" [CS3] Phase-2 access violation ({exc}); " f"sleeping {sleep_s}s for PDM cleanup then retrying" ) time.sleep(sleep_s) continue raise _Phase2AVError( f"ChangeState3 access violation after {max_attempts} attempts: {exc}" ) from exc raise # ============================================================================= # CONFIGURATION - Can be overridden via command line # ============================================================================= VAULT_NAME = "IDSVault" # Default vault name DEFAULT_TRANSITION = "Citadel_mig_Set Proto Released" # Default transition name # ============================================================================= # LOGGING SETUP # ============================================================================= def setup_logging(log_file: Optional[str] = None) -> logging.Logger: """Configure logging for the batch process.""" if log_file is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") log_file = f"batch_workflow_paths_{timestamp}.log" logger = logging.getLogger("batch_workflows_paths") logger.setLevel(logging.DEBUG) # File handler fh = logging.FileHandler(log_file) fh.setLevel(logging.DEBUG) # Console handler ch = logging.StreamHandler() ch.setLevel(logging.INFO) # Formatter formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') fh.setFormatter(formatter) ch.setFormatter(formatter) logger.addHandler(fh) logger.addHandler(ch) return logger # ============================================================================= # PDM VAULT CONNECTION # ============================================================================= class PDMVaultConnection: """Handles connection and authentication to the PDM vault.""" def __init__(self, vault_name: str): self.vault_name = vault_name self.vault = None self.is_connected = False self.logger = logging.getLogger("batch_workflows_paths") self._username = None self._password = None def connect(self, username: str, password: str) -> bool: """ Connect and log into the PDM vault with username/password. Args: username: PDM username password: PDM password Returns: True if connection successful, False otherwise """ try: self.vault = win32com.client.Dispatch("ConisioLib.EdmVault") # Login with credentials self.vault.Login(username, password, self.vault_name) self.is_connected = True self._username = username self._password = password self.logger.info(f"Successfully connected to vault: {self.vault_name}") self.logger.info(f"Logged in as: {username}") return True except Exception as e: self.logger.error(f"Failed to connect to vault '{self.vault_name}': {e}") self.is_connected = False return False def disconnect(self) -> None: """Disconnect from the PDM vault.""" if self.vault is not None: try: self.vault = None self.is_connected = False self.logger.info("Disconnected from vault") except Exception as e: self.logger.warning(f"Error during disconnect: {e}") def reconnect(self) -> bool: """Force a full disconnect + re-login using the stored credentials. Used to reset PDM's in-process DLL state after repeated Phase-2 access violations indicate the vault connection's internal data structures have been corrupted. Returns True if the re-login succeeded. """ if self._username is None or self._password is None: self.logger.error("Cannot reconnect: no stored credentials") return False username, password = self._username, self._password self.logger.info("Reconnecting vault to reset PDM internal state...") self.disconnect() # Give the in-process DLL a moment to release any lingering state. time.sleep(2) return self.connect(username, password) def get_file(self, file_path: str) -> Dict[str, Any]: """ Get a file object from the vault by full path. Args: file_path: Full path to the file in the vault Returns: Dict with 'file_obj', 'folder_obj', 'path' or None if not found """ if not self.is_connected: self.logger.error("Not connected to vault") return None try: folder_path = str(Path(file_path).parent) folder_obj = self.vault.GetFolderFromPath(folder_path) if folder_obj is None: self.logger.warning(f"Folder not found: {folder_path}") return None # GetFileFromPath returns (file_obj, file_id) tuple result = self.vault.GetFileFromPath(file_path, folder_obj) # Handle tuple return value if isinstance(result, tuple): file_obj = result[0] else: file_obj = result if file_obj is None: return None # Re-fetch via GetObject, then cast to IEdmFile13 so CurrentState and # transition methods are accessible regardless of gen_py stub state. file_obj = self.vault.GetObject(EdmObject_File, file_obj.ID) file_obj = win32com.client.CastTo(file_obj, 'IEdmFile13') return { "file_obj": file_obj, "folder_obj": folder_obj, "path": file_path } except Exception as e: self.logger.error(f"Error getting file '{file_path}': {e}") return None def __enter__(self): """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.disconnect() # ============================================================================= # FILE LIST HANDLING # ============================================================================= def load_file_list_from_csv(csv_path: str) -> List[str]: """ Load list of file paths from a CSV file (single column, no header). Args: csv_path: Path to the CSV file with one file path per line Returns: List of file paths """ logger = logging.getLogger("batch_workflows_paths") file_paths = [] try: with open(csv_path, 'r', encoding='utf-8-sig') as f: for line_num, line in enumerate(f, start=1): file_path = line.strip() if file_path: # Skip empty lines file_paths.append(file_path) logger.info(f"Loaded {len(file_paths)} file paths from {csv_path}") except FileNotFoundError: logger.error(f"CSV file not found: {csv_path}") except Exception as e: logger.error(f"Error reading CSV file: {e}") return file_paths def validate_files( vault: PDMVaultConnection, file_paths: List[str] ) -> Dict[str, List]: """ Validate that files exist in the vault by their full paths. Args: vault: Active vault connection file_paths: List of full vault paths to validate Returns: Dict with: - 'valid': list of file info dicts (ready for transition) - 'not_found': list of paths not found in vault """ logger = logging.getLogger("batch_workflows_paths") results = {"valid": [], "not_found": []} total = len(file_paths) for i, file_path in enumerate(file_paths, 1): logger.info(f"[{i}/{total}] Validating: {file_path}") file_info = vault.get_file(file_path) if file_info is None: results["not_found"].append(file_path) logger.warning(f" NOT FOUND: {file_path}") else: # Read state now, then release the COM wrappers immediately. # Keeping file_obj alive across the full validation pass leaves # stale COM proxies in memory. After any file is transitioned, # PDM's server may invalidate proxies for other files, causing # access violations later. transition_file() fetches its own # fresh wrapper just before it needs it. try: current_state = file_info["file_obj"].CurrentState state_name = current_state.Name if current_state else "Unknown" except Exception as e: logger.warning(f" Could not get state: {e}") state_name = "Unknown" results["valid"].append({"path": file_path, "current_state": state_name}) logger.info(f" FOUND (State: {state_name})") # file_info (and its file_obj / folder_obj) goes out of scope here return results # ============================================================================= # WORKFLOW PROCESSING # ============================================================================= def get_available_transitions(vault: PDMVaultConnection, file_obj) -> List[Dict[str, Any]]: """ Get list of available transitions for a file in its current state. Args: vault: Active vault connection file_obj: IEdmFile object Returns: List of transition info dicts with 'name' and 'id' """ logger = logging.getLogger("batch_workflows_paths") transitions = [] try: current_state = file_obj.CurrentState if current_state is None: return transitions trans_pos = current_state.GetFirstTransitionPosition() while not trans_pos.IsNull: transition = current_state.GetNextTransition(trans_pos) transitions.append({ "name": transition.Name, "id": transition.ID, "target_state": transition.ToState.Name if transition.ToState else "Unknown" }) except Exception as e: logger.error(f"Error getting transitions: {e}") return transitions def transition_file( vault: PDMVaultConnection, file_info: Dict[str, Any], transition_name: str, comment: str = "" ) -> str: """ Transition a single file using a named transition. Args: vault: Active vault connection file_info: Dict containing 'path' (and optionally 'current_state' for logging) transition_name: Name of the transition to execute comment: Optional transition comment Returns: One of "success", "not_available", or "failed". - "success": transition completed and state verified - "not_available": named transition is not valid from the file's current state (typically means the file is already in the target state from a prior run — not a real failure, just a no-op) - "failed": real failure (access violation, missing file, state unchanged after call, etc.) — worth retrying """ logger = logging.getLogger("batch_workflows_paths") file_path = file_info["path"] try: # Fetch a fresh COM wrapper right now — not from validation. # By the time this file is processed, earlier transitions may have # caused PDM to invalidate COM proxies obtained during the validation # pass. A fresh GetObject/CastTo gives a clean proxy every time. fresh = vault.get_file(file_path) if fresh is None: logger.error(f"File no longer accessible in vault: {file_path}") return "failed" file_obj = fresh["file_obj"] folder_obj = fresh["folder_obj"] # Get current state and find the transition current_state = file_obj.CurrentState if current_state is None: logger.error(f"File has no workflow state: {file_path}") return "failed" # Find the transition by name target_transition = None trans_pos = current_state.GetFirstTransitionPosition() while not trans_pos.IsNull: transition = current_state.GetNextTransition(trans_pos) if transition.Name.lower() == transition_name.lower(): target_transition = transition break if target_transition is None: available = get_available_transitions(vault, file_obj) available_names = [t["name"] for t in available] logger.error( f"Transition '{transition_name}' not available for {file_path}. " f"Current state: {current_state.Name}. " f"Available transitions: {available_names}" ) return "not_available" # Collect everything we need from the COM objects, then release them # before calling _changestate3. _changestate3 fetches its own wrapper # internally, so having the lookup wrapper alive simultaneously would # create competing COM references and corrupt PDM's internal state. old_state_name = current_state.Name expected_state_name = target_transition.ToState.Name to_state_id = target_transition.ToState.ID trans_id = target_transition.ID file_id = file_obj.ID folder_id = folder_obj.ID logger.info( f" Transition: '{target_transition.Name}' (ID: {trans_id}) | " f"ToState: '{expected_state_name}' (ID: {to_state_id}) | " f"Folder ID: {folder_id} | File ID: {file_id}" ) # ↓ Release all COM wrappers from the lookup phase before the vtable call del file_obj, folder_obj, fresh, current_state, target_transition, trans_pos try: _changestate3( vault.vault, file_id, to_state_id, trans_id, folder_id, comment, vault._password or "", ) except _Phase2AVError as exc: # Persistent access violation — flag for batch_transition to count # against the consecutive-failure threshold for reconnect. vault._last_was_phase2_av = True logger.error(f"Failed to transition {file_path}: {exc}") return "failed" else: vault._last_was_phase2_av = False # Verify the state actually changed — re-fetch and cast to IEdmFile13 fresh_file = win32com.client.CastTo( vault.vault.GetObject(EdmObject_File, file_id), 'IEdmFile13' ) new_state = fresh_file.CurrentState actual_state_name = new_state.Name if new_state else "Unknown" if actual_state_name.lower() == expected_state_name.lower(): logger.info( f"SUCCESS: {file_path} | " f"{old_state_name} -> {actual_state_name}" ) return "success" else: logger.error( f"FAILED (state unchanged): {file_path} | " f"Expected: {expected_state_name}, Actual: {actual_state_name}" ) return "failed" except Exception as e: logger.error(f"Failed to transition {file_path}: {e}") return "failed" def batch_transition( vault: PDMVaultConnection, file_list: List[Dict[str, Any]], transition_name: str, comment: str = "" ) -> Dict[str, List[str]]: """ Transition multiple files using a named transition. Args: vault: Active vault connection file_list: List of file info dicts (from validate_files) transition_name: Name of the transition to execute comment: Optional transition comment Returns: Dict with 'success', 'failed', and 'not_available' file lists. - 'success': transition completed - 'failed': real failure worth retrying (access violation, etc.) - 'not_available': transition not valid from current state — typically means the file is already in the target state from a prior run """ logger = logging.getLogger("batch_workflows_paths") results = {"success": [], "failed": [], "not_available": []} total = len(file_list) logger.info(f"Starting batch transition: '{transition_name}' for {total} files") logger.info("=" * 60) # Consecutive Phase-2 access-violation counter. When it hits the threshold, # the vault connection's in-process state is likely corrupted past the # point where sleeping will help, so force a full disconnect + re-login. consecutive_phase2_av = 0 PHASE2_AV_RECONNECT_THRESHOLD = 3 for i, file_info in enumerate(file_list, 1): file_path = file_info["path"] logger.info(f"[{i}/{total}] Processing: {file_path}") vault._last_was_phase2_av = False status = transition_file(vault, file_info, transition_name, comment) if status == "success": results["success"].append(file_path) consecutive_phase2_av = 0 elif status == "not_available": results["not_available"].append(file_path) consecutive_phase2_av = 0 else: # "failed" results["failed"].append(file_path) if getattr(vault, "_last_was_phase2_av", False): consecutive_phase2_av += 1 logger.warning( f" Phase-2 AV streak: {consecutive_phase2_av}/" f"{PHASE2_AV_RECONNECT_THRESHOLD}" ) if consecutive_phase2_av >= PHASE2_AV_RECONNECT_THRESHOLD: logger.warning( f"{consecutive_phase2_av} consecutive Phase-2 access " "violations — forcing vault reconnect" ) if vault.reconnect(): logger.info("Vault reconnected successfully") else: logger.error( "Vault reconnect failed — aborting remaining batch" ) break consecutive_phase2_av = 0 else: consecutive_phase2_av = 0 return results # ============================================================================= # COMMAND LINE INTERFACE # ============================================================================= def parse_arguments(): """Parse command line arguments.""" parser = argparse.ArgumentParser( description="Batch workflow transitions for SolidWorks PDM Professional (path-based)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python batch_workflows_paths.py --vault "MyVault" --csv "files.csv" --transition "citadel_set_production released" python batch_workflows_paths.py -v "MyVault" -c "wip_files.csv" -t "citadel_set_wip" CSV format (one full vault path per line): C:\\IDSVault\\Parts\\widget.sldprt C:\\IDSVault\\Parts\\bracket.sldprt C:\\IDSVault\\Assemblies\\main_assy.sldasm """ ) parser.add_argument( "-v", "--vault", default=VAULT_NAME, help=f"PDM vault name (default: {VAULT_NAME})" ) parser.add_argument( "-c", "--csv", required=True, help="Path to CSV file containing full vault paths" ) parser.add_argument( "-t", "--transition", default=DEFAULT_TRANSITION, help=f"Workflow transition name to execute (default: {DEFAULT_TRANSITION})" ) parser.add_argument( "--comment", default="Batch workflow transition", help="Comment for the workflow transition" ) parser.add_argument( "-u", "--username", help="PDM username (will prompt if not provided)" ) parser.add_argument( "--log-file", help="Custom log file path (default: auto-generated with timestamp)" ) return parser.parse_args() # ============================================================================= # MAIN EXECUTION # ============================================================================= def main(): """Main entry point for batch workflow processing.""" # Parse arguments args = parse_arguments() # Setup logging logger = setup_logging(args.log_file) logger.info("=" * 60) logger.info("PDM BATCH WORKFLOW PROCESSOR (PATH-BASED)") logger.info("=" * 60) logger.info(f"Vault: {args.vault}") logger.info(f"CSV File: {args.csv}") logger.info(f"Transition: {args.transition}") logger.info(f"Comment: {args.comment}") # Get credentials username = args.username if not username: username = input("PDM Username: ") password = getpass.getpass("PDM Password: ") # Load files from CSV file_paths = load_file_list_from_csv(args.csv) if not file_paths: logger.error("No files loaded from CSV. Exiting.") return 1 logger.info(f"Loaded {len(file_paths)} file paths from CSV") # Connect to vault vault = PDMVaultConnection(args.vault) if not vault.connect(username, password): logger.error("Failed to connect to vault. Exiting.") return 1 try: # Validate files exist in vault logger.info("Validating files in vault...") logger.info("=" * 60) validation = validate_files(vault, file_paths) valid_count = len(validation["valid"]) not_found_count = len(validation["not_found"]) # Summary logger.info("=" * 60) logger.info("VALIDATION SUMMARY") logger.info("=" * 60) logger.info(f"Files ready to process: {valid_count}") logger.info(f"Files not found: {not_found_count}") # Report not found if not_found_count > 0: logger.warning("\nFiles not found in vault:") for f in validation["not_found"]: logger.warning(f" - {f}") # Save not found list not_found_file = f"not_found_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" with open(not_found_file, 'w') as f: for path in validation["not_found"]: f.write(path + "\n") logger.info(f"Not found list saved to: {not_found_file}") if valid_count == 0: logger.error("No valid files to process. Exiting.") return 1 # Execute batch transition results = batch_transition( vault, validation["valid"], args.transition, comment=args.comment ) # Final report logger.info("=" * 60) logger.info("BATCH PROCESS COMPLETE") logger.info("=" * 60) logger.info(f"Total files processed: {valid_count}") logger.info(f"Successful transitions: {len(results['success'])}") logger.info(f"Failed transitions: {len(results['failed'])}") logger.info( f"Transition not available (likely already in target state): " f"{len(results['not_available'])}" ) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') if results["failed"]: logger.warning("\nFailed files (real failures — retry these):") for f in results["failed"]: logger.warning(f" - {f}") failed_file = f"failed_transitions_{timestamp}.txt" with open(failed_file, 'w') as f: for path in results["failed"]: f.write(path + "\n") logger.info(f"Failed file list saved to: {failed_file}") if results["not_available"]: not_avail_file = f"not_available_{timestamp}.txt" with open(not_avail_file, 'w') as f: for path in results["not_available"]: f.write(path + "\n") logger.info( f"Not-available file list saved to: {not_avail_file} " f"(likely already in target state — not retried)" ) return 0 if not results["failed"] else 1 finally: vault.disconnect() if __name__ == "__main__": exit(main())