Unverified Commit bd517eb9 authored by Nick Hill's avatar Nick Hill Committed by GitHub
Browse files

[BugFix] Fix DP Coordinator incorrect debug log message (#19624)


Signed-off-by: default avatarNick Hill <nhill@redhat.com>
parent d65668b4
...@@ -183,11 +183,12 @@ class CoordinatorProc: ...@@ -183,11 +183,12 @@ class CoordinatorProc:
# engines are paused, so that we can wake the other # engines are paused, so that we can wake the other
# engines. # engines.
engine_to_exclude, wave = msgspec.msgpack.decode(buffer) engine_to_exclude, wave = msgspec.msgpack.decode(buffer)
if wave < self.current_wave:
# If the wave number is stale, ensure the message is
# handled by all the engines.
engine_to_exclude = None
if not self.engines_running: if not self.engines_running:
if wave < self.current_wave:
# If the wave number is stale, ensure the message
# is handled by all the engines.
engine_to_exclude = None
self.engines_running = True self.engines_running = True
self.stats_changed = True self.stats_changed = True
self._send_start_wave(publish_back, self.current_wave, self._send_start_wave(publish_back, self.current_wave,
...@@ -203,22 +204,24 @@ class CoordinatorProc: ...@@ -203,22 +204,24 @@ class CoordinatorProc:
assert outputs.utility_output is None assert outputs.utility_output is None
eng_index = outputs.engine_index eng_index = outputs.engine_index
if outputs.scheduler_stats: scheduler_stats = outputs.scheduler_stats
if scheduler_stats:
# 1. Updated request load stats - update our local # 1. Updated request load stats - update our local
# state with these. # state with these.
stats = self.engines[eng_index].request_counts stats = self.engines[eng_index].request_counts
stats[0] = outputs.scheduler_stats.num_waiting_reqs stats[0] = scheduler_stats.num_waiting_reqs
stats[1] = outputs.scheduler_stats.num_running_reqs stats[1] = scheduler_stats.num_running_reqs
self.stats_changed = True self.stats_changed = True
if (wave := outputs.wave_complete) is not None: if (wave := outputs.wave_complete) is not None:
# 2. Notification from rank 0 engine that we've # 2. Notification from rank 0 engine that we've
# moved into the global paused state # moved into the global paused state
# (engines_running==False) # (engines_running==False).
if self.current_wave <= wave: if self.current_wave <= wave:
new_wave = wave + 1
logger.debug("Moving DP wave from %d to %d.", logger.debug("Moving DP wave from %d to %d.",
self.current_wave, wave) self.current_wave, new_wave)
self.current_wave = wave + 1 self.current_wave = new_wave
self.engines_running = False self.engines_running = False
self.stats_changed = True self.stats_changed = True
elif (wave := outputs.start_wave) is not None and ( elif (wave := outputs.start_wave) is not None and (
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment