829 lines
28 KiB
Python
829 lines
28 KiB
Python
"""
|
|
Batch Copy Tree Export for PDM Vault
|
|
=====================================
|
|
This module provides a framework for:
|
|
1. Logging into a PDM vault via API
|
|
2. Reading part numbers from a CSV file (no extensions)
|
|
3. Running the PDM Copy Tree function for each part
|
|
4. Exporting each part's file tree to its own subfolder on a local path
|
|
|
|
Usage:
|
|
python batch_copy_tree.py -c parts.csv -o "C:\\Temp\\Output" --vault "IDSVault"
|
|
"""
|
|
|
|
import logging
|
|
import argparse
|
|
import getpass
|
|
import os
|
|
import ctypes
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import List, Optional, Dict, Any
|
|
|
|
import win32com.client
|
|
|
|
# PDM API Type Library Constants
|
|
EdmObject_File = 1
|
|
EdmObject_Folder = 2
|
|
EdmSearch_FileName = 1 # Search by filename
|
|
EdmGet_Simple = 1 # Simple get (latest version)
|
|
|
|
|
|
# =============================================================================
|
|
# CONFIGURATION - Can be overridden via command line
|
|
# =============================================================================
|
|
|
|
VAULT_NAME = "IDSVault" # Default vault name
|
|
|
|
|
|
|
|
# =============================================================================
|
|
# LOGGING SETUP
|
|
# =============================================================================
|
|
|
|
def setup_logging(log_file: Optional[str] = None) -> logging.Logger:
|
|
"""Configure logging for the batch process."""
|
|
if log_file is None:
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
log_file = f"batch_copy_tree_{timestamp}.log"
|
|
|
|
logger = logging.getLogger("batch_copy_tree")
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# File handler
|
|
fh = logging.FileHandler(log_file)
|
|
fh.setLevel(logging.DEBUG)
|
|
|
|
# Console handler
|
|
ch = logging.StreamHandler()
|
|
ch.setLevel(logging.INFO)
|
|
|
|
# Formatter
|
|
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
|
fh.setFormatter(formatter)
|
|
ch.setFormatter(formatter)
|
|
|
|
logger.addHandler(fh)
|
|
logger.addHandler(ch)
|
|
|
|
return logger
|
|
|
|
|
|
# =============================================================================
|
|
# PDM VAULT CONNECTION
|
|
# =============================================================================
|
|
|
|
class PDMVaultConnection:
|
|
"""Handles connection and authentication to the PDM vault."""
|
|
|
|
def __init__(self, vault_name: str):
|
|
self.vault_name = vault_name
|
|
self.vault = None
|
|
self.is_connected = False
|
|
self.logger = logging.getLogger("batch_copy_tree")
|
|
self._username = None
|
|
self._password = None
|
|
|
|
def connect(self, username: str, password: str) -> bool:
|
|
"""
|
|
Connect and log into the PDM vault with username/password.
|
|
|
|
Args:
|
|
username: PDM username
|
|
password: PDM password
|
|
|
|
Returns:
|
|
True if connection successful, False otherwise
|
|
"""
|
|
try:
|
|
# Create the vault interface
|
|
self.vault = win32com.client.Dispatch("ConisioLib.EdmVault")
|
|
|
|
# Login with credentials
|
|
self.vault.Login(username, password, self.vault_name)
|
|
|
|
self.is_connected = True
|
|
self._username = username
|
|
self._password = password
|
|
self.logger.info(f"Successfully connected to vault: {self.vault_name}")
|
|
self.logger.info(f"Logged in as: {username}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to connect to vault '{self.vault_name}': {e}")
|
|
self.is_connected = False
|
|
return False
|
|
|
|
def disconnect(self) -> None:
|
|
"""Disconnect from the PDM vault."""
|
|
if self.vault is not None:
|
|
try:
|
|
# Clear the vault reference
|
|
self.vault = None
|
|
self.is_connected = False
|
|
self.logger.info("Disconnected from vault")
|
|
except Exception as e:
|
|
self.logger.warning(f"Error during disconnect: {e}")
|
|
|
|
def search_file_by_name(self, filename: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Search for a file in the vault by filename (supports wildcards).
|
|
|
|
Args:
|
|
filename: The filename to search for (e.g., "part001.*" or "part001.sldprt")
|
|
|
|
Returns:
|
|
List of dicts with 'file_obj', 'path', 'folder_path', 'folder_obj' for each match
|
|
"""
|
|
if not self.is_connected:
|
|
self.logger.error("Not connected to vault")
|
|
return []
|
|
|
|
results = []
|
|
|
|
try:
|
|
# Create search object
|
|
search = self.vault.CreateSearch()
|
|
|
|
# Set search filename
|
|
search.FileName = filename
|
|
self.logger.debug(f"Search pattern: '{filename}'")
|
|
|
|
# Execute search
|
|
search_result = search.GetFirstResult()
|
|
|
|
while search_result is not None:
|
|
try:
|
|
file_id = search_result.ID
|
|
folder_id = search_result.ParentFolderID
|
|
|
|
# Get the file and folder objects
|
|
file_obj = self.vault.GetObject(EdmObject_File, file_id)
|
|
folder_obj = self.vault.GetObject(EdmObject_Folder, folder_id)
|
|
|
|
if file_obj is not None and folder_obj is not None:
|
|
folder_path = folder_obj.LocalPath
|
|
# Use the actual filename from the file object
|
|
actual_name = search_result.Name
|
|
full_path = str(Path(folder_path) / actual_name)
|
|
|
|
results.append({
|
|
"file_obj": file_obj,
|
|
"file_id": file_id,
|
|
"folder_id": folder_id,
|
|
"path": full_path,
|
|
"folder_path": folder_path,
|
|
"folder_obj": folder_obj,
|
|
"filename": actual_name
|
|
})
|
|
except Exception as e:
|
|
self.logger.warning(f"Error processing search result: {e}")
|
|
|
|
search_result = search.GetNextResult()
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error searching for '{filename}': {e}")
|
|
|
|
self.logger.debug(f"Search for '{filename}' returned {len(results)} result(s)")
|
|
return results
|
|
|
|
def __enter__(self):
|
|
"""Context manager entry - note: call connect() separately with credentials."""
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""Context manager exit."""
|
|
self.disconnect()
|
|
|
|
|
|
# =============================================================================
|
|
# FILE LIST HANDLING
|
|
# =============================================================================
|
|
|
|
def load_part_numbers_from_csv(csv_path: str) -> List[str]:
|
|
"""
|
|
Load list of part numbers from a CSV file (single column, no header).
|
|
|
|
Args:
|
|
csv_path: Path to the CSV file with one part number per line (no extensions)
|
|
|
|
Returns:
|
|
List of part numbers
|
|
"""
|
|
logger = logging.getLogger("batch_copy_tree")
|
|
part_numbers = []
|
|
|
|
try:
|
|
with open(csv_path, 'r', encoding='utf-8-sig') as f:
|
|
for line_num, line in enumerate(f, start=1):
|
|
part_number = line.strip()
|
|
if part_number: # Skip empty lines
|
|
part_numbers.append(part_number)
|
|
|
|
logger.info(f"Loaded {len(part_numbers)} part numbers from {csv_path}")
|
|
|
|
except FileNotFoundError:
|
|
logger.error(f"CSV file not found: {csv_path}")
|
|
except Exception as e:
|
|
logger.error(f"Error reading CSV file: {e}")
|
|
|
|
return part_numbers
|
|
|
|
|
|
def search_and_resolve_parts(
|
|
vault: PDMVaultConnection,
|
|
part_numbers: List[str],
|
|
extension: str
|
|
) -> Dict[str, List]:
|
|
"""
|
|
Search for part numbers in the vault and resolve each to a root file.
|
|
|
|
Part numbers are searched with the user-specified extension
|
|
(e.g., "PART001.SLDASM") since the CSV does not include file extensions.
|
|
|
|
Args:
|
|
vault: Active vault connection
|
|
part_numbers: List of part numbers (no extensions)
|
|
extension: File extension including the dot (e.g., ".SLDASM")
|
|
|
|
Returns:
|
|
Dict with:
|
|
- 'valid': list of file info dicts (ready for copy tree)
|
|
- 'not_found': list of part numbers not found in vault
|
|
- 'ambiguous': list of dicts with part_number and all found paths
|
|
"""
|
|
logger = logging.getLogger("batch_copy_tree")
|
|
results = {"valid": [], "not_found": [], "ambiguous": []}
|
|
|
|
total = len(part_numbers)
|
|
for i, part_number in enumerate(part_numbers, 1):
|
|
logger.info(f"[{i}/{total}] Searching for: {part_number}{extension}")
|
|
|
|
search_results = vault.search_file_by_name(f"{part_number}{extension}")
|
|
|
|
if len(search_results) == 0:
|
|
results["not_found"].append(part_number)
|
|
logger.warning(f" NOT FOUND: {part_number}")
|
|
|
|
else:
|
|
match = search_results[0]
|
|
logger.info(f" FOUND: {match['path']}")
|
|
|
|
if len(search_results) > 1:
|
|
logger.warning(f" Multiple matches found, using first result:")
|
|
for r in search_results:
|
|
logger.warning(f" - {r['path']}")
|
|
|
|
results["valid"].append({
|
|
"part_number": part_number,
|
|
"filename": match["filename"],
|
|
"path": match["path"],
|
|
"file_obj": match["file_obj"],
|
|
"file_id": match["file_id"],
|
|
"folder_id": match["folder_id"],
|
|
"folder_obj": match["folder_obj"],
|
|
})
|
|
|
|
return results
|
|
|
|
|
|
# =============================================================================
|
|
# COPY TREE OPERATIONS
|
|
# =============================================================================
|
|
|
|
def get_window_handle() -> int:
|
|
"""Get a window handle for PDM API calls. Returns console handle or 0."""
|
|
try:
|
|
hwnd = ctypes.windll.kernel32.GetConsoleWindow()
|
|
return hwnd if hwnd else 0
|
|
except Exception:
|
|
return 0
|
|
|
|
|
|
def _collect_references(file_obj, folder_id: int, vault, logger) -> List[Dict[str, Any]]:
|
|
"""
|
|
Collect all referenced files from an assembly using IEdmReference5.
|
|
|
|
Uses IEdmFile5.GetReferenceTree() to get the reference tree root, then
|
|
recursively traverses all levels of the reference tree using
|
|
GetFirstChildPosition/GetNextChild on each node.
|
|
|
|
Args:
|
|
file_obj: IEdmFile5 COM object (the root assembly)
|
|
folder_id: Folder ID of the root file
|
|
vault: The IEdmVault COM object
|
|
logger: Logger instance
|
|
|
|
Returns:
|
|
List of dicts with 'file_id', 'folder_id', 'name' for each
|
|
unique referenced file (including the root file itself)
|
|
"""
|
|
collected = {} # keyed by file ID to deduplicate
|
|
|
|
# Add the root file itself
|
|
root_name = file_obj.Name
|
|
root_id = file_obj.ID
|
|
collected[root_id] = {
|
|
"file_id": root_id,
|
|
"folder_id": folder_id,
|
|
"name": root_name,
|
|
}
|
|
logger.debug(f" Root: {root_name}")
|
|
|
|
def _traverse_children(ref_node, depth=0):
|
|
"""Recursively traverse all children of a reference node."""
|
|
indent = " " * (depth + 1)
|
|
try:
|
|
result = ref_node.GetFirstChildPosition("", True, True, 0)
|
|
|
|
if isinstance(result, tuple):
|
|
child_pos = result[0]
|
|
else:
|
|
child_pos = result
|
|
|
|
while child_pos is not None:
|
|
try:
|
|
if hasattr(child_pos, 'IsNull') and child_pos.IsNull:
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
child_ref = ref_node.GetNextChild(child_pos)
|
|
if child_ref is None:
|
|
break
|
|
|
|
if isinstance(child_ref, tuple):
|
|
child_ref = child_ref[0]
|
|
|
|
child_file_id = child_ref.FileID
|
|
child_folder_id = child_ref.FolderID
|
|
child_name = child_ref.Name
|
|
|
|
if child_file_id not in collected:
|
|
collected[child_file_id] = {
|
|
"file_id": child_file_id,
|
|
"folder_id": child_folder_id,
|
|
"name": child_name,
|
|
}
|
|
logger.debug(f"{indent}Child: {child_name}")
|
|
|
|
# Recurse into this child to get its children
|
|
_traverse_children(child_ref, depth + 1)
|
|
|
|
except StopIteration:
|
|
break
|
|
except Exception as child_err:
|
|
logger.debug(f"{indent}Error reading child reference: {child_err}")
|
|
break
|
|
|
|
except Exception as e:
|
|
logger.debug(f"{indent}Error traversing children at depth {depth}: {e}")
|
|
|
|
try:
|
|
# Get the reference tree (IEdmReference5)
|
|
ref_tree = file_obj.GetReferenceTree(folder_id, 0)
|
|
|
|
if ref_tree is None:
|
|
logger.warning(f" GetReferenceTree returned None")
|
|
return list(collected.values())
|
|
|
|
_traverse_children(ref_tree)
|
|
|
|
except Exception as e:
|
|
logger.warning(f" Error traversing reference tree: {e}")
|
|
logger.debug(f" Full error:", exc_info=True)
|
|
|
|
return list(collected.values())
|
|
|
|
|
|
def execute_copy_tree(
|
|
vault: PDMVaultConnection,
|
|
file_info: Dict[str, Any],
|
|
output_dir: str,
|
|
part_name: str,
|
|
dry_run: bool = False
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Export an assembly and all its referenced files to a local subfolder.
|
|
|
|
Uses IEdmFile5.GetReferenceTree() to traverse references, then
|
|
IEdmFile5.GetFileCopy() to download each file to the output folder.
|
|
|
|
Args:
|
|
vault: Active vault connection
|
|
file_info: Dict with file_obj, file_id, folder_id, path, etc.
|
|
output_dir: Base output directory
|
|
part_name: Part number used as subfolder name
|
|
dry_run: If True, build tree but don't copy files
|
|
|
|
Returns:
|
|
Dict with 'status' ('success'/'failed'), 'file_count', 'dest_path',
|
|
'source_path', and 'error' (if failed)
|
|
"""
|
|
logger = logging.getLogger("batch_copy_tree")
|
|
dest_path = os.path.join(output_dir, part_name)
|
|
# GetFileCopy requires destination path to end with backslash
|
|
dest_path_trailing = dest_path if dest_path.endswith("\\") else dest_path + "\\"
|
|
result = {
|
|
"status": "failed",
|
|
"file_count": 0,
|
|
"dest_path": dest_path,
|
|
"source_path": file_info["path"],
|
|
"error": None
|
|
}
|
|
|
|
try:
|
|
# Create the output subfolder
|
|
os.makedirs(dest_path, exist_ok=True)
|
|
logger.debug(f"Output folder: {dest_path}")
|
|
|
|
hwnd = get_window_handle()
|
|
file_obj = file_info["file_obj"]
|
|
folder_id = file_info["folder_id"]
|
|
|
|
# Step 1: Traverse the reference tree to find all files
|
|
logger.info(f" Building reference tree...")
|
|
ref_files = _collect_references(file_obj, folder_id, vault.vault, logger)
|
|
file_count = len(ref_files)
|
|
result["file_count"] = file_count
|
|
logger.info(f" Reference tree: {file_count} file(s) found")
|
|
|
|
if dry_run:
|
|
for idx, ref in enumerate(ref_files, 1):
|
|
logger.info(f" [{idx}/{file_count}] {ref['name']}")
|
|
logger.info(f" DRY RUN: Would copy {file_count} file(s) to {dest_path}")
|
|
result["status"] = "dry_run"
|
|
return result
|
|
|
|
# Step 2: Copy each file to the output directory using GetFileCopy
|
|
# Confirmed signature: GetFileCopy(lParentWnd, poVersionNoOrRevisionName, poPathOrFolderID, lEdmGetFlags, bsNewName)
|
|
# lParentWnd = window handle (0 for headless)
|
|
# poVersionNoOrRevisionName = version number (0 = latest)
|
|
# poPathOrFolderID = destination folder path (must end with \)
|
|
# lEdmGetFlags = EdmGet flags (EdmGet_Simple = 1)
|
|
# bsNewName = new filename or empty string to keep original name
|
|
copied = 0
|
|
for idx, ref in enumerate(ref_files, 1):
|
|
ref_name = ref["name"]
|
|
logger.debug(f" [{idx}/{file_count}] Copying {ref_name}...")
|
|
|
|
try:
|
|
# Get the file object from the vault by ID
|
|
ref_file_obj = vault.vault.GetObject(EdmObject_File, ref["file_id"])
|
|
if ref_file_obj is None:
|
|
logger.warning(f" Could not get file object for {ref_name} (ID: {ref['file_id']})")
|
|
continue
|
|
|
|
ref_file_obj.GetFileCopy(hwnd, 0, dest_path_trailing, EdmGet_Simple, "")
|
|
copied += 1
|
|
except Exception as copy_err:
|
|
logger.warning(f" Failed to copy {ref_name}: {copy_err}")
|
|
|
|
result["file_count"] = copied
|
|
result["status"] = "success"
|
|
logger.info(f" SUCCESS: {copied}/{file_count} file(s) exported to {dest_path}")
|
|
|
|
except Exception as e:
|
|
result["error"] = str(e)
|
|
logger.error(f" FAILED: {e}")
|
|
logger.debug(f" Full error details:", exc_info=True)
|
|
|
|
return result
|
|
|
|
|
|
def batch_copy_tree(
|
|
vault: PDMVaultConnection,
|
|
file_list: List[Dict[str, Any]],
|
|
output_dir: str,
|
|
dry_run: bool = False
|
|
) -> Dict[str, List]:
|
|
"""
|
|
Execute Copy Tree for multiple parts.
|
|
|
|
Args:
|
|
vault: Active vault connection
|
|
file_list: List of file info dicts (from search_and_resolve_parts)
|
|
output_dir: Base output directory
|
|
dry_run: If True, build trees but don't execute copies
|
|
|
|
Returns:
|
|
Dict with 'success' and 'failed' lists of result dicts
|
|
"""
|
|
logger = logging.getLogger("batch_copy_tree")
|
|
results = {"success": [], "failed": []}
|
|
|
|
total = len(file_list)
|
|
logger.info(f"Starting batch copy tree for {total} parts")
|
|
logger.info("=" * 60)
|
|
|
|
for i, file_info in enumerate(file_list, 1):
|
|
part_number = file_info["part_number"]
|
|
logger.info(f"[{i}/{total}] Processing copy tree for: {part_number} ({file_info['filename']})")
|
|
|
|
result = execute_copy_tree(vault, file_info, output_dir, part_number, dry_run)
|
|
|
|
if result["status"] in ("success", "dry_run"):
|
|
results["success"].append(result)
|
|
else:
|
|
results["failed"].append(result)
|
|
|
|
return results
|
|
|
|
|
|
def test_copy_tree_api(vault: PDMVaultConnection, file_info: Dict[str, Any]) -> bool:
|
|
"""
|
|
Test reference tree traversal on a single file without copying.
|
|
|
|
Args:
|
|
vault: Active vault connection
|
|
file_info: Dict with file_obj, file_id, folder_id, path
|
|
|
|
Returns:
|
|
True if reference traversal succeeds, False otherwise
|
|
"""
|
|
logger = logging.getLogger("batch_copy_tree")
|
|
logger.info("=" * 60)
|
|
logger.info("TESTING REFERENCE TREE TRAVERSAL")
|
|
logger.info("=" * 60)
|
|
logger.info(f"Test file: {file_info['path']}")
|
|
|
|
try:
|
|
file_obj = file_info["file_obj"]
|
|
folder_id = file_info["folder_id"]
|
|
|
|
# Step 1: Test GetReferenceTree
|
|
logger.info("Step 1: Calling GetReferenceTree...")
|
|
ref_tree = file_obj.GetReferenceTree(folder_id, 0)
|
|
if ref_tree is None:
|
|
logger.error(" GetReferenceTree returned None")
|
|
return False
|
|
logger.info(f" OK - Got reference tree object: {type(ref_tree)}")
|
|
|
|
# Step 2: Introspect reference tree object
|
|
logger.info("Step 2: Inspecting IEdmReference5 COM object...")
|
|
try:
|
|
type_info = ref_tree._oleobj_.GetTypeInfo(0, 0)
|
|
type_attr = type_info.GetTypeAttr()
|
|
methods = []
|
|
for i in range(type_attr.cFuncs):
|
|
func_desc = type_info.GetFuncDesc(i)
|
|
names = type_info.GetNames(func_desc.memid)
|
|
methods.append(f"{names[0]}({', '.join(names[1:])})" if len(names) > 1 else names[0])
|
|
logger.info(f" IEdmReference methods ({len(methods)}):")
|
|
for m in methods:
|
|
logger.info(f" - {m}")
|
|
except Exception as intro_err:
|
|
logger.warning(f" Could not introspect: {intro_err}")
|
|
|
|
# Step 3: Traverse children
|
|
logger.info("Step 3: Traversing reference tree children...")
|
|
ref_files = _collect_references(file_obj, folder_id, vault.vault, logger)
|
|
logger.info(f" Found {len(ref_files)} file(s) in reference tree:")
|
|
for idx, ref in enumerate(ref_files, 1):
|
|
logger.info(f" [{idx}] {ref['name']}")
|
|
|
|
# Step 4: Test GetFileCopy on root file (introspect only, don't copy)
|
|
logger.info("Step 4: Inspecting IEdmFile5 GetFileCopy method...")
|
|
try:
|
|
type_info = file_obj._oleobj_.GetTypeInfo(0, 0)
|
|
type_attr = type_info.GetTypeAttr()
|
|
for i in range(type_attr.cFuncs):
|
|
func_desc = type_info.GetFuncDesc(i)
|
|
names = type_info.GetNames(func_desc.memid)
|
|
if "copy" in names[0].lower() or "get" in names[0].lower():
|
|
sig = f"{names[0]}({', '.join(names[1:])})" if len(names) > 1 else names[0]
|
|
logger.info(f" - {sig}")
|
|
except Exception as intro_err:
|
|
logger.warning(f" Could not introspect file object: {intro_err}")
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("API TEST PASSED")
|
|
logger.info("=" * 60)
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"API TEST FAILED: {e}")
|
|
logger.debug("Full error details:", exc_info=True)
|
|
logger.info("=" * 60)
|
|
return False
|
|
|
|
|
|
# =============================================================================
|
|
# COMMAND LINE INTERFACE
|
|
# =============================================================================
|
|
|
|
def parse_arguments():
|
|
"""Parse command line arguments."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Batch Copy Tree export for SolidWorks PDM Professional",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
python batch_copy_tree.py -c parts.csv -o "C:\\Temp\\Output"
|
|
python batch_copy_tree.py -c parts.csv -o "C:\\Temp\\Output" -v "IDSVault" --dry-run
|
|
python batch_copy_tree.py -c parts.csv -o "C:\\Temp\\Output" --test
|
|
|
|
CSV format (one part number per line, no extensions, no header):
|
|
UDS.00056
|
|
WIDGET.00123
|
|
BRACKET.00789
|
|
|
|
Each part number's Copy Tree output goes to its own subfolder:
|
|
C:\\Temp\\Output\\UDS.00056\\{files...}
|
|
C:\\Temp\\Output\\WIDGET.00123\\{files...}
|
|
"""
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-v", "--vault",
|
|
default=VAULT_NAME,
|
|
help=f"PDM vault name (default: {VAULT_NAME})"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-c", "--csv",
|
|
required=True,
|
|
help="Path to CSV file containing part numbers (one per line, no extensions)"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-o", "--output-dir",
|
|
required=True,
|
|
help="Base output directory for exported files (e.g., C:\\Temp\\Output)"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-u", "--username",
|
|
help="PDM username (will prompt if not provided)"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--log-file",
|
|
help="Custom log file path (default: auto-generated with timestamp)"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Build copy trees and show what would be copied, but don't execute"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--test",
|
|
action="store_true",
|
|
help="Test Copy Tree API calls on the first part only, then exit"
|
|
)
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
# =============================================================================
|
|
# MAIN EXECUTION
|
|
# =============================================================================
|
|
|
|
def main():
|
|
"""Main entry point for batch copy tree processing."""
|
|
# Parse arguments
|
|
args = parse_arguments()
|
|
|
|
# Setup logging
|
|
logger = setup_logging(args.log_file)
|
|
logger.info("=" * 60)
|
|
logger.info("PDM BATCH COPY TREE EXPORT")
|
|
logger.info("=" * 60)
|
|
logger.info(f"Vault: {args.vault}")
|
|
logger.info(f"CSV File: {args.csv}")
|
|
logger.info(f"Output Directory: {args.output_dir}")
|
|
if args.dry_run:
|
|
logger.info("Mode: DRY RUN (no files will be copied)")
|
|
if args.test:
|
|
logger.info("Mode: API TEST (testing on first part only)")
|
|
|
|
# Get credentials
|
|
username = args.username
|
|
if not username:
|
|
username = input("PDM Username: ")
|
|
|
|
password = getpass.getpass("PDM Password: ")
|
|
|
|
# Get file extension from user
|
|
ext_input = input("Enter the file extension to search for (e.g., SLDASM, SLDDRW, SLDPRT): ").strip()
|
|
ext_input = ext_input.lstrip(".") # Remove leading dot if user included one
|
|
if not ext_input:
|
|
logger.error("No extension provided. Exiting.")
|
|
return 1
|
|
extension = f".{ext_input}"
|
|
logger.info(f"File extension: {extension}")
|
|
|
|
# Load part numbers from CSV
|
|
part_numbers = load_part_numbers_from_csv(args.csv)
|
|
|
|
if not part_numbers:
|
|
logger.error("No part numbers loaded from CSV. Exiting.")
|
|
return 1
|
|
|
|
logger.info(f"Loaded {len(part_numbers)} part numbers from CSV")
|
|
|
|
# Connect to vault
|
|
vault = PDMVaultConnection(args.vault)
|
|
|
|
if not vault.connect(username, password):
|
|
logger.error("Failed to connect to vault. Exiting.")
|
|
return 1
|
|
|
|
try:
|
|
# Search for parts in vault
|
|
logger.info("Searching for parts in vault...")
|
|
logger.info("=" * 60)
|
|
validation = search_and_resolve_parts(vault, part_numbers, extension)
|
|
|
|
valid_count = len(validation["valid"])
|
|
not_found_count = len(validation["not_found"])
|
|
|
|
# Summary
|
|
logger.info("=" * 60)
|
|
logger.info("SEARCH RESULTS SUMMARY")
|
|
logger.info("=" * 60)
|
|
logger.info(f"Parts ready to process: {valid_count}")
|
|
logger.info(f"Parts not found: {not_found_count}")
|
|
|
|
# Report not found
|
|
if not_found_count > 0:
|
|
logger.warning("\nParts not found in vault:")
|
|
for pn in validation["not_found"]:
|
|
logger.warning(f" - {pn}")
|
|
|
|
# Save not found list
|
|
not_found_file = f"not_found_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
|
|
with open(not_found_file, 'w') as f:
|
|
for pn in validation["not_found"]:
|
|
f.write(pn + "\n")
|
|
logger.info(f"Not found list saved to: {not_found_file}")
|
|
|
|
if valid_count == 0:
|
|
logger.error("No valid parts to process. Exiting.")
|
|
return 1
|
|
|
|
# API test mode - test on first part then exit
|
|
if args.test:
|
|
test_passed = test_copy_tree_api(vault, validation["valid"][0])
|
|
return 0 if test_passed else 1
|
|
|
|
# Verify output directory is accessible
|
|
try:
|
|
os.makedirs(args.output_dir, exist_ok=True)
|
|
except OSError as e:
|
|
logger.error(f"Cannot create output directory '{args.output_dir}': {e}")
|
|
return 1
|
|
|
|
# Confirm before proceeding
|
|
mode_label = "DRY RUN copy tree" if args.dry_run else "copy tree"
|
|
print(f"\nReady to {mode_label} for {valid_count} parts to '{args.output_dir}'")
|
|
confirm = input("Proceed? (yes/no): ").strip().lower()
|
|
|
|
if confirm != "yes":
|
|
logger.info("Operation cancelled by user")
|
|
return 0
|
|
|
|
# Execute batch copy tree
|
|
results = batch_copy_tree(
|
|
vault,
|
|
validation["valid"],
|
|
args.output_dir,
|
|
dry_run=args.dry_run
|
|
)
|
|
|
|
# Final report
|
|
logger.info("=" * 60)
|
|
logger.info("BATCH COPY TREE COMPLETE")
|
|
logger.info("=" * 60)
|
|
logger.info(f"Total parts processed: {valid_count}")
|
|
logger.info(f"Successful: {len(results['success'])}")
|
|
logger.info(f"Failed: {len(results['failed'])}")
|
|
|
|
if results["success"]:
|
|
total_files = sum(r["file_count"] for r in results["success"])
|
|
logger.info(f"Total files exported: {total_files}")
|
|
|
|
if results["failed"]:
|
|
logger.warning("\nFailed parts:")
|
|
for r in results["failed"]:
|
|
logger.warning(f" - {r['source_path']}: {r['error']}")
|
|
|
|
# Write failed parts to a separate file for retry
|
|
failed_file = f"failed_copies_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
|
|
with open(failed_file, 'w') as f:
|
|
for r in results["failed"]:
|
|
f.write(f"{r['source_path']}\t{r['error']}\n")
|
|
logger.info(f"Failed parts list saved to: {failed_file}")
|
|
|
|
return 0 if not results["failed"] else 1
|
|
|
|
finally:
|
|
vault.disconnect()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
exit(main())
|