""" Batch Copy Tree Export for PDM Vault ===================================== This module provides a framework for: 1. Logging into a PDM vault via API 2. Reading part numbers from a CSV file (no extensions) 3. Running the PDM Copy Tree function for each part 4. Exporting each part's file tree to its own subfolder on a local path Usage: python batch_copy_tree.py -c parts.csv -o "C:\\Temp\\Output" --vault "IDSVault" """ import logging import argparse import getpass import os import ctypes from datetime import datetime from pathlib import Path from typing import List, Optional, Dict, Any import win32com.client # PDM API Type Library Constants EdmObject_File = 1 EdmObject_Folder = 2 EdmSearch_FileName = 1 # Search by filename EdmGet_Simple = 1 # Simple get (latest version) # ============================================================================= # CONFIGURATION - Can be overridden via command line # ============================================================================= VAULT_NAME = "IDSVault" # Default vault name # ============================================================================= # LOGGING SETUP # ============================================================================= def setup_logging(log_file: Optional[str] = None) -> logging.Logger: """Configure logging for the batch process.""" if log_file is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") log_file = f"batch_copy_tree_{timestamp}.log" logger = logging.getLogger("batch_copy_tree") logger.setLevel(logging.DEBUG) # File handler fh = logging.FileHandler(log_file) fh.setLevel(logging.DEBUG) # Console handler ch = logging.StreamHandler() ch.setLevel(logging.INFO) # Formatter formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') fh.setFormatter(formatter) ch.setFormatter(formatter) logger.addHandler(fh) logger.addHandler(ch) return logger # ============================================================================= # PDM VAULT CONNECTION # ============================================================================= class PDMVaultConnection: """Handles connection and authentication to the PDM vault.""" def __init__(self, vault_name: str): self.vault_name = vault_name self.vault = None self.is_connected = False self.logger = logging.getLogger("batch_copy_tree") self._username = None self._password = None def connect(self, username: str, password: str) -> bool: """ Connect and log into the PDM vault with username/password. Args: username: PDM username password: PDM password Returns: True if connection successful, False otherwise """ try: # Create the vault interface self.vault = win32com.client.Dispatch("ConisioLib.EdmVault") # Login with credentials self.vault.Login(username, password, self.vault_name) self.is_connected = True self._username = username self._password = password self.logger.info(f"Successfully connected to vault: {self.vault_name}") self.logger.info(f"Logged in as: {username}") return True except Exception as e: self.logger.error(f"Failed to connect to vault '{self.vault_name}': {e}") self.is_connected = False return False def disconnect(self) -> None: """Disconnect from the PDM vault.""" if self.vault is not None: try: # Clear the vault reference self.vault = None self.is_connected = False self.logger.info("Disconnected from vault") except Exception as e: self.logger.warning(f"Error during disconnect: {e}") def search_file_by_name(self, filename: str) -> List[Dict[str, Any]]: """ Search for a file in the vault by filename (supports wildcards). Args: filename: The filename to search for (e.g., "part001.*" or "part001.sldprt") Returns: List of dicts with 'file_obj', 'path', 'folder_path', 'folder_obj' for each match """ if not self.is_connected: self.logger.error("Not connected to vault") return [] results = [] try: # Create search object search = self.vault.CreateSearch() # Set search filename search.FileName = filename self.logger.debug(f"Search pattern: '{filename}'") # Execute search search_result = search.GetFirstResult() while search_result is not None: try: file_id = search_result.ID folder_id = search_result.ParentFolderID # Get the file and folder objects file_obj = self.vault.GetObject(EdmObject_File, file_id) folder_obj = self.vault.GetObject(EdmObject_Folder, folder_id) if file_obj is not None and folder_obj is not None: folder_path = folder_obj.LocalPath # Use the actual filename from the file object actual_name = search_result.Name full_path = str(Path(folder_path) / actual_name) results.append({ "file_obj": file_obj, "file_id": file_id, "folder_id": folder_id, "path": full_path, "folder_path": folder_path, "folder_obj": folder_obj, "filename": actual_name }) except Exception as e: self.logger.warning(f"Error processing search result: {e}") search_result = search.GetNextResult() except Exception as e: self.logger.error(f"Error searching for '{filename}': {e}") self.logger.debug(f"Search for '{filename}' returned {len(results)} result(s)") return results def __enter__(self): """Context manager entry - note: call connect() separately with credentials.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.disconnect() # ============================================================================= # FILE LIST HANDLING # ============================================================================= def load_part_numbers_from_csv(csv_path: str) -> List[str]: """ Load list of part numbers from a CSV file (single column, no header). Args: csv_path: Path to the CSV file with one part number per line (no extensions) Returns: List of part numbers """ logger = logging.getLogger("batch_copy_tree") part_numbers = [] try: with open(csv_path, 'r', encoding='utf-8-sig') as f: for line_num, line in enumerate(f, start=1): part_number = line.strip() if part_number: # Skip empty lines part_numbers.append(part_number) logger.info(f"Loaded {len(part_numbers)} part numbers from {csv_path}") except FileNotFoundError: logger.error(f"CSV file not found: {csv_path}") except Exception as e: logger.error(f"Error reading CSV file: {e}") return part_numbers def search_and_resolve_parts( vault: PDMVaultConnection, part_numbers: List[str], extension: str ) -> Dict[str, List]: """ Search for part numbers in the vault and resolve each to a root file. Part numbers are searched with the user-specified extension (e.g., "PART001.SLDASM") since the CSV does not include file extensions. Args: vault: Active vault connection part_numbers: List of part numbers (no extensions) extension: File extension including the dot (e.g., ".SLDASM") Returns: Dict with: - 'valid': list of file info dicts (ready for copy tree) - 'not_found': list of part numbers not found in vault - 'ambiguous': list of dicts with part_number and all found paths """ logger = logging.getLogger("batch_copy_tree") results = {"valid": [], "not_found": [], "ambiguous": []} total = len(part_numbers) for i, part_number in enumerate(part_numbers, 1): logger.info(f"[{i}/{total}] Searching for: {part_number}{extension}") search_results = vault.search_file_by_name(f"{part_number}{extension}") if len(search_results) == 0: results["not_found"].append(part_number) logger.warning(f" NOT FOUND: {part_number}") else: match = search_results[0] logger.info(f" FOUND: {match['path']}") if len(search_results) > 1: logger.warning(f" Multiple matches found, using first result:") for r in search_results: logger.warning(f" - {r['path']}") results["valid"].append({ "part_number": part_number, "filename": match["filename"], "path": match["path"], "file_obj": match["file_obj"], "file_id": match["file_id"], "folder_id": match["folder_id"], "folder_obj": match["folder_obj"], }) return results # ============================================================================= # COPY TREE OPERATIONS # ============================================================================= def get_window_handle() -> int: """Get a window handle for PDM API calls. Returns console handle or 0.""" try: hwnd = ctypes.windll.kernel32.GetConsoleWindow() return hwnd if hwnd else 0 except Exception: return 0 def _collect_references(file_obj, folder_id: int, vault, logger) -> List[Dict[str, Any]]: """ Collect all referenced files from an assembly using IEdmReference5. Uses IEdmFile5.GetReferenceTree() to get the reference tree root, then recursively traverses all levels of the reference tree using GetFirstChildPosition/GetNextChild on each node. Args: file_obj: IEdmFile5 COM object (the root assembly) folder_id: Folder ID of the root file vault: The IEdmVault COM object logger: Logger instance Returns: List of dicts with 'file_id', 'folder_id', 'name' for each unique referenced file (including the root file itself) """ collected = {} # keyed by file ID to deduplicate # Add the root file itself root_name = file_obj.Name root_id = file_obj.ID collected[root_id] = { "file_id": root_id, "folder_id": folder_id, "name": root_name, } logger.debug(f" Root: {root_name}") def _traverse_children(ref_node, depth=0): """Recursively traverse all children of a reference node.""" indent = " " * (depth + 1) try: result = ref_node.GetFirstChildPosition("", True, True, 0) if isinstance(result, tuple): child_pos = result[0] else: child_pos = result while child_pos is not None: try: if hasattr(child_pos, 'IsNull') and child_pos.IsNull: break except Exception: pass try: child_ref = ref_node.GetNextChild(child_pos) if child_ref is None: break if isinstance(child_ref, tuple): child_ref = child_ref[0] child_file_id = child_ref.FileID child_folder_id = child_ref.FolderID child_name = child_ref.Name if child_file_id not in collected: collected[child_file_id] = { "file_id": child_file_id, "folder_id": child_folder_id, "name": child_name, } logger.debug(f"{indent}Child: {child_name}") # Recurse into this child to get its children _traverse_children(child_ref, depth + 1) except StopIteration: break except Exception as child_err: logger.debug(f"{indent}Error reading child reference: {child_err}") break except Exception as e: logger.debug(f"{indent}Error traversing children at depth {depth}: {e}") try: # Get the reference tree (IEdmReference5) ref_tree = file_obj.GetReferenceTree(folder_id, 0) if ref_tree is None: logger.warning(f" GetReferenceTree returned None") return list(collected.values()) _traverse_children(ref_tree) except Exception as e: logger.warning(f" Error traversing reference tree: {e}") logger.debug(f" Full error:", exc_info=True) return list(collected.values()) def execute_copy_tree( vault: PDMVaultConnection, file_info: Dict[str, Any], output_dir: str, part_name: str, dry_run: bool = False ) -> Dict[str, Any]: """ Export an assembly and all its referenced files to a local subfolder. Uses IEdmFile5.GetReferenceTree() to traverse references, then IEdmFile5.GetFileCopy() to download each file to the output folder. Args: vault: Active vault connection file_info: Dict with file_obj, file_id, folder_id, path, etc. output_dir: Base output directory part_name: Part number used as subfolder name dry_run: If True, build tree but don't copy files Returns: Dict with 'status' ('success'/'failed'), 'file_count', 'dest_path', 'source_path', and 'error' (if failed) """ logger = logging.getLogger("batch_copy_tree") dest_path = os.path.join(output_dir, part_name) # GetFileCopy requires destination path to end with backslash dest_path_trailing = dest_path if dest_path.endswith("\\") else dest_path + "\\" result = { "status": "failed", "file_count": 0, "dest_path": dest_path, "source_path": file_info["path"], "error": None } try: # Create the output subfolder os.makedirs(dest_path, exist_ok=True) logger.debug(f"Output folder: {dest_path}") hwnd = get_window_handle() file_obj = file_info["file_obj"] folder_id = file_info["folder_id"] # Step 1: Traverse the reference tree to find all files logger.info(f" Building reference tree...") ref_files = _collect_references(file_obj, folder_id, vault.vault, logger) file_count = len(ref_files) result["file_count"] = file_count logger.info(f" Reference tree: {file_count} file(s) found") if dry_run: for idx, ref in enumerate(ref_files, 1): logger.info(f" [{idx}/{file_count}] {ref['name']}") logger.info(f" DRY RUN: Would copy {file_count} file(s) to {dest_path}") result["status"] = "dry_run" return result # Step 2: Copy each file to the output directory using GetFileCopy # Confirmed signature: GetFileCopy(lParentWnd, poVersionNoOrRevisionName, poPathOrFolderID, lEdmGetFlags, bsNewName) # lParentWnd = window handle (0 for headless) # poVersionNoOrRevisionName = version number (0 = latest) # poPathOrFolderID = destination folder path (must end with \) # lEdmGetFlags = EdmGet flags (EdmGet_Simple = 1) # bsNewName = new filename or empty string to keep original name copied = 0 for idx, ref in enumerate(ref_files, 1): ref_name = ref["name"] logger.debug(f" [{idx}/{file_count}] Copying {ref_name}...") try: # Get the file object from the vault by ID ref_file_obj = vault.vault.GetObject(EdmObject_File, ref["file_id"]) if ref_file_obj is None: logger.warning(f" Could not get file object for {ref_name} (ID: {ref['file_id']})") continue ref_file_obj.GetFileCopy(hwnd, 0, dest_path_trailing, EdmGet_Simple, "") copied += 1 except Exception as copy_err: logger.warning(f" Failed to copy {ref_name}: {copy_err}") result["file_count"] = copied result["status"] = "success" logger.info(f" SUCCESS: {copied}/{file_count} file(s) exported to {dest_path}") except Exception as e: result["error"] = str(e) logger.error(f" FAILED: {e}") logger.debug(f" Full error details:", exc_info=True) return result def batch_copy_tree( vault: PDMVaultConnection, file_list: List[Dict[str, Any]], output_dir: str, dry_run: bool = False ) -> Dict[str, List]: """ Execute Copy Tree for multiple parts. Args: vault: Active vault connection file_list: List of file info dicts (from search_and_resolve_parts) output_dir: Base output directory dry_run: If True, build trees but don't execute copies Returns: Dict with 'success' and 'failed' lists of result dicts """ logger = logging.getLogger("batch_copy_tree") results = {"success": [], "failed": []} total = len(file_list) logger.info(f"Starting batch copy tree for {total} parts") logger.info("=" * 60) for i, file_info in enumerate(file_list, 1): part_number = file_info["part_number"] logger.info(f"[{i}/{total}] Processing copy tree for: {part_number} ({file_info['filename']})") result = execute_copy_tree(vault, file_info, output_dir, part_number, dry_run) if result["status"] in ("success", "dry_run"): results["success"].append(result) else: results["failed"].append(result) return results def test_copy_tree_api(vault: PDMVaultConnection, file_info: Dict[str, Any]) -> bool: """ Test reference tree traversal on a single file without copying. Args: vault: Active vault connection file_info: Dict with file_obj, file_id, folder_id, path Returns: True if reference traversal succeeds, False otherwise """ logger = logging.getLogger("batch_copy_tree") logger.info("=" * 60) logger.info("TESTING REFERENCE TREE TRAVERSAL") logger.info("=" * 60) logger.info(f"Test file: {file_info['path']}") try: file_obj = file_info["file_obj"] folder_id = file_info["folder_id"] # Step 1: Test GetReferenceTree logger.info("Step 1: Calling GetReferenceTree...") ref_tree = file_obj.GetReferenceTree(folder_id, 0) if ref_tree is None: logger.error(" GetReferenceTree returned None") return False logger.info(f" OK - Got reference tree object: {type(ref_tree)}") # Step 2: Introspect reference tree object logger.info("Step 2: Inspecting IEdmReference5 COM object...") try: type_info = ref_tree._oleobj_.GetTypeInfo(0, 0) type_attr = type_info.GetTypeAttr() methods = [] for i in range(type_attr.cFuncs): func_desc = type_info.GetFuncDesc(i) names = type_info.GetNames(func_desc.memid) methods.append(f"{names[0]}({', '.join(names[1:])})" if len(names) > 1 else names[0]) logger.info(f" IEdmReference methods ({len(methods)}):") for m in methods: logger.info(f" - {m}") except Exception as intro_err: logger.warning(f" Could not introspect: {intro_err}") # Step 3: Traverse children logger.info("Step 3: Traversing reference tree children...") ref_files = _collect_references(file_obj, folder_id, vault.vault, logger) logger.info(f" Found {len(ref_files)} file(s) in reference tree:") for idx, ref in enumerate(ref_files, 1): logger.info(f" [{idx}] {ref['name']}") # Step 4: Test GetFileCopy on root file (introspect only, don't copy) logger.info("Step 4: Inspecting IEdmFile5 GetFileCopy method...") try: type_info = file_obj._oleobj_.GetTypeInfo(0, 0) type_attr = type_info.GetTypeAttr() for i in range(type_attr.cFuncs): func_desc = type_info.GetFuncDesc(i) names = type_info.GetNames(func_desc.memid) if "copy" in names[0].lower() or "get" in names[0].lower(): sig = f"{names[0]}({', '.join(names[1:])})" if len(names) > 1 else names[0] logger.info(f" - {sig}") except Exception as intro_err: logger.warning(f" Could not introspect file object: {intro_err}") logger.info("=" * 60) logger.info("API TEST PASSED") logger.info("=" * 60) return True except Exception as e: logger.error(f"API TEST FAILED: {e}") logger.debug("Full error details:", exc_info=True) logger.info("=" * 60) return False # ============================================================================= # COMMAND LINE INTERFACE # ============================================================================= def parse_arguments(): """Parse command line arguments.""" parser = argparse.ArgumentParser( description="Batch Copy Tree export for SolidWorks PDM Professional", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python batch_copy_tree.py -c parts.csv -o "C:\\Temp\\Output" python batch_copy_tree.py -c parts.csv -o "C:\\Temp\\Output" -v "IDSVault" --dry-run python batch_copy_tree.py -c parts.csv -o "C:\\Temp\\Output" --test CSV format (one part number per line, no extensions, no header): UDS.00056 WIDGET.00123 BRACKET.00789 Each part number's Copy Tree output goes to its own subfolder: C:\\Temp\\Output\\UDS.00056\\{files...} C:\\Temp\\Output\\WIDGET.00123\\{files...} """ ) parser.add_argument( "-v", "--vault", default=VAULT_NAME, help=f"PDM vault name (default: {VAULT_NAME})" ) parser.add_argument( "-c", "--csv", required=True, help="Path to CSV file containing part numbers (one per line, no extensions)" ) parser.add_argument( "-o", "--output-dir", required=True, help="Base output directory for exported files (e.g., C:\\Temp\\Output)" ) parser.add_argument( "-u", "--username", help="PDM username (will prompt if not provided)" ) parser.add_argument( "--log-file", help="Custom log file path (default: auto-generated with timestamp)" ) parser.add_argument( "--dry-run", action="store_true", help="Build copy trees and show what would be copied, but don't execute" ) parser.add_argument( "--test", action="store_true", help="Test Copy Tree API calls on the first part only, then exit" ) return parser.parse_args() # ============================================================================= # MAIN EXECUTION # ============================================================================= def main(): """Main entry point for batch copy tree processing.""" # Parse arguments args = parse_arguments() # Setup logging logger = setup_logging(args.log_file) logger.info("=" * 60) logger.info("PDM BATCH COPY TREE EXPORT") logger.info("=" * 60) logger.info(f"Vault: {args.vault}") logger.info(f"CSV File: {args.csv}") logger.info(f"Output Directory: {args.output_dir}") if args.dry_run: logger.info("Mode: DRY RUN (no files will be copied)") if args.test: logger.info("Mode: API TEST (testing on first part only)") # Get credentials username = args.username if not username: username = input("PDM Username: ") password = getpass.getpass("PDM Password: ") # Get file extension from user ext_input = input("Enter the file extension to search for (e.g., SLDASM, SLDDRW, SLDPRT): ").strip() ext_input = ext_input.lstrip(".") # Remove leading dot if user included one if not ext_input: logger.error("No extension provided. Exiting.") return 1 extension = f".{ext_input}" logger.info(f"File extension: {extension}") # Load part numbers from CSV part_numbers = load_part_numbers_from_csv(args.csv) if not part_numbers: logger.error("No part numbers loaded from CSV. Exiting.") return 1 logger.info(f"Loaded {len(part_numbers)} part numbers from CSV") # Connect to vault vault = PDMVaultConnection(args.vault) if not vault.connect(username, password): logger.error("Failed to connect to vault. Exiting.") return 1 try: # Search for parts in vault logger.info("Searching for parts in vault...") logger.info("=" * 60) validation = search_and_resolve_parts(vault, part_numbers, extension) valid_count = len(validation["valid"]) not_found_count = len(validation["not_found"]) # Summary logger.info("=" * 60) logger.info("SEARCH RESULTS SUMMARY") logger.info("=" * 60) logger.info(f"Parts ready to process: {valid_count}") logger.info(f"Parts not found: {not_found_count}") # Report not found if not_found_count > 0: logger.warning("\nParts not found in vault:") for pn in validation["not_found"]: logger.warning(f" - {pn}") # Save not found list not_found_file = f"not_found_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" with open(not_found_file, 'w') as f: for pn in validation["not_found"]: f.write(pn + "\n") logger.info(f"Not found list saved to: {not_found_file}") if valid_count == 0: logger.error("No valid parts to process. Exiting.") return 1 # API test mode - test on first part then exit if args.test: test_passed = test_copy_tree_api(vault, validation["valid"][0]) return 0 if test_passed else 1 # Verify output directory is accessible try: os.makedirs(args.output_dir, exist_ok=True) except OSError as e: logger.error(f"Cannot create output directory '{args.output_dir}': {e}") return 1 # Confirm before proceeding mode_label = "DRY RUN copy tree" if args.dry_run else "copy tree" print(f"\nReady to {mode_label} for {valid_count} parts to '{args.output_dir}'") confirm = input("Proceed? (yes/no): ").strip().lower() if confirm != "yes": logger.info("Operation cancelled by user") return 0 # Execute batch copy tree results = batch_copy_tree( vault, validation["valid"], args.output_dir, dry_run=args.dry_run ) # Final report logger.info("=" * 60) logger.info("BATCH COPY TREE COMPLETE") logger.info("=" * 60) logger.info(f"Total parts processed: {valid_count}") logger.info(f"Successful: {len(results['success'])}") logger.info(f"Failed: {len(results['failed'])}") if results["success"]: total_files = sum(r["file_count"] for r in results["success"]) logger.info(f"Total files exported: {total_files}") if results["failed"]: logger.warning("\nFailed parts:") for r in results["failed"]: logger.warning(f" - {r['source_path']}: {r['error']}") # Write failed parts to a separate file for retry failed_file = f"failed_copies_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" with open(failed_file, 'w') as f: for r in results["failed"]: f.write(f"{r['source_path']}\t{r['error']}\n") logger.info(f"Failed parts list saved to: {failed_file}") return 0 if not results["failed"] else 1 finally: vault.disconnect() if __name__ == "__main__": exit(main())