Source code for scitex_capture.utils

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Timestamp: "2025-10-18 09:55:52 (ywatanabe)"
# File: /home/ywatanabe/proj/scitex-code/src/scitex/capture/utils.py
# ----------------------------------------
from __future__ import annotations

import os

__FILE__ = "./src/scitex/capture/utils.py"
__DIR__ = os.path.dirname(__FILE__)
# ----------------------------------------

import sys

"""
Utility functions for easy screen capture.
"""

from datetime import datetime
from pathlib import Path
from typing import Optional

from ._paths import get_screenshots_dir, get_tmp_dir
from .capture import CaptureManager, ScreenshotWorker

# Global manager instance
_manager = CaptureManager()


def _manage_cache_size(cache_dir: Path, max_size_gb: float = 1.0):
    """
    Manage cache directory size by removing old files if size exceeds limit.

    Parameters
    ----------
    cache_dir : Path
        Directory to manage
    max_size_gb : float
        Maximum size in GB (default: 1.0)
    """
    if not cache_dir.exists():
        return

    max_size_bytes = max_size_gb * 1024 * 1024 * 1024  # Convert GB to bytes

    # Get all files with their sizes and modification times
    files = []
    total_size = 0

    for file_path in cache_dir.glob("*.jpg"):
        if file_path.is_file():
            size = file_path.stat().st_size
            mtime = file_path.stat().st_mtime
            files.append((file_path, size, mtime))
            total_size += size

    # Also check PNG files
    for file_path in cache_dir.glob("*.png"):
        if file_path.is_file():
            size = file_path.stat().st_size
            mtime = file_path.stat().st_mtime
            files.append((file_path, size, mtime))
            total_size += size

    # If under limit, nothing to do
    if total_size <= max_size_bytes:
        return

    # Sort by modification time (oldest first)
    files.sort(key=lambda x: x[2])

    # Remove oldest files until under limit
    for file_path, size, _ in files:
        if total_size <= max_size_bytes:
            break
        try:
            file_path.unlink()
            total_size -= size
        except:
            pass  # File might be in use


[docs] def capture( message: str = None, path: str = None, quality: int = 85, auto_categorize: bool = True, verbose: bool = True, monitor_id: int = 0, capture_all: bool = False, all: bool = False, app: str = None, url: str = None, url_wait: int = 3, url_width: int = 1920, url_height: int = 1080, max_cache_gb: float = 1.0, ) -> str: """ Take a screenshot - monitor, window, browser, or everything. Parameters ---------- message : str, optional Message to include in filename path : str, optional Output path. Defaults to the canonical ``$SCITEX_DIR/capture/runtime/screenshots/`` directory when unset. quality : int JPEG quality (1-100) all : bool Capture all monitors app : str, optional App name to capture (e.g., "chrome", "code") url : str, optional URL to capture via browser (e.g., "http://127.0.0.1:8000/") url_wait : int Seconds to wait for page load (default: 3) url_width : int Browser window width for URL capture (default: 1920) url_height : int Browser window height for URL capture (default: 1080) monitor_id : int Monitor to capture (0-based, default: 0) Returns ------- str Path to saved screenshot Examples -------- >>> from scitex import capture >>> >>> capture.snap() # Current monitor >>> capture.snap(all=True) # All monitors >>> capture.snap(app="chrome") # Chrome window >>> capture.snap(url="http://localhost:8000") # Browser page """ # Handle URL capture if url: # Auto-add http:// if no protocol specified if not url.startswith(("http://", "https://", "file://")): url = f"http://{url}" # Try Playwright first (headless, non-interfering) try: from playwright.sync_api import sync_playwright if path is None: timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S") url_slug = ( url.replace("://", "_").replace("/", "_").replace(":", "_")[:30] ) path = str( get_screenshots_dir() / f"{timestamp_str}-url-{url_slug}.jpg" ) path = os.path.expanduser(path) if verbose: print(f"📸 Capturing URL: {url}") # Check if DISPLAY is set (WSL with X11 forward causes visible browser) import os as _os original_display = _os.environ.get("DISPLAY") # Force headless by unsetting DISPLAY temporarily if original_display: _os.environ.pop("DISPLAY", None) try: with sync_playwright() as p: # Use stealth args from scitex.browser stealth_args = [ "--no-sandbox", "--disable-dev-shm-usage", "--disable-blink-features=AutomationControlled", "--window-size=1920,1080", ] browser = p.chromium.launch(headless=True, args=stealth_args) context = browser.new_context( viewport={"width": url_width, "height": url_height} ) page = context.new_page() # Use domcontentloaded for faster capture, with longer timeout page.goto(url, wait_until="domcontentloaded", timeout=30000) # Wait additional time for rendering page.wait_for_timeout(url_wait * 1000) page.screenshot( path=path, type="jpeg", quality=quality, full_page=False, ) browser.close() finally: # Restore DISPLAY if original_display: _os.environ["DISPLAY"] = original_display if Path(path).exists(): if verbose: print(f"📸 URL: {path}") return path except ImportError: if verbose: print( "⚠️ Playwright not installed: pip install 'scitex[capture-browser]'" ) pass # Try PowerShell fallback except Exception as e: if verbose: print(f"⚠️ Playwright failed: {e}") pass # Try PowerShell fallback # For WSL: Fallback to Windows-side browser if sys.platform == "linux" and "microsoft" in os.uname().release.lower(): try: import base64 import json import subprocess # Generate output path if path is None: timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S") url_slug = ( url.replace("://", "_").replace("/", "_").replace(":", "_")[:30] ) path = str( get_screenshots_dir() / f"{timestamp_str}-url-{url_slug}.jpg" ) path = os.path.expanduser(path) if verbose: print(f"📸 Capturing URL on Windows: {url}") # Use PowerShell script on Windows host script_dir = Path(__file__).parent / "powershell" script_path = script_dir / "capture_url.ps1" if script_path.exists(): # Find PowerShell ps_paths = [ "powershell.exe", "/mnt/c/Windows/System32/WindowsPowerShell/v1.0/powershell.exe", ] ps_exe = None for p in ps_paths: try: result = subprocess.run( [p, "-Command", "echo test"], capture_output=True, timeout=1, ) if result.returncode == 0: ps_exe = p break except: continue if ps_exe: cmd = [ ps_exe, "-NoProfile", "-ExecutionPolicy", "Bypass", "-File", str(script_path), "-Url", url, "-WaitSeconds", str(url_wait), "-WindowWidth", str(url_width), "-WindowHeight", str(url_height), ] result = subprocess.run( cmd, capture_output=True, text=True, timeout=30 ) if result.returncode == 0 and result.stdout.strip(): # Parse JSON lines = result.stdout.strip().split("\n") for line in lines: if line.strip().startswith("{"): data = json.loads(line) if data.get("Success"): img_data = base64.b64decode( data.get("Base64Data", "") ) # Save as JPEG try: import io from PIL import Image img = Image.open(io.BytesIO(img_data)) if img.mode == "RGBA": rgb_img = Image.new( "RGB", img.size, (255, 255, 255), ) rgb_img.paste(img, mask=img.split()[3]) img = rgb_img img.save( path, "JPEG", quality=quality, optimize=True, ) if verbose: print(f"📸 URL: {path}") return path except ImportError: # Save as PNG fallback with open( path.replace(".jpg", ".png"), "wb", ) as f: f.write(img_data) return path.replace(".jpg", ".png") break except Exception as e: if verbose: print(f"⚠️ PowerShell URL capture failed: {e}") # If all methods failed if verbose: print( "❌ URL capture failed - Playwright not available and PowerShell failed" ) return None # Handle app-specific capture if app: info = _manager.get_info() windows = info.get("Windows", {}).get("Details", []) # Search for matching window app_lower = app.lower() matching_window = None for win in windows: process_name = win.get("ProcessName", "").lower() title = win.get("Title", "").lower() if app_lower in process_name or app_lower in title: matching_window = win break if matching_window: handle = matching_window.get("Handle") result_path = _manager.capture_window(handle, path) if result_path and verbose: print(f"📸 {matching_window.get('ProcessName')}: {result_path}") return result_path else: if verbose: print(f"❌ App '{app}' not found in visible windows") return None # Handle 'all' shorthand if all: capture_all = True # Take screenshot first to analyze it timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3] temp_dir = str(get_tmp_dir()) # Take screenshot to temp location use_jpeg = ( True if path is None or path.lower().endswith((".jpg", ".jpeg")) else False ) worker = ScreenshotWorker( output_dir=temp_dir, use_jpeg=use_jpeg, jpeg_quality=quality, verbose=verbose, # Use the verbose parameter passed by user ) worker.session_id = "capture" worker.screenshot_count = 0 worker.monitor = monitor_id worker.capture_all = capture_all temp_path = worker._take_screenshot() if not temp_path: return None # Detect category if auto_categorize enabled category = "stdout" if auto_categorize: # First check if we're in an exception context if _is_in_exception_context(): category = "stderr" # Add exception info to message import traceback exc_info = traceback.format_exc(limit=3) if message: message = f"{message}\n{exc_info}" else: message = exc_info else: # Try visual detection category = _detect_category(temp_path) # Build monitor/scope tag for filename scope_tag = "" if capture_all: scope_tag = "-all-monitors" elif monitor_id > 0: scope_tag = f"-monitor{monitor_id}" # monitor_id=0 (primary) gets no tag for cleaner default names # Normalize message for filename normalized_msg = "" if message: # Remove special chars, keep only alphanumeric and spaces import re normalized = re.sub(r"[^\w\s-]", "", message.split("\n")[0]) # First line only normalized = re.sub(r"[-\s]+", "-", normalized).strip("-") normalized_msg = f"-{normalized[:50]}" if normalized else "" # Limit length # Add category suffix category_suffix = f"-{category}" # Handle path with category and message if path is None: # Include monitor/scope info in filename path = str( get_screenshots_dir() / "<timestamp><scope><message><category_suffix>.jpg" ) # Expand user home path = os.path.expanduser(path) # Replace placeholders if "<timestamp>" in path: path = path.replace("<timestamp>", timestamp) if "<scope>" in path: path = path.replace("<scope>", scope_tag) if "<message>" in path: path = path.replace("<message>", normalized_msg) if "<category_suffix>" in path: path = path.replace("<category_suffix>", category_suffix) # Ensure directory exists output_dir = Path(path).parent output_dir.mkdir(parents=True, exist_ok=True) # Move to final location final_path = Path(path) Path(temp_path).rename(final_path) # Add message with category as metadata if message or category != "stdout": metadata = ( f"[{category.upper()}] {message}" if message else f"[{category.upper()}]" ) _add_message_metadata(str(final_path), metadata) # Manage cache size (remove old files if needed) cache_dir = get_screenshots_dir() if cache_dir.exists(): _manage_cache_size(cache_dir, max_cache_gb) # Print path for user feedback (useful in interactive sessions) final_path_str = str(final_path) if verbose: try: if category == "stderr": print(f"📸 stderr: {final_path_str}") else: print(f"📸 stdout: {final_path_str}") except: # In case print fails in some environments pass return final_path_str
def take_screenshot( output_path: str = None, jpeg: bool = True, quality: int = 85 ) -> Optional[str]: """ Take a single screenshot (simple interface). Parameters ---------- output_path : str, optional Where to save the screenshot jpeg : bool Use JPEG format (True) or PNG (False) quality : int JPEG quality (1-100) Returns ------- str or None Path to saved screenshot """ return _manager.take_single_screenshot(output_path, jpeg, quality) def start_monitor( output_dir: Optional[str] = None, interval: float = 1.0, jpeg: bool = True, quality: int = 60, on_capture=None, on_error=None, verbose: bool = True, monitor_id: int = 0, capture_all: bool = False, ) -> ScreenshotWorker: """ Start continuous screenshot monitoring. Parameters ---------- output_dir : str, optional Directory for screenshots. Defaults to the canonical ``$SCITEX_DIR/capture/runtime/screenshots/`` location when unset. interval : float Seconds between captures jpeg : bool Use JPEG compression quality : int JPEG quality (1-100) on_capture : callable, optional Function called with filepath after each capture on_error : callable, optional Function called with exception on errors verbose : bool Print status messages monitor_id : int Monitor number to capture (0-based index, default: 0 for primary monitor) capture_all : bool If True, capture all monitors combined (default: False) Returns ------- ScreenshotWorker The worker instance Examples -------- >>> # Simple monitoring >>> capture.start() >>> # With event hooks >>> capture.start( ... on_capture=lambda path: print(f"Saved: {path}"), ... on_error=lambda e: logging.error(e) ... ) >>> # Detect specific screen content >>> def check_error_dialog(path): ... if "error" in analyze_image(path): ... send_alert(f"Error detected: {path}") >>> capture.start(on_capture=check_error_dialog) """ # Resolve default output directory lazily to honour $SCITEX_DIR. if output_dir is None: output_dir = str(get_screenshots_dir()) else: output_dir = os.path.expanduser(output_dir) return _manager.start_capture( output_dir=output_dir, interval=interval, jpeg=jpeg, quality=quality, on_capture=on_capture, on_error=on_error, verbose=verbose, monitor_id=monitor_id, capture_all=capture_all, ) def stop_monitor(): """Stop continuous screenshot monitoring.""" _manager.stop_capture() def _is_in_exception_context() -> bool: """ Check if we're currently in an exception handler. """ import sys # Check if there's an active exception exc_info = sys.exc_info() return exc_info[0] is not None def _detect_category(filepath: str) -> str: """ Detect screenshot category based on content. Simple heuristic based on common error indicators. """ try: # Try OCR-based detection if available from PIL import Image img = Image.open(filepath) # Simple color-based heuristics # Red dominant = likely error # Yellow/orange dominant = likely warning pixels = img.convert("RGB").getdata() red_count = sum(1 for r, g, b in pixels if r > 200 and g < 100 and b < 100) yellow_count = sum(1 for r, g, b in pixels if r > 200 and g > 150 and b < 100) total_pixels = len(pixels) red_ratio = red_count / total_pixels if total_pixels > 0 else 0 yellow_ratio = yellow_count / total_pixels if total_pixels > 0 else 0 # Thresholds for detection if red_ratio > 0.05: # More than 5% red pixels return "error" elif yellow_ratio > 0.05: # More than 5% yellow pixels return "warning" except: pass # Check filename for common error keywords filename_lower = str(filepath).lower() if any(word in filename_lower for word in ["error", "fail", "exception", "crash"]): return "stderr" elif any(word in filename_lower for word in ["warn", "alert", "caution"]): return "stderr" # Warnings also go to stderr return "stdout" def _add_message_metadata(filepath: str, message: str): """Add message as metadata to image file.""" try: # Try to add EXIF comment using PIL from PIL import Image img = Image.open(filepath) # Add comment to image metadata exif = img.getexif() exif[0x9286] = message # UserComment EXIF tag # Save with metadata img.save(filepath, exif=exif) except: # If PIL not available, create companion text file text_path = Path(filepath).with_suffix(".txt") text_path.write_text(f"{datetime.now().isoformat()}: {message}\n") # Convenience exports __all__ = [ "capture", "take_screenshot", "start_monitor", "stop_monitor", ] # EOF