Commit a13abb80 authored by PengGao's avatar PengGao Committed by GitHub
Browse files

Improve error logging in distributed inference worker and reset curre… (#296)

…nt task after broadcasting. Handle exceptions when clearing task event.

## Contributing Guidelines

We have prepared a `pre-commit` hook to enforce consistent code
formatting across the project. If your code complies with the standards,
you should not see any errors, you can clean up your code following the
steps below:

1. Install the required dependencies:

```shell
    pip install ruff pre-commit
```

2. Then, run the following command before commit:

```shell
    pre-commit run --all-files
```

3. Finally, please double-check your code to ensure it complies with the
following additional specifications as much as possible:
- Avoid hard-coding local paths: Make sure your submissions do not
include hard-coded local paths, as these paths are specific to
individual development environments and can cause compatibility issues.
Use relative paths or configuration files instead.
- Clear error handling: Implement clear error-handling mechanisms in
your code so that error messages can accurately indicate the location of
the problem, possible causes, and suggested solutions, facilitating
quick debugging.
- Detailed comments and documentation: Add comments to complex code
sections and provide comprehensive documentation to explain the
functionality of the code, input-output requirements, and potential
error scenarios.

Thank you for your contributions!
parent d7c99b0c
...@@ -226,6 +226,11 @@ def _distributed_inference_worker(rank, world_size, master_addr, master_port, ar ...@@ -226,6 +226,11 @@ def _distributed_inference_worker(rank, world_size, master_addr, master_port, ar
task_data = shared_data.get("current_task") task_data = shared_data.get("current_task")
if task_data: if task_data:
worker.dist_manager.broadcast_task_data(task_data) worker.dist_manager.broadcast_task_data(task_data)
shared_data["current_task"] = None
try:
task_event.clear()
except Exception:
pass
else: else:
continue continue
else: else:
...@@ -255,7 +260,7 @@ def _distributed_inference_worker(rank, world_size, master_addr, master_port, ar ...@@ -255,7 +260,7 @@ def _distributed_inference_worker(rank, world_size, master_addr, master_port, ar
logger.info(f"Task {task_data['task_id']} success") logger.info(f"Task {task_data['task_id']} success")
except Exception as e: except Exception as e:
logger.error(f"Process {rank} error occurred while processing task: {str(e)}") logger.exception(f"Process {rank} error occurred while processing task: {str(e)}")
worker.dist_manager.barrier() worker.dist_manager.barrier()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment