Unverified Commit 95d19478 authored by fishyds's avatar fishyds Committed by GitHub
Browse files

Fix a race condidtion issue in trial_keeper for reading log from pipe (#578)

* Fix a race condidtion issue in trial_keeper for reading log from pipe
parent 0a3cc459
...@@ -129,13 +129,15 @@ class PipeLogReader(threading.Thread): ...@@ -129,13 +129,15 @@ class PipeLogReader(threading.Thread):
self.pipeReader = os.fdopen(self.fdRead) self.pipeReader = os.fdopen(self.fdRead)
self.orig_stdout = sys.__stdout__ self.orig_stdout = sys.__stdout__
self._is_read_completed = False self._is_read_completed = False
self.process_exit = False
def _populateQueue(stream, queue): def _populateQueue(stream, queue):
''' '''
Collect lines from 'stream' and put them in 'quque'. Collect lines from 'stream' and put them in 'quque'.
''' '''
time.sleep(5) time.sleep(5)
while True: while True:
cur_process_exit = self.process_exit
try: try:
line = self.queue.get(True, 5) line = self.queue.get(True, 5)
try: try:
...@@ -144,9 +146,10 @@ class PipeLogReader(threading.Thread): ...@@ -144,9 +146,10 @@ class PipeLogReader(threading.Thread):
self.orig_stdout.flush() self.orig_stdout.flush()
except Exception as e: except Exception as e:
pass pass
except Exception as e: except Exception as e:
self._is_read_completed = True if cur_process_exit == True:
break self._is_read_completed = True
break
self.pip_log_reader_thread = threading.Thread(target = _populateQueue, self.pip_log_reader_thread = threading.Thread(target = _populateQueue,
args = (self.pipeReader, self.queue)) args = (self.pipeReader, self.queue))
...@@ -175,4 +178,8 @@ class PipeLogReader(threading.Thread): ...@@ -175,4 +178,8 @@ class PipeLogReader(threading.Thread):
def is_read_completed(self): def is_read_completed(self):
"""Return if read is completed """Return if read is completed
""" """
return self._is_read_completed return self._is_read_completed
\ No newline at end of file
def set_process_exit(self):
self.process_exit = True
return self.process_exit
\ No newline at end of file
...@@ -65,7 +65,7 @@ def main_loop(args): ...@@ -65,7 +65,7 @@ def main_loop(args):
while True: while True:
retCode = process.poll() retCode = process.poll()
# child worker process exits and all stdout data is read # child worker process exits and all stdout data is read
if retCode is not None and log_pipe_stdout.is_read_completed == True: if retCode is not None and log_pipe_stdout.set_process_exit() and log_pipe_stdout.is_read_completed == True:
nni_log(LogType.Info, 'subprocess terminated. Exit code is {}. Quit'.format(retCode)) nni_log(LogType.Info, 'subprocess terminated. Exit code is {}. Quit'.format(retCode))
if args.pai_hdfs_output_dir is not None: if args.pai_hdfs_output_dir is not None:
# Copy local directory to hdfs for OpenPAI # Copy local directory to hdfs for OpenPAI
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment