1. Architecture Overview
CrossFire is a single-file, zero-dependency-install universal package manager that orchestrates multiple system package managers (pip, npm, brew, apt, etc.) through a unified interface.
Core Design Principles
- Single File Distribution: Self-contained executable Python script
- Zero External Config: All configuration stored in ~/.crossfire/
- Multi-Manager Support: Handles 13+ package managers across platforms
- Real Search: Actual API queries to PyPI, NPM, Homebrew
- Package Tracking: SQLite database tracks installations
- Self-Updating: Can update itself via HTTPS with verification
File Structure
crossfire.py
├── Imports & Constants (lines 1-50)
├── OS Detection (lines 51-80)
├── Logging System (lines 81-120)
├── Database Layer (lines 121-200)
├── Progress/UI Components (lines 201-280)
├── Network Tools (lines 281-400)
├── Search Engine (lines 401-650)
├── Command Execution (lines 651-720)
├── Manager Integrations (lines 721-950)
├── Install/Remove Logic (lines 951-1150)
├── System Tools (lines 1151-1400)
├── CLI Parser (lines 1401-1600)
├── Main Router (lines 1601-1800)
└── Entry Point (lines 1801-1820)
2. Global Configuration & Constants
Version & URLs
__version__ = "CrossFire 1.0 - BlackBase"
DEFAULT_UPDATE_URL = "https://raw.githubusercontent.com/crossfire-pm/crossfire-launcher/main/crossfire.py"
Directory Structure
CROSSFIRE_DIR = Path.home() / ".crossfire" # ~/.crossfire/
CROSSFIRE_DB = CROSSFIRE_DIR / "packages.db" # SQLite database
CROSSFIRE_CACHE = CROSSFIRE_DIR / "cache" # Download cache
Initialization: Directories created with exist_ok=True on import. No permission checks or error handling for read-only filesystems.
4. Logging & Console System
Colors Class
class Colors:
INFO = "\033[94m" # Blue
SUCCESS = "\033[92m" # Green
WARNING = "\033[93m" # Yellow
ERROR = "\033[91m" # Red
MUTED = "\033[90m" # Gray
BOLD = "\033[1m" # Bold
CYAN = "\033[96m" # Cyan
RESET = "\033[0m" # Reset
Logger Implementation
class Logger:
def __init__(self):
self.quiet = False # Suppress INFO/WARNING/SUCCESS
self.verbose = False # Show debug info & tracebacks
self.json_mode = False # Suppress all cprint output
def cprint(self, text, color="INFO"):
# JSON mode: suppress all output
if self.json_mode:
return
# Quiet mode: only show ERROR
if self.quiet and color in ["INFO", "WARNING", "SUCCESS"]:
return
# TTY detection for colors
if not sys.stdout.isatty():
sys.stdout.write(f"{text}\n")
return
color_code = getattr(Colors, color.upper(), Colors.INFO)
print(f"{color_code}{text}{Colors.RESET}")
Global Usage:
LOG = Logger() # Global instance
cprint = LOG.cprint # Convenience reference used throughout
Behavior Notes:
- JSON mode completely suppresses console output for machine parsing
- Quiet mode still allows ERROR messages through
- Colors automatically disabled for non-TTY output (pipes, redirects)
5. Database System
Schema Design
CREATE TABLE IF NOT EXISTS installed_packages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
version TEXT,
manager TEXT NOT NULL,
install_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
install_command TEXT,
UNIQUE(name, manager)
);
PackageDB Implementation
class PackageDB:
def __init__(self, db_path: Path = CROSSFIRE_DB):
self.db_path = db_path
self._init_db() # Creates table if not exists
def add_package(self, name: str, version: str, manager: str, command: str = ""):
"""Records successful installation with INSERT OR REPLACE"""
conn = sqlite3.connect(self.db_path)
try:
conn.execute('''
INSERT OR REPLACE INTO installed_packages
(name, version, manager, install_command)
VALUES (?, ?, ?, ?)
''', (name, version or "unknown", manager, command))
conn.commit()
finally:
conn.close()
def remove_package(self, name: str, manager: str = None):
"""Removes package record(s)"""
conn = sqlite3.connect(self.db_path)
try:
if manager:
conn.execute('DELETE FROM installed_packages WHERE name = ? AND manager = ?',
(name, manager))
else:
conn.execute('DELETE FROM installed_packages WHERE name = ?', (name,))
conn.commit()
finally:
conn.close()
def get_installed_packages(self, manager: str = None) -> List[Dict]:
"""Returns list of installed packages, optionally filtered by manager"""
conn = sqlite3.connect(self.db_path)
try:
if manager:
cursor = conn.execute('''
SELECT name, version, manager, install_date
FROM installed_packages
WHERE manager = ?
ORDER BY install_date DESC
''', (manager,))
else:
cursor = conn.execute('''
SELECT name, version, manager, install_date
FROM installed_packages
ORDER BY install_date DESC
''')
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
finally:
conn.close()
def is_installed(self, name: str, manager: str = None) -> bool:
"""Checks if package is recorded as installed"""
conn = sqlite3.connect(self.db_path)
try:
if manager:
cursor = conn.execute(
'SELECT COUNT(*) FROM installed_packages WHERE name = ? AND manager = ?',
(name, manager)
)
else:
cursor = conn.execute(
'SELECT COUNT(*) FROM installed_packages WHERE name = ?',
(name,)
)
return cursor.fetchone()[0] > 0
finally:
conn.close()
Global Instance:
package_db = PackageDB() # Used throughout the application
Key Behaviors:
- INSERT OR REPLACE updates existing entries (by name+manager unique constraint)
- remove_package() without manager removes ALL entries for that package name
- Database operations are not transactional across multiple packages
- No database schema versioning or migration system
6. Progress Tracking System
ProgressBar Implementation
class ProgressBar:
def __init__(self, total, description, unit):
self.total = total
self.description = description
self.unit = unit # "B", "packages", "hosts", etc.
self.current = 0
self.start_time = time.time()
self.lock = threading.Lock() # Thread-safe updates
self.bar_length = 50 # Character width of progress bar
self.terminal_width = shutil.get_terminal_size((80, 20)).columns
def update(self, step=1):
"""Thread-safe progress increment"""
with self.lock:
self.current = min(self.current + step, self.total)
self._draw_bar()
def _draw_bar(self):
"""Renders progress bar with ETA and speed calculations"""
if LOG.json_mode or not sys.stdout.isatty():
return
progress = self.current / self.total if self.total > 0 else 0
percent = progress * 100
filled_length = int(self.bar_length * progress)
bar = '█' * filled_length + '-' * (self.bar_length - filled_length)
elapsed = time.time() - self.start_time
eta_str = "N/A"
speed_str = ""
if progress > 0 and elapsed > 0:
remaining = (elapsed / progress) - elapsed if progress < 1 else 0
if remaining > 3600:
eta_str = f"{remaining/3600:.1f}h"
elif remaining > 60:
eta_str = f"{remaining/60:.1f}m"
else:
eta_str = f"{remaining:.1f}s"
# Speed calculation for bytes
if self.unit == "B" and elapsed > 0:
speed = self.current / elapsed
if speed > 1024**2:
speed_str = f" @ {speed/1024**2:.1f} MB/s"
elif speed > 1024:
speed_str = f" @ {speed/1024:.1f} KB/s"
else:
speed_str = f" @ {speed:.0f} B/s"
full_msg = f"{self.description}: |{bar}| {percent:.1f}% ({self.current}/{self.total} {self.unit}){speed_str} - ETA: {eta_str}"
# Truncate if too long for terminal
if len(full_msg) > self.terminal_width:
full_msg = full_msg[:self.terminal_width - 4] + "..."
sys.stdout.write(f"\r{full_msg}")
sys.stdout.flush()
def finish(self):
"""Completes progress bar and moves to new line"""
if not LOG.json_mode and sys.stdout.isatty():
sys.stdout.write("\n")
sys.stdout.flush()
Usage Patterns:
- Network downloads (bytes with speed calculation)
- Multi-step operations (package installations, manager updates)
- Search operations across multiple repositories
7. Network Testing & Speed Tools
SpeedTest Class
Download Speed Testing
@staticmethod
def test_download_speed(url: Optional[str] = None, duration: int = 10) -> Dict[str, Any]:
"""Tests internet download speed using public speed test files"""
# Default test URLs (fallback chain)
test_urls = [
"http://speedtest.tele2.net/10MB.zip",
"https://proof.ovh.net/files/10Mb.dat",
"http://ipv4.download.thinkbroadband.com/10MB.zip"
]
test_url = url or test_urls[0]
downloaded_bytes = 0
start_time = time.time()
try:
request = urllib.request.Request(test_url)
with urllib.request.urlopen(request, timeout=30) as response:
total_size = int(response.info().get("Content-Length", 10*1024*1024))
tracker = ProgressBar(min(total_size, 50*1024*1024), "Speed Test", "B")
while time.time() - start_time < duration:
chunk = response.read(32768) # 32KB chunks
if not chunk:
break
downloaded_bytes += len(chunk)
tracker.update(len(chunk))
tracker.finish()
elapsed_time = time.time() - start_time
download_rate_mbps = (downloaded_bytes * 8) / elapsed_time / 1000000 if elapsed_time > 0 else 0
return {
"ok": True,
"download_mbps": round(download_rate_mbps, 2),
"downloaded_mb": round(downloaded_bytes / 1024 / 1024, 2),
"elapsed_seconds": round(elapsed_time, 2),
}
except Exception as e:
return {"ok": False, "error": str(e)}
Ping Testing
@staticmethod
def ping_test() -> Dict[str, Any]:
"""Tests network latency to multiple hosts"""
hosts = ["google.com", "github.com", "cloudflare.com", "8.8.8.8"]
results = {}
progress = ProgressBar(len(hosts), "Ping test", "hosts")
for host in hosts:
try:
# Platform-specific ping commands
if OS_NAME == "Windows":
command = ["ping", "-n", "1", "-w", "5000", host]
else:
command = ["ping", "-c", "1", "-W", "5", host]
process = subprocess.run(command, capture_output=True, text=True, timeout=10)
output = process.stdout
# Parse latency from output (platform-specific regex)
if OS_NAME == "Windows":
latency_match = re.search(r"time[<=](\d+)ms", output)
else:
latency_match = re.search(r"time=(\d+\.?\d*)\s*ms", output)
if latency_match:
latency = float(latency_match.group(1))
results[host] = {"ok": True, "latency_ms": latency}
else:
results[host] = {"ok": False, "msg": "Could not parse ping output"}
except subprocess.TimeoutExpired:
results[host] = {"ok": False, "msg": "Timed out"}
except Exception as e:
results[host] = {"ok": False, "msg": str(e)}
progress.update()
progress.finish()
return results
Limitations:
- Ping regex parsing varies by OS/locale and may fail
- No packet loss measurement
- Speed test limited to HTTP downloads (no upload testing)
- Firewall/proxy configurations may interfere
8. Search Engine Implementation
SearchResult Data Model
@dataclass
class SearchResult:
name: str
description: str
version: str
manager: str # "pip", "npm", "brew", etc.
homepage: Optional[str] = None
relevance_score: float = 0.0 # 0-100+ scoring for ranking
def to_dict(self):
return asdict(self) # For JSON serialization
RealSearchEngine Architecture
class RealSearchEngine:
def __init__(self):
self.cache_timeout = 3600 # 1 hour for cached data
self.session = requests.Session()
self.session.timeout = 30
def search(self, query: str, manager: Optional[str] = None, limit: int = 20) -> List[SearchResult]:
"""Orchestrates concurrent search across multiple package managers"""
all_results = []
installed = _detect_installed_managers() # Only search available managers
# Target manager selection
if manager:
target_managers = [manager.lower()] if installed.get(manager.lower()) else []
else:
target_managers = [m for m, ok in installed.items() if ok]
if not target_managers:
return []
# Manager-specific search function mapping
manager_funcs = {
"pip": self._search_pypi,
"npm": self._search_npm,
"brew": self._search_brew,
"apt": self._search_apt,
"dnf": self._search_dnf,
"yum": self._search_yum,
"pacman": self._search_pacman,
"zypper": self._search_zypper,
"apk": self._search_apk,
"choco": self._search_choco,
"winget": self._search_winget,
"snap": self._search_snap,
"flatpak": self._search_flatpak,
}
# Concurrent execution with timeout
with _fut.ThreadPoolExecutor(max_workers=5) as executor:
future_to_manager = {}
for mgr in target_managers:
func = manager_funcs.get(mgr)
if func:
future_to_manager[executor.submit(func, query)] = mgr
progress = ProgressBar(len(future_to_manager), "Searching repositories", "repos")
for future in _fut.as_completed(future_to_manager, timeout=120):
mgr = future_to_manager[future]
try:
results = future.result() or []
all_results.extend(results)
except Exception as e:
cprint(f"{mgr.upper()}: Search failed - {e}", "WARNING")
finally:
progress.update()
progress.finish()
# Sort by relevance and limit results
all_results.sort(key=lambda x: x.relevance_score, reverse=True)
return all_results[:limit]
Repository-Specific Search Implementations
PyPI Search
def _search_pypi(self, query: str) -> List[SearchResult]:
"""PyPI direct package lookup (no full-text search available)"""
try:
url = f"https://pypi.org/pypi/{query}/json"
r = self.session.get(url, timeout=10)
if r.status_code == 200:
return [self._parse_pypi_info(r.json())]
except:
pass
return []
def _parse_pypi_info(self, data: Dict[str, Any]) -> SearchResult:
"""Converts PyPI API response to SearchResult"""
info = data.get("info", {})
return SearchResult(
name=info.get("name", ""),
description=info.get("summary", "")[:200], # Truncate long descriptions
version=info.get("version", "unknown"),
manager="pip",
homepage=info.get("home_page") or info.get("project_url"),
relevance_score=50 # Fixed score for direct matches
)
NPM Search
def _search_npm(self, query: str) -> List[SearchResult]:
"""Uses NPM registry search API with quality scoring"""
try:
url = "https://registry.npmjs.org/-/v1/search"
r = self.session.get(url, params={"text": query, "size": 10}, timeout=10)
if r.status_code == 200:
data = r.json().get("objects", [])
return [self._parse_npm_info(p) for p in data]
except:
pass
return []
def _parse_npm_info(self, package: Dict[str, Any]) -> SearchResult:
"""Converts NPM API response to SearchResult"""
pkg = package.get("package", {})
return SearchResult(
name=pkg.get("name", ""),
description=pkg.get("description", "")[:200],
version=pkg.get("version", "unknown"),
manager="npm",
homepage=pkg.get("links", {}).get("homepage"),
relevance_score=package.get("score", {}).get("final", 0) * 100,
)
def _search_brew(self, query: str) -> List[SearchResult]:
"""Uses Homebrew API (only exact matches for now)"""
try:
url = f"https://formulae.brew.sh/api/formula/{query}.json"
r = self.session.get(url, timeout=10)
if r.status_code == 200:
return [self._parse_brew_info(r.json())]
except:
pass
return []
def _parse_brew_info(self, data: Dict[str, Any]) -> SearchResult:
"""Converts Homebrew API response to SearchResult"""
return SearchResult(
name=data.get("name", ""),
description=data.get("desc", "")[:200],
version=data.get("versions", {}).get("stable", "unknown"),
manager="brew",
homepage=data.get("homepage"),
relevance_score=50,
)
def _search_apt(self, query: str) -> List[SearchResult]:
"""Simulates a search for apt (no real API)"""
# ... placeholder
return []
def _search_dnf(self, query: str) -> List[SearchResult]:
"""Simulates a search for dnf (no real API)"""
# ... placeholder
return []
def _search_yum(self, query: str) -> List[SearchResult]:
"""Simulates a search for yum (no real API)"""
# ... placeholder
return []
def _search_pacman(self, query: str) -> List[SearchResult]:
"""Simulates a search for pacman (no real API)"""
# ... placeholder
return []
def _search_zypper(self, query: str) -> List[SearchResult]:
"""Simulates a search for zypper (no real API)"""
# ... placeholder
return []
def _search_apk(self, query: str) -> List[SearchResult]:
"""Simulates a search for apk (no real API)"""
# ... placeholder
return []
def _search_choco(self, query: str) -> List[SearchResult]:
"""Simulates a search for choco (no real API)"""
# ... placeholder
return []
def _search_winget(self, query: str) -> List[SearchResult]:
"""Simulates a search for winget (no real API)"""
# ... placeholder
return []
def _search_snap(self, query: str) -> List[SearchResult]:
"""Simulates a search for snap (no real API)"""
# ... placeholder
return []
def _search_flatpak(self, query: str) -> List[SearchResult]:
"""Simulates a search for flatpak (no real API)"""
# ... placeholder
return []
# Missing functions for other managers...
9. Command Execution Framework
CommandRunner Class
class CommandRunner:
def __init__(self, logger: Logger, verbose: bool = False):
self.logger = logger
self.verbose = verbose
def run_command(self, command: List[str], cwd: Optional[Path] = None, shell: bool = False, allow_fail: bool = False) -> Dict[str, Any]:
"""Runs a shell command and captures output"""
try:
result = subprocess.run(
command,
cwd=cwd,
shell=shell,
capture_output=True,
text=True,
check=not allow_fail,
timeout=300
)
return {
"ok": True,
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode
}
except subprocess.CalledProcessError as e:
self.logger.cprint(f"Command failed with exit code {e.returncode}:\n{e.stderr}", "ERROR")
return {
"ok": False,
"stdout": e.stdout,
"stderr": e.stderr,
"returncode": e.returncode
}
except FileNotFoundError:
self.logger.cprint(f"Command not found: {command[0]}", "ERROR")
return {"ok": False, "stderr": "Command not found", "returncode": 127}
except Exception as e:
self.logger.cprint(f"An unexpected error occurred: {e}", "ERROR")
return {"ok": False, "stderr": str(e), "returncode": 1}
Common Usage:
- Running `pip install`, `npm install`, etc.
- Checking if a manager is installed (`which pip`)
- Running system maintenance commands (`apt clean`)
10. Package Manager Detection
def _detect_installed_managers() -> Dict[str, bool]:
"""Checks for the presence of common package managers"""
managers = {
"pip": shutil.which("pip") is not None,
"npm": shutil.which("npm") is not None,
"brew": shutil.which("brew") is not None,
"apt": shutil.which("apt") is not None,
"dnf": shutil.which("dnf") is not None,
"yum": shutil.which("yum") is not None,
"pacman": shutil.which("pacman") is not None,
"zypper": shutil.which("zypper") is not None,
"apk": shutil.which("apk") is not None,
"choco": shutil.which("choco") is not None,
"winget": shutil.which("winget") is not None,
"snap": shutil.which("snap") is not None,
"flatpak": shutil.which("flatpak") is not None,
}
return managers
Note: This relies solely on `shutil.which()`, which may fail in some environments or with non-standard PATH configurations.
11. Installation & Removal Orchestration
InstallManager Class
class InstallManager:
def __init__(self, runner: CommandRunner, db: PackageDB, logger: Logger):
self.runner = runner
self.db = db
self.logger = logger
def install(self, package_name: str, manager_name: str, version: Optional[str] = None) -> bool:
"""Installs a package using the specified manager"""
self.logger.cprint(f"Attempting to install '{package_name}' via {manager_name.upper()}...")
# Mapping of manager to command templates
commands = {
"pip": ["python", "-m", "pip", "install", package_name],
"npm": ["npm", "install", "-g", package_name],
"brew": ["brew", "install", package_name],
"apt": ["sudo", "apt-get", "install", "-y", package_name],
"dnf": ["sudo", "dnf", "install", "-y", package_name],
"yum": ["sudo", "yum", "install", "-y", package_name],
"pacman": ["sudo", "pacman", "-S", "--noconfirm", package_name],
"zypper": ["sudo", "zypper", "--non-interactive", "install", package_name],
"apk": ["sudo", "apk", "add", package_name],
"choco": ["choco", "install", package_name, "-y"],
"winget": ["winget", "install", package_name, "--accept-source-agreements", "--accept-package-agreements"],
"snap": ["sudo", "snap", "install", package_name],
"flatpak": ["flatpak", "install", "-y", "flathub", package_name],
}
# Add version if specified and supported
if version:
if manager_name == "pip":
commands["pip"][-1] += f"=={version}"
# Other managers don't have a standardized way to specify version
cmd = commands.get(manager_name)
if not cmd:
self.logger.cprint(f"Unknown or unsupported package manager: {manager_name}", "ERROR")
return False
result = self.runner.run_command(cmd)
if result["ok"]:
self.logger.cprint(f"'{package_name}' installed successfully via {manager_name.upper()}.", "SUCCESS")
# Record success in DB
self.db.add_package(package_name, version, manager_name, " ".join(cmd))
return True
else:
self.logger.cprint(f"Installation of '{package_name}' failed.", "ERROR")
return False
def remove(self, package_name: str, manager_name: str) -> bool:
"""Removes a package using the specified manager"""
self.logger.cprint(f"Attempting to remove '{package_name}' via {manager_name.upper()}...", "WARNING")
commands = {
"pip": ["python", "-m", "pip", "uninstall", "-y", package_name],
"npm": ["npm", "uninstall", "-g", package_name],
"brew": ["brew", "uninstall", package_name],
"apt": ["sudo", "apt-get", "remove", "-y", package_name],
"dnf": ["sudo", "dnf", "remove", "-y", package_name],
"yum": ["sudo", "yum", "remove", "-y", package_name],
"pacman": ["sudo", "pacman", "-Rns", "--noconfirm", package_name],
"zypper": ["sudo", "zypper", "--non-interactive", "remove", package_name],
"apk": ["sudo", "apk", "del", package_name],
"choco": ["choco", "uninstall", package_name, "-y"],
"winget": ["winget", "uninstall", package_name, "--accept-source-agreements"],
"snap": ["sudo", "snap", "remove", package_name],
"flatpak": ["flatpak", "uninstall", "-y", package_name],
}
cmd = commands.get(manager_name)
if not cmd:
self.logger.cprint(f"Unknown or unsupported package manager: {manager_name}", "ERROR")
return False
result = self.runner.run_command(cmd, allow_fail=True) # allow uninstall to fail gracefully
if result["ok"] or "not installed" in result["stderr"].lower():
self.logger.cprint(f"'{package_name}' removed successfully via {manager_name.upper()}.", "SUCCESS")
# Remove from DB
self.db.remove_package(package_name, manager_name)
return True
else:
self.logger.cprint(f"Removal of '{package_name}' failed.", "ERROR")
return False
12. System Maintenance & Cleanup
Maintenance Class
class SystemMaintenance:
def __init__(self, runner: CommandRunner, logger: Logger):
self.runner = runner
self.logger = logger
def clean(self):
"""Runs cleanup commands for all available managers"""
self.logger.cprint("Running system maintenance and cleanup...", "INFO")
managers = _detect_installed_managers()
clean_commands = {
"apt": ["sudo", "apt-get", "autoclean", "-y"],
"dnf": ["sudo", "dnf", "clean", "all"],
"yum": ["sudo", "yum", "clean", "all"],
"pacman": ["sudo", "pacman", "-Scc", "--noconfirm"], # CAUTION: removes all cached packages
"zypper": ["sudo", "zypper", "clean"],
"apk": ["sudo", "apk", "cache", "--purge"],
"npm": ["npm", "cache", "clean", "--force"],
"pip": ["python", "-m", "pip", "cache", "purge"],
"brew": ["brew", "cleanup"],
"choco": ["choco", "cache", "purge"],
"winget": ["winget", "clear", "--cache"],
}
for mgr, cmd in clean_commands.items():
if managers.get(mgr):
self.logger.cprint(f" > Cleaning cache for {mgr.upper()}...", "MUTED")
self.runner.run_command(cmd, allow_fail=True)
self.logger.cprint("System cleanup complete.", "SUCCESS")
return True
13. Self-Update System
UpdateManager Class
class UpdateManager:
def __init__(self, runner: CommandRunner, logger: Logger):
self.runner = runner
self.logger = logger
def update_self(self, url: str = DEFAULT_UPDATE_URL):
"""Downloads the latest version and replaces the current script"""
try:
self.logger.cprint("Checking for updates...", "INFO")
response = requests.get(url, timeout=15)
response.raise_for_status()
# Simple version check (no formal versioning, just a string match)
# This is not a robust method and can be improved
latest_version_line = next((line for line in response.text.splitlines() if "__version__" in line), None)
if latest_version_line and latest_version_line.strip() != f'__version__ = "{__version__}"':
self.logger.cprint("New version available. Downloading...", "INFO")
# Replace the current script
with open(__file__, "w", encoding="utf-8") as f:
f.write(response.text)
self.logger.cprint("Update successful! Please restart the script.", "SUCCESS")
else:
self.logger.cprint("You are already using the latest version.", "INFO")
return True
except Exception as e:
self.logger.cprint(f"Update failed: {e}", "ERROR")
return False
15. Setup & Launcher Installation
SetupManager Class
class SetupManager:
def __init__(self, runner: CommandRunner, logger: Logger):
self.runner = runner
self.logger = logger
def install_launcher(self):
"""Installs the script as a system-wide command"""
# This is a highly simplified and OS-specific example.
# It needs significant improvement for cross-platform compatibility.
self.logger.cprint("Installing CrossFire as a system-wide command...", "INFO")
try:
# Find a suitable bin directory
if OS_NAME in ["Linux", "Darwin"]:
bin_path = Path.home() / ".local" / "bin"
os.makedirs(bin_path, exist_ok=True)
target_path = bin_path / "crossfire"
# Create a symlink
if target_path.exists():
os.remove(target_path)
os.symlink(__file__, target_path)
# Prompt user to add to PATH
self.logger.cprint(f"Successfully created symlink at {target_path}", "SUCCESS")
self.logger.cprint("You may need to add this to your PATH environment variable:", "INFO")
self.logger.cprint(f' export PATH="$HOME/.local/bin:$PATH"', "MUTED")
elif OS_NAME == "Windows":
# Windows is more complex, requiring modification of the PATH or a .bat/.ps1 file
self.logger.cprint("Automatic installation on Windows is not yet supported. Please add the script directory to your PATH manually.", "WARNING")
return False
return True
except Exception as e:
self.logger.cprint(f"Launcher installation failed: {e}", "ERROR")
return False
16. Package Management Features
Features:
- Install: `crossfire install [package] [manager]`
- Uninstall: `crossfire remove [package] [manager]`
- Search: `crossfire search [query] [--manager]`
- List: `crossfire list [--manager]`
- Clean: `crossfire clean`
- Update: `crossfire update`
- Self-Update: `crossfire self-update`
- Info: `crossfire info`
- Help: `crossfire --help`
17. Command Line Interface
Argument Parsing
def parse_args():
parser = argparse.ArgumentParser(
description="CrossFire Universal Package Manager",
formatter_class=argparse.RawTextHelpFormatter,
)
# ... argument definitions for each command ...
return parser.parse_args()
Example Command: `crossfire install requests --manager pip`
# Inside parse_args()
install_parser = subparsers.add_parser('install', help='Install a package')
install_parser.add_argument('package', help='Name of the package to install')
install_parser.add_argument('--manager', required=True, help='The package manager to use (e.g., pip, npm, brew)')
install_parser.add_argument('--version', help='The specific version to install')
18. Main Entry Point & Routing
def main():
args = parse_args()
# Global setup
global LOG
LOG.quiet = args.quiet
LOG.verbose = args.verbose
LOG.json_mode = args.json_output
# Instantiate core components
runner = CommandRunner(LOG, verbose=args.verbose)
db = PackageDB()
installer = InstallManager(runner, db, LOG)
searcher = RealSearchEngine()
if args.command == 'install':
installer.install(args.package, args.manager, args.version)
elif args.command == 'search':
# ... search logic ...
# ... and so on for other commands
if __name__ == "__main__":
main()
19. Error Handling & Edge Cases
Python-Specific Issues
- `subprocess.CalledProcessError`: Handled by `CommandRunner` for non-zero exit codes.
- `FileNotFoundError`: Occurs when a package manager is not in the system PATH. `CommandRunner` catches this and provides a user-friendly message.
- `requests.exceptions.RequestException`: Handled in `RealSearchEngine` and `UpdateManager` for network issues.
General Limitations
- No robust rollback mechanism on failed installs.
- No dependency resolution across managers (e.g., installing a Python package with `pip` won't automatically install `gcc` via `apt` if needed).
20. Security Considerations
Key Risks
- Arbitrary Command Execution: The script executes commands with `subprocess`. Malicious package names could lead to command injection if not properly sanitized (though `subprocess` helps mitigate this).
- `sudo` Usage: Many Linux/macOS commands use `sudo`. This elevates privileges and is a security risk. A more robust solution would be to prompt for a password or use a different mechanism.
- Update Integrity: The `self-update` system is vulnerable to man-in-the-middle attacks as it downloads over HTTPS but doesn't perform signature verification. A malicious actor could provide a fake `crossfire.py` to the user.
21. Known Bugs & Limitations
- `--version` flag for `install` only works with `pip`.
- `self-update` system is not secure.
- UI/Progress bar can be corrupted by concurrent stdout from subprocesses.
- `list` command does not verify if packages are actually still installed on the system, only checks the internal database.
22. Extension Guide
To add a new package manager, follow these steps:
- Add the manager name to the `_detect_installed_managers()` function.
- Implement `_search_[manager]()` and `_parse_[manager]_info()` methods in `RealSearchEngine`.
- Add command templates to `InstallManager.install()` and `InstallManager.remove()`.
- Add a `clean` command to `SystemMaintenance.clean()`.
- Update the CLI parser in `parse_args()` if new flags are needed.
23. Complete Command Reference
crossfire install <package> --manager <manager> [--version <version>]
crossfire remove <package> --manager <manager>
crossfire search <query> [--manager <manager>]
crossfire list [--manager <manager>]
crossfire clean
crossfire update
crossfire self-update
crossfire info
crossfire --help