Notas
22 de setembro de 2025·5 min

MemProfilerX: como funciona por baixo dos panos

I created MemProfilerX because I needed to debug memory leaks in production and existing tools were either too heavy or didn't have the features I needed. Here's how each part works.

3-Layer Architecture

┌─────────────────────────────────────────┐
│  Integration Layer                       │
│  CLI, pytest plugin, web middlewares     │
├─────────────────────────────────────────┤
│  Advanced Layer                          │
│  tracemalloc, snapshots, leak detection  │
├─────────────────────────────────────────┤
│  Core Layer                              │
│  psutil tracking, basic decorators       │
└─────────────────────────────────────────┘

Each layer solves a different problem. Let's go bottom-up.

Core: The Decorator that Monitors in Background

The @track_memory is deceptively simple:

@track_memory(interval=1.0)
def process_data():
    return [i**2 for i in range(10_000_000)]

Under the hood, it:

  1. Spawns a daemon thread that runs in parallel
  2. The thread collects process.memory_info().rss at each interval
  3. When the function ends, the thread stops
  4. Returns the data + original result
# Simplified
def track_memory(interval=1.0):
    def decorator(func):
        def wrapper(*args, **kwargs):
            samples = []
            stop_event = threading.Event()

            def monitor():
                while not stop_event.is_set():
                    mem = psutil.Process().memory_info().rss
                    samples.append((time.time(), mem))
                    time.sleep(interval)

            thread = threading.Thread(target=monitor, daemon=True)
            thread.start()

            result = func(*args, **kwargs)  # Execute the real function

            stop_event.set()
            thread.join()

            return {'result': result, 'memory_usage': samples}
        return wrapper
    return decorator

The key is the daemon thread — it dies automatically when the main process ends. No manual cleanup, no resource leaks.

Advanced: tracemalloc — Python's X-Ray

psutil gives you total process consumption. But what if you want to know which line of code is allocating memory?

Enter tracemalloc — a C-level profiler built into Python:

with AdvancedMemoryProfiler() as profiler:
    data = [x for x in range(1_000_000)]

for alloc in profiler.get_top_allocations(5):
    print(f"{alloc.filename}:{alloc.lineno} - {alloc.size_mb:.2f}MB")

Output:

script.py:2 - 38.15MB  # The list comprehension!

How it works:

class AdvancedMemoryProfiler:
    def __enter__(self):
        tracemalloc.start(25)  # Track up to 25 stack frames
        self._start_snapshot = tracemalloc.take_snapshot()
        return self

    def __exit__(self, *args):
        self._end_snapshot = tracemalloc.take_snapshot()
        tracemalloc.stop()

    def get_top_allocations(self, n=10):
        stats = self._end_snapshot.compare_to(
            self._start_snapshot,
            'lineno'  # Group by file:line
        )
        return stats[:n]

tracemalloc.start() instructs Python to record every allocation with its traceback. ~10-15% overhead, but gives you surgical information.

Snapshots: Photographing the Heap

Sometimes you want to compare memory state at two points in time. Snapshots do this.

ProcessSnapshot — system metrics:

snapshot = ProcessSnapshot.capture()
# RSS, VMS, CPU%, threads, file descriptors

ObjectSnapshot — X-ray of Python objects:

snapshot = ObjectSnapshot.capture()

This one is interesting. It uses gc.get_objects() to traverse all live objects in the heap:

def capture():
    counts = defaultdict(int)
    sizes = defaultdict(int)

    for obj in gc.get_objects():  # ALL objects
        obj_type = type(obj).__name__
        obj_size = sys.getsizeof(obj)
        counts[obj_type] += 1
        sizes[obj_type] += obj_size

    return ObjectSnapshot(counts, sizes)

With two snapshots, you calculate deltas:

before = ObjectSnapshot.capture()
# ... suspicious code ...
after = ObjectSnapshot.capture()

# Which types grew?
for obj_type in after.counts:
    delta = after.counts[obj_type] - before.counts[obj_type]
    if delta > 1000:
        print(f"⚠️ {obj_type}: +{delta} instances")

Leak Detection: The Heuristics

Detecting leaks automatically is hard. MemProfilerX uses a combination of heuristics:

detector = LeakDetector(interval=0.5)
detector.start_monitoring(duration=30)
report = detector.analyze_leaks()

The algorithm:

  1. Collects snapshots every interval seconds
  2. Calculates growth rate for each object type
  3. Confidence scoring:
if growth_rate > 0.1 MB/s and instances > 1000:
    confidence = 'high'
elif growth_rate > 0.01 MB/s and instances > 100:
    confidence = 'medium'
else:
    confidence = 'low'
  1. Generates recommendations based on type:
if 'dict' in suspects:
    recommendations.append("Check for unbounded caches")
if 'list' in suspects:
    recommendations.append("Look for append() without cleanup")
if 'socket' in suspects:
    recommendations.append("Ensure connections are closed")

It's not perfect — false positives happen. But it's a good starting point.

Context Managers: Sync and Async

The synchronous version uses threading:

class MemoryContext:
    def __enter__(self):
        self._stop_event = threading.Event()
        self._thread = threading.Thread(target=self._monitor)
        self._thread.start()

    def __exit__(self, *args):
        self._stop_event.set()
        self._thread.join()

The async version uses asyncio.Task:

class AsyncMemoryContext:
    async def __aenter__(self):
        self._stop_event = asyncio.Event()
        self._task = asyncio.create_task(self._monitor())

    async def _monitor(self):
        while not self._stop_event.is_set():
            # Collect metric
            try:
                await asyncio.wait_for(
                    self._stop_event.wait(),
                    timeout=self.interval
                )
            except asyncio.TimeoutError:
                pass  # Continue monitoring

The difference is subtle but important: async doesn't block the event loop.

Web Middlewares: Monitoring Each Request

For FastAPI (async):

class FastAPIMemoryMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        mem_before = psutil.Process().memory_info().rss

        response = await call_next(request)

        mem_after = psutil.Process().memory_info().rss
        delta_mb = (mem_after - mem_before) / 1024 / 1024

        response.headers['X-Memory-Delta-MB'] = f"{delta_mb:.2f}"
        return response

Each response gets headers with the memory delta. Useful for identifying problematic endpoints.

CLI: Rich for Pretty Output

The CLI uses the Rich library for colorful tables:

memx check my_script.py --duration 10
┌──────────────────────────────────────────┐
│         Memory Leak Analysis             │
├────────────┬────────────┬───────────────┤
│ Type       │ Growth     │ Confidence    │
├────────────┼────────────┼───────────────┤
│ list       │ +15.2 MB   │ 🔴 high       │
│ dict       │ +3.1 MB    │ 🟡 medium     │
│ str        │ +0.5 MB    │ 🟢 low        │
└────────────┴────────────┴───────────────┘

Overhead by Module

ModuleOverheadWhen to use
@track_memory~1-2%Continuous monitoring
AdvancedMemoryProfiler~10-15%Specific debugging
ObjectSnapshot~5%Periodic analysis
LeakDetector~5-10%Investigation

Why the overhead?

The Complete Stack

Collection:     psutil, tracemalloc, gc, sys.getsizeof
Concurrency:    threading (sync), asyncio (async)
Serialization:  json, csv, pickle
Visualization:  Rich (terminal), matplotlib (graphs)
Integration:    pytest hooks, ASGI/WSGI middlewares

TL;DR

MemProfilerX combines:

All packaged in an API of decorators and context managers that (hopefully) is easy to use.

The code is on GitHub if you want to explore the implementation.