Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
ycai
simbricks
Commits
05bfbefb
Commit
05bfbefb
authored
Jul 05, 2023
by
Jonas Kaufmann
Committed by
Antoine Kaufmann
Nov 05, 2023
Browse files
orchestration: add type annotations throughout
parent
85c94472
Changes
14
Hide whitespace changes
Inline
Side-by-side
Showing
14 changed files
with
387 additions
and
339 deletions
+387
-339
experiments/pyexps/dist_netperf.py
experiments/pyexps/dist_netperf.py
+1
-1
experiments/simbricks/orchestration/exectools.py
experiments/simbricks/orchestration/exectools.py
+73
-54
experiments/simbricks/orchestration/experiment/experiment_environment.py
...bricks/orchestration/experiment/experiment_environment.py
+25
-17
experiments/simbricks/orchestration/experiment/experiment_output.py
...s/simbricks/orchestration/experiment/experiment_output.py
+15
-9
experiments/simbricks/orchestration/experiments.py
experiments/simbricks/orchestration/experiments.py
+12
-12
experiments/simbricks/orchestration/nodeconfig.py
experiments/simbricks/orchestration/nodeconfig.py
+79
-79
experiments/simbricks/orchestration/proxy.py
experiments/simbricks/orchestration/proxy.py
+31
-26
experiments/simbricks/orchestration/runners.py
experiments/simbricks/orchestration/runners.py
+19
-17
experiments/simbricks/orchestration/runtime/common.py
experiments/simbricks/orchestration/runtime/common.py
+4
-4
experiments/simbricks/orchestration/runtime/distributed.py
experiments/simbricks/orchestration/runtime/distributed.py
+7
-5
experiments/simbricks/orchestration/runtime/local.py
experiments/simbricks/orchestration/runtime/local.py
+11
-11
experiments/simbricks/orchestration/runtime/slurm.py
experiments/simbricks/orchestration/runtime/slurm.py
+4
-4
experiments/simbricks/orchestration/simulator_utils.py
experiments/simbricks/orchestration/simulator_utils.py
+3
-3
experiments/simbricks/orchestration/simulators.py
experiments/simbricks/orchestration/simulators.py
+103
-97
No files found.
experiments/pyexps/dist_netperf.py
View file @
05bfbefb
...
...
@@ -122,7 +122,7 @@ for host_type in host_types:
e
.
assign_sim_host
(
c
.
pcidevs
[
0
],
k
)
if
k
!=
0
:
cp
.
add_nic
(
c
.
pcidev
s
[
0
])
cp
.
add_nic
(
c
.
nic
s
[
0
])
k
=
(
k
+
1
)
%
2
# add to experiments
...
...
experiments/simbricks/orchestration/exectools.py
View file @
05bfbefb
...
...
@@ -20,6 +20,7 @@
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import
abc
import
asyncio
import
os
import
pathlib
...
...
@@ -35,9 +36,9 @@ class Component(object):
def
__init__
(
self
,
cmd_parts
:
tp
.
List
[
str
],
with_stdin
=
False
):
self
.
is_ready
=
False
self
.
stdout
=
[]
self
.
stdout
:
tp
.
List
[
str
]
=
[]
self
.
stdout_buf
=
bytearray
()
self
.
stderr
=
[]
self
.
stderr
:
tp
.
List
[
str
]
=
[]
self
.
stderr_buf
=
bytearray
()
self
.
cmd_parts
=
cmd_parts
#print(cmd_parts)
...
...
@@ -46,7 +47,7 @@ class Component(object):
self
.
_proc
:
Process
self
.
_terminate_future
:
asyncio
.
Task
def
_parse_buf
(
self
,
buf
,
data
)
:
def
_parse_buf
(
self
,
buf
:
bytearray
,
data
:
bytes
)
->
tp
.
List
[
str
]
:
if
data
is
not
None
:
buf
.
extend
(
data
)
lines
=
[]
...
...
@@ -62,14 +63,14 @@ class Component(object):
lines
.
append
(
buf
.
decode
(
'utf-8'
))
return
lines
async
def
_consume_out
(
self
,
data
:
bytes
):
async
def
_consume_out
(
self
,
data
:
bytes
)
->
None
:
eof
=
len
(
data
)
==
0
ls
=
self
.
_parse_buf
(
self
.
stdout_buf
,
data
)
if
len
(
ls
)
>
0
or
eof
:
await
self
.
process_out
(
ls
,
eof
=
eof
)
self
.
stdout
.
extend
(
ls
)
async
def
_consume_err
(
self
,
data
:
bytes
):
async
def
_consume_err
(
self
,
data
:
bytes
)
->
None
:
eof
=
len
(
data
)
==
0
ls
=
self
.
_parse_buf
(
self
.
stderr_buf
,
data
)
if
len
(
ls
)
>
0
or
eof
:
...
...
@@ -85,7 +86,7 @@ class Component(object):
await
fn
(
bs
)
return
async
def
_waiter
(
self
):
async
def
_waiter
(
self
)
->
None
:
stdout_handler
=
asyncio
.
create_task
(
self
.
_read_stream
(
self
.
_proc
.
stdout
,
self
.
_consume_out
)
)
...
...
@@ -96,12 +97,12 @@ class Component(object):
await
asyncio
.
wait
([
stdout_handler
,
stderr_handler
])
await
self
.
terminated
(
rc
)
async
def
send_input
(
self
,
bs
,
eof
=
False
):
async
def
send_input
(
self
,
bs
:
bytes
,
eof
=
False
)
->
None
:
self
.
_proc
.
stdin
.
write
(
bs
)
if
eof
:
self
.
_proc
.
stdin
.
close
()
async
def
start
(
self
):
async
def
start
(
self
)
->
None
:
if
self
.
with_stdin
:
stdin
=
asyncio
.
subprocess
.
PIPE
else
:
...
...
@@ -116,7 +117,7 @@ class Component(object):
self
.
_terminate_future
=
asyncio
.
create_task
(
self
.
_waiter
())
await
self
.
started
()
async
def
wait
(
self
):
async
def
wait
(
self
)
->
None
:
"""
Wait for running process to finish and output to be collected.
...
...
@@ -125,22 +126,22 @@ class Component(object):
"""
await
asyncio
.
shield
(
self
.
_terminate_future
)
async
def
interrupt
(
self
):
async
def
interrupt
(
self
)
->
None
:
"""Sends an interrupt signal."""
if
self
.
_proc
.
returncode
is
None
:
self
.
_proc
.
send_signal
(
signal
.
SIGINT
)
async
def
terminate
(
self
):
async
def
terminate
(
self
)
->
None
:
"""Sends a terminate signal."""
if
self
.
_proc
.
returncode
is
None
:
self
.
_proc
.
terminate
()
async
def
kill
(
self
):
async
def
kill
(
self
)
->
None
:
"""Sends a kill signal."""
if
self
.
_proc
.
returncode
is
None
:
self
.
_proc
.
kill
()
async
def
int_term_kill
(
self
,
delay
=
5
)
:
async
def
int_term_kill
(
self
,
delay
:
int
=
5
)
->
None
:
"""Attempts to stop this component by sending signals in the following
order: interrupt, terminate, kill."""
await
self
.
interrupt
()
...
...
@@ -168,41 +169,47 @@ class Component(object):
await
self
.
_proc
.
wait
()
async
def
started
(
self
):
async
def
started
(
self
)
->
None
:
pass
async
def
terminated
(
self
,
rc
):
async
def
terminated
(
self
,
rc
)
->
None
:
pass
async
def
process_out
(
self
,
lines
,
eof
)
:
async
def
process_out
(
self
,
lines
:
tp
.
List
[
str
],
eof
:
bool
)
->
None
:
pass
async
def
process_err
(
self
,
lines
,
eof
)
:
async
def
process_err
(
self
,
lines
:
tp
.
List
[
str
],
eof
:
bool
)
->
None
:
pass
class
SimpleComponent
(
Component
):
def
__init__
(
self
,
label
,
cmd_parts
,
*
args
,
verbose
=
True
,
canfail
=
False
,
**
kwargs
):
self
,
label
:
str
,
cmd_parts
:
tp
.
List
[
str
],
*
args
,
verbose
=
True
,
canfail
=
False
,
**
kwargs
)
->
None
:
self
.
label
=
label
self
.
verbose
=
verbose
self
.
canfail
=
canfail
self
.
cmd_parts
=
cmd_parts
super
().
__init__
(
cmd_parts
,
*
args
,
**
kwargs
)
async
def
process_out
(
self
,
lines
,
eof
)
:
async
def
process_out
(
self
,
lines
:
tp
.
List
[
str
],
eof
:
bool
)
->
None
:
if
self
.
verbose
:
for
_
in
lines
:
print
(
self
.
label
,
'OUT:'
,
lines
,
flush
=
True
)
async
def
process_err
(
self
,
lines
,
eof
)
:
async
def
process_err
(
self
,
lines
:
tp
.
List
[
str
],
eof
:
bool
)
->
None
:
if
self
.
verbose
:
for
_
in
lines
:
print
(
self
.
label
,
'ERR:'
,
lines
,
flush
=
True
)
async
def
terminated
(
self
,
rc
)
:
async
def
terminated
(
self
,
rc
:
int
)
->
None
:
if
self
.
verbose
:
print
(
self
.
label
,
'TERMINATED:'
,
rc
,
flush
=
True
)
if
not
self
.
canfail
and
rc
!=
0
:
...
...
@@ -213,14 +220,14 @@ class SimpleRemoteComponent(SimpleComponent):
def
__init__
(
self
,
host_name
,
label
,
cmd_parts
,
host_name
:
str
,
label
:
str
,
cmd_parts
:
tp
.
List
[
str
]
,
*
args
,
cwd
=
None
,
ssh_extra_args
=
None
,
cwd
:
tp
.
Optional
[
str
]
=
None
,
ssh_extra_args
:
tp
.
Optional
[
tp
.
List
[
str
]]
=
None
,
**
kwargs
):
)
->
None
:
if
ssh_extra_args
is
None
:
ssh_extra_args
=
[]
...
...
@@ -246,7 +253,7 @@ class SimpleRemoteComponent(SimpleComponent):
self
.
_pid_fut
:
tp
.
Optional
[
asyncio
.
Future
]
=
None
def
_ssh_cmd
(
self
,
parts
)
:
def
_ssh_cmd
(
self
,
parts
:
tp
.
List
[
str
])
->
tp
.
List
[
str
]
:
"""SSH invocation of command for this host."""
return
[
'ssh'
,
...
...
@@ -256,13 +263,13 @@ class SimpleRemoteComponent(SimpleComponent):
'StrictHostKeyChecking=no'
]
+
self
.
extra_flags
+
[
self
.
host_name
,
'--'
]
+
parts
async
def
start
(
self
):
async
def
start
(
self
)
->
None
:
"""Start this command (includes waiting for its pid)."""
self
.
_pid_fut
=
asyncio
.
get_running_loop
().
create_future
()
await
super
().
start
()
await
self
.
_pid_fut
async
def
process_out
(
self
,
lines
,
eof
)
:
async
def
process_out
(
self
,
lines
:
tp
.
List
[
str
],
eof
:
bool
)
->
None
:
"""Scans output and set PID future once PID line found."""
if
not
self
.
_pid_fut
.
done
():
newlines
=
[]
...
...
@@ -282,7 +289,7 @@ class SimpleRemoteComponent(SimpleComponent):
self
.
_pid_fut
.
cancel
()
await
super
().
process_out
(
lines
,
eof
)
async
def
_kill_cmd
(
self
,
sig
)
:
async
def
_kill_cmd
(
self
,
sig
:
str
)
->
None
:
"""Send signal to command by running ssh kill -$sig $PID."""
cmd_parts
=
self
.
_ssh_cmd
([
'kill'
,
'-'
+
sig
,
str
(
self
.
_pid_fut
.
result
())
...
...
@@ -290,43 +297,47 @@ class SimpleRemoteComponent(SimpleComponent):
proc
=
await
asyncio
.
create_subprocess_exec
(
*
cmd_parts
)
await
proc
.
wait
()
async
def
interrupt
(
self
):
async
def
interrupt
(
self
)
->
None
:
await
self
.
_kill_cmd
(
'INT'
)
async
def
terminate
(
self
):
async
def
terminate
(
self
)
->
None
:
await
self
.
_kill_cmd
(
'TERM'
)
async
def
kill
(
self
):
async
def
kill
(
self
)
->
None
:
await
self
.
_kill_cmd
(
'KILL'
)
class
Executor
(
abc
.
ABC
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
self
.
ip
=
None
@
abc
.
abstractmethod
def
create_component
(
self
,
label
,
parts
,
**
kwargs
)
->
SimpleComponent
:
def
create_component
(
self
,
label
:
str
,
parts
:
tp
.
List
[
str
],
**
kwargs
)
->
SimpleComponent
:
pass
@
abc
.
abstractmethod
async
def
await_file
(
self
,
path
,
delay
=
0.05
,
verbose
=
False
)
->
None
:
async
def
await_file
(
self
,
path
:
str
,
delay
=
0.05
,
verbose
=
False
)
->
None
:
pass
@
abc
.
abstractmethod
async
def
send_file
(
self
,
path
,
verbose
=
False
)
->
None
:
async
def
send_file
(
self
,
path
:
str
,
verbose
=
False
)
->
None
:
pass
@
abc
.
abstractmethod
async
def
mkdir
(
self
,
path
,
verbose
=
False
)
->
None
:
async
def
mkdir
(
self
,
path
:
str
,
verbose
=
False
)
->
None
:
pass
@
abc
.
abstractmethod
async
def
rmtree
(
self
,
path
,
verbose
=
False
)
->
None
:
async
def
rmtree
(
self
,
path
:
str
,
verbose
=
False
)
->
None
:
pass
# runs the list of commands as strings sequentially
async
def
run_cmdlist
(
self
,
label
,
cmds
,
verbose
=
True
):
async
def
run_cmdlist
(
self
,
label
:
str
,
cmds
:
tp
.
List
[
str
],
verbose
=
True
)
->
None
:
i
=
0
for
cmd
in
cmds
:
cmd_c
=
self
.
create_component
(
...
...
@@ -335,7 +346,7 @@ class Executor(abc.ABC):
await
cmd_c
.
start
()
await
cmd_c
.
wait
()
async
def
await_files
(
self
,
paths
,
*
args
,
**
kwargs
):
async
def
await_files
(
self
,
paths
:
tp
.
List
[
str
]
,
*
args
,
**
kwargs
)
->
None
:
xs
=
[]
for
p
in
paths
:
waiter
=
asyncio
.
create_task
(
self
.
await_file
(
p
,
*
args
,
**
kwargs
))
...
...
@@ -346,10 +357,14 @@ class Executor(abc.ABC):
class
LocalExecutor
(
Executor
):
def
create_component
(
self
,
label
,
parts
,
**
kwargs
):
def
create_component
(
self
,
label
:
str
,
parts
:
tp
.
List
[
str
],
**
kwargs
)
->
SimpleComponent
:
return
SimpleComponent
(
label
,
parts
,
**
kwargs
)
async
def
await_file
(
self
,
path
,
delay
=
0.05
,
verbose
=
False
,
timeout
=
30
):
async
def
await_file
(
self
,
path
:
str
,
delay
=
0.05
,
verbose
=
False
,
timeout
=
30
)
->
None
:
if
verbose
:
print
(
f
'await_file(
{
path
}
)'
)
t
=
0
...
...
@@ -359,14 +374,14 @@ class LocalExecutor(Executor):
await
asyncio
.
sleep
(
delay
)
t
+=
delay
async
def
send_file
(
self
,
path
,
verbose
)
:
async
def
send_file
(
self
,
path
:
str
,
verbose
=
False
)
->
None
:
# locally we do not need to do anything
pass
async
def
mkdir
(
self
,
path
,
verbose
=
False
):
async
def
mkdir
(
self
,
path
:
str
,
verbose
=
False
)
->
None
:
pathlib
.
Path
(
path
).
mkdir
(
parents
=
True
,
exist_ok
=
True
)
async
def
rmtree
(
self
,
path
,
verbose
=
False
):
async
def
rmtree
(
self
,
path
:
str
,
verbose
=
False
)
->
None
:
if
os
.
path
.
isdir
(
path
):
shutil
.
rmtree
(
path
,
ignore_errors
=
True
)
elif
os
.
path
.
exists
(
path
):
...
...
@@ -375,7 +390,7 @@ class LocalExecutor(Executor):
class
RemoteExecutor
(
Executor
):
def
__init__
(
self
,
host_name
,
workdir
)
:
def
__init__
(
self
,
host_name
:
str
,
workdir
:
str
)
->
None
:
super
().
__init__
()
self
.
host_name
=
host_name
...
...
@@ -383,7 +398,9 @@ class RemoteExecutor(Executor):
self
.
ssh_extra_args
=
[]
self
.
scp_extra_args
=
[]
def
create_component
(
self
,
label
,
parts
,
**
kwargs
):
def
create_component
(
self
,
label
:
str
,
parts
:
tp
.
List
[
str
],
**
kwargs
)
->
SimpleRemoteComponent
:
return
SimpleRemoteComponent
(
self
.
host_name
,
label
,
...
...
@@ -393,7 +410,9 @@ class RemoteExecutor(Executor):
**
kwargs
)
async
def
await_file
(
self
,
path
,
delay
=
0.05
,
verbose
=
False
,
timeout
=
30
):
async
def
await_file
(
self
,
path
:
str
,
delay
=
0.05
,
verbose
=
False
,
timeout
=
30
)
->
None
:
if
verbose
:
print
(
f
'
{
self
.
host_name
}
.await_file(
{
path
}
) started'
)
...
...
@@ -416,7 +435,7 @@ class RemoteExecutor(Executor):
# TODO: Implement opitimized await_files()
async
def
send_file
(
self
,
path
,
verbose
)
:
async
def
send_file
(
self
,
path
:
str
,
verbose
=
False
)
->
None
:
parts
=
[
'scp'
,
'-o'
,
...
...
@@ -433,7 +452,7 @@ class RemoteExecutor(Executor):
await
sc
.
start
()
await
sc
.
wait
()
async
def
mkdir
(
self
,
path
,
verbose
=
False
):
async
def
mkdir
(
self
,
path
:
str
,
verbose
=
False
)
->
None
:
sc
=
self
.
create_component
(
f
"
{
self
.
host_name
}
.mkdir('
{
path
}
')"
,
[
'mkdir'
,
'-p'
,
path
],
canfail
=
False
,
...
...
@@ -442,7 +461,7 @@ class RemoteExecutor(Executor):
await
sc
.
start
()
await
sc
.
wait
()
async
def
rmtree
(
self
,
path
,
verbose
=
False
):
async
def
rmtree
(
self
,
path
:
str
,
verbose
=
False
)
->
None
:
sc
=
self
.
create_component
(
f
'
{
self
.
host_name
}
.rmtree("
{
path
}
")'
,
[
'rm'
,
'-rf'
,
path
],
canfail
=
False
,
...
...
experiments/simbricks/orchestration/experiment/experiment_environment.py
View file @
05bfbefb
...
...
@@ -21,12 +21,16 @@
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import
os
import
typing
as
tp
if
tp
.
TYPE_CHECKING
:
# prevent cyclic import
from
simbricks.orchestration
import
simulators
class
ExpEnv
(
object
):
"""Manages the experiment environment."""
def
__init__
(
self
,
repo_path
,
workdir
,
cpdir
):
def
__init__
(
self
,
repo_path
,
workdir
,
cpdir
)
->
None
:
self
.
create_cp
=
False
"""Whether a checkpoint should be created."""
self
.
restore_cp
=
False
...
...
@@ -53,50 +57,54 @@ class ExpEnv(object):
f
'
{
simics_project_base
}
/targets/qsp-x86/qsp-modern-core.simics'
)
def
gem5_path
(
self
,
variant
)
:
def
gem5_path
(
self
,
variant
:
str
)
->
str
:
return
f
'
{
self
.
repodir
}
/sims/external/gem5/build/X86/gem5.
{
variant
}
'
def
hdcopy_path
(
self
,
sim
)
:
def
hdcopy_path
(
self
,
sim
:
'simulators.Simulator'
)
->
str
:
return
f
'
{
self
.
workdir
}
/hdcopy.
{
sim
.
name
}
'
def
hd_path
(
self
,
hd_name
)
:
def
hd_path
(
self
,
hd_name
:
str
)
->
str
:
return
f
'
{
self
.
repodir
}
/images/output-
{
hd_name
}
/
{
hd_name
}
'
def
hd_raw_path
(
self
,
hd_name
)
:
def
hd_raw_path
(
self
,
hd_name
:
str
)
->
str
:
return
f
'
{
self
.
repodir
}
/images/output-
{
hd_name
}
/
{
hd_name
}
.raw'
def
cfgtar_path
(
self
,
sim
)
:
def
cfgtar_path
(
self
,
sim
:
'simulators.Simulator'
)
->
str
:
return
f
'
{
self
.
workdir
}
/cfg.
{
sim
.
name
}
.tar'
def
dev_pci_path
(
self
,
sim
):
def
dev_pci_path
(
self
,
sim
)
->
str
:
return
f
'
{
self
.
workdir
}
/dev.pci.
{
sim
.
name
}
'
def
dev_mem_path
(
self
,
sim
)
:
def
dev_mem_path
(
self
,
sim
:
'simulators.Simulator'
)
->
str
:
return
f
'
{
self
.
workdir
}
/dev.mem.
{
sim
.
name
}
'
def
nic_eth_path
(
self
,
sim
)
:
def
nic_eth_path
(
self
,
sim
:
'simulators.Simulator'
)
->
str
:
return
f
'
{
self
.
workdir
}
/nic.eth.
{
sim
.
name
}
'
def
dev_shm_path
(
self
,
sim
)
:
def
dev_shm_path
(
self
,
sim
:
'simulators.Simulator'
)
->
str
:
return
f
'
{
self
.
shm_base
}
/dev.shm.
{
sim
.
name
}
'
def
n2n_eth_path
(
self
,
sim_l
,
sim_c
):
def
n2n_eth_path
(
self
,
sim_l
:
'simulators.Simulator'
,
sim_c
:
'simulators.Simulator'
)
->
str
:
return
f
'
{
self
.
workdir
}
/n2n.eth.
{
sim_l
.
name
}
.
{
sim_c
.
name
}
'
def
net2host_eth_path
(
self
,
sim_n
,
sim_h
):
def
net2host_eth_path
(
self
,
sim_n
,
sim_h
)
->
str
:
return
f
'
{
self
.
workdir
}
/n2h.eth.
{
sim_n
.
name
}
.
{
sim_h
.
name
}
'
def
net2host_shm_path
(
self
,
sim_n
,
sim_h
):
def
net2host_shm_path
(
self
,
sim_n
:
'simulators.Simulator'
,
sim_h
:
'simulators.Simulator'
)
->
str
:
return
f
'
{
self
.
workdir
}
/n2h.shm.
{
sim_n
.
name
}
.
{
sim_h
.
name
}
'
def
proxy_shm_path
(
self
,
sim
)
:
def
proxy_shm_path
(
self
,
sim
:
'simulators.Simulator'
)
->
str
:
return
f
'
{
self
.
shm_base
}
/proxy.shm.
{
sim
.
name
}
'
def
gem5_outdir
(
self
,
sim
)
:
def
gem5_outdir
(
self
,
sim
:
'simulators.Simulator'
)
->
str
:
return
f
'
{
self
.
workdir
}
/gem5-out.
{
sim
.
name
}
'
def
gem5_cpdir
(
self
,
sim
)
:
def
gem5_cpdir
(
self
,
sim
:
'simulators.Simulator'
)
->
str
:
return
f
'
{
self
.
cpdir
}
/gem5-cp.
{
sim
.
name
}
'
def
simics_cpfile
(
self
,
sim
)
:
def
simics_cpfile
(
self
,
sim
:
'simulators.Simulator'
)
->
str
:
return
f
'
{
self
.
cpdir
}
/simics-cp.
{
sim
.
name
}
'
experiments/simbricks/orchestration/experiment/experiment_output.py
View file @
05bfbefb
...
...
@@ -23,36 +23,42 @@
import
json
import
pathlib
import
time
import
typing
as
tp
from
simbricks.orchestration.experiments
import
Experiment
if
tp
.
TYPE_CHECKING
:
# prevent cyclic import
from
simbricks.orchestration
import
exectools
,
simulators
class
ExpOutput
(
object
):
"""Manages an experiment's output."""
def
__init__
(
self
,
exp
:
Experiment
):
def
__init__
(
self
,
exp
:
Experiment
)
->
None
:
self
.
exp_name
=
exp
.
name
self
.
metadata
=
exp
.
metadata
self
.
start_time
=
None
self
.
end_time
=
None
self
.
sims
=
{}
self
.
sims
:
tp
.
Dict
[
str
,
tp
.
Dict
[
str
,
tp
.
Union
[
str
,
tp
.
List
[
str
]]]]
=
{}
self
.
success
=
True
self
.
interrupted
=
False
def
set_start
(
self
):
def
set_start
(
self
)
->
None
:
self
.
start_time
=
time
.
time
()
def
set_end
(
self
):
def
set_end
(
self
)
->
None
:
self
.
end_time
=
time
.
time
()
def
set_failed
(
self
):
def
set_failed
(
self
)
->
None
:
self
.
success
=
False
def
set_interrupted
(
self
):
def
set_interrupted
(
self
)
->
None
:
self
.
success
=
False
self
.
interrupted
=
True
def
add_sim
(
self
,
sim
,
comp
):
def
add_sim
(
self
,
sim
:
'simulators.Simulator'
,
comp
:
'exectools.Component'
)
->
None
:
obj
=
{
'class'
:
sim
.
__class__
.
__name__
,
'cmd'
:
comp
.
cmd_parts
,
...
...
@@ -61,12 +67,12 @@ class ExpOutput(object):
}
self
.
sims
[
sim
.
full_name
()]
=
obj
def
dump
(
self
,
outpath
:
str
):
def
dump
(
self
,
outpath
:
str
)
->
None
:
pathlib
.
Path
(
outpath
).
parent
.
mkdir
(
parents
=
True
,
exist_ok
=
True
)
with
open
(
outpath
,
'w'
,
encoding
=
'utf-8'
)
as
file
:
json
.
dump
(
self
.
__dict__
,
file
)
def
load
(
self
,
file
:
str
):
def
load
(
self
,
file
:
str
)
->
None
:
with
open
(
file
,
'r'
,
encoding
=
'utf-8'
)
as
fp
:
for
k
,
v
in
json
.
load
(
fp
).
items
():
self
.
__dict__
[
k
]
=
v
experiments/simbricks/orchestration/experiments.py
View file @
05bfbefb
...
...
@@ -37,7 +37,7 @@ class Experiment(object):
Contains the simulators to be run and experiment-wide parameters.
"""
def
__init__
(
self
,
name
:
str
):
def
__init__
(
self
,
name
:
str
)
->
None
:
self
.
name
=
name
"""
This experiment's name. Can be used to run only a selection of
...
...
@@ -64,13 +64,13 @@ class Experiment(object):
"""The network memory simulators to run."""
self
.
networks
:
tp
.
List
[
NetSim
]
=
[]
"""The network simulators to run."""
self
.
metadata
=
{}
self
.
metadata
:
tp
.
Dict
[
str
,
tp
.
Any
]
=
{}
@
property
def
nics
(
self
):
return
filter
(
lambda
pcidev
:
pcidev
.
is_nic
(),
self
.
pcidevs
)
def
add_host
(
self
,
sim
:
HostSim
):
def
add_host
(
self
,
sim
:
HostSim
)
->
None
:
"""Add a host simulator to the experiment."""
for
h
in
self
.
hosts
:
if
h
.
name
==
sim
.
name
:
...
...
@@ -81,7 +81,7 @@ class Experiment(object):
"""Add a NIC simulator to the experiment."""
self
.
add_pcidev
(
sim
)
def
add_pcidev
(
self
,
sim
:
PCIDevSim
):
def
add_pcidev
(
self
,
sim
:
PCIDevSim
)
->
None
:
"""Add a PCIe device simulator to the experiment."""
for
d
in
self
.
pcidevs
:
if
d
.
name
==
sim
.
name
:
...
...
@@ -100,27 +100,27 @@ class Experiment(object):
raise
ValueError
(
'Duplicate netmems name'
)
self
.
netmems
.
append
(
sim
)
def
add_network
(
self
,
sim
:
NetSim
):
def
add_network
(
self
,
sim
:
NetSim
)
->
None
:
"""Add a network simulator to the experiment."""
for
n
in
self
.
networks
:
if
n
.
name
==
sim
.
name
:
raise
ValueError
(
'Duplicate net name'
)
self
.
networks
.
append
(
sim
)
def
all_simulators
(
self
):
def
all_simulators
(
self
)
->
tp
.
Iterable
[
Simulator
]
:
"""Returns all simulators defined to run in this experiment."""
return
itertools
.
chain
(
self
.
hosts
,
self
.
pcidevs
,
self
.
memdevs
,
self
.
netmems
,
self
.
networks
)
def
resreq_mem
(
self
):
def
resreq_mem
(
self
)
->
int
:
"""Memory required to run all simulators in this experiment."""
mem
=
0
for
s
in
self
.
all_simulators
():
mem
+=
s
.
resreq_mem
()
return
mem
def
resreq_cores
(
self
):
def
resreq_cores
(
self
)
->
int
:
"""Number of Cores required to run all simulators in this experiment."""
cores
=
0
for
s
in
self
.
all_simulators
():
...
...
@@ -131,7 +131,7 @@ class Experiment(object):
class
DistributedExperiment
(
Experiment
):
"""Describes a distributed simulation experiment."""
def
__init__
(
self
,
name
:
str
,
num_hosts
:
int
):
def
__init__
(
self
,
name
:
str
,
num_hosts
:
int
)
->
None
:
super
().
__init__
(
name
)
self
.
num_hosts
=
num_hosts
"""Number of hosts to use."""
...
...
@@ -146,17 +146,17 @@ class DistributedExperiment(Experiment):
else
:
self
.
proxies_connect
.
append
(
tp
.
cast
(
NetProxyConnecter
,
proxy
))
def
all_simulators
(
self
):
def
all_simulators
(
self
)
->
tp
.
Iterable
[
Simulator
]
:
return
itertools
.
chain
(
super
().
all_simulators
(),
self
.
proxies_listen
,
self
.
proxies_connect
)
def
assign_sim_host
(
self
,
sim
:
Simulator
,
host
:
int
):
def
assign_sim_host
(
self
,
sim
:
Simulator
,
host
:
int
)
->
None
:
"""Assign host ID (< self.num_hosts) for a simulator."""
assert
0
<=
host
<
self
.
num_hosts
self
.
host_mapping
[
sim
]
=
host
def
all_sims_assigned
(
self
):
def
all_sims_assigned
(
self
)
->
bool
:
"""Check if all simulators are assigned to a host."""
for
s
in
self
.
all_simulators
():
if
s
not
in
self
.
host_mapping
:
...
...
experiments/simbricks/orchestration/nodeconfig.py
View file @
05bfbefb
...
...
@@ -54,13 +54,13 @@ class AppConfig():
"""
return
{}
def
strfile
(
self
,
s
:
str
):
def
strfile
(
self
,
s
:
str
)
->
io
.
BytesIO
:
"""
Helper function to convert a string to an IO handle for usage in
`config_files()`.
Using this, you can create a file with the string as its content on the
node.
simulated
node.
"""
return
io
.
BytesIO
(
bytes
(
s
,
encoding
=
'UTF-8'
))
...
...
@@ -68,7 +68,7 @@ class AppConfig():
class
NodeConfig
():
"""Defines the configuration of a node or host."""
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
self
.
sim
=
'qemu'
"""The concrete simulator that runs this node config. This is used to
use execute different commands depending on the concrete simulator,
...
...
@@ -115,7 +115,7 @@ class NodeConfig():
self
.
run_cmds
()
+
self
.
cleanup_cmds
()
+
exit_es
return
'
\n
'
.
join
(
es
)
def
make_tar
(
self
,
path
)
:
def
make_tar
(
self
,
path
:
str
)
->
None
:
with
tarfile
.
open
(
path
,
'w:'
)
as
tar
:
# add main run script
cfg_i
=
tarfile
.
TarInfo
(
'guest/run.sh'
)
...
...
@@ -169,26 +169,26 @@ class NodeConfig():
"""
return
self
.
app
.
config_files
()
def
strfile
(
self
,
s
:
str
):
def
strfile
(
self
,
s
:
str
)
->
io
.
BytesIO
:
"""
Helper function to convert a string to an IO handle for usage in
`config_files()`.
Using this, you can create a file with the string as its content on the
node.
simulated
node.
"""
return
io
.
BytesIO
(
bytes
(
s
,
encoding
=
'UTF-8'
))
class
LinuxNode
(
NodeConfig
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
ifname
=
'eth0'
self
.
drivers
=
[]
self
.
force_mac_addr
=
None
self
.
drivers
:
tp
.
List
[
str
]
=
[]
self
.
force_mac_addr
:
tp
.
Optional
[
str
]
=
None
def
prepare_post_cp
(
self
):
def
prepare_post_cp
(
self
)
->
tp
.
List
[
str
]
:
l
=
[]
for
d
in
self
.
drivers
:
if
d
[
0
]
==
'/'
:
...
...
@@ -207,40 +207,40 @@ class LinuxNode(NodeConfig):
class
I40eLinuxNode
(
LinuxNode
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
drivers
.
append
(
'i40e'
)
class
CorundumLinuxNode
(
LinuxNode
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
drivers
.
append
(
'/tmp/guest/mqnic.ko'
)
# pylint: disable=consider-using-with
def
config_files
(
self
):
def
config_files
(
self
)
->
tp
.
Dict
[
str
,
tp
.
IO
]
:
m
=
{
'mqnic.ko'
:
open
(
'../images/mqnic/mqnic.ko'
,
'rb'
)}
return
{
**
m
,
**
super
().
config_files
()}
class
E1000LinuxNode
(
LinuxNode
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
drivers
.
append
(
'e1000'
)
class
MtcpNode
(
NodeConfig
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
disk_image
=
'mtcp'
self
.
pci_dev
=
'0000:00:02.0'
self
.
memory
=
16
*
1024
self
.
num_hugepages
=
4096
def
prepare_pre_cp
(
self
):
def
prepare_pre_cp
(
self
)
->
tp
.
List
[
str
]
:
return
super
().
prepare_pre_cp
()
+
[
'mount -t proc proc /proc'
,
'mount -t sysfs sysfs /sys'
,
...
...
@@ -252,7 +252,7 @@ class MtcpNode(NodeConfig):
'node/node0/hugepages/hugepages-2048kB/nr_hugepages'
,
]
def
prepare_post_cp
(
self
):
def
prepare_post_cp
(
self
)
->
tp
.
List
[
str
]
:
return
super
().
prepare_post_cp
()
+
[
'insmod /root/mtcp/dpdk/x86_64-native-linuxapp-gcc/kmod/igb_uio.ko'
,
'/root/mtcp/dpdk/usertools/dpdk-devbind.py -b igb_uio '
+
...
...
@@ -263,7 +263,7 @@ class MtcpNode(NodeConfig):
f
'ip addr add
{
self
.
ip
}
/
{
self
.
prefix
}
dev dpdk0'
]
def
config_files
(
self
):
def
config_files
(
self
)
->
tp
.
Dict
[
str
,
tp
.
IO
]
:
m
=
{
'mtcp.conf'
:
self
.
strfile
(
...
...
@@ -286,7 +286,7 @@ class MtcpNode(NodeConfig):
class
TASNode
(
NodeConfig
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
disk_image
=
'tas'
self
.
pci_dev
=
'0000:00:02.0'
...
...
@@ -295,7 +295,7 @@ class TASNode(NodeConfig):
self
.
fp_cores
=
1
self
.
preload
=
True
def
prepare_pre_cp
(
self
):
def
prepare_pre_cp
(
self
)
->
tp
.
List
[
str
]
:
return
super
().
prepare_pre_cp
()
+
[
'mount -t proc proc /proc'
,
'mount -t sysfs sysfs /sys'
,
...
...
@@ -307,7 +307,7 @@ class TASNode(NodeConfig):
'node/node0/hugepages/hugepages-2048kB/nr_hugepages'
,
]
def
prepare_post_cp
(
self
):
def
prepare_post_cp
(
self
)
->
tp
.
List
[
str
]
:
cmds
=
super
().
prepare_post_cp
()
+
[
'insmod /root/dpdk/lib/modules/5.4.46/extra/dpdk/igb_uio.ko'
,
'/root/dpdk/sbin/dpdk-devbind -b igb_uio '
+
self
.
pci_dev
,
...
...
@@ -326,7 +326,7 @@ class TASNode(NodeConfig):
class
I40eDCTCPNode
(
NodeConfig
):
def
prepare_pre_cp
(
self
):
def
prepare_pre_cp
(
self
)
->
tp
.
List
[
str
]
:
return
super
().
prepare_pre_cp
()
+
[
'mount -t proc proc /proc'
,
'mount -t sysfs sysfs /sys'
,
...
...
@@ -342,7 +342,7 @@ class I40eDCTCPNode(NodeConfig):
'sysctl -w net.ipv4.tcp_ecn=1'
]
def
prepare_post_cp
(
self
):
def
prepare_post_cp
(
self
)
->
tp
.
List
[
str
]
:
return
super
().
prepare_post_cp
()
+
[
'modprobe i40e'
,
'ethtool -G eth0 rx 4096 tx 4096'
,
...
...
@@ -355,7 +355,7 @@ class I40eDCTCPNode(NodeConfig):
class
CorundumDCTCPNode
(
NodeConfig
):
def
prepare_pre_cp
(
self
):
def
prepare_pre_cp
(
self
)
->
tp
.
List
[
str
]
:
return
super
().
prepare_pre_cp
()
+
[
'mount -t proc proc /proc'
,
'mount -t sysfs sysfs /sys'
,
...
...
@@ -371,7 +371,7 @@ class CorundumDCTCPNode(NodeConfig):
'sysctl -w net.ipv4.tcp_ecn=1'
]
def
prepare_post_cp
(
self
):
def
prepare_post_cp
(
self
)
->
tp
.
List
[
str
]
:
return
super
().
prepare_post_cp
()
+
[
'insmod mqnic.ko'
,
'ip link set dev eth0 up'
,
...
...
@@ -381,11 +381,11 @@ class CorundumDCTCPNode(NodeConfig):
class
LinuxFEMUNode
(
NodeConfig
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
drivers
=
[
'nvme'
]
def
prepare_post_cp
(
self
):
def
prepare_post_cp
(
self
)
->
tp
.
List
[
str
]
:
l
=
[
'lspci -vvvv'
]
for
d
in
self
.
drivers
:
if
d
[
0
]
==
'/'
:
...
...
@@ -397,13 +397,13 @@ class LinuxFEMUNode(NodeConfig):
class
IdleHost
(
AppConfig
):
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
return
[
'sleep infinity'
]
class
NVMEFsTest
(
AppConfig
):
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
return
[
'mount -t proc proc /proc'
,
'mkfs.ext3 /dev/nvme0n1'
,
...
...
@@ -414,18 +414,18 @@ class NVMEFsTest(AppConfig):
class
DctcpServer
(
AppConfig
):
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
return
[
'iperf -s -w 1M -Z dctcp'
]
class
DctcpClient
(
AppConfig
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
server_ip
=
'192.168.64.1'
self
.
is_last
=
False
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
if
self
.
is_last
:
return
[
'sleep 1'
,
...
...
@@ -442,35 +442,35 @@ class DctcpClient(AppConfig):
class
PingClient
(
AppConfig
):
def
__init__
(
self
,
server_ip
:
str
=
'192.168.64.1'
):
def
__init__
(
self
,
server_ip
:
str
=
'192.168.64.1'
)
->
None
:
super
().
__init__
()
self
.
server_ip
=
server_ip
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
return
[
f
'ping
{
self
.
server_ip
}
-c 10'
]
class
IperfTCPServer
(
AppConfig
):
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
return
[
'iperf -s -l 32M -w 32M'
]
class
IperfUDPServer
(
AppConfig
):
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
return
[
'iperf -s -u'
]
class
IperfTCPClient
(
AppConfig
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
server_ip
=
'10.0.0.1'
self
.
procs
=
1
self
.
is_last
=
False
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
cmds
=
[
'sleep 1'
,
...
...
@@ -486,13 +486,13 @@ class IperfTCPClient(AppConfig):
class
IperfUDPClient
(
AppConfig
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
server_ip
=
'10.0.0.1'
self
.
rate
=
'150m'
self
.
is_last
=
False
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
cmds
=
[
'sleep 1'
,
'iperf -c '
+
self
.
server_ip
+
' -i 1 -u -b '
+
self
.
rate
...
...
@@ -508,13 +508,13 @@ class IperfUDPClient(AppConfig):
class
IperfUDPShortClient
(
AppConfig
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
server_ip
=
'10.0.0.1'
self
.
rate
=
'150m'
self
.
is_last
=
False
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
cmds
=
[
'sleep 1'
,
'iperf -c '
+
self
.
server_ip
+
' -u -n 1 '
]
return
cmds
...
...
@@ -522,23 +522,23 @@ class IperfUDPShortClient(AppConfig):
class
IperfUDPClientSleep
(
AppConfig
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
server_ip
=
'10.0.0.1'
self
.
rate
=
'150m'
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
return
[
'sleep 1'
,
'sleep 10'
]
class
NoTraffic
(
AppConfig
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
is_sleep
=
1
self
.
is_server
=
0
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
cmds
=
[]
if
self
.
is_server
:
cmds
.
append
(
'sleep infinity'
)
...
...
@@ -554,19 +554,19 @@ class NoTraffic(AppConfig):
class
NetperfServer
(
AppConfig
):
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
return
[
'netserver'
,
'sleep infinity'
]
class
NetperfClient
(
AppConfig
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
server_ip
=
'10.0.0.1'
self
.
duration_tp
=
10
self
.
duration_lat
=
10
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
return
[
'netserver'
,
'sleep 0.5'
,
...
...
@@ -580,11 +580,11 @@ class NetperfClient(AppConfig):
class
VRReplica
(
AppConfig
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
index
=
0
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
return
[
'/root/nopaxos/bench/replica -c /root/nopaxos.config -i '
+
str
(
self
.
index
)
+
' -m vr'
...
...
@@ -593,11 +593,11 @@ class VRReplica(AppConfig):
class
VRClient
(
AppConfig
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
server_ips
=
[]
self
.
server_ips
:
tp
.
List
[
str
]
=
[]
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
cmds
=
[]
for
ip
in
self
.
server_ips
:
cmds
.
append
(
'ping -c 2 '
+
ip
)
...
...
@@ -610,11 +610,11 @@ class VRClient(AppConfig):
class
NOPaxosReplica
(
AppConfig
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
index
=
0
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
return
[
'/root/nopaxos/bench/replica -c /root/nopaxos.config -i '
+
str
(
self
.
index
)
+
' -m nopaxos'
...
...
@@ -623,13 +623,13 @@ class NOPaxosReplica(AppConfig):
class
NOPaxosClient
(
AppConfig
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
server_ips
=
[]
self
.
server_ips
:
tp
.
List
[
str
]
=
[]
self
.
is_last
=
False
self
.
use_ehseq
=
False
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
cmds
=
[]
for
ip
in
self
.
server_ips
:
cmds
.
append
(
'ping -c 2 '
+
ip
)
...
...
@@ -647,7 +647,7 @@ class NOPaxosClient(AppConfig):
class
NOPaxosSequencer
(
AppConfig
):
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
return
[(
'/root/nopaxos/sequencer/sequencer -c /root/nopaxos.config'
' -m nopaxos'
...
...
@@ -656,14 +656,14 @@ class NOPaxosSequencer(AppConfig):
class
RPCServer
(
AppConfig
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
port
=
1234
self
.
threads
=
1
self
.
max_flows
=
1234
self
.
max_bytes
=
1024
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
exe
=
'echoserver_linux'
if
not
isinstance
(
node
,
MtcpNode
)
else
\
'echoserver_mtcp'
return
[
...
...
@@ -677,7 +677,7 @@ class RPCServer(AppConfig):
class
RPCClient
(
AppConfig
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
server_ip
=
'10.0.0.1'
self
.
port
=
1234
...
...
@@ -690,7 +690,7 @@ class RPCClient(AppConfig):
self
.
max_pend_conns
=
8
self
.
time
=
25
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
exe
=
'testclient_linux'
if
not
isinstance
(
node
,
MtcpNode
)
else
\
'testclient_mtcp'
return
[
...
...
@@ -710,14 +710,14 @@ class RPCClient(AppConfig):
class
HTTPD
(
AppConfig
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
threads
=
1
self
.
file_size
=
64
self
.
mtcp_config
=
'lighttpd.conf'
self
.
httpd_dir
=
''
# TODO added because doesn't originally exist
def
prepare_pre_cp
(
self
):
def
prepare_pre_cp
(
self
)
->
tp
.
List
[
str
]
:
return
[
'mkdir -p /srv/www/htdocs/ /tmp/lighttpd/'
,
(
...
...
@@ -726,7 +726,7 @@ class HTTPD(AppConfig):
)
]
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
return
[
f
'cd
{
self
.
httpd_dir
}
/src/'
,
(
...
...
@@ -738,26 +738,26 @@ class HTTPD(AppConfig):
class
HTTPDLinux
(
HTTPD
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
httpd_dir
=
'/root/mtcp/apps/lighttpd-mtlinux'
class
HTTPDLinuxRPO
(
HTTPD
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
httpd_dir
=
'/root/mtcp/apps/lighttpd-mtlinux-rop'
class
HTTPDMtcp
(
HTTPD
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
httpd_dir
=
'/root/mtcp/apps/lighttpd-mtcp'
self
.
mtcp_config
=
'm-lighttpd.conf'
def
prepare_pre_cp
(
self
):
def
prepare_pre_cp
(
self
)
->
tp
.
List
[
str
]
:
return
super
().
prepare_pre_cp
()
+
[
f
'cp /tmp/guest/mtcp.conf
{
self
.
httpd_dir
}
/src/mtcp.conf'
,
(
...
...
@@ -770,7 +770,7 @@ class HTTPDMtcp(HTTPD):
class
HTTPC
(
AppConfig
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
server_ip
=
'10.0.0.1'
self
.
conns
=
1000
...
...
@@ -780,7 +780,7 @@ class HTTPC(AppConfig):
self
.
url
=
'/file'
self
.
ab_dir
=
''
# TODO added because doesn't originally exist
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
return
[
f
'cd
{
self
.
ab_dir
}
/support/'
,
(
...
...
@@ -792,18 +792,18 @@ class HTTPC(AppConfig):
class
HTTPCLinux
(
HTTPC
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
ab_dir
=
'/root/mtcp/apps/ab-linux'
class
HTTPCMtcp
(
HTTPC
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
ab_dir
=
'/root/mtcp/apps/ab-mtcp'
def
prepare_pre_cp
(
self
):
def
prepare_pre_cp
(
self
)
->
tp
.
List
[
str
]
:
return
super
().
prepare_pre_cp
()
+
[
f
'cp /tmp/guest/mtcp.conf
{
self
.
ab_dir
}
/support/config/mtcp.conf'
,
f
'rm -f
{
self
.
ab_dir
}
/support/config/arp.conf'
...
...
@@ -812,20 +812,20 @@ class HTTPCMtcp(HTTPC):
class
MemcachedServer
(
AppConfig
):
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
return
[
'memcached -u root -t 1 -c 4096'
]
class
MemcachedClient
(
AppConfig
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
server_ips
=
[
'10.0.0.1'
]
self
.
threads
=
1
self
.
concurrency
=
1
self
.
throughput
=
'1k'
def
run_cmds
(
self
,
node
)
:
def
run_cmds
(
self
,
node
:
NodeConfig
)
->
tp
.
List
[
str
]
:
servers
=
[
ip
+
':11211'
for
ip
in
self
.
server_ips
]
servers
=
','
.
join
(
servers
)
return
[(
...
...
experiments/simbricks/orchestration/proxy.py
View file @
05bfbefb
...
...
@@ -24,60 +24,61 @@ import typing as tp
from
simbricks.orchestration.simulators
import
NICSim
,
Simulator
if
tp
.
TYPE_CHECKING
:
from
simbricks.orchestration.experiment
import
experiment_environment
class
SimProxy
(
Simulator
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
name
=
''
# set by the experiment runner
self
.
ip
=
''
self
.
listen
=
False
def
full_name
(
self
):
def
full_name
(
self
)
->
str
:
return
'proxy.'
+
self
.
name
class
NetProxy
(
SimProxy
):
"""Proxy for connections between NICs and networks."""
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
nics
:
tp
.
List
[
tp
.
Tuple
[
NICSim
,
bool
]]
=
[]
"""List of tuples (nic, with_listener)"""
self
.
n2ns
:
tp
.
List
[
tp
.
Tuple
[
tp
.
Tuple
[
NetProxyListener
,
NetProxyConnecter
],
bool
]]
=
[]
self
.
n2ns
:
tp
.
List
[
tp
.
Tuple
[
tp
.
Tuple
[
Simulator
,
Simulator
],
bool
]]
=
[]
"""List of tuples ((netL,netC), with_listener)"""
self
.
shm_size
=
2048
"""Shared memory size in GB"""
def
start_delay
(
self
):
def
start_delay
(
self
)
->
int
:
return
10
class
NetProxyListener
(
NetProxy
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
port
=
12345
self
.
connecter
=
None
self
.
connecter
:
NetProxyConnecter
self
.
listen
=
True
def
add_nic
(
self
,
nic
)
:
def
add_nic
(
self
,
nic
:
NICSim
)
->
None
:
self
.
nics
.
append
((
nic
,
True
))
# the network this nic connects to now also depends on the peer
nic
.
network
.
extra_deps
.
append
(
self
.
connecter
)
# add net2net connection with listening network on the listener side
def
add_n2n
(
self
,
net_c
,
net_l
)
:
def
add_n2n
(
self
,
net_c
:
Simulator
,
net_l
:
Simulator
)
->
None
:
self
.
n2ns
.
append
(((
net_c
,
net_l
),
True
))
# the connecting network depends on our peer
net_c
.
extra_deps
.
append
(
self
.
connecter
)
def
dependencies
(
self
):
def
dependencies
(
self
)
->
tp
.
List
[
Simulator
]
:
deps
=
[]
for
(
nic
,
local
)
in
self
.
nics
:
if
local
:
...
...
@@ -87,7 +88,8 @@ class NetProxyListener(NetProxy):
deps
.
append
(
net_l
)
return
deps
def
sockets_cleanup
(
self
,
env
):
def
sockets_cleanup
(
self
,
env
:
'experiment_environment.ExpEnv'
)
->
tp
.
List
[
str
]:
socks
=
[]
for
(
nic
,
local
)
in
self
.
nics
:
if
not
local
:
...
...
@@ -98,7 +100,8 @@ class NetProxyListener(NetProxy):
return
[]
# sockets to wait for indicating the simulator is ready
def
sockets_wait
(
self
,
env
):
def
sockets_wait
(
self
,
env
:
'experiment_environment.ExpEnv'
)
->
tp
.
List
[
str
]:
socks
=
[]
for
(
nic
,
local
)
in
self
.
nics
:
if
not
local
:
...
...
@@ -108,7 +111,7 @@ class NetProxyListener(NetProxy):
socks
.
append
(
env
.
n2n_eth_path
(
net_l
,
net_c
))
return
socks
def
run_cmd_base
(
self
,
env
)
:
def
run_cmd_base
(
self
,
env
:
'experiment_environment.ExpEnv'
)
->
str
:
cmd
=
(
f
'-s
{
env
.
proxy_shm_path
(
self
)
}
'
f
'-S
{
self
.
shm_size
}
'
)
for
(
nic
,
local
)
in
self
.
nics
:
...
...
@@ -125,26 +128,26 @@ class NetProxyListener(NetProxy):
class
NetProxyConnecter
(
NetProxy
):
def
__init__
(
self
,
listener
:
NetProxyListener
):
def
__init__
(
self
,
listener
:
NetProxyListener
)
->
None
:
super
().
__init__
()
self
.
listener
=
listener
listener
.
connecter
=
self
self
.
nics
=
listener
.
nics
self
.
n2ns
=
listener
.
n2ns
def
add_nic
(
self
,
nic
)
:
def
add_nic
(
self
,
nic
:
NICSim
)
->
None
:
self
.
nics
.
append
((
nic
,
False
))
# the network this nic connects to now also depends on the proxy
nic
.
network
.
extra_deps
.
append
(
self
.
listener
)
# add net2net connection with listening network on the connection side
def
add_n2n
(
self
,
net_c
,
net_l
)
:
def
add_n2n
(
self
,
net_c
:
Simulator
,
net_l
:
Simulator
)
->
None
:
self
.
n2ns
.
append
(((
net_c
,
net_l
),
False
))
# the connecting network depends on our peer
net_c
.
extra_deps
.
append
(
self
.
listener
)
def
dependencies
(
self
):
def
dependencies
(
self
)
->
tp
.
List
[
Simulator
]
:
deps
=
[
self
.
listener
]
for
(
nic
,
local
)
in
self
.
nics
:
if
not
local
:
...
...
@@ -154,7 +157,8 @@ class NetProxyConnecter(NetProxy):
deps
.
append
(
net_l
)
return
deps
def
sockets_cleanup
(
self
,
env
):
def
sockets_cleanup
(
self
,
env
:
'experiment_environment.ExpEnv'
)
->
tp
.
List
[
str
]:
socks
=
[]
for
(
nic
,
local
)
in
self
.
nics
:
if
local
:
...
...
@@ -165,7 +169,8 @@ class NetProxyConnecter(NetProxy):
return
[]
# sockets to wait for indicating the simulator is ready
def
sockets_wait
(
self
,
env
):
def
sockets_wait
(
self
,
env
:
'experiment_environment.ExpEnv'
)
->
tp
.
List
[
str
]:
socks
=
[]
for
(
nic
,
local
)
in
self
.
nics
:
if
local
:
...
...
@@ -175,7 +180,7 @@ class NetProxyConnecter(NetProxy):
socks
.
append
(
env
.
n2n_eth_path
(
net_l
,
net_c
))
return
socks
def
run_cmd_base
(
self
,
env
)
:
def
run_cmd_base
(
self
,
env
:
'experiment_environment.ExpEnv'
)
->
str
:
cmd
=
(
f
'-s
{
env
.
proxy_shm_path
(
self
)
}
'
f
'-S
{
self
.
shm_size
}
'
)
for
(
nic
,
local
)
in
self
.
nics
:
...
...
@@ -192,7 +197,7 @@ class NetProxyConnecter(NetProxy):
class
RDMANetProxyListener
(
NetProxyListener
):
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
'experiment_environment.ExpEnv'
)
->
str
:
cmd
=
f
'
{
env
.
repodir
}
/dist/rdma/net_rdma -l '
cmd
+=
super
().
run_cmd_base
(
env
)
return
cmd
...
...
@@ -200,7 +205,7 @@ class RDMANetProxyListener(NetProxyListener):
class
RDMANetProxyConnecter
(
NetProxyConnecter
):
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
'experiment_environment.ExpEnv'
)
->
str
:
cmd
=
f
'
{
env
.
repodir
}
/dist/rdma/net_rdma '
cmd
+=
super
().
run_cmd_base
(
env
)
return
cmd
...
...
@@ -208,7 +213,7 @@ class RDMANetProxyConnecter(NetProxyConnecter):
class
SocketsNetProxyListener
(
NetProxyListener
):
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
'experiment_environment.ExpEnv'
)
->
str
:
cmd
=
f
'
{
env
.
repodir
}
/dist/sockets/net_sockets -l '
cmd
+=
super
().
run_cmd_base
(
env
)
return
cmd
...
...
@@ -216,7 +221,7 @@ class SocketsNetProxyListener(NetProxyListener):
class
SocketsNetProxyConnecter
(
NetProxyConnecter
):
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
'experiment_environment.ExpEnv'
)
->
str
:
cmd
=
f
'
{
env
.
repodir
}
/dist/sockets/net_sockets '
cmd
+=
super
().
run_cmd_base
(
env
)
return
cmd
experiments/simbricks/orchestration/runners.py
View file @
05bfbefb
...
...
@@ -21,6 +21,7 @@
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import
asyncio
import
collections
import
itertools
import
shlex
import
traceback
...
...
@@ -41,30 +42,29 @@ from simbricks.orchestration.utils import graphlib
class
ExperimentBaseRunner
(
ABC
):
def
__init__
(
self
,
exp
:
Experiment
,
env
:
ExpEnv
,
verbose
:
bool
):
def
__init__
(
self
,
exp
:
Experiment
,
env
:
ExpEnv
,
verbose
:
bool
)
->
None
:
self
.
exp
=
exp
self
.
env
=
env
self
.
verbose
=
verbose
self
.
out
=
ExpOutput
(
exp
)
self
.
running
:
tp
.
List
[
tp
.
Tuple
[
Simulator
,
SimpleComponent
]]
=
[]
self
.
sockets
=
[]
self
.
sockets
:
tp
.
List
[
tp
.
Tuple
[
Executor
,
str
]]
=
[]
self
.
wait_sims
:
tp
.
List
[
Component
]
=
[]
@
abstractmethod
def
sim_executor
(
self
,
sim
:
Simulator
)
->
Executor
:
pass
def
sim_graph
(
self
):
def
sim_graph
(
self
)
->
tp
.
Dict
[
Simulator
,
tp
.
Set
[
Simulator
]]
:
sims
=
self
.
exp
.
all_simulators
()
graph
=
{}
graph
=
collections
.
defaultdict
(
set
)
for
sim
in
sims
:
deps
=
sim
.
dependencies
()
+
sim
.
extra_deps
graph
[
sim
]
=
set
()
for
d
in
deps
:
graph
[
sim
].
add
(
d
)
return
graph
async
def
start_sim
(
self
,
sim
:
Simulator
):
async
def
start_sim
(
self
,
sim
:
Simulator
)
->
None
:
"""Start a simulator and wait for it to be ready."""
name
=
sim
.
full_name
()
...
...
@@ -108,16 +108,16 @@ class ExperimentBaseRunner(ABC):
if
self
.
verbose
:
print
(
f
'
{
self
.
exp
.
name
}
: started
{
name
}
'
)
async
def
before_wait
(
self
):
async
def
before_wait
(
self
)
->
None
:
pass
async
def
before_cleanup
(
self
):
async
def
before_cleanup
(
self
)
->
None
:
pass
async
def
after_cleanup
(
self
):
async
def
after_cleanup
(
self
)
->
None
:
pass
async
def
prepare
(
self
):
async
def
prepare
(
self
)
->
None
:
# generate config tars
copies
=
[]
for
host
in
self
.
exp
.
hosts
:
...
...
@@ -143,14 +143,14 @@ class ExperimentBaseRunner(ABC):
sims
.
append
(
task
)
await
asyncio
.
wait
(
sims
)
async
def
wait_for_sims
(
self
):
async
def
wait_for_sims
(
self
)
->
None
:
"""Wait for simulators to terminate (the ones marked to wait on)."""
if
self
.
verbose
:
print
(
f
'
{
self
.
exp
.
name
}
: waiting for hosts to terminate'
)
for
sc
in
self
.
wait_sims
:
await
sc
.
wait
()
async
def
run
(
self
):
async
def
run
(
self
)
->
ExpOutput
:
try
:
self
.
out
.
set_start
()
...
...
@@ -218,28 +218,30 @@ class ExperimentBaseRunner(ABC):
class
ExperimentSimpleRunner
(
ExperimentBaseRunner
):
"""Simple experiment runner with just one executor."""
def
__init__
(
self
,
executor
:
Executor
,
*
args
,
**
kwargs
):
def
__init__
(
self
,
executor
:
Executor
,
*
args
,
**
kwargs
)
->
None
:
self
.
executor
=
executor
super
().
__init__
(
*
args
,
**
kwargs
)
def
sim_executor
(
self
,
sim
:
Simulator
):
def
sim_executor
(
self
,
sim
:
Simulator
)
->
Executor
:
return
self
.
executor
class
ExperimentDistributedRunner
(
ExperimentBaseRunner
):
"""Simple experiment runner with just one executor."""
def
__init__
(
self
,
execs
,
exp
:
DistributedExperiment
,
*
args
,
**
kwargs
):
def
__init__
(
self
,
execs
,
exp
:
DistributedExperiment
,
*
args
,
**
kwargs
)
->
None
:
self
.
execs
=
execs
super
().
__init__
(
exp
,
*
args
,
**
kwargs
)
self
.
exp
=
exp
# overrides the type in the base class
assert
self
.
exp
.
num_hosts
<=
len
(
execs
)
def
sim_executor
(
self
,
sim
):
def
sim_executor
(
self
,
sim
)
->
Executor
:
h_id
=
self
.
exp
.
host_mapping
[
sim
]
return
self
.
execs
[
h_id
]
async
def
prepare
(
self
):
async
def
prepare
(
self
)
->
None
:
# make sure all simulators are assigned to an executor
assert
self
.
exp
.
all_sims_assigned
()
...
...
experiments/simbricks/orchestration/runtime/common.py
View file @
05bfbefb
...
...
@@ -54,10 +54,10 @@ class Run(object):
self
.
job_id
:
tp
.
Optional
[
int
]
=
None
"""Slurm job id."""
def
name
(
self
):
def
name
(
self
)
->
str
:
return
self
.
experiment
.
name
+
'.'
+
str
(
self
.
index
)
async
def
prep_dirs
(
self
,
executor
=
LocalExecutor
()):
async
def
prep_dirs
(
self
,
executor
=
LocalExecutor
())
->
None
:
shutil
.
rmtree
(
self
.
env
.
workdir
,
ignore_errors
=
True
)
await
executor
.
rmtree
(
self
.
env
.
workdir
)
shutil
.
rmtree
(
self
.
env
.
shm_base
,
ignore_errors
=
True
)
...
...
@@ -83,11 +83,11 @@ class Runtime(metaclass=ABCMeta):
"""Indicates whether interrupt has been signaled."""
@
abstractmethod
def
add_run
(
self
,
run
:
Run
):
def
add_run
(
self
,
run
:
Run
)
->
None
:
pass
@
abstractmethod
async
def
start
(
self
):
async
def
start
(
self
)
->
None
:
pass
@
abstractmethod
...
...
experiments/simbricks/orchestration/runtime/distributed.py
View file @
05bfbefb
...
...
@@ -34,7 +34,7 @@ from simbricks.orchestration.runtime.common import Run, Runtime
class
DistributedSimpleRuntime
(
Runtime
):
def
__init__
(
self
,
executors
,
verbose
=
False
):
def
__init__
(
self
,
executors
,
verbose
=
False
)
->
None
:
super
().
__init__
()
self
.
runnable
:
tp
.
List
[
Run
]
=
[]
self
.
complete
:
tp
.
List
[
Run
]
=
[]
...
...
@@ -43,13 +43,13 @@ class DistributedSimpleRuntime(Runtime):
self
.
_running
:
asyncio
.
Task
def
add_run
(
self
,
run
:
Run
):
def
add_run
(
self
,
run
:
Run
)
->
None
:
if
not
isinstance
(
run
.
experiment
,
DistributedExperiment
):
raise
RuntimeError
(
'Only distributed experiments supported'
)
self
.
runnable
.
append
(
run
)
async
def
do_run
(
self
,
run
:
Run
):
async
def
do_run
(
self
,
run
:
Run
)
->
None
:
runner
=
ExperimentDistributedRunner
(
self
.
executors
,
# we ensure the correct type in add_run()
...
...
@@ -91,8 +91,10 @@ class DistributedSimpleRuntime(Runtime):
def
auto_dist
(
e
:
Experiment
,
execs
:
tp
.
List
[
Executor
],
proxy_type
:
str
=
'sockets'
):
e
:
Experiment
,
execs
:
tp
.
List
[
Executor
],
proxy_type
:
str
=
'sockets'
)
->
DistributedExperiment
:
"""
Converts an Experiment into a DistributedExperiment.
...
...
experiments/simbricks/orchestration/runtime/local.py
View file @
05bfbefb
...
...
@@ -43,10 +43,10 @@ class LocalSimpleRuntime(Runtime):
self
.
executor
=
executor
self
.
_running
:
tp
.
Optional
[
asyncio
.
Task
]
=
None
def
add_run
(
self
,
run
:
Run
):
def
add_run
(
self
,
run
:
Run
)
->
None
:
self
.
runnable
.
append
(
run
)
async
def
do_run
(
self
,
run
:
Run
):
async
def
do_run
(
self
,
run
:
Run
)
->
None
:
"""Actually executes `run`."""
try
:
runner
=
ExperimentSimpleRunner
(
...
...
@@ -69,7 +69,7 @@ class LocalSimpleRuntime(Runtime):
)
run
.
output
.
dump
(
run
.
outpath
)
async
def
start
(
self
):
async
def
start
(
self
)
->
None
:
"""Execute the runs defined in `self.runnable`."""
for
run
in
self
.
runnable
:
if
self
.
_interrupted
:
...
...
@@ -78,7 +78,7 @@ class LocalSimpleRuntime(Runtime):
self
.
_running
=
asyncio
.
create_task
(
self
.
do_run
(
run
))
await
self
.
_running
def
interrupt_handler
(
self
):
def
interrupt_handler
(
self
)
->
None
:
if
self
.
_running
:
self
.
_running
.
cancel
()
...
...
@@ -107,7 +107,7 @@ class LocalParallelRuntime(Runtime):
self
.
_pending_jobs
:
tp
.
Set
[
asyncio
.
Task
]
=
set
()
self
.
_starter_task
:
asyncio
.
Task
def
add_run
(
self
,
run
:
Run
):
def
add_run
(
self
,
run
:
Run
)
->
None
:
if
run
.
experiment
.
resreq_cores
()
>
self
.
cores
:
raise
RuntimeError
(
'Not enough cores available for run'
)
...
...
@@ -119,7 +119,7 @@ class LocalParallelRuntime(Runtime):
else
:
self
.
runs_prereq
.
append
(
run
)
async
def
do_run
(
self
,
run
:
Run
):
async
def
do_run
(
self
,
run
:
Run
)
->
tp
.
Optional
[
Run
]
:
"""Actually executes `run`."""
try
:
runner
=
ExperimentSimpleRunner
(
...
...
@@ -130,7 +130,7 @@ class LocalParallelRuntime(Runtime):
except
asyncio
.
CancelledError
:
# it is safe to just exit here because we are not running any
# simulators yet
return
return
None
print
(
'starting run '
,
run
.
name
())
run
.
output
=
await
runner
.
run
()
# already handles CancelledError
...
...
@@ -144,7 +144,7 @@ class LocalParallelRuntime(Runtime):
print
(
'finished run '
,
run
.
name
())
return
run
async
def
wait_completion
(
self
):
async
def
wait_completion
(
self
)
->
None
:
"""Wait for any run to terminate and return."""
assert
self
.
_pending_jobs
...
...
@@ -158,7 +158,7 @@ class LocalParallelRuntime(Runtime):
self
.
cores_used
-=
run
.
experiment
.
resreq_cores
()
self
.
mem_used
-=
run
.
experiment
.
resreq_mem
()
def
enough_resources
(
self
,
run
:
Run
):
def
enough_resources
(
self
,
run
:
Run
)
->
bool
:
"""Check if enough cores and mem are available for the run."""
exp
=
run
.
experiment
# pylint: disable=redefined-outer-name
...
...
@@ -174,14 +174,14 @@ class LocalParallelRuntime(Runtime):
return
enough_cores
and
enough_mem
def
prereq_ready
(
self
,
run
:
Run
):
def
prereq_ready
(
self
,
run
:
Run
)
->
bool
:
"""Check if the prerequesite run for `run` has completed."""
if
run
.
prereq
is
None
:
return
True
return
run
.
prereq
in
self
.
complete
async
def
do_start
(
self
):
async
def
do_start
(
self
)
->
None
:
"""Asynchronously execute the runs defined in `self.runs_noprereq +
self.runs_prereq."""
#self.completions = asyncio.Queue()
...
...
experiments/simbricks/orchestration/runtime/slurm.py
View file @
05bfbefb
...
...
@@ -32,7 +32,7 @@ from simbricks.orchestration.runtime.common import Run, Runtime
class
SlurmRuntime
(
Runtime
):
def
__init__
(
self
,
slurmdir
,
args
,
verbose
=
False
,
cleanup
=
True
):
def
__init__
(
self
,
slurmdir
,
args
,
verbose
=
False
,
cleanup
=
True
)
->
None
:
super
().
__init__
()
self
.
runnable
:
tp
.
List
[
Run
]
=
[]
self
.
slurmdir
=
slurmdir
...
...
@@ -42,10 +42,10 @@ class SlurmRuntime(Runtime):
self
.
_start_task
:
asyncio
.
Task
def
add_run
(
self
,
run
:
Run
):
def
add_run
(
self
,
run
:
Run
)
->
None
:
self
.
runnable
.
append
(
run
)
def
prep_run
(
self
,
run
:
Run
):
def
prep_run
(
self
,
run
:
Run
)
->
str
:
exp
=
run
.
experiment
e_idx
=
exp
.
name
+
f
'-
{
run
.
index
}
'
+
'.exp'
exp_path
=
os
.
path
.
join
(
self
.
slurmdir
,
e_idx
)
...
...
@@ -92,7 +92,7 @@ class SlurmRuntime(Runtime):
return
exp_script
async
def
_do_start
(
self
):
async
def
_do_start
(
self
)
->
None
:
pathlib
.
Path
(
self
.
slurmdir
).
mkdir
(
parents
=
True
,
exist_ok
=
True
)
jid_re
=
re
.
compile
(
r
'Submitted batch job ([0-9]+)'
)
...
...
experiments/simbricks/orchestration/simulator_utils.py
View file @
05bfbefb
...
...
@@ -41,7 +41,7 @@ def create_basic_hosts(
app_class
:
tp
.
Type
[
AppConfig
],
ip_start
:
int
=
1
,
ip_prefix
:
int
=
24
):
)
->
tp
.
List
[
HostSim
]
:
"""
Creates and configures multiple hosts to be simulated using the given
parameters.
...
...
@@ -84,7 +84,7 @@ def create_multinic_hosts(
app_class
:
tp
.
Type
[
AppConfig
],
ip_start
:
int
=
1
,
ip_prefix
:
int
=
24
):
)
->
tp
.
List
[
HostSim
]
:
"""
Creates and configures multiple hosts to be simulated using the given
parameters. These hosts use multiple NICs.
...
...
@@ -133,7 +133,7 @@ def create_dctcp_hosts(
cpu_freq
:
str
,
mtu
:
int
,
ip_start
:
int
=
1
):
)
->
tp
.
List
[
HostSim
]
:
"""
Creates and configures multiple hosts to be simulated in a DCTCP experiment
using the given parameters.
...
...
experiments/simbricks/orchestration/simulators.py
View file @
05bfbefb
...
...
@@ -33,11 +33,11 @@ from simbricks.orchestration.nodeconfig import NodeConfig
class
Simulator
(
object
):
"""Base class for all simulators."""
def
__init__
(
self
):
self
.
extra_deps
=
[]
def
__init__
(
self
)
->
None
:
self
.
extra_deps
:
tp
.
List
[
Simulator
]
=
[]
self
.
name
=
''
def
resreq_cores
(
self
):
def
resreq_cores
(
self
)
->
int
:
"""
Number of cores this simulator requires during execution.
...
...
@@ -45,7 +45,7 @@ class Simulator(object):
"""
return
1
def
resreq_mem
(
self
):
def
resreq_mem
(
self
)
->
int
:
"""
Number of memory in MB this simulator requires during execution.
...
...
@@ -53,7 +53,7 @@ class Simulator(object):
"""
return
64
def
full_name
(
self
):
def
full_name
(
self
)
->
str
:
"""Full name of the simulator."""
return
''
...
...
@@ -73,25 +73,25 @@ class Simulator(object):
# Sockets to be cleaned up
# pylint: disable=unused-argument
def
sockets_cleanup
(
self
,
env
:
ExpEnv
):
def
sockets_cleanup
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
str
]
:
return
[]
# sockets to wait for indicating the simulator is ready
# pylint: disable=unused-argument
def
sockets_wait
(
self
,
env
:
ExpEnv
):
def
sockets_wait
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
str
]
:
return
[]
def
start_delay
(
self
):
def
start_delay
(
self
)
->
int
:
return
5
def
wait_terminate
(
self
):
def
wait_terminate
(
self
)
->
bool
:
return
False
class
PCIDevSim
(
Simulator
):
"""Base class for PCIe device simulators."""
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
sync_mode
=
0
...
...
@@ -110,23 +110,23 @@ class PCIDevSim(Simulator):
"""Latency in nanoseconds for sending messages to components connected
via PCI."""
def
full_name
(
self
):
def
full_name
(
self
)
->
str
:
return
'dev.'
+
self
.
name
def
is_nic
(
self
):
def
is_nic
(
self
)
->
bool
:
return
False
def
sockets_cleanup
(
self
,
env
)
:
def
sockets_cleanup
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
str
]
:
return
[
env
.
dev_pci_path
(
self
),
env
.
dev_shm_path
(
self
)]
def
sockets_wait
(
self
,
env
)
:
def
sockets_wait
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
str
]
:
return
[
env
.
dev_pci_path
(
self
)]
class
NICSim
(
PCIDevSim
):
"""Base class for NIC simulators."""
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
network
:
tp
.
Optional
[
NetSim
]
=
None
...
...
@@ -135,12 +135,12 @@ class NICSim(PCIDevSim):
"""Ethernet latency in nanoseconds from this NIC to the network
component."""
def
set_network
(
self
,
net
:
NetSim
):
def
set_network
(
self
,
net
:
NetSim
)
->
None
:
"""Connect this NIC to a network simulator."""
self
.
network
=
net
net
.
nics
.
append
(
self
)
def
basic_args
(
self
,
env
,
extra
=
None
)
:
def
basic_args
(
self
,
env
:
ExpEnv
,
extra
:
tp
.
Optional
[
str
]
=
None
)
->
str
:
cmd
=
(
f
'
{
env
.
dev_pci_path
(
self
)
}
{
env
.
nic_eth_path
(
self
)
}
'
f
'
{
env
.
dev_shm_path
(
self
)
}
{
self
.
sync_mode
}
{
self
.
start_tick
}
'
...
...
@@ -153,27 +153,29 @@ class NICSim(PCIDevSim):
cmd
+=
' '
+
extra
return
cmd
def
basic_run_cmd
(
self
,
env
,
name
,
extra
=
None
):
def
basic_run_cmd
(
self
,
env
:
ExpEnv
,
name
:
str
,
extra
:
tp
.
Optional
[
str
]
=
None
)
->
str
:
cmd
=
f
'
{
env
.
repodir
}
/sims/nic/
{
name
}
{
self
.
basic_args
(
env
,
extra
)
}
'
return
cmd
def
full_name
(
self
):
def
full_name
(
self
)
->
str
:
return
'nic.'
+
self
.
name
def
is_nic
(
self
):
def
is_nic
(
self
)
->
bool
:
return
True
def
sockets_cleanup
(
self
,
env
)
:
def
sockets_cleanup
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
str
]
:
return
super
().
sockets_cleanup
(
env
)
+
[
env
.
nic_eth_path
(
self
)]
def
sockets_wait
(
self
,
env
)
:
def
sockets_wait
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
str
]
:
return
super
().
sockets_wait
(
env
)
+
[
env
.
nic_eth_path
(
self
)]
class
NetSim
(
Simulator
):
"""Base class for network simulators."""
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
opt
=
''
...
...
@@ -191,10 +193,10 @@ class NetSim(Simulator):
self
.
net_listen
:
list
[
NetSim
]
=
[]
self
.
net_connect
:
list
[
NetSim
]
=
[]
def
full_name
(
self
):
def
full_name
(
self
)
->
str
:
return
'net.'
+
self
.
name
def
connect_network
(
self
,
net
:
NetSim
):
def
connect_network
(
self
,
net
:
NetSim
)
->
None
:
"""Connect this network to the listening peer `net`"""
net
.
net_listen
.
append
(
self
)
self
.
net_connect
.
append
(
net
)
...
...
@@ -209,19 +211,19 @@ class NetSim(Simulator):
sockets
.
append
((
h
,
env
.
net2host_eth_path
(
self
,
h
)))
return
sockets
def
listen_sockets
(
self
,
env
:
ExpEnv
):
def
listen_sockets
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
tp
.
Tuple
[
NetSim
,
str
]]
:
listens
=
[]
for
net
in
self
.
net_listen
:
listens
.
append
((
net
,
env
.
n2n_eth_path
(
self
,
net
)))
return
listens
def
dependencies
(
self
):
def
dependencies
(
self
)
->
tp
.
List
[
Simulator
]
:
return
self
.
nics
+
self
.
net_connect
+
self
.
hosts_direct
def
sockets_cleanup
(
self
,
env
:
ExpEnv
):
def
sockets_cleanup
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
str
]
:
return
[
s
for
(
_
,
s
)
in
self
.
listen_sockets
(
env
)]
def
sockets_wait
(
self
,
env
:
ExpEnv
):
def
sockets_wait
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
str
]
:
return
[
s
for
(
_
,
s
)
in
self
.
listen_sockets
(
env
)]
...
...
@@ -229,7 +231,7 @@ class NetSim(Simulator):
class
MemDevSim
(
NICSim
):
"""Base class for memory device simulators."""
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
mem_latency
=
500
...
...
@@ -237,40 +239,40 @@ class MemDevSim(NICSim):
self
.
size
=
1024
*
1024
*
1024
# 1GB
self
.
as_id
=
0
def
full_name
(
self
):
def
full_name
(
self
)
->
str
:
return
'mem.'
+
self
.
name
def
sockets_cleanup
(
self
,
env
)
:
def
sockets_cleanup
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
str
]
:
return
[
env
.
dev_mem_path
(
self
),
env
.
dev_shm_path
(
self
)]
def
sockets_wait
(
self
,
env
)
:
def
sockets_wait
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
str
]
:
return
[
env
.
dev_mem_path
(
self
)]
class
NetMemSim
(
NICSim
):
"""Base class for netork memory simulators."""
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
addr
=
0xe000000000000000
self
.
size
=
1024
*
1024
*
1024
# 1GB
self
.
as_id
=
0
def
full_name
(
self
):
def
full_name
(
self
)
->
str
:
return
'netmem.'
+
self
.
name
def
sockets_cleanup
(
self
,
env
)
:
def
sockets_cleanup
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
str
]
:
return
[
env
.
nic_eth_path
(
self
),
env
.
dev_shm_path
(
self
)]
def
sockets_wait
(
self
,
env
)
:
def
sockets_wait
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
str
]
:
return
[
env
.
nic_eth_path
(
self
)]
class
HostSim
(
Simulator
):
"""Base class for host simulators."""
def
__init__
(
self
,
node_config
:
NodeConfig
):
def
__init__
(
self
,
node_config
:
NodeConfig
)
->
None
:
super
().
__init__
()
self
.
node_config
=
node_config
"""System configuration for this simulated host. """
...
...
@@ -301,31 +303,35 @@ class HostSim(Simulator):
self
.
memdevs
:
tp
.
List
[
MemDevSim
]
=
[]
@
property
def
nics
(
self
):
return
filter
(
lambda
pcidev
:
pcidev
.
is_nic
(),
self
.
pcidevs
)
def
nics
(
self
)
->
tp
.
List
[
NICSim
]:
return
[
tp
.
cast
(
NICSim
,
pcidev
)
for
pcidev
in
self
.
pcidevs
if
pcidev
.
is_nic
()
]
def
full_name
(
self
):
def
full_name
(
self
)
->
str
:
return
'host.'
+
self
.
name
def
add_nic
(
self
,
dev
:
NICSim
):
def
add_nic
(
self
,
dev
:
NICSim
)
->
None
:
"""Add a NIC to this host."""
self
.
add_pcidev
(
dev
)
def
add_pcidev
(
self
,
dev
:
PCIDevSim
):
def
add_pcidev
(
self
,
dev
:
PCIDevSim
)
->
None
:
"""Add a PCIe device to this host."""
dev
.
name
=
self
.
name
+
'.'
+
dev
.
name
self
.
pcidevs
.
append
(
dev
)
def
add_memdev
(
self
,
dev
:
MemDevSim
):
def
add_memdev
(
self
,
dev
:
MemDevSim
)
->
None
:
dev
.
name
=
self
.
name
+
'.'
+
dev
.
name
self
.
memdevs
.
append
(
dev
)
def
add_netdirect
(
self
,
net
:
NetSim
):
def
add_netdirect
(
self
,
net
:
NetSim
)
->
None
:
"""Add a direct connection to a network to this host."""
net
.
hosts_direct
.
append
(
self
)
self
.
net_directs
.
append
(
net
)
def
dependencies
(
self
):
def
dependencies
(
self
)
->
tp
.
List
[
PCIDevSim
]
:
deps
=
[]
for
dev
in
self
.
pcidevs
:
deps
.
append
(
dev
)
...
...
@@ -335,36 +341,36 @@ class HostSim(Simulator):
deps
.
append
(
dev
)
return
deps
def
wait_terminate
(
self
):
def
wait_terminate
(
self
)
->
bool
:
return
self
.
wait
class
QemuHost
(
HostSim
):
"""Qemu host simulator."""
def
__init__
(
self
,
node_config
:
NodeConfig
):
def
__init__
(
self
,
node_config
:
NodeConfig
)
->
None
:
super
().
__init__
(
node_config
)
self
.
sync
=
False
""""Whether to synchronize with attached simulators."""
def
resreq_cores
(
self
):
def
resreq_cores
(
self
)
->
int
:
if
self
.
sync
:
return
1
else
:
return
self
.
node_config
.
cores
+
1
def
resreq_mem
(
self
):
def
resreq_mem
(
self
)
->
int
:
return
8192
def
prep_cmds
(
self
,
env
)
:
def
prep_cmds
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
str
]
:
return
[
f
'
{
env
.
qemu_img_path
}
create -f qcow2 -o '
f
'backing_file="
{
env
.
hd_path
(
self
.
node_config
.
disk_image
)
}
" '
f
'
{
env
.
hdcopy_path
(
self
)
}
'
]
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
ExpEnv
)
->
str
:
accel
=
',accel=kvm:tcg'
if
not
self
.
sync
else
''
if
self
.
node_config
.
kcmd_append
:
kcmd_append
=
' '
+
self
.
node_config
.
kcmd_append
...
...
@@ -416,7 +422,7 @@ class QemuHost(HostSim):
class
Gem5Host
(
HostSim
):
"""Gem5 host simulator."""
def
__init__
(
self
,
node_config
:
NodeConfig
):
def
__init__
(
self
,
node_config
:
NodeConfig
)
->
None
:
node_config
.
sim
=
'gem5'
super
().
__init__
(
node_config
)
self
.
cpu_type_cp
=
'X86KvmCPU'
...
...
@@ -426,16 +432,16 @@ class Gem5Host(HostSim):
self
.
extra_config_args
=
[]
self
.
variant
=
'fast'
def
resreq_cores
(
self
):
def
resreq_cores
(
self
)
->
int
:
return
1
def
resreq_mem
(
self
):
def
resreq_mem
(
self
)
->
int
:
return
4096
def
prep_cmds
(
self
,
env
)
:
def
prep_cmds
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
str
]
:
return
[
f
'mkdir -p
{
env
.
gem5_cpdir
(
self
)
}
'
]
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
ExpEnv
)
->
str
:
cpu_type
=
self
.
cpu_type
if
env
.
create_cp
:
cpu_type
=
self
.
cpu_type_cp
...
...
@@ -506,7 +512,7 @@ class Gem5Host(HostSim):
class
SimicsHost
(
HostSim
):
"""Simics host simulator."""
def
__init__
(
self
,
node_config
:
NodeConfig
):
def
__init__
(
self
,
node_config
:
NodeConfig
)
->
None
:
super
().
__init__
(
node_config
)
node_config
.
sim
=
'simics'
...
...
@@ -527,13 +533,13 @@ class SimicsHost(HostSim):
self
.
debug_messages
=
False
"""Whether to enable debug messages of SimBricks adapter devices."""
def
resreq_cores
(
self
):
def
resreq_cores
(
self
)
->
int
:
return
2
def
resreq_mem
(
self
):
def
resreq_mem
(
self
)
->
int
:
return
self
.
node_config
.
memory
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
ExpEnv
)
->
str
:
if
self
.
node_config
.
kcmd_append
:
raise
RuntimeError
(
'Appending kernel command-line not yet implemented.'
...
...
@@ -670,15 +676,15 @@ class SimicsHost(HostSim):
class
CorundumVerilatorNIC
(
NICSim
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
clock_freq
=
250
# MHz
def
resreq_mem
(
self
):
def
resreq_mem
(
self
)
->
int
:
# this is a guess
return
512
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
ExpEnv
)
->
str
:
return
self
.
basic_run_cmd
(
env
,
'/corundum/corundum_verilator'
,
str
(
self
.
clock_freq
)
)
...
...
@@ -686,23 +692,23 @@ class CorundumVerilatorNIC(NICSim):
class
CorundumBMNIC
(
NICSim
):
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
ExpEnv
)
->
str
:
return
self
.
basic_run_cmd
(
env
,
'/corundum_bm/corundum_bm'
)
class
I40eNIC
(
NICSim
):
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
ExpEnv
)
->
str
:
return
self
.
basic_run_cmd
(
env
,
'/i40e_bm/i40e_bm'
)
class
E1000NIC
(
NICSim
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
debug
=
False
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
ExpEnv
)
->
str
:
cmd
=
self
.
basic_run_cmd
(
env
,
'/e1000_gem5/e1000_gem5'
)
if
self
.
debug
:
cmd
=
'env E1000_DEBUG=1 '
+
cmd
...
...
@@ -711,35 +717,35 @@ class E1000NIC(NICSim):
class
MultiSubNIC
(
NICSim
):
def
__init__
(
self
,
mn
)
:
def
__init__
(
self
,
mn
:
Simulator
)
->
None
:
super
().
__init__
()
self
.
multinic
=
mn
def
full_name
(
self
):
def
full_name
(
self
)
->
str
:
return
self
.
multinic
.
full_name
()
+
'.'
+
self
.
name
def
dependencies
(
self
):
def
dependencies
(
self
)
->
tp
.
List
[
Simulator
]
:
return
super
().
dependencies
()
+
[
self
.
multinic
]
def
start_delay
(
self
):
def
start_delay
(
self
)
->
int
:
return
0
class
I40eMultiNIC
(
Simulator
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
subnics
=
[]
self
.
subnics
:
tp
.
List
[
NICSim
]
=
[]
def
create_subnic
(
self
):
def
create_subnic
(
self
)
->
MultiSubNIC
:
sn
=
MultiSubNIC
(
self
)
self
.
subnics
.
append
(
sn
)
return
sn
def
full_name
(
self
):
def
full_name
(
self
)
->
str
:
return
'multinic.'
+
self
.
name
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
ExpEnv
)
->
str
:
args
=
''
first
=
True
for
sn
in
self
.
subnics
:
...
...
@@ -749,13 +755,13 @@ class I40eMultiNIC(Simulator):
args
+=
sn
.
basic_args
(
env
)
return
f
'
{
env
.
repodir
}
/sims/nic/i40e_bm/i40e_bm
{
args
}
'
def
sockets_cleanup
(
self
,
env
)
:
def
sockets_cleanup
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
str
]
:
ss
=
[]
for
sn
in
self
.
subnics
:
ss
+=
sn
.
sockets_cleanup
(
env
)
return
ss
def
sockets_wait
(
self
,
env
)
:
def
sockets_wait
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
str
]
:
ss
=
[]
for
sn
in
self
.
subnics
:
ss
+=
sn
.
sockets_wait
(
env
)
...
...
@@ -764,7 +770,7 @@ class I40eMultiNIC(Simulator):
class
WireNet
(
NetSim
):
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
ExpEnv
)
->
str
:
connects
=
self
.
connect_sockets
(
env
)
assert
len
(
connects
)
==
2
cmd
=
(
...
...
@@ -779,12 +785,12 @@ class WireNet(NetSim):
class
SwitchNet
(
NetSim
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
sync
=
True
"""Whether to synchronize with attached simulators."""
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
ExpEnv
)
->
str
:
cmd
=
env
.
repodir
+
'/sims/net/switch/net_switch'
cmd
+=
f
' -S
{
self
.
sync_period
}
-E
{
self
.
eth_latency
}
'
...
...
@@ -799,7 +805,7 @@ class SwitchNet(NetSim):
cmd
+=
' -h '
+
n
return
cmd
def
sockets_cleanup
(
self
,
env
)
:
def
sockets_cleanup
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
str
]
:
# cleanup here will just have listening eth sockets, switch also creates
# shm regions for each with a "-shm" suffix
cleanup
=
[]
...
...
@@ -811,13 +817,13 @@ class SwitchNet(NetSim):
class
MemSwitchNet
(
NetSim
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
sync
=
True
""" AS_ID,VADDR_START,VADDR_END,MEMNODE_MAC,PHYS_START """
self
.
mem_map
=
[]
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
ExpEnv
)
->
str
:
cmd
=
env
.
repodir
+
'/sims/mem/memswitch/memswitch'
cmd
+=
f
' -S
{
self
.
sync_period
}
-E
{
self
.
eth_latency
}
'
...
...
@@ -836,7 +842,7 @@ class MemSwitchNet(NetSim):
cmd
+=
f
',
{
m
[
4
]
}
'
return
cmd
def
sockets_cleanup
(
self
,
env
)
:
def
sockets_cleanup
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
str
]
:
# cleanup here will just have listening eth sockets, switch also creates
# shm regions for each with a "-shm" suffix
cleanup
=
[]
...
...
@@ -848,12 +854,12 @@ class MemSwitchNet(NetSim):
class
TofinoNet
(
NetSim
):
def
__init__
(
self
):
def
__init__
(
self
)
->
None
:
super
().
__init__
()
self
.
tofino_log_path
=
'/tmp/model.ldjson'
self
.
sync
=
True
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
ExpEnv
)
->
str
:
cmd
=
f
'
{
env
.
repodir
}
/sims/net/tofino/tofino'
cmd
+=
(
f
' -S
{
self
.
sync_period
}
-E
{
self
.
eth_latency
}
'
...
...
@@ -868,7 +874,7 @@ class TofinoNet(NetSim):
class
NS3DumbbellNet
(
NetSim
):
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
ExpEnv
)
->
str
:
ports
=
''
for
(
n
,
s
)
in
self
.
connect_sockets
(
env
):
if
'server'
in
n
.
name
:
...
...
@@ -887,7 +893,7 @@ class NS3DumbbellNet(NetSim):
class
NS3BridgeNet
(
NetSim
):
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
ExpEnv
)
->
str
:
ports
=
''
for
(
_
,
n
)
in
self
.
connect_sockets
(
env
):
ports
+=
'--CosimPort='
+
n
+
' '
...
...
@@ -903,7 +909,7 @@ class NS3BridgeNet(NetSim):
class
NS3SequencerNet
(
NetSim
):
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
ExpEnv
)
->
str
:
ports
=
''
for
(
n
,
s
)
in
self
.
connect_sockets
(
env
):
if
'client'
in
n
.
name
:
...
...
@@ -924,7 +930,7 @@ class NS3SequencerNet(NetSim):
class
FEMUDev
(
PCIDevSim
):
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
ExpEnv
)
->
str
:
cmd
=
(
f
'
{
env
.
repodir
}
/sims/external/femu/femu-simbricks'
f
'
{
env
.
dev_pci_path
(
self
)
}
{
env
.
dev_shm_path
(
self
)
}
'
...
...
@@ -934,7 +940,7 @@ class FEMUDev(PCIDevSim):
class
BasicMemDev
(
MemDevSim
):
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
ExpEnv
)
->
str
:
cmd
=
(
f
'
{
env
.
repodir
}
/sims/mem/basicmem/basicmem'
f
'
{
self
.
size
}
{
self
.
addr
}
{
self
.
as_id
}
'
...
...
@@ -947,7 +953,7 @@ class BasicMemDev(MemDevSim):
class
MemNIC
(
MemDevSim
):
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
ExpEnv
)
->
str
:
cmd
=
(
f
'
{
env
.
repodir
}
/sims/mem/memnic/memnic'
f
'
{
env
.
dev_mem_path
(
self
)
}
{
env
.
nic_eth_path
(
self
)
}
'
...
...
@@ -962,16 +968,16 @@ class MemNIC(MemDevSim):
return
cmd
def
sockets_cleanup
(
self
,
env
)
:
def
sockets_cleanup
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
str
]
:
return
super
().
sockets_cleanup
(
env
)
+
[
env
.
nic_eth_path
(
self
)]
def
sockets_wait
(
self
,
env
)
:
def
sockets_wait
(
self
,
env
:
ExpEnv
)
->
tp
.
List
[
str
]
:
return
super
().
sockets_wait
(
env
)
+
[
env
.
nic_eth_path
(
self
)]
class
NetMem
(
NetMemSim
):
def
run_cmd
(
self
,
env
)
:
def
run_cmd
(
self
,
env
:
ExpEnv
)
->
str
:
cmd
=
(
f
'
{
env
.
repodir
}
/sims/mem/netmem/netmem'
f
'
{
self
.
size
}
{
self
.
addr
}
{
self
.
as_id
}
'
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment