Files
pdm/helpers/batch_workflows_paths.py

865 lines
31 KiB
Python

"""
Batch Workflow Processing for PDM Vault (Path-based)
=====================================================
This module provides a framework for:
1. Logging into a PDM vault via API
2. Processing a list of files (by full vault path) through a specified workflow transition
Usage:
python batch_workflows_paths.py --vault "MyVault" --csv "files.csv" --transition "citadel_set_production released"
"""
import logging
import argparse
import getpass
import time
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Dict, Any
import ctypes
import pythoncom
import win32com.client
import comtypes
import comtypes.automation
from comtypes import COMMETHOD, GUID, HRESULT
from comtypes.automation import IDispatch as _CT_IDispatch
# PDM API Type Library Constants
EdmObject_File = 1
EdmObject_Folder = 2
EdmObject_Workflow = 6
# ---------------------------------------------------------------------------
# comtypes interface for IEdmFile13 — enables vtable call to ChangeState3.
#
# Confirmed from gen_py stubs (IEdmFile13.py):
# IID : {DB0646C9-9E3F-4EA2-93AA-EB6584D268E2}
# oVft : 432 → slot 54 (IUnknown[0-2] + IDispatch[3-6] + 47 placeholders[7-53])
# ---------------------------------------------------------------------------
_IEdmFile13_phs = [COMMETHOD([], HRESULT, f"_ph{i}") for i in range(47)]
_VARIANT_p = ctypes.POINTER(comtypes.automation.VARIANT)
class _IEdmFile13_CT(_CT_IDispatch):
_iid_ = GUID("{DB0646C9-9E3F-4EA2-93AA-EB6584D268E2}")
_idlflags_ = ["dual", "oleautomation"]
_methods_ = _IEdmFile13_phs + [
COMMETHOD(
[], HRESULT, "ChangeState3",
(["in"], _VARIANT_p, "poStateIdOrName"),
(["in"], _VARIANT_p, "poTransitionIdOrName"),
(["in"], ctypes.c_long, "lFolderID"),
(["in"], ctypes.c_wchar_p, "bsComment"),
(["in"], ctypes.c_long, "lParentWnd"),
(["in"], ctypes.c_long, "lEdmStateFlags"),
(["in"], ctypes.c_wchar_p, "bsPasswd"),
),
]
def _make_i4_variant(val: int) -> comtypes.automation.VARIANT:
"""Return a VARIANT with vt=VT_I4 containing the given integer."""
v = comtypes.automation.VARIANT()
v.vt = 3 # VT_I4
ctypes.cast(ctypes.byref(v, 8), ctypes.POINTER(ctypes.c_int))[0] = int(val)
return v
class _Phase2AVError(OSError):
"""Raised when ChangeState3 crashes with an access violation after all retries.
Distinct from generic OSError so batch_transition can count consecutive
Phase-2 AV failures and trigger a vault reconnect to reset PDM's in-process
state after repeated corruption.
"""
# Escalating backoff for Phase-2 access violations (seconds between attempts).
# PDM's in-process DLL can take progressively longer to clean up after state
# corruption accumulates; a longer final sleep gives it a real chance to settle.
_PHASE2_BACKOFF = (3, 10, 30)
def _changestate3(vault_obj, file_id: int, to_state_id: int,
transition_id: int, folder_id: int, comment: str,
password: str) -> None:
"""
Call IEdmFile13::ChangeState3 via comtypes vtable to transition a file
using a *specific* transition ID, bypassing the ambiguous ChangeState.
Uses the primary win32com vault for GetObject so that the returned COM
proxy is in the primary connection's context. Bridging to comtypes is
done by reading the IEdmFile13* stored inside the pythoncom PyIBase
wrapper at CPython object offset 16, then calling QueryInterface to get
an AddRef'd comtypes pointer.
Reads from _oleobj_ directly (not from a secondary QI(IID_IUnknown) result)
because for aggregated COM objects the controlling IUnknown can be at a
different address with a shorter lifetime than the IEdmFile13* itself.
Retries up to 3 additional times on Phase-1 access-violation or
misaligned-pointer failures, and on Phase-2 access violations inside
ChangeState3 itself (with escalating backoff of 3s, 10s, 30s between
attempts to give PDM's in-process DLL time to clean up corrupted state).
If all Phase-2 attempts fail, raises _Phase2AVError so batch_transition
can trigger a vault reconnect after repeated failures.
"""
logger = logging.getLogger("batch_workflows_paths")
max_attempts = 1 + len(_PHASE2_BACKOFF) # initial + 3 retries
for attempt in range(max_attempts):
if attempt > 0:
logger.debug(f" [CS3] Retry {attempt} for file ID {file_id}")
file_obj = None
try:
# Fresh COM wrapper each attempt — primary vault, no competing refs.
file_obj = win32com.client.CastTo(
vault_obj.GetObject(EdmObject_File, file_id), 'IEdmFile13'
)
# CastTo('IEdmFile13') calls QI(IID_IEdmFile13) so _oleobj_ already
# holds the IEdmFile13* directly. Read it at offset 16 in the
# CPython object struct (ob_refcnt[8] + ob_type[8] + m_pUnknown[8]).
py_disp = file_obj._oleobj_
raw_ptr = ctypes.c_uint64.from_address(id(py_disp) + 16).value
logger.debug(f" [CS3] raw_ptr={raw_ptr:#018x} (& 7 == {raw_ptr & 7})")
if not raw_ptr or (raw_ptr & 0x7) != 0:
raise RuntimeError(f"Misaligned IEdmFile13* at offset 16: {raw_ptr:#x}")
ct_unk = ctypes.cast(raw_ptr, ctypes.POINTER(comtypes.IUnknown))
file13 = ct_unk.QueryInterface(_IEdmFile13_CT) # AddRefs independently
logger.debug(f" [CS3] QI OK (attempt {attempt})")
except (OSError, RuntimeError) as exc:
if file_obj is not None:
del file_obj
is_retryable = (
(isinstance(exc, OSError) and 'access violation' in str(exc).lower())
or isinstance(exc, RuntimeError)
)
if is_retryable and attempt < max_attempts - 1:
sleep_s = _PHASE2_BACKOFF[attempt]
logger.debug(
f" [CS3] Phase-1 failure ({exc}); sleeping {sleep_s}s then retrying"
)
time.sleep(sleep_s)
continue
raise
# Release win32com wrapper — file13 holds its own AddRef'd reference.
del file_obj
# --- Phase 2: call ChangeState3 ---
v_state = _make_i4_variant(to_state_id)
v_trans = _make_i4_variant(transition_id)
try:
file13.ChangeState3(
ctypes.byref(v_state),
ctypes.byref(v_trans),
ctypes.c_long(folder_id),
comment,
ctypes.c_long(0),
ctypes.c_long(0),
password,
)
return # success
except OSError as exc:
# Access violation inside ChangeState3 (PDM in-process DLL crashes while
# accessing internal state left over from a recent transition). An
# escalating sleep (3s, 10s, 30s) lets PDM's post-transition cleanup
# finish, then we retry with a fresh COM wrapper. After all retries
# are exhausted, raise _Phase2AVError so batch_transition can count
# consecutive failures and reconnect the vault.
if 'access violation' in str(exc).lower():
if attempt < max_attempts - 1:
sleep_s = _PHASE2_BACKOFF[attempt]
logger.debug(
f" [CS3] Phase-2 access violation ({exc}); "
f"sleeping {sleep_s}s for PDM cleanup then retrying"
)
time.sleep(sleep_s)
continue
raise _Phase2AVError(
f"ChangeState3 access violation after {max_attempts} attempts: {exc}"
) from exc
raise
# =============================================================================
# CONFIGURATION - Can be overridden via command line
# =============================================================================
VAULT_NAME = "IDSVault" # Default vault name
DEFAULT_TRANSITION = "Citadel_mig_Set Proto Released" # Default transition name
# =============================================================================
# LOGGING SETUP
# =============================================================================
def setup_logging(log_file: Optional[str] = None) -> logging.Logger:
"""Configure logging for the batch process."""
if log_file is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = f"batch_workflow_paths_{timestamp}.log"
logger = logging.getLogger("batch_workflows_paths")
logger.setLevel(logging.DEBUG)
# File handler
fh = logging.FileHandler(log_file)
fh.setLevel(logging.DEBUG)
# Console handler
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
# Formatter
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)
return logger
# =============================================================================
# PDM VAULT CONNECTION
# =============================================================================
class PDMVaultConnection:
"""Handles connection and authentication to the PDM vault."""
def __init__(self, vault_name: str):
self.vault_name = vault_name
self.vault = None
self.is_connected = False
self.logger = logging.getLogger("batch_workflows_paths")
self._username = None
self._password = None
def connect(self, username: str, password: str) -> bool:
"""
Connect and log into the PDM vault with username/password.
Args:
username: PDM username
password: PDM password
Returns:
True if connection successful, False otherwise
"""
try:
self.vault = win32com.client.Dispatch("ConisioLib.EdmVault")
# Login with credentials
self.vault.Login(username, password, self.vault_name)
self.is_connected = True
self._username = username
self._password = password
self.logger.info(f"Successfully connected to vault: {self.vault_name}")
self.logger.info(f"Logged in as: {username}")
return True
except Exception as e:
self.logger.error(f"Failed to connect to vault '{self.vault_name}': {e}")
self.is_connected = False
return False
def disconnect(self) -> None:
"""Disconnect from the PDM vault."""
if self.vault is not None:
try:
self.vault = None
self.is_connected = False
self.logger.info("Disconnected from vault")
except Exception as e:
self.logger.warning(f"Error during disconnect: {e}")
def reconnect(self) -> bool:
"""Force a full disconnect + re-login using the stored credentials.
Used to reset PDM's in-process DLL state after repeated Phase-2 access
violations indicate the vault connection's internal data structures have
been corrupted. Returns True if the re-login succeeded.
"""
if self._username is None or self._password is None:
self.logger.error("Cannot reconnect: no stored credentials")
return False
username, password = self._username, self._password
self.logger.info("Reconnecting vault to reset PDM internal state...")
self.disconnect()
# Give the in-process DLL a moment to release any lingering state.
time.sleep(2)
return self.connect(username, password)
def get_file(self, file_path: str) -> Dict[str, Any]:
"""
Get a file object from the vault by full path.
Args:
file_path: Full path to the file in the vault
Returns:
Dict with 'file_obj', 'folder_obj', 'path' or None if not found
"""
if not self.is_connected:
self.logger.error("Not connected to vault")
return None
try:
folder_path = str(Path(file_path).parent)
folder_obj = self.vault.GetFolderFromPath(folder_path)
if folder_obj is None:
self.logger.warning(f"Folder not found: {folder_path}")
return None
# GetFileFromPath returns (file_obj, file_id) tuple
result = self.vault.GetFileFromPath(file_path, folder_obj)
# Handle tuple return value
if isinstance(result, tuple):
file_obj = result[0]
else:
file_obj = result
if file_obj is None:
return None
# Re-fetch via GetObject, then cast to IEdmFile13 so CurrentState and
# transition methods are accessible regardless of gen_py stub state.
file_obj = self.vault.GetObject(EdmObject_File, file_obj.ID)
file_obj = win32com.client.CastTo(file_obj, 'IEdmFile13')
return {
"file_obj": file_obj,
"folder_obj": folder_obj,
"path": file_path
}
except Exception as e:
self.logger.error(f"Error getting file '{file_path}': {e}")
return None
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.disconnect()
# =============================================================================
# FILE LIST HANDLING
# =============================================================================
def load_file_list_from_csv(csv_path: str) -> List[str]:
"""
Load list of file paths from a CSV file (single column, no header).
Args:
csv_path: Path to the CSV file with one file path per line
Returns:
List of file paths
"""
logger = logging.getLogger("batch_workflows_paths")
file_paths = []
try:
with open(csv_path, 'r', encoding='utf-8-sig') as f:
for line_num, line in enumerate(f, start=1):
file_path = line.strip()
if file_path: # Skip empty lines
file_paths.append(file_path)
logger.info(f"Loaded {len(file_paths)} file paths from {csv_path}")
except FileNotFoundError:
logger.error(f"CSV file not found: {csv_path}")
except Exception as e:
logger.error(f"Error reading CSV file: {e}")
return file_paths
def validate_files(
vault: PDMVaultConnection,
file_paths: List[str]
) -> Dict[str, List]:
"""
Validate that files exist in the vault by their full paths.
Args:
vault: Active vault connection
file_paths: List of full vault paths to validate
Returns:
Dict with:
- 'valid': list of file info dicts (ready for transition)
- 'not_found': list of paths not found in vault
"""
logger = logging.getLogger("batch_workflows_paths")
results = {"valid": [], "not_found": []}
total = len(file_paths)
for i, file_path in enumerate(file_paths, 1):
logger.info(f"[{i}/{total}] Validating: {file_path}")
file_info = vault.get_file(file_path)
if file_info is None:
results["not_found"].append(file_path)
logger.warning(f" NOT FOUND: {file_path}")
else:
# Read state now, then release the COM wrappers immediately.
# Keeping file_obj alive across the full validation pass leaves
# stale COM proxies in memory. After any file is transitioned,
# PDM's server may invalidate proxies for other files, causing
# access violations later. transition_file() fetches its own
# fresh wrapper just before it needs it.
try:
current_state = file_info["file_obj"].CurrentState
state_name = current_state.Name if current_state else "Unknown"
except Exception as e:
logger.warning(f" Could not get state: {e}")
state_name = "Unknown"
results["valid"].append({"path": file_path, "current_state": state_name})
logger.info(f" FOUND (State: {state_name})")
# file_info (and its file_obj / folder_obj) goes out of scope here
return results
# =============================================================================
# WORKFLOW PROCESSING
# =============================================================================
def get_available_transitions(vault: PDMVaultConnection, file_obj) -> List[Dict[str, Any]]:
"""
Get list of available transitions for a file in its current state.
Args:
vault: Active vault connection
file_obj: IEdmFile object
Returns:
List of transition info dicts with 'name' and 'id'
"""
logger = logging.getLogger("batch_workflows_paths")
transitions = []
try:
current_state = file_obj.CurrentState
if current_state is None:
return transitions
trans_pos = current_state.GetFirstTransitionPosition()
while not trans_pos.IsNull:
transition = current_state.GetNextTransition(trans_pos)
transitions.append({
"name": transition.Name,
"id": transition.ID,
"target_state": transition.ToState.Name if transition.ToState else "Unknown"
})
except Exception as e:
logger.error(f"Error getting transitions: {e}")
return transitions
def transition_file(
vault: PDMVaultConnection,
file_info: Dict[str, Any],
transition_name: str,
comment: str = ""
) -> str:
"""
Transition a single file using a named transition.
Args:
vault: Active vault connection
file_info: Dict containing 'path' (and optionally 'current_state' for logging)
transition_name: Name of the transition to execute
comment: Optional transition comment
Returns:
One of "success", "not_available", or "failed".
- "success": transition completed and state verified
- "not_available": named transition is not valid from the file's current
state (typically means the file is already in the target state from a
prior run — not a real failure, just a no-op)
- "failed": real failure (access violation, missing file, state
unchanged after call, etc.) — worth retrying
"""
logger = logging.getLogger("batch_workflows_paths")
file_path = file_info["path"]
try:
# Fetch a fresh COM wrapper right now — not from validation.
# By the time this file is processed, earlier transitions may have
# caused PDM to invalidate COM proxies obtained during the validation
# pass. A fresh GetObject/CastTo gives a clean proxy every time.
fresh = vault.get_file(file_path)
if fresh is None:
logger.error(f"File no longer accessible in vault: {file_path}")
return "failed"
file_obj = fresh["file_obj"]
folder_obj = fresh["folder_obj"]
# Get current state and find the transition
current_state = file_obj.CurrentState
if current_state is None:
logger.error(f"File has no workflow state: {file_path}")
return "failed"
# Find the transition by name
target_transition = None
trans_pos = current_state.GetFirstTransitionPosition()
while not trans_pos.IsNull:
transition = current_state.GetNextTransition(trans_pos)
if transition.Name.lower() == transition_name.lower():
target_transition = transition
break
if target_transition is None:
available = get_available_transitions(vault, file_obj)
available_names = [t["name"] for t in available]
logger.error(
f"Transition '{transition_name}' not available for {file_path}. "
f"Current state: {current_state.Name}. "
f"Available transitions: {available_names}"
)
return "not_available"
# Collect everything we need from the COM objects, then release them
# before calling _changestate3. _changestate3 fetches its own wrapper
# internally, so having the lookup wrapper alive simultaneously would
# create competing COM references and corrupt PDM's internal state.
old_state_name = current_state.Name
expected_state_name = target_transition.ToState.Name
to_state_id = target_transition.ToState.ID
trans_id = target_transition.ID
file_id = file_obj.ID
folder_id = folder_obj.ID
logger.info(
f" Transition: '{target_transition.Name}' (ID: {trans_id}) | "
f"ToState: '{expected_state_name}' (ID: {to_state_id}) | "
f"Folder ID: {folder_id} | File ID: {file_id}"
)
# ↓ Release all COM wrappers from the lookup phase before the vtable call
del file_obj, folder_obj, fresh, current_state, target_transition, trans_pos
try:
_changestate3(
vault.vault,
file_id,
to_state_id,
trans_id,
folder_id,
comment,
vault._password or "",
)
except _Phase2AVError as exc:
# Persistent access violation — flag for batch_transition to count
# against the consecutive-failure threshold for reconnect.
vault._last_was_phase2_av = True
logger.error(f"Failed to transition {file_path}: {exc}")
return "failed"
else:
vault._last_was_phase2_av = False
# Verify the state actually changed — re-fetch and cast to IEdmFile13
fresh_file = win32com.client.CastTo(
vault.vault.GetObject(EdmObject_File, file_id), 'IEdmFile13'
)
new_state = fresh_file.CurrentState
actual_state_name = new_state.Name if new_state else "Unknown"
if actual_state_name.lower() == expected_state_name.lower():
logger.info(
f"SUCCESS: {file_path} | "
f"{old_state_name} -> {actual_state_name}"
)
return "success"
else:
logger.error(
f"FAILED (state unchanged): {file_path} | "
f"Expected: {expected_state_name}, Actual: {actual_state_name}"
)
return "failed"
except Exception as e:
logger.error(f"Failed to transition {file_path}: {e}")
return "failed"
def batch_transition(
vault: PDMVaultConnection,
file_list: List[Dict[str, Any]],
transition_name: str,
comment: str = ""
) -> Dict[str, List[str]]:
"""
Transition multiple files using a named transition.
Args:
vault: Active vault connection
file_list: List of file info dicts (from validate_files)
transition_name: Name of the transition to execute
comment: Optional transition comment
Returns:
Dict with 'success', 'failed', and 'not_available' file lists.
- 'success': transition completed
- 'failed': real failure worth retrying (access violation, etc.)
- 'not_available': transition not valid from current state — typically
means the file is already in the target state from a prior run
"""
logger = logging.getLogger("batch_workflows_paths")
results = {"success": [], "failed": [], "not_available": []}
total = len(file_list)
logger.info(f"Starting batch transition: '{transition_name}' for {total} files")
logger.info("=" * 60)
# Consecutive Phase-2 access-violation counter. When it hits the threshold,
# the vault connection's in-process state is likely corrupted past the
# point where sleeping will help, so force a full disconnect + re-login.
consecutive_phase2_av = 0
PHASE2_AV_RECONNECT_THRESHOLD = 3
for i, file_info in enumerate(file_list, 1):
file_path = file_info["path"]
logger.info(f"[{i}/{total}] Processing: {file_path}")
vault._last_was_phase2_av = False
status = transition_file(vault, file_info, transition_name, comment)
if status == "success":
results["success"].append(file_path)
consecutive_phase2_av = 0
elif status == "not_available":
results["not_available"].append(file_path)
consecutive_phase2_av = 0
else: # "failed"
results["failed"].append(file_path)
if getattr(vault, "_last_was_phase2_av", False):
consecutive_phase2_av += 1
logger.warning(
f" Phase-2 AV streak: {consecutive_phase2_av}/"
f"{PHASE2_AV_RECONNECT_THRESHOLD}"
)
if consecutive_phase2_av >= PHASE2_AV_RECONNECT_THRESHOLD:
logger.warning(
f"{consecutive_phase2_av} consecutive Phase-2 access "
"violations — forcing vault reconnect"
)
if vault.reconnect():
logger.info("Vault reconnected successfully")
else:
logger.error(
"Vault reconnect failed — aborting remaining batch"
)
break
consecutive_phase2_av = 0
else:
consecutive_phase2_av = 0
return results
# =============================================================================
# COMMAND LINE INTERFACE
# =============================================================================
def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Batch workflow transitions for SolidWorks PDM Professional (path-based)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python batch_workflows_paths.py --vault "MyVault" --csv "files.csv" --transition "citadel_set_production released"
python batch_workflows_paths.py -v "MyVault" -c "wip_files.csv" -t "citadel_set_wip"
CSV format (one full vault path per line):
C:\\IDSVault\\Parts\\widget.sldprt
C:\\IDSVault\\Parts\\bracket.sldprt
C:\\IDSVault\\Assemblies\\main_assy.sldasm
"""
)
parser.add_argument(
"-v", "--vault",
default=VAULT_NAME,
help=f"PDM vault name (default: {VAULT_NAME})"
)
parser.add_argument(
"-c", "--csv",
required=True,
help="Path to CSV file containing full vault paths"
)
parser.add_argument(
"-t", "--transition",
default=DEFAULT_TRANSITION,
help=f"Workflow transition name to execute (default: {DEFAULT_TRANSITION})"
)
parser.add_argument(
"--comment",
default="Batch workflow transition",
help="Comment for the workflow transition"
)
parser.add_argument(
"-u", "--username",
help="PDM username (will prompt if not provided)"
)
parser.add_argument(
"--log-file",
help="Custom log file path (default: auto-generated with timestamp)"
)
return parser.parse_args()
# =============================================================================
# MAIN EXECUTION
# =============================================================================
def main():
"""Main entry point for batch workflow processing."""
# Parse arguments
args = parse_arguments()
# Setup logging
logger = setup_logging(args.log_file)
logger.info("=" * 60)
logger.info("PDM BATCH WORKFLOW PROCESSOR (PATH-BASED)")
logger.info("=" * 60)
logger.info(f"Vault: {args.vault}")
logger.info(f"CSV File: {args.csv}")
logger.info(f"Transition: {args.transition}")
logger.info(f"Comment: {args.comment}")
# Get credentials
username = args.username
if not username:
username = input("PDM Username: ")
password = getpass.getpass("PDM Password: ")
# Load files from CSV
file_paths = load_file_list_from_csv(args.csv)
if not file_paths:
logger.error("No files loaded from CSV. Exiting.")
return 1
logger.info(f"Loaded {len(file_paths)} file paths from CSV")
# Connect to vault
vault = PDMVaultConnection(args.vault)
if not vault.connect(username, password):
logger.error("Failed to connect to vault. Exiting.")
return 1
try:
# Validate files exist in vault
logger.info("Validating files in vault...")
logger.info("=" * 60)
validation = validate_files(vault, file_paths)
valid_count = len(validation["valid"])
not_found_count = len(validation["not_found"])
# Summary
logger.info("=" * 60)
logger.info("VALIDATION SUMMARY")
logger.info("=" * 60)
logger.info(f"Files ready to process: {valid_count}")
logger.info(f"Files not found: {not_found_count}")
# Report not found
if not_found_count > 0:
logger.warning("\nFiles not found in vault:")
for f in validation["not_found"]:
logger.warning(f" - {f}")
# Save not found list
not_found_file = f"not_found_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(not_found_file, 'w') as f:
for path in validation["not_found"]:
f.write(path + "\n")
logger.info(f"Not found list saved to: {not_found_file}")
if valid_count == 0:
logger.error("No valid files to process. Exiting.")
return 1
# Execute batch transition
results = batch_transition(
vault,
validation["valid"],
args.transition,
comment=args.comment
)
# Final report
logger.info("=" * 60)
logger.info("BATCH PROCESS COMPLETE")
logger.info("=" * 60)
logger.info(f"Total files processed: {valid_count}")
logger.info(f"Successful transitions: {len(results['success'])}")
logger.info(f"Failed transitions: {len(results['failed'])}")
logger.info(
f"Transition not available (likely already in target state): "
f"{len(results['not_available'])}"
)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if results["failed"]:
logger.warning("\nFailed files (real failures — retry these):")
for f in results["failed"]:
logger.warning(f" - {f}")
failed_file = f"failed_transitions_{timestamp}.txt"
with open(failed_file, 'w') as f:
for path in results["failed"]:
f.write(path + "\n")
logger.info(f"Failed file list saved to: {failed_file}")
if results["not_available"]:
not_avail_file = f"not_available_{timestamp}.txt"
with open(not_avail_file, 'w') as f:
for path in results["not_available"]:
f.write(path + "\n")
logger.info(
f"Not-available file list saved to: {not_avail_file} "
f"(likely already in target state — not retried)"
)
return 0 if not results["failed"] else 1
finally:
vault.disconnect()
if __name__ == "__main__":
exit(main())