#!/usr/bin/env python3 """ Docker overlay2 cleanup tool. Maps /var/lib/docker/overlay2 folders to containers/images, identifies orphans, and provides interactive deletion. Usage: sudo python3 docker-overlay-cleanup.py """ import subprocess import json import os import sys import curses from pathlib import Path from typing import Dict, Set, List, Tuple, Optional OVERLAY2_PATH = "/var/lib/docker/overlay2" def get_folder_size(path: str) -> int: """Get folder size in bytes using du.""" try: result = subprocess.run( ["du", "-sb", path], capture_output=True, text=True, timeout=30 ) if result.returncode == 0: return int(result.stdout.split()[0]) except: pass return 0 def human_size(size: int) -> str: """Convert bytes to human readable.""" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size < 1024: return f"{size:.1f} {unit}" size /= 1024 return f"{size:.1f} PB" def run_docker_cmd(cmd: List[str]) -> subprocess.CompletedProcess: """Run a docker command and return the result.""" try: return subprocess.run(cmd, capture_output=True, text=True, timeout=60) except Exception as e: return subprocess.CompletedProcess(cmd, 1, "", str(e)) def parse_jsonl(text: str) -> List[dict]: """Parse JSON Lines format (one JSON object per line).""" result = [] for line in text.strip().split('\n'): if line.strip(): try: result.append(json.loads(line)) except json.JSONDecodeError: pass return result def get_docker_inspect(ids: List[str]) -> List[dict]: """Run docker inspect on a list of IDs and return parsed JSON.""" if not ids: return [] try: result = subprocess.run( ["docker", "inspect"] + ids, capture_output=True, text=True, timeout=120 ) if result.returncode == 0 and result.stdout.strip(): return json.loads(result.stdout) except: pass return [] def extract_layer_ids(graphdriver_data: dict) -> Set[str]: """Extract overlay2 layer IDs from GraphDriver data.""" layer_ids = set() for key in ["MergedDir", "UpperDir", "WorkDir", "LowerDir"]: path = graphdriver_data.get(key, "") # LowerDir can have multiple paths separated by ':' for p in path.split(":"): if OVERLAY2_PATH in p: # Extract layer ID from path like /var/lib/docker/overlay2/abc123/diff parts = p.replace(OVERLAY2_PATH + "/", "").split("/") if parts: layer_id = parts[0] if layer_id and layer_id != "l": layer_ids.add(layer_id) return layer_ids def get_referenced_layers() -> Dict[str, str]: """Get all overlay2 layer IDs referenced by containers and images. Returns dict of layer_id -> description.""" referenced = {} # Get all container IDs and names result = run_docker_cmd(["docker", "ps", "-a", "--format", "{{json .}}"]) containers = parse_jsonl(result.stdout) if result.returncode == 0 else [] # Build a map of container ID -> name container_names = {} container_ids = [] for c in containers: cid = c.get("ID", "") cname = c.get("Names", cid) if cid: container_ids.append(cid) container_names[cid] = cname # Inspect all containers at once (much faster) if container_ids: inspected = get_docker_inspect(container_ids) for info in inspected: cid = info.get("Id", "")[:12] cname = container_names.get(cid, info.get("Name", cid).lstrip("/")) gd = info.get("GraphDriver", {}).get("Data", {}) for layer_id in extract_layer_ids(gd): referenced[layer_id] = f"container: {cname}" # Get all image IDs result = run_docker_cmd(["docker", "images", "-a", "--format", "{{json .}}"]) images = parse_jsonl(result.stdout) if result.returncode == 0 else [] # Build a map of image ID -> name image_names = {} image_ids = [] for img in images: img_id = img.get("ID", "") repo = img.get("Repository", "") tag = img.get("Tag", "") img_name = f"{repo}:{tag}" if repo and tag else img_id[:12] if img_id: image_ids.append(img_id) image_names[img_id] = img_name # Inspect all images at once if image_ids: inspected = get_docker_inspect(image_ids) for info in inspected: img_id = info.get("Id", "").replace("sha256:", "")[:12] img_name = image_names.get(img_id, img_id) gd = info.get("GraphDriver", {}).get("Data", {}) for layer_id in extract_layer_ids(gd): if layer_id not in referenced: referenced[layer_id] = f"image: {img_name}" return referenced def get_overlay_folders() -> List[Tuple[str, int]]: """Get all overlay2 folders with their sizes. Returns list of (folder_name, size_bytes) sorted by size desc.""" folders = [] overlay_path = Path(OVERLAY2_PATH) if not overlay_path.exists(): return folders for item in overlay_path.iterdir(): if item.is_dir() and item.name != "l": # Skip the 'l' symlink directory size = get_folder_size(str(item)) folders.append((item.name, size)) # Sort by size descending folders.sort(key=lambda x: x[1], reverse=True) return folders class CleanupTUI: def __init__(self, folders: List[Tuple[str, int]], referenced: Dict[str, str]): self.folders = folders self.referenced = referenced self.selected: Set[str] = set() self.cursor = 0 self.scroll_offset = 0 self.show_all = False # Toggle to show referenced folders too self.message = "" def get_display_folders(self) -> List[Tuple[str, int, Optional[str]]]: """Get folders for display with their reference info.""" result = [] for name, size in self.folders: ref = self.referenced.get(name) if self.show_all or ref is None: result.append((name, size, ref)) return result def run(self, stdscr): curses.curs_set(0) curses.use_default_colors() curses.init_pair(1, curses.COLOR_GREEN, -1) # Referenced curses.init_pair(2, curses.COLOR_RED, -1) # Orphaned curses.init_pair(3, curses.COLOR_YELLOW, -1) # Selected curses.init_pair(4, curses.COLOR_CYAN, -1) # Header while True: stdscr.clear() height, width = stdscr.getmaxyx() display = self.get_display_folders() # Header orphan_count = sum(1 for _, _, ref in display if ref is None) orphan_size = sum(size for name, size, ref in display if ref is None) selected_size = sum(size for name, size in self.folders if name in self.selected) header = f" Docker overlay2 Cleanup | Orphans: {orphan_count} ({human_size(orphan_size)}) | Selected: {len(self.selected)} ({human_size(selected_size)})" mode = " [ALL]" if self.show_all else " [ORPHANS ONLY]" stdscr.attron(curses.color_pair(4) | curses.A_BOLD) stdscr.addnstr(0, 0, header + mode + " " * width, width - 1) stdscr.attroff(curses.color_pair(4) | curses.A_BOLD) # Help line help_text = " ↑↓:nav | Space:select | a:toggle-all | Enter:delete-selected | q:quit" stdscr.addnstr(1, 0, help_text + " " * width, width - 1) if self.message: stdscr.attron(curses.color_pair(3)) stdscr.addnstr(2, 0, f" {self.message}" + " " * width, width - 1) stdscr.attroff(curses.color_pair(3)) # Folder list list_start = 4 list_height = height - list_start - 1 # Adjust scroll if self.cursor < self.scroll_offset: self.scroll_offset = self.cursor elif self.cursor >= self.scroll_offset + list_height: self.scroll_offset = self.cursor - list_height + 1 for i, (name, size, ref) in enumerate(display[self.scroll_offset:self.scroll_offset + list_height]): idx = i + self.scroll_offset y = list_start + i if y >= height - 1: break # Selection marker marker = "[X]" if name in self.selected else "[ ]" # Cursor highlight if idx == self.cursor: stdscr.attron(curses.A_REVERSE) # Color based on status if ref: stdscr.attron(curses.color_pair(1)) status = f"→ {ref[:40]}" else: stdscr.attron(curses.color_pair(2)) status = "ORPHAN" line = f" {marker} {human_size(size):>10} {name[:30]} {status}" stdscr.addnstr(y, 0, line + " " * (width - len(line) - 1), width - 1) stdscr.attroff(curses.A_REVERSE) stdscr.attroff(curses.color_pair(1)) stdscr.attroff(curses.color_pair(2)) # Status bar status = f" {len(display)} folders | Cursor: {self.cursor + 1}/{len(display)}" stdscr.addnstr(height - 1, 0, status + " " * width, width - 1) stdscr.refresh() # Input handling key = stdscr.getch() display = self.get_display_folders() # Refresh after potential toggle if key == ord('q') or key == 27: # q or ESC break elif key == curses.KEY_UP or key == ord('k'): self.cursor = max(0, self.cursor - 1) elif key == curses.KEY_DOWN or key == ord('j'): self.cursor = min(len(display) - 1, self.cursor + 1) elif key == curses.KEY_PPAGE: # Page Up self.cursor = max(0, self.cursor - list_height) elif key == curses.KEY_NPAGE: # Page Down self.cursor = min(len(display) - 1, self.cursor + list_height) elif key == ord(' '): # Space - toggle selection if display and self.cursor < len(display): name = display[self.cursor][0] if name in self.selected: self.selected.discard(name) else: self.selected.add(name) self.cursor = min(len(display) - 1, self.cursor + 1) elif key == ord('a'): # Toggle show all self.show_all = not self.show_all self.cursor = 0 self.scroll_offset = 0 elif key == ord('o'): # Select all orphans for name, size, ref in display: if ref is None: self.selected.add(name) self.message = f"Selected all {orphan_count} orphans" elif key == ord('c'): # Clear selection self.selected.clear() self.message = "Selection cleared" elif key == ord('\n') or key == curses.KEY_ENTER: # Enter - delete if self.selected: return list(self.selected) else: self.message = "Nothing selected!" return [] def delete_folders(folders: List[str]) -> Tuple[int, int]: """Delete selected folders. Returns (success_count, fail_count).""" success = 0 fail = 0 for name in folders: path = os.path.join(OVERLAY2_PATH, name) try: subprocess.run(["rm", "-rf", path], check=True, timeout=120) success += 1 except: fail += 1 return success, fail def main(): if os.geteuid() != 0: print("Error: This script must be run as root (sudo)") sys.exit(1) if not os.path.exists(OVERLAY2_PATH): print(f"Error: {OVERLAY2_PATH} does not exist") sys.exit(1) print("Scanning Docker overlay2 folders...") folders = get_overlay_folders() print(f"Found {len(folders)} folders") print("Mapping to containers and images...") referenced = get_referenced_layers() print(f"Found {len(referenced)} referenced layers") orphan_count = sum(1 for name, _ in folders if name not in referenced) print(f"Identified {orphan_count} orphaned folders") if not folders: print("No folders to clean up!") return # Run TUI tui = CleanupTUI(folders, referenced) to_delete = curses.wrapper(tui.run) if to_delete: total_size = sum(size for name, size in folders if name in to_delete) print(f"\nAbout to delete {len(to_delete)} folders ({human_size(total_size)})") confirm = input("Confirm deletion? [y/N]: ").strip().lower() if confirm == 'y': print("Deleting...") success, fail = delete_folders(to_delete) print(f"Done! Deleted {success} folders, {fail} failures") else: print("Cancelled") else: print("\nNo folders selected for deletion") if __name__ == "__main__": main()