Commit a0096a09 authored by Baber's avatar Baber
Browse files

fix gpu queue

parent 1412e0c6
......@@ -8,8 +8,10 @@ As each GPU finishes evaluating a model, it automatically picks up the next one
import argparse
import json
import queue
import subprocess
import sys
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
......@@ -322,6 +324,73 @@ def load_failed_models(output_path: str) -> list[str]:
return models
def gpu_worker(
gpu_id: int,
model_queue: queue.Queue,
results: list[dict[str, Any]],
results_lock: threading.Lock,
pbar: tqdm,
):
"""Worker function that processes models from a queue on a specific GPU.
Args:
gpu_id: The GPU ID (0, 1, 2, ...) this worker should use
model_queue: Queue containing models to process
results: Shared list to store results (protected by results_lock)
results_lock: Lock for thread-safe access to results list
pbar: Progress bar to update
"""
success_count = 0
failed_count = 0
while True:
try:
# Get next model from queue (non-blocking)
model = model_queue.get_nowait()
except queue.Empty:
# No more models to process
break
try:
# Run evaluation on this GPU
result = run_evaluation(model, gpu_id)
# Thread-safe append to results
with results_lock:
results.append(result)
if result["status"] == "success":
success_count += 1
else:
failed_count += 1
# Update progress bar
pbar.set_postfix(
{
"✓": sum(1 for r in results if r["status"] == "success"),
"✗": sum(1 for r in results if r["status"] != "success"),
}
)
pbar.update(1)
except Exception as e:
print(f"[GPU {gpu_id}] Unexpected error processing {model}: {e}")
with results_lock:
results.append(
{
"model": model,
"gpu_id": gpu_id,
"status": "exception",
"error": str(e),
"timestamp": datetime.now().isoformat(),
}
)
failed_count += 1
pbar.update(1)
finally:
# Mark task as done
model_queue.task_done()
def main():
"""Main execution function."""
# Parse command-line arguments
......@@ -376,59 +445,42 @@ def main():
print("No models to evaluate. Exiting.")
return 0
# Create a queue of (model, gpu_id) pairs
# We cycle through GPUs as we assign models
model_gpu_pairs = [
(model, gpu_id % NUM_GPUS) for gpu_id, model in enumerate(models_to_run)
]
# Create a queue and populate it with models
model_queue = queue.Queue()
for model in models_to_run:
model_queue.put(model)
# Shared data structures
results = []
success_count = 0
failed_count = 0
# Use ThreadPoolExecutor to run evaluations in parallel
# max_workers = NUM_GPUS ensures we don't oversubscribe GPUs
with ThreadPoolExecutor(max_workers=NUM_GPUS) as executor:
# Submit all jobs
future_to_model = {
executor.submit(run_evaluation, model, gpu_id): (model, gpu_id)
for model, gpu_id in model_gpu_pairs
}
# Process completed jobs as they finish with a progress bar
with tqdm(
total=len(models_to_run), desc="Evaluating models", unit="model"
) as pbar:
for future in as_completed(future_to_model):
model, gpu_id = future_to_model[future]
results_lock = threading.Lock()
# Use ThreadPoolExecutor with one worker per GPU
# Each worker will dynamically pull models from the queue
with tqdm(total=len(models_to_run), desc="Evaluating models", unit="model") as pbar:
with ThreadPoolExecutor(max_workers=NUM_GPUS) as executor:
# Submit one worker per GPU
futures = [
executor.submit(
gpu_worker, gpu_id, model_queue, results, results_lock, pbar
)
for gpu_id in range(NUM_GPUS)
]
# Wait for all workers to complete
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
if result["status"] == "success":
success_count += 1
else:
failed_count += 1
future.result() # This will raise any exceptions from the worker
except Exception as e:
print(f"Unexpected error processing {model}: {e}")
results.append(
{
"model": model,
"gpu_id": gpu_id,
"status": "exception",
"error": str(e),
}
)
failed_count += 1
# Update progress bar with current statistics
pbar.set_postfix({"✓": success_count, "✗": failed_count})
pbar.update(1)
print(f"Worker thread error: {e}")
# Print summary
print("\n" + "=" * 80)
print("EVALUATION SUMMARY")
print("=" * 80)
success_count = sum(1 for r in results if r["status"] == "success")
failed_count = len(results) - success_count
print(f"Total models: {len(models_to_run)}")
print(f"Successful: {success_count}")
print(f"Failed: {failed_count}")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment