CrossFire

Complete Technical Documentation

Version: CrossFire 1.0 - BlackBase©

Table of Contents

1. Architecture Overview

CrossFire is a single-file, zero-dependency-install universal package manager that orchestrates multiple system package managers (pip, npm, brew, apt, etc.) through a unified interface.

Core Design Principles

File Structure

crossfire.py
├── Imports & Constants (lines 1-50)
├── OS Detection (lines 51-80)
├── Logging System (lines 81-120)
├── Database Layer (lines 121-200)
├── Progress/UI Components (lines 201-280)
├── Network Tools (lines 281-400)
├── Search Engine (lines 401-650)
├── Command Execution (lines 651-720)
├── Manager Integrations (lines 721-950)
├── Install/Remove Logic (lines 951-1150)
├── System Tools (lines 1151-1400)
├── CLI Parser (lines 1401-1600)
├── Main Router (lines 1601-1800)
└── Entry Point (lines 1801-1820)

2. Global Configuration & Constants

Version & URLs

__version__ = "CrossFire 1.0 - BlackBase"
DEFAULT_UPDATE_URL = "https://raw.githubusercontent.com/crossfire-pm/crossfire-launcher/main/crossfire.py"

Directory Structure

CROSSFIRE_DIR = Path.home() / ".crossfire"        # ~/.crossfire/
CROSSFIRE_DB = CROSSFIRE_DIR / "packages.db"      # SQLite database
CROSSFIRE_CACHE = CROSSFIRE_DIR / "cache"         # Download cache

Initialization: Directories created with exist_ok=True on import. No permission checks or error handling for read-only filesystems.

3. OS & Platform Detection

Core Detection Logic

OS_NAME = platform.system()          # "Windows", "Darwin", "Linux"
ARCH = platform.architecture()[0]    # "64bit", "32bit"

# Distribution Detection (with fallback)
try:
    import distro
    DISTRO_NAME = distro.id() or "linux"
    DISTRO_VERSION = distro.version() or ""
except Exception:
    # Fallback logic for each OS
    if OS_NAME == "Darwin":
        DISTRO_NAME = "macOS"
        DISTRO_VERSION = platform.mac_ver()[0]
    elif OS_NAME == "Windows":
        DISTRO_NAME = "Windows"  
        DISTRO_VERSION = platform.version()
    else:
        DISTRO_NAME = OS_NAME.lower()
        DISTRO_VERSION = ""

Usage Throughout Code:

4. Logging & Console System

Colors Class

class Colors:
    INFO = "\033[94m"      # Blue
    SUCCESS = "\033[92m"   # Green  
    WARNING = "\033[93m"   # Yellow
    ERROR = "\033[91m"     # Red
    MUTED = "\033[90m"     # Gray
    BOLD = "\033[1m"       # Bold
    CYAN = "\033[96m"      # Cyan
    RESET = "\033[0m"      # Reset

Logger Implementation

class Logger:
    def __init__(self):
        self.quiet = False      # Suppress INFO/WARNING/SUCCESS
        self.verbose = False    # Show debug info & tracebacks  
        self.json_mode = False  # Suppress all cprint output

    def cprint(self, text, color="INFO"):
        # JSON mode: suppress all output
        if self.json_mode:
            return
        
        # Quiet mode: only show ERROR
        if self.quiet and color in ["INFO", "WARNING", "SUCCESS"]:
            return
            
        # TTY detection for colors
        if not sys.stdout.isatty():
            sys.stdout.write(f"{text}\n")
            return
            
        color_code = getattr(Colors, color.upper(), Colors.INFO)
        print(f"{color_code}{text}{Colors.RESET}")

Global Usage:

LOG = Logger()  # Global instance
cprint = LOG.cprint  # Convenience reference used throughout

Behavior Notes:

5. Database System

Schema Design

CREATE TABLE IF NOT EXISTS installed_packages (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    version TEXT,
    manager TEXT NOT NULL,
    install_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    install_command TEXT,
    UNIQUE(name, manager)
);

PackageDB Implementation

class PackageDB:
    def __init__(self, db_path: Path = CROSSFIRE_DB):
        self.db_path = db_path
        self._init_db()  # Creates table if not exists
    
    def add_package(self, name: str, version: str, manager: str, command: str = ""):
        """Records successful installation with INSERT OR REPLACE"""
        conn = sqlite3.connect(self.db_path)
        try:
            conn.execute('''
                INSERT OR REPLACE INTO installed_packages 
                (name, version, manager, install_command) 
                VALUES (?, ?, ?, ?)
            ''', (name, version or "unknown", manager, command))
            conn.commit()
        finally:
            conn.close()
    
    def remove_package(self, name: str, manager: str = None):
        """Removes package record(s)"""
        conn = sqlite3.connect(self.db_path)
        try:
            if manager:
                conn.execute('DELETE FROM installed_packages WHERE name = ? AND manager = ?', 
                           (name, manager))
            else:
                conn.execute('DELETE FROM installed_packages WHERE name = ?', (name,))
            conn.commit()
        finally:
            conn.close()
    
    def get_installed_packages(self, manager: str = None) -> List[Dict]:
        """Returns list of installed packages, optionally filtered by manager"""
        conn = sqlite3.connect(self.db_path)
        try:
            if manager:
                cursor = conn.execute('''
                    SELECT name, version, manager, install_date 
                    FROM installed_packages 
                    WHERE manager = ? 
                    ORDER BY install_date DESC
                ''', (manager,))
            else:
                cursor = conn.execute('''
                    SELECT name, version, manager, install_date 
                    FROM installed_packages 
                    ORDER BY install_date DESC
                ''')
            
            columns = [desc[0] for desc in cursor.description]
            return [dict(zip(columns, row)) for row in cursor.fetchall()]
        finally:
            conn.close()
    
    def is_installed(self, name: str, manager: str = None) -> bool:
        """Checks if package is recorded as installed"""
        conn = sqlite3.connect(self.db_path)
        try:
            if manager:
                cursor = conn.execute(
                    'SELECT COUNT(*) FROM installed_packages WHERE name = ? AND manager = ?',
                    (name, manager)
                )
            else:
                cursor = conn.execute(
                    'SELECT COUNT(*) FROM installed_packages WHERE name = ?',
                    (name,)
                )
            return cursor.fetchone()[0] > 0
        finally:
            conn.close()

Global Instance:

package_db = PackageDB()  # Used throughout the application

Key Behaviors:

6. Progress Tracking System

ProgressBar Implementation

class ProgressBar:
    def __init__(self, total, description, unit):
        self.total = total
        self.description = description  
        self.unit = unit               # "B", "packages", "hosts", etc.
        self.current = 0
        self.start_time = time.time()
        self.lock = threading.Lock()   # Thread-safe updates
        self.bar_length = 50           # Character width of progress bar
        self.terminal_width = shutil.get_terminal_size((80, 20)).columns

    def update(self, step=1):
        """Thread-safe progress increment"""
        with self.lock:
            self.current = min(self.current + step, self.total)
            self._draw_bar()

    def _draw_bar(self):
        """Renders progress bar with ETA and speed calculations"""
        if LOG.json_mode or not sys.stdout.isatty():
            return
        
        progress = self.current / self.total if self.total > 0 else 0
        percent = progress * 100
        filled_length = int(self.bar_length * progress)
        bar = '█' * filled_length + '-' * (self.bar_length - filled_length)
        
        elapsed = time.time() - self.start_time
        eta_str = "N/A"
        speed_str = ""
        
        if progress > 0 and elapsed > 0:
            remaining = (elapsed / progress) - elapsed if progress < 1 else 0
            if remaining > 3600:
                eta_str = f"{remaining/3600:.1f}h"
            elif remaining > 60:
                eta_str = f"{remaining/60:.1f}m"
            else:
                eta_str = f"{remaining:.1f}s"
            
            # Speed calculation for bytes
            if self.unit == "B" and elapsed > 0:
                speed = self.current / elapsed
                if speed > 1024**2:
                    speed_str = f" @ {speed/1024**2:.1f} MB/s"
                elif speed > 1024:
                    speed_str = f" @ {speed/1024:.1f} KB/s"
                else:
                    speed_str = f" @ {speed:.0f} B/s"
        
        full_msg = f"{self.description}: |{bar}| {percent:.1f}% ({self.current}/{self.total} {self.unit}){speed_str} - ETA: {eta_str}"
        
        # Truncate if too long for terminal
        if len(full_msg) > self.terminal_width:
            full_msg = full_msg[:self.terminal_width - 4] + "..."
        
        sys.stdout.write(f"\r{full_msg}")
        sys.stdout.flush()

    def finish(self):
        """Completes progress bar and moves to new line"""
        if not LOG.json_mode and sys.stdout.isatty():
            sys.stdout.write("\n")
            sys.stdout.flush()

Usage Patterns:

7. Network Testing & Speed Tools

SpeedTest Class

Download Speed Testing

@staticmethod
def test_download_speed(url: Optional[str] = None, duration: int = 10) -> Dict[str, Any]:
    """Tests internet download speed using public speed test files"""
    
    # Default test URLs (fallback chain)
    test_urls = [
        "http://speedtest.tele2.net/10MB.zip",
        "https://proof.ovh.net/files/10Mb.dat", 
        "http://ipv4.download.thinkbroadband.com/10MB.zip"
    ]
    
    test_url = url or test_urls[0]
    downloaded_bytes = 0
    start_time = time.time()
    
    try:
        request = urllib.request.Request(test_url)
        with urllib.request.urlopen(request, timeout=30) as response:
            total_size = int(response.info().get("Content-Length", 10*1024*1024))
            tracker = ProgressBar(min(total_size, 50*1024*1024), "Speed Test", "B")
            
            while time.time() - start_time < duration:
                chunk = response.read(32768)  # 32KB chunks
                if not chunk:
                    break
                downloaded_bytes += len(chunk)
                tracker.update(len(chunk))
                
            tracker.finish()
        
        elapsed_time = time.time() - start_time
        download_rate_mbps = (downloaded_bytes * 8) / elapsed_time / 1000000 if elapsed_time > 0 else 0
        
        return {
            "ok": True,
            "download_mbps": round(download_rate_mbps, 2),
            "downloaded_mb": round(downloaded_bytes / 1024 / 1024, 2),
            "elapsed_seconds": round(elapsed_time, 2),
        }
        
    except Exception as e:
        return {"ok": False, "error": str(e)}

Ping Testing

@staticmethod
def ping_test() -> Dict[str, Any]:
    """Tests network latency to multiple hosts"""
    
    hosts = ["google.com", "github.com", "cloudflare.com", "8.8.8.8"]
    results = {}
    progress = ProgressBar(len(hosts), "Ping test", "hosts")
    
    for host in hosts:
        try:
            # Platform-specific ping commands
            if OS_NAME == "Windows":
                command = ["ping", "-n", "1", "-w", "5000", host]
            else:
                command = ["ping", "-c", "1", "-W", "5", host]
                
            process = subprocess.run(command, capture_output=True, text=True, timeout=10)
            output = process.stdout
            
            # Parse latency from output (platform-specific regex)
            if OS_NAME == "Windows":
                latency_match = re.search(r"time[<=](\d+)ms", output)
            else:
                latency_match = re.search(r"time=(\d+\.?\d*)\s*ms", output)
            
            if latency_match:
                latency = float(latency_match.group(1))
                results[host] = {"ok": True, "latency_ms": latency}
            else:
                results[host] = {"ok": False, "msg": "Could not parse ping output"}
                
        except subprocess.TimeoutExpired:
            results[host] = {"ok": False, "msg": "Timed out"}
        except Exception as e:
            results[host] = {"ok": False, "msg": str(e)}
        
        progress.update()
        
    progress.finish()
    return results

Limitations:

8. Search Engine Implementation

SearchResult Data Model

@dataclass
class SearchResult:
    name: str
    description: str  
    version: str
    manager: str                    # "pip", "npm", "brew", etc.
    homepage: Optional[str] = None
    relevance_score: float = 0.0    # 0-100+ scoring for ranking
    
    def to_dict(self):
        return asdict(self)  # For JSON serialization

RealSearchEngine Architecture

class RealSearchEngine:
    def __init__(self):
        self.cache_timeout = 3600  # 1 hour for cached data
        self.session = requests.Session()
        self.session.timeout = 30
    
    def search(self, query: str, manager: Optional[str] = None, limit: int = 20) -> List[SearchResult]:
        """Orchestrates concurrent search across multiple package managers"""
        
        all_results = []
        installed = _detect_installed_managers()  # Only search available managers
        
        # Target manager selection
        if manager:
            target_managers = [manager.lower()] if installed.get(manager.lower()) else []
        else:
            target_managers = [m for m, ok in installed.items() if ok]
        
        if not target_managers:
            return []
        
        # Manager-specific search function mapping
        manager_funcs = {
            "pip": self._search_pypi,
            "npm": self._search_npm, 
            "brew": self._search_brew,
            "apt": self._search_apt,
            "dnf": self._search_dnf,
            "yum": self._search_yum,
            "pacman": self._search_pacman,
            "zypper": self._search_zypper,
            "apk": self._search_apk,
            "choco": self._search_choco,
            "winget": self._search_winget,
            "snap": self._search_snap,
            "flatpak": self._search_flatpak,
        }
        
        # Concurrent execution with timeout
        with _fut.ThreadPoolExecutor(max_workers=5) as executor:
            future_to_manager = {}
            for mgr in target_managers:
                func = manager_funcs.get(mgr)
                if func:
                    future_to_manager[executor.submit(func, query)] = mgr
            
            progress = ProgressBar(len(future_to_manager), "Searching repositories", "repos")
            for future in _fut.as_completed(future_to_manager, timeout=120):
                mgr = future_to_manager[future]
                try:
                    results = future.result() or []
                    all_results.extend(results)
                except Exception as e:
                    cprint(f"{mgr.upper()}: Search failed - {e}", "WARNING")
                finally:
                    progress.update()
            progress.finish()
        
        # Sort by relevance and limit results
        all_results.sort(key=lambda x: x.relevance_score, reverse=True)
        return all_results[:limit]

Repository-Specific Search Implementations

PyPI Search

def _search_pypi(self, query: str) -> List[SearchResult]:
    """PyPI direct package lookup (no full-text search available)"""
    try:
        url = f"https://pypi.org/pypi/{query}/json"
        r = self.session.get(url, timeout=10)
        if r.status_code == 200:
            return [self._parse_pypi_info(r.json())]
    except:
        pass
    return []

def _parse_pypi_info(self, data: Dict[str, Any]) -> SearchResult:
    """Converts PyPI API response to SearchResult"""
    info = data.get("info", {})
    return SearchResult(
        name=info.get("name", ""),
        description=info.get("summary", "")[:200],  # Truncate long descriptions
        version=info.get("version", "unknown"),
        manager="pip",
        homepage=info.get("home_page") or info.get("project_url"),
        relevance_score=50  # Fixed score for direct matches
    )

NPM Search

      def _search_npm(self, query: str) -> List[SearchResult]:
      """Uses NPM registry search API with quality scoring"""
      try:
      url = "https://registry.npmjs.org/-/v1/search"
      r = self.session.get(url, params={"text": query, "size": 10}, timeout=10)
      if r.status_code == 200:
          data = r.json().get("objects", [])
          return [self._parse_npm_info(p) for p in data]
      except:
          pass
      return []
def _parse_npm_info(self, package: Dict[str, Any]) -> SearchResult:
    """Converts NPM API response to SearchResult"""
    pkg = package.get("package", {})
    return SearchResult(
        name=pkg.get("name", ""),
        description=pkg.get("description", "")[:200],
        version=pkg.get("version", "unknown"),
        manager="npm",
        homepage=pkg.get("links", {}).get("homepage"),
        relevance_score=package.get("score", {}).get("final", 0) * 100,
    )
    
def _search_brew(self, query: str) -> List[SearchResult]:
    """Uses Homebrew API (only exact matches for now)"""
    try:
        url = f"https://formulae.brew.sh/api/formula/{query}.json"
        r = self.session.get(url, timeout=10)
        if r.status_code == 200:
            return [self._parse_brew_info(r.json())]
    except:
        pass
    return []
    
def _parse_brew_info(self, data: Dict[str, Any]) -> SearchResult:
    """Converts Homebrew API response to SearchResult"""
    return SearchResult(
        name=data.get("name", ""),
        description=data.get("desc", "")[:200],
        version=data.get("versions", {}).get("stable", "unknown"),
        manager="brew",
        homepage=data.get("homepage"),
        relevance_score=50,
    )
    
def _search_apt(self, query: str) -> List[SearchResult]:
    """Simulates a search for apt (no real API)"""
    # ... placeholder
    return []

def _search_dnf(self, query: str) -> List[SearchResult]:
    """Simulates a search for dnf (no real API)"""
    # ... placeholder
    return []

def _search_yum(self, query: str) -> List[SearchResult]:
    """Simulates a search for yum (no real API)"""
    # ... placeholder
    return []

def _search_pacman(self, query: str) -> List[SearchResult]:
    """Simulates a search for pacman (no real API)"""
    # ... placeholder
    return []
    
def _search_zypper(self, query: str) -> List[SearchResult]:
    """Simulates a search for zypper (no real API)"""
    # ... placeholder
    return []
    
def _search_apk(self, query: str) -> List[SearchResult]:
    """Simulates a search for apk (no real API)"""
    # ... placeholder
    return []
    
def _search_choco(self, query: str) -> List[SearchResult]:
    """Simulates a search for choco (no real API)"""
    # ... placeholder
    return []

def _search_winget(self, query: str) -> List[SearchResult]:
    """Simulates a search for winget (no real API)"""
    # ... placeholder
    return []

def _search_snap(self, query: str) -> List[SearchResult]:
    """Simulates a search for snap (no real API)"""
    # ... placeholder
    return []

def _search_flatpak(self, query: str) -> List[SearchResult]:
    """Simulates a search for flatpak (no real API)"""
    # ... placeholder
    return []
    
# Missing functions for other managers...
      

9. Command Execution Framework

CommandRunner Class

class CommandRunner:
    def __init__(self, logger: Logger, verbose: bool = False):
        self.logger = logger
        self.verbose = verbose
    
    def run_command(self, command: List[str], cwd: Optional[Path] = None, shell: bool = False, allow_fail: bool = False) -> Dict[str, Any]:
        """Runs a shell command and captures output"""
        try:
            result = subprocess.run(
                command,
                cwd=cwd,
                shell=shell,
                capture_output=True,
                text=True,
                check=not allow_fail,
                timeout=300
            )
            return {
                "ok": True,
                "stdout": result.stdout,
                "stderr": result.stderr,
                "returncode": result.returncode
            }
        except subprocess.CalledProcessError as e:
            self.logger.cprint(f"Command failed with exit code {e.returncode}:\n{e.stderr}", "ERROR")
            return {
                "ok": False,
                "stdout": e.stdout,
                "stderr": e.stderr,
                "returncode": e.returncode
            }
        except FileNotFoundError:
            self.logger.cprint(f"Command not found: {command[0]}", "ERROR")
            return {"ok": False, "stderr": "Command not found", "returncode": 127}
        except Exception as e:
            self.logger.cprint(f"An unexpected error occurred: {e}", "ERROR")
            return {"ok": False, "stderr": str(e), "returncode": 1}

Common Usage:

10. Package Manager Detection

def _detect_installed_managers() -> Dict[str, bool]:
    """Checks for the presence of common package managers"""
    
    managers = {
        "pip": shutil.which("pip") is not None,
        "npm": shutil.which("npm") is not None,
        "brew": shutil.which("brew") is not None,
        "apt": shutil.which("apt") is not None,
        "dnf": shutil.which("dnf") is not None,
        "yum": shutil.which("yum") is not None,
        "pacman": shutil.which("pacman") is not None,
        "zypper": shutil.which("zypper") is not None,
        "apk": shutil.which("apk") is not None,
        "choco": shutil.which("choco") is not None,
        "winget": shutil.which("winget") is not None,
        "snap": shutil.which("snap") is not None,
        "flatpak": shutil.which("flatpak") is not None,
    }
    return managers

Note: This relies solely on `shutil.which()`, which may fail in some environments or with non-standard PATH configurations.

11. Installation & Removal Orchestration

InstallManager Class

class InstallManager:
    def __init__(self, runner: CommandRunner, db: PackageDB, logger: Logger):
        self.runner = runner
        self.db = db
        self.logger = logger
        
    def install(self, package_name: str, manager_name: str, version: Optional[str] = None) -> bool:
        """Installs a package using the specified manager"""
        self.logger.cprint(f"Attempting to install '{package_name}' via {manager_name.upper()}...")
        
        # Mapping of manager to command templates
        commands = {
            "pip": ["python", "-m", "pip", "install", package_name],
            "npm": ["npm", "install", "-g", package_name],
            "brew": ["brew", "install", package_name],
            "apt": ["sudo", "apt-get", "install", "-y", package_name],
            "dnf": ["sudo", "dnf", "install", "-y", package_name],
            "yum": ["sudo", "yum", "install", "-y", package_name],
            "pacman": ["sudo", "pacman", "-S", "--noconfirm", package_name],
            "zypper": ["sudo", "zypper", "--non-interactive", "install", package_name],
            "apk": ["sudo", "apk", "add", package_name],
            "choco": ["choco", "install", package_name, "-y"],
            "winget": ["winget", "install", package_name, "--accept-source-agreements", "--accept-package-agreements"],
            "snap": ["sudo", "snap", "install", package_name],
            "flatpak": ["flatpak", "install", "-y", "flathub", package_name],
        }
        
        # Add version if specified and supported
        if version:
            if manager_name == "pip":
                commands["pip"][-1] += f"=={version}"
            # Other managers don't have a standardized way to specify version
            
        cmd = commands.get(manager_name)
        if not cmd:
            self.logger.cprint(f"Unknown or unsupported package manager: {manager_name}", "ERROR")
            return False
            
        result = self.runner.run_command(cmd)
        
        if result["ok"]:
            self.logger.cprint(f"'{package_name}' installed successfully via {manager_name.upper()}.", "SUCCESS")
            # Record success in DB
            self.db.add_package(package_name, version, manager_name, " ".join(cmd))
            return True
        else:
            self.logger.cprint(f"Installation of '{package_name}' failed.", "ERROR")
            return False

    def remove(self, package_name: str, manager_name: str) -> bool:
        """Removes a package using the specified manager"""
        self.logger.cprint(f"Attempting to remove '{package_name}' via {manager_name.upper()}...", "WARNING")
        
        commands = {
            "pip": ["python", "-m", "pip", "uninstall", "-y", package_name],
            "npm": ["npm", "uninstall", "-g", package_name],
            "brew": ["brew", "uninstall", package_name],
            "apt": ["sudo", "apt-get", "remove", "-y", package_name],
            "dnf": ["sudo", "dnf", "remove", "-y", package_name],
            "yum": ["sudo", "yum", "remove", "-y", package_name],
            "pacman": ["sudo", "pacman", "-Rns", "--noconfirm", package_name],
            "zypper": ["sudo", "zypper", "--non-interactive", "remove", package_name],
            "apk": ["sudo", "apk", "del", package_name],
            "choco": ["choco", "uninstall", package_name, "-y"],
            "winget": ["winget", "uninstall", package_name, "--accept-source-agreements"],
            "snap": ["sudo", "snap", "remove", package_name],
            "flatpak": ["flatpak", "uninstall", "-y", package_name],
        }
        
        cmd = commands.get(manager_name)
        if not cmd:
            self.logger.cprint(f"Unknown or unsupported package manager: {manager_name}", "ERROR")
            return False
        
        result = self.runner.run_command(cmd, allow_fail=True) # allow uninstall to fail gracefully
        
        if result["ok"] or "not installed" in result["stderr"].lower():
            self.logger.cprint(f"'{package_name}' removed successfully via {manager_name.upper()}.", "SUCCESS")
            # Remove from DB
            self.db.remove_package(package_name, manager_name)
            return True
        else:
            self.logger.cprint(f"Removal of '{package_name}' failed.", "ERROR")
            return False

12. System Maintenance & Cleanup

Maintenance Class

class SystemMaintenance:
    def __init__(self, runner: CommandRunner, logger: Logger):
        self.runner = runner
        self.logger = logger
        
    def clean(self):
        """Runs cleanup commands for all available managers"""
        self.logger.cprint("Running system maintenance and cleanup...", "INFO")
        managers = _detect_installed_managers()
        
        clean_commands = {
            "apt": ["sudo", "apt-get", "autoclean", "-y"],
            "dnf": ["sudo", "dnf", "clean", "all"],
            "yum": ["sudo", "yum", "clean", "all"],
            "pacman": ["sudo", "pacman", "-Scc", "--noconfirm"], # CAUTION: removes all cached packages
            "zypper": ["sudo", "zypper", "clean"],
            "apk": ["sudo", "apk", "cache", "--purge"],
            "npm": ["npm", "cache", "clean", "--force"],
            "pip": ["python", "-m", "pip", "cache", "purge"],
            "brew": ["brew", "cleanup"],
            "choco": ["choco", "cache", "purge"],
            "winget": ["winget", "clear", "--cache"],
        }
        
        for mgr, cmd in clean_commands.items():
            if managers.get(mgr):
                self.logger.cprint(f"  > Cleaning cache for {mgr.upper()}...", "MUTED")
                self.runner.run_command(cmd, allow_fail=True)
                
        self.logger.cprint("System cleanup complete.", "SUCCESS")
        return True

13. Self-Update System

UpdateManager Class

class UpdateManager:
    def __init__(self, runner: CommandRunner, logger: Logger):
        self.runner = runner
        self.logger = logger
        
    def update_self(self, url: str = DEFAULT_UPDATE_URL):
        """Downloads the latest version and replaces the current script"""
        try:
            self.logger.cprint("Checking for updates...", "INFO")
            response = requests.get(url, timeout=15)
            response.raise_for_status()
            
            # Simple version check (no formal versioning, just a string match)
            # This is not a robust method and can be improved
            latest_version_line = next((line for line in response.text.splitlines() if "__version__" in line), None)
            
            if latest_version_line and latest_version_line.strip() != f'__version__ = "{__version__}"':
                self.logger.cprint("New version available. Downloading...", "INFO")
                # Replace the current script
                with open(__file__, "w", encoding="utf-8") as f:
                    f.write(response.text)
                self.logger.cprint("Update successful! Please restart the script.", "SUCCESS")
            else:
                self.logger.cprint("You are already using the latest version.", "INFO")
            
            return True
        except Exception as e:
            self.logger.cprint(f"Update failed: {e}", "ERROR")
            return False

14. System Information & Health Checks

SysInfo Class

class SysInfo:
    @staticmethod
    def get_info() -> Dict[str, Any]:
        """Gathers basic system information"""
        return {
            "system": OS_NAME,
            "distribution": DISTRO_NAME,
            "distribution_version": DISTRO_VERSION,
            "architecture": ARCH,
            "python_version": platform.python_version(),
            "user": os.getlogin(),
            "home_dir": str(Path.home()),
            "crossfire_dir": str(CROSSFIRE_DIR),
            "version": __version__,
        }

15. Setup & Launcher Installation

SetupManager Class

class SetupManager:
    def __init__(self, runner: CommandRunner, logger: Logger):
        self.runner = runner
        self.logger = logger
        
    def install_launcher(self):
        """Installs the script as a system-wide command"""
        # This is a highly simplified and OS-specific example.
        # It needs significant improvement for cross-platform compatibility.
        self.logger.cprint("Installing CrossFire as a system-wide command...", "INFO")
        
        try:
            # Find a suitable bin directory
            if OS_NAME in ["Linux", "Darwin"]:
                bin_path = Path.home() / ".local" / "bin"
                os.makedirs(bin_path, exist_ok=True)
                target_path = bin_path / "crossfire"
                
                # Create a symlink
                if target_path.exists():
                    os.remove(target_path)
                os.symlink(__file__, target_path)
                
                # Prompt user to add to PATH
                self.logger.cprint(f"Successfully created symlink at {target_path}", "SUCCESS")
                self.logger.cprint("You may need to add this to your PATH environment variable:", "INFO")
                self.logger.cprint(f'  export PATH="$HOME/.local/bin:$PATH"', "MUTED")
            
            elif OS_NAME == "Windows":
                # Windows is more complex, requiring modification of the PATH or a .bat/.ps1 file
                self.logger.cprint("Automatic installation on Windows is not yet supported. Please add the script directory to your PATH manually.", "WARNING")
                return False
                
            return True
        except Exception as e:
            self.logger.cprint(f"Launcher installation failed: {e}", "ERROR")
            return False

16. Package Management Features

Features:

17. Command Line Interface

Argument Parsing

def parse_args():
    parser = argparse.ArgumentParser(
        description="CrossFire Universal Package Manager",
        formatter_class=argparse.RawTextHelpFormatter,
    )
    
    # ... argument definitions for each command ...
    
    return parser.parse_args()

Example Command: `crossfire install requests --manager pip`

# Inside parse_args()
install_parser = subparsers.add_parser('install', help='Install a package')
install_parser.add_argument('package', help='Name of the package to install')
install_parser.add_argument('--manager', required=True, help='The package manager to use (e.g., pip, npm, brew)')
install_parser.add_argument('--version', help='The specific version to install')

18. Main Entry Point & Routing

def main():
    args = parse_args()
    
    # Global setup
    global LOG
    LOG.quiet = args.quiet
    LOG.verbose = args.verbose
    LOG.json_mode = args.json_output
    
    # Instantiate core components
    runner = CommandRunner(LOG, verbose=args.verbose)
    db = PackageDB()
    installer = InstallManager(runner, db, LOG)
    searcher = RealSearchEngine()
    
    if args.command == 'install':
        installer.install(args.package, args.manager, args.version)
    elif args.command == 'search':
        # ... search logic ...
    # ... and so on for other commands
    
if __name__ == "__main__":
    main()

19. Error Handling & Edge Cases

Python-Specific Issues

General Limitations

20. Security Considerations

Key Risks

21. Known Bugs & Limitations

22. Extension Guide

To add a new package manager, follow these steps:

  1. Add the manager name to the `_detect_installed_managers()` function.
  2. Implement `_search_[manager]()` and `_parse_[manager]_info()` methods in `RealSearchEngine`.
  3. Add command templates to `InstallManager.install()` and `InstallManager.remove()`.
  4. Add a `clean` command to `SystemMaintenance.clean()`.
  5. Update the CLI parser in `parse_args()` if new flags are needed.

23. Complete Command Reference

crossfire install <package> --manager <manager> [--version <version>]
crossfire remove <package> --manager <manager>
crossfire search <query> [--manager <manager>]
crossfire list [--manager <manager>]
crossfire clean
crossfire update
crossfire self-update
crossfire info
crossfire --help