Commit abc4d0e9 authored by Jonas Kaufmann's avatar Jonas Kaufmann Committed by Antoine Kaufmann
Browse files

orchestration: eliminate unnecessary output buffer copies

drastically speeds up simulation performance in the presence of a lot of output
parent 000cc16f
...@@ -81,14 +81,14 @@ class Component(object): ...@@ -81,14 +81,14 @@ class Component(object):
ls = self._parse_buf(self.stdout_buf, data) ls = self._parse_buf(self.stdout_buf, data)
if len(ls) > 0 or eof: if len(ls) > 0 or eof:
await self.process_out(ls, eof=eof) await self.process_out(ls, eof=eof)
self.stdout = self.stdout + ls self.stdout.extend(ls)
async def _consume_err(self, data: bytes): async def _consume_err(self, data: bytes):
eof = len(data) == 0 eof = len(data) == 0
ls = self._parse_buf(self.stderr_buf, data) ls = self._parse_buf(self.stderr_buf, data)
if len(ls) > 0 or eof: if len(ls) > 0 or eof:
await self.process_err(ls, eof=eof) await self.process_err(ls, eof=eof)
self.stderr = self.stderr + ls self.stderr.extend(ls)
async def _read_stream(self, stream: asyncio.StreamReader, fn): async def _read_stream(self, stream: asyncio.StreamReader, fn):
while True: while True:
......
...@@ -71,6 +71,11 @@ class DistributedSimpleRuntime(Runtime): ...@@ -71,6 +71,11 @@ class DistributedSimpleRuntime(Runtime):
run.output = await runner.run() # already handles CancelledError run.output = await runner.run() # already handles CancelledError
self.complete.append(run) self.complete.append(run)
# if the log is huge, this step takes some time
if self.verbose:
print(
f'Writing collected output of run {run.name()} to JSON file ...'
)
pathlib.Path(run.outpath).parent.mkdir(parents=True, exist_ok=True) pathlib.Path(run.outpath).parent.mkdir(parents=True, exist_ok=True)
with open(run.outpath, 'w', encoding='utf-8') as f: with open(run.outpath, 'w', encoding='utf-8') as f:
f.write(run.output.dumps()) f.write(run.output.dumps())
......
...@@ -63,6 +63,11 @@ class LocalSimpleRuntime(Runtime): ...@@ -63,6 +63,11 @@ class LocalSimpleRuntime(Runtime):
run.output = await runner.run() # already handles CancelledError run.output = await runner.run() # already handles CancelledError
self.complete.append(run) self.complete.append(run)
# if the log is huge, this step takes some time
if self.verbose:
print(
f'Writing collected output of run {run.name()} to JSON file ...'
)
pathlib.Path(run.outpath).parent.mkdir(parents=True, exist_ok=True) pathlib.Path(run.outpath).parent.mkdir(parents=True, exist_ok=True)
with open(run.outpath, 'w', encoding='utf-8') as f: with open(run.outpath, 'w', encoding='utf-8') as f:
f.write(run.output.dumps()) f.write(run.output.dumps())
...@@ -134,6 +139,11 @@ class LocalParallelRuntime(Runtime): ...@@ -134,6 +139,11 @@ class LocalParallelRuntime(Runtime):
print('starting run ', run.name()) print('starting run ', run.name())
run.output = await runner.run() # already handles CancelledError run.output = await runner.run() # already handles CancelledError
# if the log is huge, this step takes some time
if self.verbose:
print(
f'Writing collected output of run {run.name()} to JSON file ...'
)
pathlib.Path(run.outpath).parent.mkdir(parents=True, exist_ok=True) pathlib.Path(run.outpath).parent.mkdir(parents=True, exist_ok=True)
with open(run.outpath, 'w', encoding='utf-8') as f: with open(run.outpath, 'w', encoding='utf-8') as f:
f.write(run.output.dumps()) f.write(run.output.dumps())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment