361 lines
13 KiB
Python
Executable File
361 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Docker overlay2 cleanup tool.
|
|
Maps /var/lib/docker/overlay2 folders to containers/images,
|
|
identifies orphans, and provides interactive deletion.
|
|
|
|
Usage: sudo python3 docker-overlay-cleanup.py
|
|
"""
|
|
|
|
import subprocess
|
|
import json
|
|
import os
|
|
import sys
|
|
import curses
|
|
from pathlib import Path
|
|
from typing import Dict, Set, List, Tuple, Optional
|
|
|
|
OVERLAY2_PATH = "/var/lib/docker/overlay2"
|
|
|
|
def get_folder_size(path: str) -> int:
|
|
"""Get folder size in bytes using du."""
|
|
try:
|
|
result = subprocess.run(
|
|
["du", "-sb", path],
|
|
capture_output=True, text=True, timeout=30
|
|
)
|
|
if result.returncode == 0:
|
|
return int(result.stdout.split()[0])
|
|
except:
|
|
pass
|
|
return 0
|
|
|
|
def human_size(size: int) -> str:
|
|
"""Convert bytes to human readable."""
|
|
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
|
|
if size < 1024:
|
|
return f"{size:.1f} {unit}"
|
|
size /= 1024
|
|
return f"{size:.1f} PB"
|
|
|
|
def run_docker_cmd(cmd: List[str]) -> subprocess.CompletedProcess:
|
|
"""Run a docker command and return the result."""
|
|
try:
|
|
return subprocess.run(cmd, capture_output=True, text=True, timeout=60)
|
|
except Exception as e:
|
|
return subprocess.CompletedProcess(cmd, 1, "", str(e))
|
|
|
|
def parse_jsonl(text: str) -> List[dict]:
|
|
"""Parse JSON Lines format (one JSON object per line)."""
|
|
result = []
|
|
for line in text.strip().split('\n'):
|
|
if line.strip():
|
|
try:
|
|
result.append(json.loads(line))
|
|
except json.JSONDecodeError:
|
|
pass
|
|
return result
|
|
|
|
def get_docker_inspect(ids: List[str]) -> List[dict]:
|
|
"""Run docker inspect on a list of IDs and return parsed JSON."""
|
|
if not ids:
|
|
return []
|
|
try:
|
|
result = subprocess.run(
|
|
["docker", "inspect"] + ids,
|
|
capture_output=True, text=True, timeout=120
|
|
)
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
return json.loads(result.stdout)
|
|
except:
|
|
pass
|
|
return []
|
|
|
|
def extract_layer_ids(graphdriver_data: dict) -> Set[str]:
|
|
"""Extract overlay2 layer IDs from GraphDriver data."""
|
|
layer_ids = set()
|
|
for key in ["MergedDir", "UpperDir", "WorkDir", "LowerDir"]:
|
|
path = graphdriver_data.get(key, "")
|
|
# LowerDir can have multiple paths separated by ':'
|
|
for p in path.split(":"):
|
|
if OVERLAY2_PATH in p:
|
|
# Extract layer ID from path like /var/lib/docker/overlay2/abc123/diff
|
|
parts = p.replace(OVERLAY2_PATH + "/", "").split("/")
|
|
if parts:
|
|
layer_id = parts[0]
|
|
if layer_id and layer_id != "l":
|
|
layer_ids.add(layer_id)
|
|
return layer_ids
|
|
|
|
def get_referenced_layers() -> Dict[str, str]:
|
|
"""Get all overlay2 layer IDs referenced by containers and images.
|
|
Returns dict of layer_id -> description."""
|
|
referenced = {}
|
|
|
|
# Get all container IDs and names
|
|
result = run_docker_cmd(["docker", "ps", "-a", "--format", "{{json .}}"])
|
|
containers = parse_jsonl(result.stdout) if result.returncode == 0 else []
|
|
|
|
# Build a map of container ID -> name
|
|
container_names = {}
|
|
container_ids = []
|
|
for c in containers:
|
|
cid = c.get("ID", "")
|
|
cname = c.get("Names", cid)
|
|
if cid:
|
|
container_ids.append(cid)
|
|
container_names[cid] = cname
|
|
|
|
# Inspect all containers at once (much faster)
|
|
if container_ids:
|
|
inspected = get_docker_inspect(container_ids)
|
|
for info in inspected:
|
|
cid = info.get("Id", "")[:12]
|
|
cname = container_names.get(cid, info.get("Name", cid).lstrip("/"))
|
|
gd = info.get("GraphDriver", {}).get("Data", {})
|
|
for layer_id in extract_layer_ids(gd):
|
|
referenced[layer_id] = f"container: {cname}"
|
|
|
|
# Get all image IDs
|
|
result = run_docker_cmd(["docker", "images", "-a", "--format", "{{json .}}"])
|
|
images = parse_jsonl(result.stdout) if result.returncode == 0 else []
|
|
|
|
# Build a map of image ID -> name
|
|
image_names = {}
|
|
image_ids = []
|
|
for img in images:
|
|
img_id = img.get("ID", "")
|
|
repo = img.get("Repository", "")
|
|
tag = img.get("Tag", "")
|
|
img_name = f"{repo}:{tag}" if repo and tag else img_id[:12]
|
|
if img_id:
|
|
image_ids.append(img_id)
|
|
image_names[img_id] = img_name
|
|
|
|
# Inspect all images at once
|
|
if image_ids:
|
|
inspected = get_docker_inspect(image_ids)
|
|
for info in inspected:
|
|
img_id = info.get("Id", "").replace("sha256:", "")[:12]
|
|
img_name = image_names.get(img_id, img_id)
|
|
gd = info.get("GraphDriver", {}).get("Data", {})
|
|
for layer_id in extract_layer_ids(gd):
|
|
if layer_id not in referenced:
|
|
referenced[layer_id] = f"image: {img_name}"
|
|
|
|
return referenced
|
|
|
|
def get_overlay_folders() -> List[Tuple[str, int]]:
|
|
"""Get all overlay2 folders with their sizes.
|
|
Returns list of (folder_name, size_bytes) sorted by size desc."""
|
|
folders = []
|
|
overlay_path = Path(OVERLAY2_PATH)
|
|
|
|
if not overlay_path.exists():
|
|
return folders
|
|
|
|
for item in overlay_path.iterdir():
|
|
if item.is_dir() and item.name != "l": # Skip the 'l' symlink directory
|
|
size = get_folder_size(str(item))
|
|
folders.append((item.name, size))
|
|
|
|
# Sort by size descending
|
|
folders.sort(key=lambda x: x[1], reverse=True)
|
|
return folders
|
|
|
|
class CleanupTUI:
|
|
def __init__(self, folders: List[Tuple[str, int]], referenced: Dict[str, str]):
|
|
self.folders = folders
|
|
self.referenced = referenced
|
|
self.selected: Set[str] = set()
|
|
self.cursor = 0
|
|
self.scroll_offset = 0
|
|
self.show_all = False # Toggle to show referenced folders too
|
|
self.message = ""
|
|
|
|
def get_display_folders(self) -> List[Tuple[str, int, Optional[str]]]:
|
|
"""Get folders for display with their reference info."""
|
|
result = []
|
|
for name, size in self.folders:
|
|
ref = self.referenced.get(name)
|
|
if self.show_all or ref is None:
|
|
result.append((name, size, ref))
|
|
return result
|
|
|
|
def run(self, stdscr):
|
|
curses.curs_set(0)
|
|
curses.use_default_colors()
|
|
curses.init_pair(1, curses.COLOR_GREEN, -1) # Referenced
|
|
curses.init_pair(2, curses.COLOR_RED, -1) # Orphaned
|
|
curses.init_pair(3, curses.COLOR_YELLOW, -1) # Selected
|
|
curses.init_pair(4, curses.COLOR_CYAN, -1) # Header
|
|
|
|
while True:
|
|
stdscr.clear()
|
|
height, width = stdscr.getmaxyx()
|
|
display = self.get_display_folders()
|
|
|
|
# Header
|
|
orphan_count = sum(1 for _, _, ref in display if ref is None)
|
|
orphan_size = sum(size for name, size, ref in display if ref is None)
|
|
selected_size = sum(size for name, size in self.folders if name in self.selected)
|
|
|
|
header = f" Docker overlay2 Cleanup | Orphans: {orphan_count} ({human_size(orphan_size)}) | Selected: {len(self.selected)} ({human_size(selected_size)})"
|
|
mode = " [ALL]" if self.show_all else " [ORPHANS ONLY]"
|
|
stdscr.attron(curses.color_pair(4) | curses.A_BOLD)
|
|
stdscr.addnstr(0, 0, header + mode + " " * width, width - 1)
|
|
stdscr.attroff(curses.color_pair(4) | curses.A_BOLD)
|
|
|
|
# Help line
|
|
help_text = " ↑↓:nav | Space:select | a:toggle-all | Enter:delete-selected | q:quit"
|
|
stdscr.addnstr(1, 0, help_text + " " * width, width - 1)
|
|
|
|
if self.message:
|
|
stdscr.attron(curses.color_pair(3))
|
|
stdscr.addnstr(2, 0, f" {self.message}" + " " * width, width - 1)
|
|
stdscr.attroff(curses.color_pair(3))
|
|
|
|
# Folder list
|
|
list_start = 4
|
|
list_height = height - list_start - 1
|
|
|
|
# Adjust scroll
|
|
if self.cursor < self.scroll_offset:
|
|
self.scroll_offset = self.cursor
|
|
elif self.cursor >= self.scroll_offset + list_height:
|
|
self.scroll_offset = self.cursor - list_height + 1
|
|
|
|
for i, (name, size, ref) in enumerate(display[self.scroll_offset:self.scroll_offset + list_height]):
|
|
idx = i + self.scroll_offset
|
|
y = list_start + i
|
|
|
|
if y >= height - 1:
|
|
break
|
|
|
|
# Selection marker
|
|
marker = "[X]" if name in self.selected else "[ ]"
|
|
|
|
# Cursor highlight
|
|
if idx == self.cursor:
|
|
stdscr.attron(curses.A_REVERSE)
|
|
|
|
# Color based on status
|
|
if ref:
|
|
stdscr.attron(curses.color_pair(1))
|
|
status = f"→ {ref[:40]}"
|
|
else:
|
|
stdscr.attron(curses.color_pair(2))
|
|
status = "ORPHAN"
|
|
|
|
line = f" {marker} {human_size(size):>10} {name[:30]} {status}"
|
|
stdscr.addnstr(y, 0, line + " " * (width - len(line) - 1), width - 1)
|
|
|
|
stdscr.attroff(curses.A_REVERSE)
|
|
stdscr.attroff(curses.color_pair(1))
|
|
stdscr.attroff(curses.color_pair(2))
|
|
|
|
# Status bar
|
|
status = f" {len(display)} folders | Cursor: {self.cursor + 1}/{len(display)}"
|
|
stdscr.addnstr(height - 1, 0, status + " " * width, width - 1)
|
|
|
|
stdscr.refresh()
|
|
|
|
# Input handling
|
|
key = stdscr.getch()
|
|
display = self.get_display_folders() # Refresh after potential toggle
|
|
|
|
if key == ord('q') or key == 27: # q or ESC
|
|
break
|
|
elif key == curses.KEY_UP or key == ord('k'):
|
|
self.cursor = max(0, self.cursor - 1)
|
|
elif key == curses.KEY_DOWN or key == ord('j'):
|
|
self.cursor = min(len(display) - 1, self.cursor + 1)
|
|
elif key == curses.KEY_PPAGE: # Page Up
|
|
self.cursor = max(0, self.cursor - list_height)
|
|
elif key == curses.KEY_NPAGE: # Page Down
|
|
self.cursor = min(len(display) - 1, self.cursor + list_height)
|
|
elif key == ord(' '): # Space - toggle selection
|
|
if display and self.cursor < len(display):
|
|
name = display[self.cursor][0]
|
|
if name in self.selected:
|
|
self.selected.discard(name)
|
|
else:
|
|
self.selected.add(name)
|
|
self.cursor = min(len(display) - 1, self.cursor + 1)
|
|
elif key == ord('a'): # Toggle show all
|
|
self.show_all = not self.show_all
|
|
self.cursor = 0
|
|
self.scroll_offset = 0
|
|
elif key == ord('o'): # Select all orphans
|
|
for name, size, ref in display:
|
|
if ref is None:
|
|
self.selected.add(name)
|
|
self.message = f"Selected all {orphan_count} orphans"
|
|
elif key == ord('c'): # Clear selection
|
|
self.selected.clear()
|
|
self.message = "Selection cleared"
|
|
elif key == ord('\n') or key == curses.KEY_ENTER: # Enter - delete
|
|
if self.selected:
|
|
return list(self.selected)
|
|
else:
|
|
self.message = "Nothing selected!"
|
|
|
|
return []
|
|
|
|
def delete_folders(folders: List[str]) -> Tuple[int, int]:
|
|
"""Delete selected folders. Returns (success_count, fail_count)."""
|
|
success = 0
|
|
fail = 0
|
|
for name in folders:
|
|
path = os.path.join(OVERLAY2_PATH, name)
|
|
try:
|
|
subprocess.run(["rm", "-rf", path], check=True, timeout=120)
|
|
success += 1
|
|
except:
|
|
fail += 1
|
|
return success, fail
|
|
|
|
def main():
|
|
if os.geteuid() != 0:
|
|
print("Error: This script must be run as root (sudo)")
|
|
sys.exit(1)
|
|
|
|
if not os.path.exists(OVERLAY2_PATH):
|
|
print(f"Error: {OVERLAY2_PATH} does not exist")
|
|
sys.exit(1)
|
|
|
|
print("Scanning Docker overlay2 folders...")
|
|
folders = get_overlay_folders()
|
|
print(f"Found {len(folders)} folders")
|
|
|
|
print("Mapping to containers and images...")
|
|
referenced = get_referenced_layers()
|
|
print(f"Found {len(referenced)} referenced layers")
|
|
|
|
orphan_count = sum(1 for name, _ in folders if name not in referenced)
|
|
print(f"Identified {orphan_count} orphaned folders")
|
|
|
|
if not folders:
|
|
print("No folders to clean up!")
|
|
return
|
|
|
|
# Run TUI
|
|
tui = CleanupTUI(folders, referenced)
|
|
to_delete = curses.wrapper(tui.run)
|
|
|
|
if to_delete:
|
|
total_size = sum(size for name, size in folders if name in to_delete)
|
|
print(f"\nAbout to delete {len(to_delete)} folders ({human_size(total_size)})")
|
|
confirm = input("Confirm deletion? [y/N]: ").strip().lower()
|
|
if confirm == 'y':
|
|
print("Deleting...")
|
|
success, fail = delete_folders(to_delete)
|
|
print(f"Done! Deleted {success} folders, {fail} failures")
|
|
else:
|
|
print("Cancelled")
|
|
else:
|
|
print("\nNo folders selected for deletion")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|