Commit 65bdde67 authored by Antoine Kaufmann's avatar Antoine Kaufmann Committed by Hejing Li
Browse files

experiments: add --profiler-int flag to periodially send sigusr1

parent 3d7826bc
......@@ -102,6 +102,13 @@ def parse_args() -> argparse.Namespace:
default=False,
help='Dump pcap file (if supported by component simulator)'
)
parser.add_argument(
'--profile-int',
metavar='S',
type=int,
default=None,
help='Enable periodic sigusr1 to each simulator every S seconds.'
)
# arguments for the experiment environment
g_env = parser.add_argument_group('Environment')
......@@ -310,6 +317,9 @@ def main():
verbose=args.verbose, executor=executors[0]
)
if args.profile_int:
rt.enable_profiler(args.profile_int)
# load experiments
if not args.pickled:
# default: load python modules with experiments
......
......@@ -169,6 +169,11 @@ class Component(object):
await self.kill()
await self._proc.wait()
async def sigusr1(self) -> None:
"""Sends an interrupt signal."""
if self._proc.returncode is None:
self._proc.send_signal(signal.SIGUSR1)
async def started(self) -> None:
pass
......
......@@ -45,6 +45,7 @@ class ExperimentBaseRunner(ABC):
self.exp = exp
self.env = env
self.verbose = verbose
self.profile_int: tp.Optional[int] = None
self.out = ExpOutput(exp)
self.running: tp.List[tp.Tuple[Simulator, SimpleComponent]] = []
self.sockets: tp.List[tp.Tuple[Executor, str]] = []
......@@ -182,7 +183,16 @@ class ExperimentBaseRunner(ABC):
await self.after_cleanup()
return self.out
async def profiler(self):
assert self.profile_int
while True:
await asyncio.sleep(self.profile_int)
for (sim,sc) in self.running:
await sc.sigusr1()
async def run(self) -> ExpOutput:
profiler_task = None
try:
self.out.set_start()
graph = self.sim_graph()
......@@ -202,6 +212,8 @@ class ExperimentBaseRunner(ABC):
for sim in sims:
ts.done(sim)
if self.profile_int:
profiler_task = asyncio.create_task(self.profiler())
await self.before_wait()
await self.wait_for_sims()
except asyncio.CancelledError:
......@@ -212,6 +224,11 @@ class ExperimentBaseRunner(ABC):
self.out.set_failed()
traceback.print_exc()
if profiler_task:
try:
profiler_task.cancel()
except asyncio.CancelledError:
pass
# The bare except above guarantees that we always execute the following
# code, which terminates all simulators and produces a proper output
# file.
......
......@@ -81,6 +81,7 @@ class Runtime(metaclass=ABCMeta):
def __init__(self) -> None:
self._interrupted = False
"""Indicates whether interrupt has been signaled."""
self.profile_int: tp.Optional[int] = None
@abstractmethod
def add_run(self, run: Run) -> None:
......@@ -108,3 +109,6 @@ class Runtime(metaclass=ABCMeta):
if not self._interrupted:
self._interrupted = True
self.interrupt_handler()
def enable_profiler(self, profile_int: int) -> None:
self.profile_int = profile_int
\ No newline at end of file
......@@ -57,6 +57,8 @@ class DistributedSimpleRuntime(Runtime):
run.env,
self.verbose
)
if self.profile_int:
runner.profile_int = self.profile_int
try:
for executor in self.executors:
......
......@@ -52,6 +52,8 @@ class LocalSimpleRuntime(Runtime):
runner = ExperimentSimpleRunner(
self.executor, run.experiment, run.env, self.verbose
)
if self.profile_int:
runner.profile_int = self.profile_int
await run.prep_dirs(self.executor)
await runner.prepare()
except asyncio.CancelledError:
......@@ -125,6 +127,8 @@ class LocalParallelRuntime(Runtime):
runner = ExperimentSimpleRunner(
self.executor, run.experiment, run.env, self.verbose
)
if self.profile_int:
runner.profile_int = self.profile_int
await run.prep_dirs(executor=self.executor)
await runner.prepare()
except asyncio.CancelledError:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment