Unverified Commit 23867991 authored by Jakob Görgen's avatar Jakob Görgen
Browse files

new symphony folder with new folder structure and packages + removed old orchestration framework

parent a14a0365
------------------------------------------------------------
Client connecting to 10.9.9.11, TCP port 5001
TCP window size: 416 KByte (WARNING: requested 400 KByte)
------------------------------------------------------------
[ 3] local 10.9.9.12 port 36760 connected with 10.9.9.11 port 5001
[ ID] Interval Transfer Bandwidth
[ 3] 0.0- 1.0 sec 558 MBytes 4.68 Gbits/sec
[ 3] 1.0- 2.0 sec 577 MBytes 4.84 Gbits/sec
[ 3] 2.0- 3.0 sec 538 MBytes 4.51 Gbits/sec
[ 3] 3.0- 4.0 sec 545 MBytes 4.57 Gbits/sec
[ 3] 4.0- 5.0 sec 578 MBytes 4.84 Gbits/sec
[ 3] 5.0- 6.0 sec 583 MBytes 4.89 Gbits/sec
[ 3] 6.0- 7.0 sec 580 MBytes 4.86 Gbits/sec
[ 3] 7.0- 8.0 sec 572 MBytes 4.80 Gbits/sec
[ 3] 8.0- 9.0 sec 572 MBytes 4.80 Gbits/sec
[ 3] 9.0-10.0 sec 561 MBytes 4.70 Gbits/sec
[ 3] 10.0-11.0 sec 578 MBytes 4.85 Gbits/sec
[ 3] 11.0-12.0 sec 579 MBytes 4.86 Gbits/sec
[ 3] 12.0-13.0 sec 580 MBytes 4.86 Gbits/sec
[ 3] 13.0-14.0 sec 594 MBytes 4.98 Gbits/sec
[ 3] 14.0-15.0 sec 584 MBytes 4.90 Gbits/sec
[ 3] 15.0-16.0 sec 589 MBytes 4.94 Gbits/sec
[ 3] 16.0-17.0 sec 582 MBytes 4.88 Gbits/sec
[ 3] 17.0-18.0 sec 572 MBytes 4.79 Gbits/sec
[ 3] 18.0-19.0 sec 579 MBytes 4.86 Gbits/sec
[ 3] 19.0-20.0 sec 576 MBytes 4.83 Gbits/sec
[ 3] 20.0-21.0 sec 574 MBytes 4.82 Gbits/sec
[ 3] 21.0-22.0 sec 570 MBytes 4.78 Gbits/sec
[ 3] 22.0-23.0 sec 568 MBytes 4.77 Gbits/sec
[ 3] 23.0-24.0 sec 582 MBytes 4.88 Gbits/sec
[ 3] 24.0-25.0 sec 604 MBytes 5.07 Gbits/sec
[ 3] 25.0-26.0 sec 568 MBytes 4.76 Gbits/sec
[ 3] 26.0-27.0 sec 560 MBytes 4.70 Gbits/sec
[ 3] 27.0-28.0 sec 564 MBytes 4.73 Gbits/sec
[ 3] 28.0-29.0 sec 578 MBytes 4.84 Gbits/sec
[ 3] 29.0-30.0 sec 763 MBytes 6.40 Gbits/sec
[ 3] 0.0-30.0 sec 17.0 GBytes 4.87 Gbits/sec
------------------------------------------------------------
Client connecting to 10.9.9.11, TCP port 5001
TCP window size: 416 KByte (WARNING: requested 400 KByte)
------------------------------------------------------------
[ 3] local 10.9.9.13 port 60480 connected with 10.9.9.11 port 5001
[ ID] Interval Transfer Bandwidth
[ 3] 0.0- 1.0 sec 766 MBytes 6.43 Gbits/sec
[ 3] 1.0- 2.0 sec 563 MBytes 4.72 Gbits/sec
[ 3] 2.0- 3.0 sec 587 MBytes 4.92 Gbits/sec
[ 3] 3.0- 4.0 sec 596 MBytes 5.00 Gbits/sec
[ 3] 4.0- 5.0 sec 562 MBytes 4.72 Gbits/sec
[ 3] 5.0- 6.0 sec 553 MBytes 4.64 Gbits/sec
[ 3] 6.0- 7.0 sec 558 MBytes 4.68 Gbits/sec
[ 3] 7.0- 8.0 sec 558 MBytes 4.68 Gbits/sec
[ 3] 8.0- 9.0 sec 566 MBytes 4.75 Gbits/sec
[ 3] 9.0-10.0 sec 564 MBytes 4.73 Gbits/sec
[ 3] 10.0-11.0 sec 559 MBytes 4.69 Gbits/sec
[ 3] 11.0-12.0 sec 564 MBytes 4.74 Gbits/sec
[ 3] 12.0-13.0 sec 544 MBytes 4.56 Gbits/sec
[ 3] 13.0-14.0 sec 568 MBytes 4.76 Gbits/sec
[ 3] 14.0-15.0 sec 553 MBytes 4.64 Gbits/sec
[ 3] 15.0-16.0 sec 557 MBytes 4.67 Gbits/sec
[ 3] 16.0-17.0 sec 551 MBytes 4.62 Gbits/sec
[ 3] 17.0-18.0 sec 556 MBytes 4.66 Gbits/sec
[ 3] 18.0-19.0 sec 561 MBytes 4.70 Gbits/sec
[ 3] 19.0-20.0 sec 559 MBytes 4.69 Gbits/sec
[ 3] 20.0-21.0 sec 567 MBytes 4.76 Gbits/sec
[ 3] 21.0-22.0 sec 556 MBytes 4.67 Gbits/sec
[ 3] 22.0-23.0 sec 570 MBytes 4.79 Gbits/sec
[ 3] 23.0-24.0 sec 573 MBytes 4.81 Gbits/sec
[ 3] 24.0-25.0 sec 539 MBytes 4.52 Gbits/sec
[ 3] 25.0-26.0 sec 553 MBytes 4.64 Gbits/sec
[ 3] 26.0-27.0 sec 582 MBytes 4.88 Gbits/sec
[ 3] 27.0-28.0 sec 573 MBytes 4.81 Gbits/sec
[ 3] 28.0-29.0 sec 564 MBytes 4.73 Gbits/sec
[ 3] 29.0-30.0 sec 560 MBytes 4.70 Gbits/sec
[ 3] 0.0-30.0 sec 16.7 GBytes 4.78 Gbits/sec
------------------------------------------------------------
Client connecting to 10.9.9.11, TCP port 5001
TCP window size: 416 KByte (WARNING: requested 400 KByte)
------------------------------------------------------------
[ 3] local 10.9.9.12 port 36764 connected with 10.9.9.11 port 5001
[ ID] Interval Transfer Bandwidth
[ 3] 0.0- 1.0 sec 582 MBytes 4.88 Gbits/sec
[ 3] 1.0- 2.0 sec 582 MBytes 4.89 Gbits/sec
[ 3] 2.0- 3.0 sec 587 MBytes 4.93 Gbits/sec
[ 3] 3.0- 4.0 sec 564 MBytes 4.73 Gbits/sec
[ 3] 4.0- 5.0 sec 565 MBytes 4.74 Gbits/sec
[ 3] 5.0- 6.0 sec 564 MBytes 4.73 Gbits/sec
[ 3] 6.0- 7.0 sec 555 MBytes 4.66 Gbits/sec
[ 3] 7.0- 8.0 sec 569 MBytes 4.78 Gbits/sec
[ 3] 8.0- 9.0 sec 576 MBytes 4.83 Gbits/sec
[ 3] 9.0-10.0 sec 584 MBytes 4.90 Gbits/sec
[ 3] 10.0-11.0 sec 568 MBytes 4.77 Gbits/sec
[ 3] 11.0-12.0 sec 566 MBytes 4.74 Gbits/sec
[ 3] 12.0-13.0 sec 581 MBytes 4.87 Gbits/sec
[ 3] 13.0-14.0 sec 571 MBytes 4.79 Gbits/sec
[ 3] 14.0-15.0 sec 568 MBytes 4.77 Gbits/sec
[ 3] 15.0-16.0 sec 565 MBytes 4.74 Gbits/sec
[ 3] 16.0-17.0 sec 568 MBytes 4.76 Gbits/sec
[ 3] 17.0-18.0 sec 563 MBytes 4.72 Gbits/sec
[ 3] 18.0-19.0 sec 567 MBytes 4.75 Gbits/sec
[ 3] 19.0-20.0 sec 546 MBytes 4.58 Gbits/sec
[ 3] 20.0-21.0 sec 577 MBytes 4.84 Gbits/sec
[ 3] 21.0-22.0 sec 563 MBytes 4.73 Gbits/sec
[ 3] 22.0-23.0 sec 584 MBytes 4.89 Gbits/sec
[ 3] 23.0-24.0 sec 592 MBytes 4.97 Gbits/sec
[ 3] 24.0-25.0 sec 575 MBytes 4.82 Gbits/sec
[ 3] 25.0-26.0 sec 569 MBytes 4.78 Gbits/sec
[ 3] 26.0-27.0 sec 563 MBytes 4.72 Gbits/sec
[ 3] 27.0-28.0 sec 552 MBytes 4.63 Gbits/sec
[ 3] 28.0-29.0 sec 568 MBytes 4.77 Gbits/sec
[ 3] 29.0-30.0 sec 741 MBytes 6.22 Gbits/sec
[ 3] 0.0-30.0 sec 16.9 GBytes 4.83 Gbits/sec
------------------------------------------------------------
Client connecting to 10.9.9.11, TCP port 5001
TCP window size: 416 KByte (WARNING: requested 400 KByte)
------------------------------------------------------------
[ 3] local 10.9.9.13 port 60492 connected with 10.9.9.11 port 5001
[ ID] Interval Transfer Bandwidth
[ 3] 0.0- 1.0 sec 728 MBytes 6.11 Gbits/sec
[ 3] 1.0- 2.0 sec 544 MBytes 4.56 Gbits/sec
[ 3] 2.0- 3.0 sec 551 MBytes 4.62 Gbits/sec
[ 3] 3.0- 4.0 sec 557 MBytes 4.67 Gbits/sec
[ 3] 4.0- 5.0 sec 577 MBytes 4.84 Gbits/sec
[ 3] 5.0- 6.0 sec 574 MBytes 4.82 Gbits/sec
[ 3] 6.0- 7.0 sec 576 MBytes 4.83 Gbits/sec
[ 3] 7.0- 8.0 sec 569 MBytes 4.78 Gbits/sec
[ 3] 8.0- 9.0 sec 567 MBytes 4.76 Gbits/sec
[ 3] 9.0-10.0 sec 561 MBytes 4.70 Gbits/sec
[ 3] 10.0-11.0 sec 556 MBytes 4.67 Gbits/sec
[ 3] 11.0-12.0 sec 564 MBytes 4.73 Gbits/sec
[ 3] 12.0-13.0 sec 553 MBytes 4.64 Gbits/sec
[ 3] 13.0-14.0 sec 566 MBytes 4.75 Gbits/sec
[ 3] 14.0-15.0 sec 565 MBytes 4.74 Gbits/sec
[ 3] 15.0-16.0 sec 563 MBytes 4.72 Gbits/sec
[ 3] 16.0-17.0 sec 560 MBytes 4.70 Gbits/sec
[ 3] 17.0-18.0 sec 565 MBytes 4.74 Gbits/sec
[ 3] 18.0-19.0 sec 572 MBytes 4.80 Gbits/sec
[ 3] 19.0-20.0 sec 586 MBytes 4.92 Gbits/sec
[ 3] 20.0-21.0 sec 561 MBytes 4.70 Gbits/sec
[ 3] 21.0-22.0 sec 569 MBytes 4.77 Gbits/sec
[ 3] 22.0-23.0 sec 560 MBytes 4.70 Gbits/sec
[ 3] 23.0-24.0 sec 557 MBytes 4.67 Gbits/sec
[ 3] 24.0-25.0 sec 565 MBytes 4.74 Gbits/sec
[ 3] 25.0-26.0 sec 560 MBytes 4.70 Gbits/sec
[ 3] 26.0-27.0 sec 580 MBytes 4.87 Gbits/sec
[ 3] 27.0-28.0 sec 572 MBytes 4.79 Gbits/sec
[ 3] 28.0-29.0 sec 571 MBytes 4.79 Gbits/sec
[ 3] 29.0-30.0 sec 554 MBytes 4.65 Gbits/sec
[ 3] 0.0-30.0 sec 16.7 GBytes 4.78 Gbits/sec
------------------------------------------------------------
Client connecting to 10.9.9.11, TCP port 5001
TCP window size: 416 KByte (WARNING: requested 400 KByte)
------------------------------------------------------------
[ 3] local 10.9.9.12 port 36774 connected with 10.9.9.11 port 5001
[ ID] Interval Transfer Bandwidth
[ 3] 0.0- 1.0 sec 575 MBytes 4.82 Gbits/sec
[ 3] 1.0- 2.0 sec 566 MBytes 4.74 Gbits/sec
[ 3] 2.0- 3.0 sec 557 MBytes 4.67 Gbits/sec
[ 3] 3.0- 4.0 sec 596 MBytes 5.00 Gbits/sec
[ 3] 4.0- 5.0 sec 569 MBytes 4.77 Gbits/sec
[ 3] 5.0- 6.0 sec 572 MBytes 4.80 Gbits/sec
[ 3] 6.0- 7.0 sec 574 MBytes 4.82 Gbits/sec
[ 3] 7.0- 8.0 sec 591 MBytes 4.96 Gbits/sec
[ 3] 8.0- 9.0 sec 578 MBytes 4.85 Gbits/sec
[ 3] 9.0-10.0 sec 575 MBytes 4.82 Gbits/sec
[ 3] 10.0-11.0 sec 542 MBytes 4.55 Gbits/sec
[ 3] 11.0-12.0 sec 549 MBytes 4.61 Gbits/sec
[ 3] 12.0-13.0 sec 567 MBytes 4.76 Gbits/sec
[ 3] 13.0-14.0 sec 551 MBytes 4.63 Gbits/sec
[ 3] 14.0-15.0 sec 568 MBytes 4.76 Gbits/sec
[ 3] 15.0-16.0 sec 561 MBytes 4.71 Gbits/sec
[ 3] 16.0-17.0 sec 559 MBytes 4.69 Gbits/sec
[ 3] 17.0-18.0 sec 559 MBytes 4.69 Gbits/sec
[ 3] 18.0-19.0 sec 578 MBytes 4.85 Gbits/sec
[ 3] 19.0-20.0 sec 579 MBytes 4.85 Gbits/sec
[ 3] 20.0-21.0 sec 567 MBytes 4.76 Gbits/sec
[ 3] 21.0-22.0 sec 560 MBytes 4.70 Gbits/sec
[ 3] 22.0-23.0 sec 566 MBytes 4.75 Gbits/sec
[ 3] 23.0-24.0 sec 567 MBytes 4.75 Gbits/sec
[ 3] 24.0-25.0 sec 567 MBytes 4.76 Gbits/sec
[ 3] 25.0-26.0 sec 579 MBytes 4.86 Gbits/sec
[ 3] 26.0-27.0 sec 585 MBytes 4.91 Gbits/sec
[ 3] 27.0-28.0 sec 565 MBytes 4.74 Gbits/sec
[ 3] 28.0-29.0 sec 548 MBytes 4.60 Gbits/sec
[ 3] 29.0-30.0 sec 748 MBytes 6.27 Gbits/sec
[ 3] 0.0-30.0 sec 16.8 GBytes 4.81 Gbits/sec
------------------------------------------------------------
Client connecting to 10.9.9.11, TCP port 5001
TCP window size: 416 KByte (WARNING: requested 400 KByte)
------------------------------------------------------------
[ 3] local 10.9.9.13 port 60504 connected with 10.9.9.11 port 5001
[ ID] Interval Transfer Bandwidth
[ 3] 0.0- 1.0 sec 740 MBytes 6.20 Gbits/sec
[ 3] 1.0- 2.0 sec 560 MBytes 4.69 Gbits/sec
[ 3] 2.0- 3.0 sec 571 MBytes 4.79 Gbits/sec
[ 3] 3.0- 4.0 sec 558 MBytes 4.68 Gbits/sec
[ 3] 4.0- 5.0 sec 559 MBytes 4.69 Gbits/sec
[ 3] 5.0- 6.0 sec 566 MBytes 4.75 Gbits/sec
[ 3] 6.0- 7.0 sec 565 MBytes 4.74 Gbits/sec
[ 3] 7.0- 8.0 sec 555 MBytes 4.66 Gbits/sec
[ 3] 8.0- 9.0 sec 560 MBytes 4.69 Gbits/sec
[ 3] 9.0-10.0 sec 564 MBytes 4.73 Gbits/sec
[ 3] 10.0-11.0 sec 577 MBytes 4.84 Gbits/sec
[ 3] 11.0-12.0 sec 602 MBytes 5.05 Gbits/sec
[ 3] 12.0-13.0 sec 571 MBytes 4.79 Gbits/sec
[ 3] 13.0-14.0 sec 579 MBytes 4.86 Gbits/sec
[ 3] 14.0-15.0 sec 570 MBytes 4.78 Gbits/sec
[ 3] 15.0-16.0 sec 570 MBytes 4.78 Gbits/sec
[ 3] 16.0-17.0 sec 574 MBytes 4.82 Gbits/sec
[ 3] 17.0-18.0 sec 573 MBytes 4.81 Gbits/sec
[ 3] 18.0-19.0 sec 569 MBytes 4.78 Gbits/sec
[ 3] 19.0-20.0 sec 568 MBytes 4.77 Gbits/sec
[ 3] 20.0-21.0 sec 555 MBytes 4.65 Gbits/sec
[ 3] 21.0-22.0 sec 584 MBytes 4.90 Gbits/sec
[ 3] 22.0-23.0 sec 578 MBytes 4.85 Gbits/sec
[ 3] 23.0-24.0 sec 576 MBytes 4.83 Gbits/sec
[ 3] 24.0-25.0 sec 575 MBytes 4.82 Gbits/sec
[ 3] 25.0-26.0 sec 574 MBytes 4.82 Gbits/sec
[ 3] 26.0-27.0 sec 557 MBytes 4.67 Gbits/sec
[ 3] 27.0-28.0 sec 570 MBytes 4.78 Gbits/sec
[ 3] 28.0-29.0 sec 587 MBytes 4.92 Gbits/sec
[ 3] 29.0-30.0 sec 567 MBytes 4.75 Gbits/sec
[ 3] 0.0-30.0 sec 16.9 GBytes 4.83 Gbits/sec
------------------------------------------------------------
Client connecting to 10.9.9.11, TCP port 5001
TCP window size: 416 KByte (WARNING: requested 400 KByte)
------------------------------------------------------------
[ 3] local 10.9.9.12 port 40318 connected with 10.9.9.11 port 5001
[ ID] Interval Transfer Bandwidth
[ 3] 0.0- 1.0 sec 568 MBytes 4.77 Gbits/sec
[ 3] 1.0- 2.0 sec 540 MBytes 4.53 Gbits/sec
[ 3] 2.0- 3.0 sec 578 MBytes 4.85 Gbits/sec
[ 3] 3.0- 4.0 sec 567 MBytes 4.76 Gbits/sec
[ 3] 4.0- 5.0 sec 586 MBytes 4.92 Gbits/sec
[ 3] 5.0- 6.0 sec 586 MBytes 4.91 Gbits/sec
[ 3] 6.0- 7.0 sec 564 MBytes 4.73 Gbits/sec
[ 3] 7.0- 8.0 sec 555 MBytes 4.66 Gbits/sec
[ 3] 8.0- 9.0 sec 568 MBytes 4.76 Gbits/sec
[ 3] 9.0-10.0 sec 571 MBytes 4.79 Gbits/sec
[ 3] 10.0-11.0 sec 560 MBytes 4.70 Gbits/sec
[ 3] 11.0-12.0 sec 564 MBytes 4.73 Gbits/sec
[ 3] 12.0-13.0 sec 569 MBytes 4.78 Gbits/sec
[ 3] 13.0-14.0 sec 567 MBytes 4.75 Gbits/sec
[ 3] 14.0-15.0 sec 557 MBytes 4.67 Gbits/sec
[ 3] 15.0-16.0 sec 556 MBytes 4.67 Gbits/sec
[ 3] 16.0-17.0 sec 554 MBytes 4.64 Gbits/sec
[ 3] 17.0-18.0 sec 562 MBytes 4.71 Gbits/sec
[ 3] 18.0-19.0 sec 572 MBytes 4.80 Gbits/sec
[ 3] 19.0-20.0 sec 565 MBytes 4.74 Gbits/sec
[ 3] 20.0-21.0 sec 571 MBytes 4.79 Gbits/sec
[ 3] 21.0-22.0 sec 566 MBytes 4.75 Gbits/sec
[ 3] 22.0-23.0 sec 562 MBytes 4.71 Gbits/sec
[ 3] 23.0-24.0 sec 558 MBytes 4.68 Gbits/sec
[ 3] 24.0-25.0 sec 569 MBytes 4.77 Gbits/sec
[ 3] 25.0-26.0 sec 577 MBytes 4.84 Gbits/sec
[ 3] 26.0-27.0 sec 568 MBytes 4.77 Gbits/sec
[ 3] 27.0-28.0 sec 562 MBytes 4.72 Gbits/sec
[ 3] 28.0-29.0 sec 573 MBytes 4.81 Gbits/sec
[ 3] 29.0-30.0 sec 611 MBytes 5.12 Gbits/sec
[ 3] 0.0-30.0 sec 16.6 GBytes 4.76 Gbits/sec
------------------------------------------------------------
Client connecting to 10.9.9.11, TCP port 5001
TCP window size: 416 KByte (WARNING: requested 400 KByte)
------------------------------------------------------------
[ 3] local 10.9.9.13 port 36942 connected with 10.9.9.11 port 5001
[ ID] Interval Transfer Bandwidth
[ 3] 0.0- 1.0 sec 632 MBytes 5.30 Gbits/sec
[ 3] 1.0- 2.0 sec 594 MBytes 4.98 Gbits/sec
[ 3] 2.0- 3.0 sec 555 MBytes 4.65 Gbits/sec
[ 3] 3.0- 4.0 sec 567 MBytes 4.76 Gbits/sec
[ 3] 4.0- 5.0 sec 554 MBytes 4.65 Gbits/sec
[ 3] 5.0- 6.0 sec 557 MBytes 4.68 Gbits/sec
[ 3] 6.0- 7.0 sec 573 MBytes 4.81 Gbits/sec
[ 3] 7.0- 8.0 sec 578 MBytes 4.85 Gbits/sec
[ 3] 8.0- 9.0 sec 562 MBytes 4.71 Gbits/sec
[ 3] 9.0-10.0 sec 571 MBytes 4.79 Gbits/sec
[ 3] 10.0-11.0 sec 576 MBytes 4.84 Gbits/sec
[ 3] 11.0-12.0 sec 568 MBytes 4.77 Gbits/sec
[ 3] 12.0-13.0 sec 570 MBytes 4.78 Gbits/sec
[ 3] 13.0-14.0 sec 574 MBytes 4.81 Gbits/sec
[ 3] 14.0-15.0 sec 575 MBytes 4.83 Gbits/sec
[ 3] 15.0-16.0 sec 581 MBytes 4.87 Gbits/sec
[ 3] 16.0-17.0 sec 587 MBytes 4.93 Gbits/sec
[ 3] 17.0-18.0 sec 573 MBytes 4.81 Gbits/sec
[ 3] 18.0-19.0 sec 572 MBytes 4.80 Gbits/sec
[ 3] 19.0-20.0 sec 564 MBytes 4.73 Gbits/sec
[ 3] 20.0-21.0 sec 566 MBytes 4.74 Gbits/sec
[ 3] 21.0-22.0 sec 568 MBytes 4.76 Gbits/sec
[ 3] 22.0-23.0 sec 574 MBytes 4.82 Gbits/sec
[ 3] 23.0-24.0 sec 578 MBytes 4.85 Gbits/sec
[ 3] 24.0-25.0 sec 566 MBytes 4.75 Gbits/sec
[ 3] 25.0-26.0 sec 570 MBytes 4.78 Gbits/sec
[ 3] 26.0-27.0 sec 560 MBytes 4.70 Gbits/sec
[ 3] 27.0-28.0 sec 578 MBytes 4.85 Gbits/sec
[ 3] 28.0-29.0 sec 566 MBytes 4.75 Gbits/sec
[ 3] 29.0-30.0 sec 576 MBytes 4.83 Gbits/sec
[ 3] 0.0-30.0 sec 16.8 GBytes 4.81 Gbits/sec
------------------------------------------------------------
Client connecting to 10.9.9.11, TCP port 5001
TCP window size: 416 KByte (WARNING: requested 400 KByte)
------------------------------------------------------------
[ 3] local 10.9.9.12 port 40324 connected with 10.9.9.11 port 5001
[ ID] Interval Transfer Bandwidth
[ 3] 0.0- 1.0 sec 602 MBytes 5.05 Gbits/sec
[ 3] 1.0- 2.0 sec 574 MBytes 4.81 Gbits/sec
[ 3] 2.0- 3.0 sec 575 MBytes 4.82 Gbits/sec
[ 3] 3.0- 4.0 sec 586 MBytes 4.92 Gbits/sec
[ 3] 4.0- 5.0 sec 592 MBytes 4.97 Gbits/sec
[ 3] 5.0- 6.0 sec 566 MBytes 4.75 Gbits/sec
[ 3] 6.0- 7.0 sec 581 MBytes 4.87 Gbits/sec
[ 3] 7.0- 8.0 sec 574 MBytes 4.82 Gbits/sec
[ 3] 8.0- 9.0 sec 560 MBytes 4.70 Gbits/sec
[ 3] 9.0-10.0 sec 581 MBytes 4.87 Gbits/sec
[ 3] 10.0-11.0 sec 577 MBytes 4.84 Gbits/sec
[ 3] 11.0-12.0 sec 563 MBytes 4.72 Gbits/sec
[ 3] 12.0-13.0 sec 562 MBytes 4.72 Gbits/sec
[ 3] 13.0-14.0 sec 554 MBytes 4.64 Gbits/sec
[ 3] 14.0-15.0 sec 573 MBytes 4.81 Gbits/sec
[ 3] 15.0-16.0 sec 564 MBytes 4.73 Gbits/sec
[ 3] 16.0-17.0 sec 568 MBytes 4.77 Gbits/sec
[ 3] 17.0-18.0 sec 556 MBytes 4.67 Gbits/sec
[ 3] 18.0-19.0 sec 566 MBytes 4.75 Gbits/sec
[ 3] 19.0-20.0 sec 578 MBytes 4.85 Gbits/sec
[ 3] 20.0-21.0 sec 569 MBytes 4.78 Gbits/sec
[ 3] 21.0-22.0 sec 589 MBytes 4.94 Gbits/sec
[ 3] 22.0-23.0 sec 604 MBytes 5.07 Gbits/sec
[ 3] 23.0-24.0 sec 570 MBytes 4.78 Gbits/sec
[ 3] 24.0-25.0 sec 572 MBytes 4.80 Gbits/sec
[ 3] 25.0-26.0 sec 557 MBytes 4.68 Gbits/sec
[ 3] 26.0-27.0 sec 548 MBytes 4.59 Gbits/sec
[ 3] 27.0-28.0 sec 563 MBytes 4.72 Gbits/sec
[ 3] 28.0-29.0 sec 566 MBytes 4.75 Gbits/sec
[ 3] 29.0-30.0 sec 834 MBytes 6.99 Gbits/sec
[ 3] 0.0-30.0 sec 17.0 GBytes 4.87 Gbits/sec
------------------------------------------------------------
Client connecting to 10.9.9.11, TCP port 5001
TCP window size: 416 KByte (WARNING: requested 400 KByte)
------------------------------------------------------------
[ 3] local 10.9.9.13 port 36950 connected with 10.9.9.11 port 5001
[ ID] Interval Transfer Bandwidth
[ 3] 0.0- 1.0 sec 815 MBytes 6.83 Gbits/sec
[ 3] 1.0- 2.0 sec 559 MBytes 4.69 Gbits/sec
[ 3] 2.0- 3.0 sec 558 MBytes 4.68 Gbits/sec
[ 3] 3.0- 4.0 sec 554 MBytes 4.65 Gbits/sec
[ 3] 4.0- 5.0 sec 554 MBytes 4.65 Gbits/sec
[ 3] 5.0- 6.0 sec 582 MBytes 4.88 Gbits/sec
[ 3] 6.0- 7.0 sec 562 MBytes 4.72 Gbits/sec
[ 3] 7.0- 8.0 sec 579 MBytes 4.86 Gbits/sec
[ 3] 8.0- 9.0 sec 572 MBytes 4.80 Gbits/sec
[ 3] 9.0-10.0 sec 568 MBytes 4.77 Gbits/sec
[ 3] 10.0-11.0 sec 564 MBytes 4.73 Gbits/sec
[ 3] 11.0-12.0 sec 574 MBytes 4.82 Gbits/sec
[ 3] 12.0-13.0 sec 568 MBytes 4.76 Gbits/sec
[ 3] 13.0-14.0 sec 581 MBytes 4.88 Gbits/sec
[ 3] 14.0-15.0 sec 579 MBytes 4.86 Gbits/sec
[ 3] 15.0-16.0 sec 582 MBytes 4.88 Gbits/sec
[ 3] 16.0-17.0 sec 557 MBytes 4.67 Gbits/sec
[ 3] 17.0-18.0 sec 583 MBytes 4.89 Gbits/sec
[ 3] 18.0-19.0 sec 586 MBytes 4.91 Gbits/sec
[ 3] 19.0-20.0 sec 556 MBytes 4.66 Gbits/sec
[ 3] 20.0-21.0 sec 561 MBytes 4.71 Gbits/sec
[ 3] 21.0-22.0 sec 573 MBytes 4.81 Gbits/sec
[ 3] 22.0-23.0 sec 537 MBytes 4.51 Gbits/sec
[ 3] 23.0-24.0 sec 556 MBytes 4.66 Gbits/sec
[ 3] 24.0-25.0 sec 569 MBytes 4.77 Gbits/sec
[ 3] 25.0-26.0 sec 578 MBytes 4.85 Gbits/sec
[ 3] 26.0-27.0 sec 590 MBytes 4.95 Gbits/sec
[ 3] 27.0-28.0 sec 568 MBytes 4.76 Gbits/sec
[ 3] 28.0-29.0 sec 574 MBytes 4.81 Gbits/sec
[ 3] 29.0-30.0 sec 579 MBytes 4.85 Gbits/sec
[ 3] 0.0-30.0 sec 16.9 GBytes 4.84 Gbits/sec
#!/bin/bash
DUR=30
# K is DCTCP threshold in the unit of cells (208 bytes for BCMXXXXX)
K_START=0
K_END=32
K_INTERVAL=32
MTU=1500
set -x
for ((K=$K_START; K<=$K_END; K+=$K_INTERVAL))
do
ssh honey1.kaist.ac.kr -p 2222 ~/change_k.sh $K > /dev/null
EXP=$MTU-$K
sleep 1
ssh honey2.kaist.ac.kr -p 2222 sudo taskset 0x02 iperf -c 10.9.9.11 -i 1 -Z dctcp -w 400K -t $DUR > $EXP-1.txt &
ssh honey3.kaist.ac.kr -p 2222 sudo taskset 0x02 iperf -c 10.9.9.11 -i 1 -Z dctcp -w 400K -t $DUR > $EXP-2.txt
wait
cat $EXP-*.txt
sleep 3
done
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""This is the top-level module of the SimBricks orchestration framework that
users interact with."""
import argparse
import asyncio
import fnmatch
import importlib
import importlib.util
import json
import os
import pickle
import signal
import sys
import typing as tp
from simbricks.orchestration import exectools
from simbricks.orchestration import experiments as exps
from simbricks.orchestration import runtime
from simbricks.orchestration.experiment import experiment_environment
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
# general arguments for experiments
parser.add_argument(
'experiments',
metavar='EXP',
type=str,
nargs='+',
help='Python modules to load the experiments from'
)
parser.add_argument(
'--list',
action='store_const',
const=True,
default=False,
help='List available experiment names'
)
parser.add_argument(
'--filter',
metavar='PATTERN',
type=str,
nargs='+',
help='Only run experiments matching the given Unix shell style patterns'
)
parser.add_argument(
'--pickled',
action='store_const',
const=True,
default=False,
help='Interpret experiment modules as pickled runs instead of .py files'
)
parser.add_argument(
'--runs',
metavar='N',
type=int,
default=1,
help='Number of repetition of each experiment'
)
parser.add_argument(
'--firstrun', metavar='N', type=int, default=1, help='ID for first run'
)
parser.add_argument(
'--force',
action='store_const',
const=True,
default=False,
help='Run experiments even if output already exists (overwrites output)'
)
parser.add_argument(
'--verbose',
action='store_const',
const=True,
default=False,
help='Verbose output, for example, print component simulators\' output'
)
parser.add_argument(
'--pcap',
action='store_const',
const=True,
default=False,
help='Dump pcap file (if supported by component simulator)'
)
parser.add_argument(
'--profile-int',
metavar='S',
type=int,
default=None,
help='Enable periodic sigusr1 to each simulator every S seconds.'
)
# arguments for the experiment environment
g_env = parser.add_argument_group('Environment')
g_env.add_argument(
'--repo',
metavar='DIR',
type=str,
default=os.path.dirname(__file__) + '/..',
help='SimBricks repository directory'
)
g_env.add_argument(
'--workdir',
metavar='DIR',
type=str,
default='./out/',
help='Work directory base'
)
g_env.add_argument(
'--outdir',
metavar='DIR',
type=str,
default='./out/',
help='Output directory base'
)
g_env.add_argument(
'--cpdir',
metavar='DIR',
type=str,
default='./out/',
help='Checkpoint directory base'
)
g_env.add_argument(
'--hosts',
metavar='JSON_FILE',
type=str,
default=None,
help='List of hosts to use (json)'
)
g_env.add_argument(
'--shmdir',
metavar='DIR',
type=str,
default=None,
help='Shared memory directory base (workdir if not set)'
)
# arguments for the parallel runtime
g_par = parser.add_argument_group('Parallel Runtime')
g_par.add_argument(
'--parallel',
dest='runtime',
action='store_const',
const='parallel',
default='sequential',
help='Use parallel instead of sequential runtime'
)
g_par.add_argument(
'--cores',
metavar='N',
type=int,
default=len(os.sched_getaffinity(0)),
help='Number of cores to use for parallel runs'
)
g_par.add_argument(
'--mem',
metavar='N',
type=int,
default=None,
help='Memory limit for parallel runs (in MB)'
)
# arguments for the slurm runtime
g_slurm = parser.add_argument_group('Slurm Runtime')
g_slurm.add_argument(
'--slurm',
dest='runtime',
action='store_const',
const='slurm',
default='sequential',
help='Use slurm instead of sequential runtime'
)
g_slurm.add_argument(
'--slurmdir',
metavar='DIR',
type=str,
default='./slurm/',
help='Slurm communication directory'
)
# arguments for the distributed runtime
g_dist = parser.add_argument_group('Distributed Runtime')
g_dist.add_argument(
'--dist',
dest='runtime',
action='store_const',
const='dist',
default='sequential',
help='Use sequential distributed runtime instead of local'
)
g_dist.add_argument(
'--auto-dist',
action='store_const',
const=True,
default=False,
help='Automatically distribute non-distributed experiments'
)
g_dist.add_argument(
'--proxy-type',
metavar='TYPE',
type=str,
default='sockets',
help='Proxy type to use (sockets,rdma) for auto distribution'
)
return parser.parse_args()
def load_executors(path: str) -> tp.List[exectools.Executor]:
"""Load hosts list from json file and return list of executors."""
with open(path, 'r', encoding='utf-8') as f:
hosts = json.load(f)
exs = []
for h in hosts:
if h['type'] == 'local':
ex = exectools.LocalExecutor()
elif h['type'] == 'remote':
ex = exectools.RemoteExecutor(h['host'], h['workdir'])
if 'ssh_args' in h:
ex.ssh_extra_args += h['ssh_args']
if 'scp_args' in h:
ex.scp_extra_args += h['scp_args']
else:
raise RuntimeError('invalid host type "' + h['type'] + '"')
ex.ip = h['ip']
exs.append(ex)
return exs
def warn_multi_exec(executors: tp.List[exectools.Executor]):
if len(executors) > 1:
print(
'Warning: multiple hosts specified, only using first one for now',
file=sys.stderr
)
def add_exp(
e: exps.Experiment,
rt: runtime.Runtime,
run: int,
prereq: tp.Optional[runtime.Run],
create_cp: bool,
restore_cp: bool,
no_simbricks: bool,
args: argparse.Namespace
):
outpath = f'{args.outdir}/{e.name}-{run}.json'
if os.path.exists(outpath) and not args.force:
print(f'skip {e.name} run {run}')
return None
workdir = f'{args.workdir}/{e.name}/{run}'
cpdir = f'{args.cpdir}/{e.name}/0'
if args.shmdir is not None:
shmdir = f'{args.shmdir}/{e.name}/{run}'
env = experiment_environment.ExpEnv(args.repo, workdir, cpdir)
env.create_cp = create_cp
env.restore_cp = restore_cp
env.no_simbricks = no_simbricks
env.pcap_file = ''
if args.pcap:
env.pcap_file = workdir + '/pcap'
if args.shmdir is not None:
env.shm_base = os.path.abspath(shmdir)
run = runtime.Run(e, run, env, outpath, prereq)
rt.add_run(run)
return run
def main():
args = parse_args()
if args.hosts is None:
executors = [exectools.LocalExecutor()]
else:
executors = load_executors(args.hosts)
# initialize runtime
if args.runtime == 'parallel':
warn_multi_exec(executors)
rt = runtime.LocalParallelRuntime(
cores=args.cores,
mem=args.mem,
verbose=args.verbose,
executor=executors[0]
)
elif args.runtime == 'slurm':
rt = runtime.SlurmRuntime(args.slurmdir, args, verbose=args.verbose)
elif args.runtime == 'dist':
rt = runtime.DistributedSimpleRuntime(executors, verbose=args.verbose)
else:
warn_multi_exec(executors)
rt = runtime.LocalSimpleRuntime(
verbose=args.verbose, executor=executors[0]
)
if args.profile_int:
rt.enable_profiler(args.profile_int)
# load experiments
if not args.pickled:
# default: load python modules with experiments
experiments = []
for path in args.experiments:
modname, _ = os.path.splitext(os.path.basename(path))
class ExperimentModuleLoadError(Exception):
pass
spec = importlib.util.spec_from_file_location(modname, path)
if spec is None:
raise ExperimentModuleLoadError('spec is None')
mod = importlib.util.module_from_spec(spec)
if spec.loader is None:
raise ExperimentModuleLoadError('spec.loader is None')
spec.loader.exec_module(mod)
experiments += mod.experiments
if args.list:
for e in experiments:
print(e.name)
sys.exit(0)
for e in experiments:
if args.auto_dist and not isinstance(e, exps.DistributedExperiment):
e = runtime.auto_dist(e, executors, args.proxy_type)
# apply filter if any specified
if (args.filter) and (len(args.filter) > 0):
match = False
for f in args.filter:
match = fnmatch.fnmatch(e.name, f)
if match:
break
if not match:
continue
# if this is an experiment with a checkpoint we might have to create
# it
no_simbricks = e.no_simbricks
if e.checkpoint:
prereq = add_exp(
e, rt, 0, None, True, False, no_simbricks, args
)
else:
prereq = None
for run in range(args.firstrun, args.firstrun + args.runs):
add_exp(
e, rt, run, prereq, False, e.checkpoint, no_simbricks, args
)
else:
# otherwise load pickled run object
for path in args.experiments:
with open(path, 'rb') as f:
rt.add_run(pickle.load(f))
# register interrupt handler
signal.signal(signal.SIGINT, lambda *_: rt.interrupt())
# invoke runtime to run experiments
asyncio.run(rt.start())
if __name__ == '__main__':
main()
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import abc
import asyncio
import os
import pathlib
import re
import shlex
import shutil
import signal
import typing as tp
from asyncio.subprocess import Process
class Component(object):
def __init__(self, cmd_parts: tp.List[str], with_stdin=False):
self.is_ready = False
self.stdout: tp.List[str] = []
self.stdout_buf = bytearray()
self.stderr: tp.List[str] = []
self.stderr_buf = bytearray()
self.cmd_parts = cmd_parts
#print(cmd_parts)
self.with_stdin = with_stdin
self._proc: Process
self._terminate_future: asyncio.Task
def _parse_buf(self, buf: bytearray, data: bytes) -> tp.List[str]:
if data is not None:
buf.extend(data)
lines = []
start = 0
for i in range(0, len(buf)):
if buf[i] == ord('\n'):
l = buf[start:i].decode('utf-8')
lines.append(l)
start = i + 1
del buf[0:start]
if len(data) == 0 and len(buf) > 0:
lines.append(buf.decode('utf-8'))
return lines
async def _consume_out(self, data: bytes) -> None:
eof = len(data) == 0
ls = self._parse_buf(self.stdout_buf, data)
if len(ls) > 0 or eof:
await self.process_out(ls, eof=eof)
self.stdout.extend(ls)
async def _consume_err(self, data: bytes) -> None:
eof = len(data) == 0
ls = self._parse_buf(self.stderr_buf, data)
if len(ls) > 0 or eof:
await self.process_err(ls, eof=eof)
self.stderr.extend(ls)
async def _read_stream(self, stream: asyncio.StreamReader, fn):
while True:
bs = await stream.readline()
if bs:
await fn(bs)
else:
await fn(bs)
return
async def _waiter(self) -> None:
stdout_handler = asyncio.create_task(
self._read_stream(self._proc.stdout, self._consume_out)
)
stderr_handler = asyncio.create_task(
self._read_stream(self._proc.stderr, self._consume_err)
)
rc = await self._proc.wait()
await asyncio.gather(stdout_handler, stderr_handler)
await self.terminated(rc)
async def send_input(self, bs: bytes, eof=False) -> None:
self._proc.stdin.write(bs)
if eof:
self._proc.stdin.close()
async def start(self) -> None:
if self.with_stdin:
stdin = asyncio.subprocess.PIPE
else:
stdin = asyncio.subprocess.DEVNULL
self._proc = await asyncio.create_subprocess_exec(
*self.cmd_parts,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=stdin,
)
self._terminate_future = asyncio.create_task(self._waiter())
await self.started()
async def wait(self) -> None:
"""
Wait for running process to finish and output to be collected.
On cancellation, the `CancelledError` is propagated but this component
keeps running.
"""
await asyncio.shield(self._terminate_future)
async def interrupt(self) -> None:
"""Sends an interrupt signal."""
if self._proc.returncode is None:
self._proc.send_signal(signal.SIGINT)
async def terminate(self) -> None:
"""Sends a terminate signal."""
if self._proc.returncode is None:
self._proc.terminate()
async def kill(self) -> None:
"""Sends a kill signal."""
if self._proc.returncode is None:
self._proc.kill()
async def int_term_kill(self, delay: int = 5) -> None:
"""Attempts to stop this component by sending signals in the following
order: interrupt, terminate, kill."""
await self.interrupt()
try:
await asyncio.wait_for(self._proc.wait(), delay)
return
# before Python 3.11, asyncio.wait_for() throws asyncio.TimeoutError -_-
except (TimeoutError, asyncio.TimeoutError):
print(
f'terminating component {self.cmd_parts[0]} '
f'pid {self._proc.pid}',
flush=True
)
await self.terminate()
try:
await asyncio.wait_for(self._proc.wait(), delay)
return
except (TimeoutError, asyncio.TimeoutError):
print(
f'killing component {self.cmd_parts[0]} '
f'pid {self._proc.pid}',
flush=True
)
await self.kill()
await self._proc.wait()
async def sigusr1(self) -> None:
"""Sends an interrupt signal."""
if self._proc.returncode is None:
self._proc.send_signal(signal.SIGUSR1)
async def started(self) -> None:
pass
async def terminated(self, rc) -> None:
pass
async def process_out(self, lines: tp.List[str], eof: bool) -> None:
pass
async def process_err(self, lines: tp.List[str], eof: bool) -> None:
pass
class SimpleComponent(Component):
def __init__(
self,
label: str,
cmd_parts: tp.List[str],
*args,
verbose=True,
canfail=False,
**kwargs
) -> None:
self.label = label
self.verbose = verbose
self.canfail = canfail
self.cmd_parts = cmd_parts
super().__init__(cmd_parts, *args, **kwargs)
async def process_out(self, lines: tp.List[str], eof: bool) -> None:
if self.verbose:
for _ in lines:
print(self.label, 'OUT:', lines, flush=True)
async def process_err(self, lines: tp.List[str], eof: bool) -> None:
if self.verbose:
for _ in lines:
print(self.label, 'ERR:', lines, flush=True)
async def terminated(self, rc: int) -> None:
if self.verbose:
print(self.label, 'TERMINATED:', rc, flush=True)
if not self.canfail and rc != 0:
raise RuntimeError('Command Failed: ' + str(self.cmd_parts))
class SimpleRemoteComponent(SimpleComponent):
def __init__(
self,
host_name: str,
label: str,
cmd_parts: tp.List[str],
*args,
cwd: tp.Optional[str] = None,
ssh_extra_args: tp.Optional[tp.List[str]] = None,
**kwargs
) -> None:
if ssh_extra_args is None:
ssh_extra_args = []
self.host_name = host_name
self.extra_flags = ssh_extra_args
# add a wrapper to print the PID
remote_parts = ['echo', 'PID', '$$', '&&']
if cwd is not None:
# if necessary add a CD command
remote_parts += ['cd', cwd, '&&']
# escape actual command parts
cmd_parts = list(map(shlex.quote, cmd_parts))
# use exec to make sure the command we run keeps the PIDS
remote_parts += ['exec'] + cmd_parts
# wrap up command in ssh invocation
parts = self._ssh_cmd(remote_parts)
super().__init__(label, parts, *args, **kwargs)
self._pid_fut: tp.Optional[asyncio.Future] = None
def _ssh_cmd(self, parts: tp.List[str]) -> tp.List[str]:
"""SSH invocation of command for this host."""
return [
'ssh',
'-o',
'UserKnownHostsFile=/dev/null',
'-o',
'StrictHostKeyChecking=no'
] + self.extra_flags + [self.host_name, '--'] + parts
async def start(self) -> None:
"""Start this command (includes waiting for its pid)."""
self._pid_fut = asyncio.get_running_loop().create_future()
await super().start()
await self._pid_fut
async def process_out(self, lines: tp.List[str], eof: bool) -> None:
"""Scans output and set PID future once PID line found."""
if not self._pid_fut.done():
newlines = []
pid_re = re.compile(r'^PID\s+(\d+)\s*$')
for l in lines:
m = pid_re.match(l)
if m:
pid = int(m.group(1))
self._pid_fut.set_result(pid)
else:
newlines.append(l)
lines = newlines
if eof and not self._pid_fut.done():
# cancel PID future if it's not going to happen
print('PID not found but EOF already found:', self.label)
self._pid_fut.cancel()
await super().process_out(lines, eof)
async def _kill_cmd(self, sig: str) -> None:
"""Send signal to command by running ssh kill -$sig $PID."""
cmd_parts = self._ssh_cmd([
'kill', '-' + sig, str(self._pid_fut.result())
])
proc = await asyncio.create_subprocess_exec(*cmd_parts)
await proc.wait()
async def interrupt(self) -> None:
await self._kill_cmd('INT')
async def terminate(self) -> None:
await self._kill_cmd('TERM')
async def kill(self) -> None:
await self._kill_cmd('KILL')
class Executor(abc.ABC):
def __init__(self) -> None:
self.ip = None
@abc.abstractmethod
def create_component(
self, label: str, parts: tp.List[str], **kwargs
) -> SimpleComponent:
pass
@abc.abstractmethod
async def await_file(self, path: str, delay=0.05, verbose=False) -> None:
pass
@abc.abstractmethod
async def send_file(self, path: str, verbose=False) -> None:
pass
@abc.abstractmethod
async def mkdir(self, path: str, verbose=False) -> None:
pass
@abc.abstractmethod
async def rmtree(self, path: str, verbose=False) -> None:
pass
# runs the list of commands as strings sequentially
async def run_cmdlist(
self, label: str, cmds: tp.List[str], verbose=True
) -> None:
i = 0
for cmd in cmds:
cmd_c = self.create_component(
label + '.' + str(i), shlex.split(cmd), verbose=verbose
)
await cmd_c.start()
await cmd_c.wait()
async def await_files(self, paths: tp.List[str], *args, **kwargs) -> None:
xs = []
for p in paths:
waiter = asyncio.create_task(self.await_file(p, *args, **kwargs))
xs.append(waiter)
await asyncio.gather(*xs)
class LocalExecutor(Executor):
def create_component(
self, label: str, parts: tp.List[str], **kwargs
) -> SimpleComponent:
return SimpleComponent(label, parts, **kwargs)
async def await_file(
self, path: str, delay=0.05, verbose=False, timeout=30
) -> None:
if verbose:
print(f'await_file({path})')
t = 0
while not os.path.exists(path):
if t >= timeout:
raise TimeoutError()
await asyncio.sleep(delay)
t += delay
async def send_file(self, path: str, verbose=False) -> None:
# locally we do not need to do anything
pass
async def mkdir(self, path: str, verbose=False) -> None:
pathlib.Path(path).mkdir(parents=True, exist_ok=True)
async def rmtree(self, path: str, verbose=False) -> None:
if os.path.isdir(path):
shutil.rmtree(path, ignore_errors=True)
elif os.path.exists(path):
os.unlink(path)
class RemoteExecutor(Executor):
def __init__(self, host_name: str, workdir: str) -> None:
super().__init__()
self.host_name = host_name
self.cwd = workdir
self.ssh_extra_args = []
self.scp_extra_args = []
def create_component(
self, label: str, parts: tp.List[str], **kwargs
) -> SimpleRemoteComponent:
return SimpleRemoteComponent(
self.host_name,
label,
parts,
cwd=self.cwd,
ssh_extra_args=self.ssh_extra_args,
**kwargs
)
async def await_file(
self, path: str, delay=0.05, verbose=False, timeout=30
) -> None:
if verbose:
print(f'{self.host_name}.await_file({path}) started')
to_its = timeout / delay
loop_cmd = (
f'i=0 ; while [ ! -e {path} ] ; do '
f'if [ $i -ge {to_its:u} ] ; then exit 1 ; fi ; '
f'sleep {delay} ; '
'i=$(($i+1)) ; done; exit 0'
) % (path, to_its, delay)
parts = ['/bin/sh', '-c', loop_cmd]
sc = self.create_component(
f"{self.host_name}.await_file('{path}')",
parts,
canfail=False,
verbose=verbose
)
await sc.start()
await sc.wait()
# TODO: Implement opitimized await_files()
async def send_file(self, path: str, verbose=False) -> None:
parts = [
'scp',
'-o',
'UserKnownHostsFile=/dev/null',
'-o',
'StrictHostKeyChecking=no'
] + self.scp_extra_args + [path, f'{self.host_name}:{path}']
sc = SimpleComponent(
f'{self.host_name}.send_file("{path}")',
parts,
canfail=False,
verbose=verbose
)
await sc.start()
await sc.wait()
async def mkdir(self, path: str, verbose=False) -> None:
sc = self.create_component(
f"{self.host_name}.mkdir('{path}')", ['mkdir', '-p', path],
canfail=False,
verbose=verbose
)
await sc.start()
await sc.wait()
async def rmtree(self, path: str, verbose=False) -> None:
sc = self.create_component(
f'{self.host_name}.rmtree("{path}")', ['rm', '-rf', path],
canfail=False,
verbose=verbose
)
await sc.start()
await sc.wait()
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import os
import typing as tp
if tp.TYPE_CHECKING: # prevent cyclic import
from simbricks.orchestration import simulators
class ExpEnv(object):
"""Manages the experiment environment."""
def __init__(self, repo_path: str, workdir: str, cpdir: str) -> None:
self.create_cp = False
"""Whether a checkpoint should be created."""
self.restore_cp = False
"""Whether to restore from a checkpoint."""
self.pcap_file = ''
self.repodir = os.path.abspath(repo_path)
self.workdir = os.path.abspath(workdir)
self.cpdir = os.path.abspath(cpdir)
self.shm_base = self.workdir
self.qemu_img_path = f'{self.repodir}/sims/external/qemu/build/qemu-img'
self.qemu_path = (
f'{self.repodir}/sims/external/qemu/build/'
'x86_64-softmmu/qemu-system-x86_64'
)
self.qemu_kernel_path = f'{self.repodir}/images/bzImage'
self.gem5_py_path = (
f'{self.repodir}/sims/external/gem5/configs/simbricks/simbricks.py'
)
self.gem5_kernel_path = f'{self.repodir}/images/vmlinux'
simics_project_base = f'{self.repodir}/sims/external/simics/project'
self.simics_path = f'{simics_project_base}/simics'
self.simics_gui_path = f'{simics_project_base}/simics-gui'
self.simics_qsp_modern_core_path = (
f'{simics_project_base}/targets/qsp-x86/qsp-modern-core.simics'
)
def gem5_path(self, variant: str) -> str:
return f'{self.repodir}/sims/external/gem5/build/X86/gem5.{variant}'
def hdcopy_path(self, sim: 'simulators.Simulator') -> str:
return f'{self.workdir}/hdcopy.{sim.name}'
@staticmethod
def is_absolute_exists(path: str) -> bool:
return os.path.isabs(path) and os.path.isfile(path)
def hd_path(self, hd_name_or_path: str) -> str:
if ExpEnv.is_absolute_exists(hd_name_or_path):
return hd_name_or_path
return (
f'{self.repodir}/images/output-{hd_name_or_path}/'
f'{hd_name_or_path}'
)
def hd_raw_path(self, hd_name_or_path: str) -> str:
if ExpEnv.is_absolute_exists(hd_name_or_path):
return f'{hd_name_or_path}.raw'
return (
f'{self.repodir}/images/output-{hd_name_or_path}/'
f'{hd_name_or_path}.raw'
)
def cfgtar_path(self, sim: 'simulators.Simulator') -> str:
return f'{self.workdir}/cfg.{sim.name}.tar'
def dev_pci_path(self, sim) -> str:
return f'{self.workdir}/dev.pci.{sim.name}'
def dev_mem_path(self, sim: 'simulators.Simulator') -> str:
return f'{self.workdir}/dev.mem.{sim.name}'
def nic_eth_path(self, sim: 'simulators.Simulator') -> str:
return f'{self.workdir}/nic.eth.{sim.name}'
def dev_shm_path(self, sim: 'simulators.Simulator') -> str:
return f'{self.shm_base}/dev.shm.{sim.name}'
def n2n_eth_path(
self,
sim_l: 'simulators.Simulator',
sim_c: 'simulators.Simulator',
suffix=''
) -> str:
return f'{self.workdir}/n2n.eth.{sim_l.name}.{sim_c.name}.{suffix}'
def net2host_eth_path(self, sim_n, sim_h) -> str:
return f'{self.workdir}/n2h.eth.{sim_n.name}.{sim_h.name}'
def net2host_shm_path(
self, sim_n: 'simulators.Simulator', sim_h: 'simulators.Simulator'
) -> str:
return f'{self.workdir}/n2h.shm.{sim_n.name}.{sim_h.name}'
def proxy_shm_path(self, sim: 'simulators.Simulator') -> str:
return f'{self.shm_base}/proxy.shm.{sim.name}'
def gem5_outdir(self, sim: 'simulators.Simulator') -> str:
return f'{self.workdir}/gem5-out.{sim.name}'
def gem5_cpdir(self, sim: 'simulators.Simulator') -> str:
return f'{self.cpdir}/gem5-cp.{sim.name}'
def simics_cpfile(self, sim: 'simulators.Simulator') -> str:
return f'{self.cpdir}/simics-cp.{sim.name}'
def ns3_e2e_params_file(self, sim: 'simulators.NS3E2ENet') -> str:
return f'{self.workdir}/ns3_e2e_params.{sim.name}'
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import os
import typing as tp
if tp.TYPE_CHECKING: # prevent cyclic import
# from simbricks.orchestration import simulators
from simbricks.splitsim import impl as simulators
class ExpEnv(object):
"""Manages the experiment environment."""
def __init__(self, repo_path: str, workdir: str, cpdir: str) -> None:
self.create_cp = False
"""Whether a checkpoint should be created."""
self.restore_cp = False
"""Whether to restore from a checkpoint."""
self.pcap_file = ''
self.repodir = os.path.abspath(repo_path)
self.workdir = os.path.abspath(workdir)
self.cpdir = os.path.abspath(cpdir)
self.shm_base = self.workdir
self.utilsdir = f'{self.repodir}/experiments/simbricks/utils'
"""Directory containing some utility scripts/binaries."""
self.qemu_img_path = f'{self.repodir}/sims/external/qemu/build/qemu-img'
self.qemu_path = (
f'{self.repodir}/sims/external/qemu/build/'
'x86_64-softmmu/qemu-system-x86_64'
)
self.qemu_kernel_path = f'{self.repodir}/images/bzImage'
self.gem5_py_path = (
f'{self.repodir}/sims/external/gem5/configs/simbricks/simbricks.py'
)
self.gem5_kernel_path = f'{self.repodir}/images/vmlinux'
simics_project_base = f'{self.repodir}/sims/external/simics/project'
self.simics_path = f'{simics_project_base}/simics'
self.simics_gui_path = f'{simics_project_base}/simics-gui'
self.simics_qsp_modern_core_path = (
f'{simics_project_base}/targets/qsp-x86/qsp-modern-core.simics'
)
def gem5_path(self, variant: str) -> str:
return f'{self.repodir}/sims/external/gem5/build/X86/gem5.{variant}'
def hdcopy_path(self, sim: 'simulators.Simulator') -> str:
return f'{self.workdir}/hdcopy.{sim.name}'
def hd_path(self, hd_name: str) -> str:
return f'{self.repodir}/images/output-{hd_name}/{hd_name}'
def hd_raw_path(self, hd_name: str) -> str:
return f'{self.repodir}/images/output-{hd_name}/{hd_name}.raw'
def cfgtar_path(self, sim: 'simulators.Simulator') -> str:
return f'{self.workdir}/cfg.{sim.name}.tar'
def dev_pci_path(self, sim) -> str:
return f'{self.workdir}/dev.pci.{sim.name}'
def dev_mem_path(self, sim: 'simulators.Simulator') -> str:
return f'{self.workdir}/dev.mem.{sim.name}'
def nic_eth_path(self, sim: 'simulators.Simulator') -> str:
return f'{self.workdir}/nic.eth.{sim.name}'
def dev_shm_path(self, sim: 'simulators.Simulator') -> str:
return f'{self.shm_base}/dev.shm.{sim.name}'
def n2n_eth_path(
self, sim_l: 'simulators.Simulator', sim_c: 'simulators.Simulator',
suffix=''
) -> str:
return f'{self.workdir}/n2n.eth.{sim_l.name}.{sim_c.name}.{suffix}'
def net2host_eth_path(self, sim_n, sim_h) -> str:
return f'{self.workdir}/n2h.eth.{sim_n.name}.{sim_h.name}'
def net2host_shm_path(
self, sim_n: 'simulators.Simulator', sim_h: 'simulators.Simulator'
) -> str:
return f'{self.workdir}/n2h.shm.{sim_n.name}.{sim_h.name}'
def proxy_shm_path(self, sim: 'simulators.Simulator') -> str:
return f'{self.shm_base}/proxy.shm.{sim.name}'
def gem5_outdir(self, sim: 'simulators.Simulator') -> str:
return f'{self.workdir}/gem5-out.{sim.name}'
def gem5_cpdir(self, sim: 'simulators.Simulator') -> str:
return f'{self.cpdir}/gem5-cp.{sim.name}'
def simics_cpfile(self, sim: 'simulators.Simulator') -> str:
return f'{self.cpdir}/simics-cp.{sim.name}'
def ns3_e2e_params_file(self, sim: 'simulators.NS3E2ENet') -> str:
return f'{self.workdir}/ns3_e2e_params.{sim.name}'
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import itertools
import typing as tp
from simbricks.orchestration import simulators
import simbricks.orchestration.simulation.base as sim_base
import simbricks.orchestration.simulation.net as sim_net
import simbricks.orchestration.simulation.host as sim_host
import simbricks.orchestration.simulation.channel as sim_channel
import simbricks.orchestration.simulation.pcidev as sim_pcidev
import simbricks.orchestration.system.base as system_base
import simbricks.orchestration.system.host.base as system_host_base
import simbricks.orchestration.system.eth as system_eth
import simbricks.orchestration.system.mem as system_mem
import simbricks.orchestration.system.nic as system_nic
import simbricks.orchestration.system.pcie as system_pcie
from simbricks.orchestration.proxy import NetProxyConnecter, NetProxyListener
from simbricks.orchestration.simulators import (
HostSim,
I40eMultiNIC,
NetSim,
NICSim,
PCIDevSim,
Simulator,
)
class Experiment(object):
"""
Base class for all simulation experiments.
Contains the simulators to be run and experiment-wide parameters.
"""
def __init__(self, name: str) -> None:
self.name = name
"""
This experiment's name.
Can be used to run only a selection of experiments.
"""
self.timeout: int | None = None
"""Timeout for experiment in seconds."""
self.checkpoint = False
"""
Whether to use checkpoint and restore for simulators.
The most common use-case for this is accelerating host simulator startup
by first running in a less accurate mode, then checkpointing the system
state after boot and running simulations from there.
"""
self.no_simbricks = False
"""If `true`, no simbricks adapters are used in any of the
simulators."""
self.hosts: list[HostSim] = []
"""The host simulators to run."""
self.pcidevs: list[PCIDevSim] = []
"""The PCIe device simulators to run."""
self.memdevs: list[simulators.MemDevSim] = []
"""The memory device simulators to run."""
self.netmems: list[simulators.NetMemSim] = []
"""The network memory simulators to run."""
self.networks: list[NetSim] = []
"""The network simulators to run."""
self.metadata: dict[str, tp.Any] = {}
self.sys_sim_map: dict[system.Component, simulation.Simulator] = {}
"""System component and its simulator pairs"""
self._chan_map: dict[system_base.Channel, sim_channel.Channel] = {}
"""Channel spec and its instanciation"""
def add_spec_sim_map(self, sys: system.component, sim: simulation.Simulator):
"""Add a mapping from specification to simulation instance"""
if sys in sys_sim_map:
raise Exception("system component is already mapped by simulator")
self.sys_sim_map[sys] = sim
def is_channel_instantiated(self, chan: system_base.Channel) -> bool:
return chan in self._chan_map
def retrieve_or_create_channel(
self, chan: system_base.Channel
) -> sim_channel.Channel:
if self.is_channel_instantiated(chan):
return self._chan_map[chan]
channel = sim_channel.Channel(self, chan)
self._chan_map[chan] = channel
return channel
@property
def nics(self):
return filter(lambda pcidev: pcidev.is_nic(), self.pcidevs)
def add_host(self, sim: HostSim) -> None:
"""Add a host simulator to the experiment."""
for h in self.hosts:
if h.name == sim.name:
raise ValueError("Duplicate host name")
self.hosts.append(sim)
def add_nic(self, sim: NICSim | I40eMultiNIC):
"""Add a NIC simulator to the experiment."""
self.add_pcidev(sim)
def add_pcidev(self, sim: PCIDevSim) -> None:
"""Add a PCIe device simulator to the experiment."""
for d in self.pcidevs:
if d.name == sim.name:
raise ValueError("Duplicate pcidev name")
self.pcidevs.append(sim)
def add_memdev(self, sim: simulators.MemDevSim):
for d in self.memdevs:
if d.name == sim.name:
raise ValueError("Duplicate memdev name")
self.memdevs.append(sim)
def add_netmem(self, sim: simulators.NetMemSim):
for d in self.netmems:
if d.name == sim.name:
raise ValueError("Duplicate netmems name")
self.netmems.append(sim)
def add_network(self, sim: NetSim) -> None:
"""Add a network simulator to the experiment."""
for n in self.networks:
if n.name == sim.name:
raise ValueError("Duplicate net name")
self.networks.append(sim)
def all_simulators(self) -> tp.Iterable[Simulator]:
"""Returns all simulators defined to run in this experiment."""
return itertools.chain(
self.hosts, self.pcidevs, self.memdevs, self.netmems, self.networks
)
def resreq_mem(self) -> int:
"""Memory required to run all simulators in this experiment."""
mem = 0
for s in self.all_simulators():
mem += s.resreq_mem()
return mem
def resreq_cores(self) -> int:
"""Number of Cores required to run all simulators in this experiment."""
cores = 0
for s in self.all_simulators():
cores += s.resreq_cores()
return cores
def find_sim(self, comp: system_base.Component) -> sim_base.Simulator:
"""Returns the used simulator object for the system component."""
for c, sim in self.sys_sim_map.items():
if c == comp:
return sim
raise Exception("Simulator Not Found")
class DistributedExperiment(Experiment):
"""Describes a distributed simulation experiment."""
def __init__(self, name: str, num_hosts: int) -> None:
super().__init__(name)
self.num_hosts = num_hosts
"""Number of hosts to use."""
self.host_mapping: dict[Simulator, int] = {}
"""Mapping from simulator to host ID."""
self.proxies_listen: list[NetProxyListener] = []
self.proxies_connect: list[NetProxyConnecter] = []
def add_proxy(self, proxy: NetProxyListener | NetProxyConnecter):
if proxy.listen:
self.proxies_listen.append(tp.cast(NetProxyListener, proxy))
else:
self.proxies_connect.append(tp.cast(NetProxyConnecter, proxy))
def all_simulators(self) -> tp.Iterable[Simulator]:
return itertools.chain(
super().all_simulators(), self.proxies_listen, self.proxies_connect
)
def assign_sim_host(self, sim: Simulator, host: int) -> None:
"""Assign host ID (< self.num_hosts) for a simulator."""
assert 0 <= host < self.num_hosts
self.host_mapping[sim] = host
def all_sims_assigned(self) -> bool:
"""Check if all simulators are assigned to a host."""
for s in self.all_simulators():
if s not in self.host_mapping:
return False
return True
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from __future__ import annotations
import io
import tarfile
import typing as tp
from simbricks.orchestration.experiment.experiment_environment import ExpEnv
class AppConfig():
"""Defines the application to run on a node or host."""
# pylint: disable=unused-argument
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
"""Commands to run for this application."""
return []
def prepare_pre_cp(self) -> tp.List[str]:
"""Commands to run to prepare this application before checkpointing."""
return []
def prepare_post_cp(self) -> tp.List[str]:
"""Commands to run to prepare this application after the checkpoint is
restored."""
return []
def config_files(self) -> tp.Dict[str, tp.IO]:
"""
Additional files to put inside the node, which are mounted under
`/tmp/guest/`.
Specified in the following format: `filename_inside_node`:
`IO_handle_of_file`
"""
return {}
def strfile(self, s: str) -> io.BytesIO:
"""
Helper function to convert a string to an IO handle for usage in
`config_files()`.
Using this, you can create a file with the string as its content on the
simulated node.
"""
return io.BytesIO(bytes(s, encoding='UTF-8'))
class NodeConfig():
"""Defines the configuration of a node or host."""
def __init__(self) -> None:
self.sim = 'qemu'
"""
The concrete simulator that runs this node config. This is used to use
execute different commands depending on the concrete simulator, e.g.,
which command to use to end the simulation.
TODO(Kaufi-Jonas): This is ugly. Refactor necessary commands to be
provided by the simulator's class directly.
"""
self.ip = '10.0.0.1'
"""IP address."""
self.prefix = 24
"""IP prefix."""
self.cores = 1
"""Number of CPU cores."""
self.threads = 1
"""Number of threads per CPU core."""
self.memory = 512
"""Amount of system memory in MB."""
self.disk_image = 'base'
"""Name of disk image to use or absolute path to image."""
self.mtu = 1500
"""Networking MTU."""
self.tcp_congestion_control = 'bic'
"""TCP Congestion Control algorithm to use."""
self.nockp = 0
"""
Do not create a checkpoint in Gem5.
TODO(Kaufi-Jonas): Seems we don't need this anymore since we specify
whether to take a checkpoint experiment-wide. Otherwise, refactor this
into simulator-specific class.
"""
self.app: tp.Optional[AppConfig] = None
"""Application to run on simulated host."""
self.kcmd_append = ''
"""String to be appended to kernel command line."""
def config_str(self) -> str:
if self.sim == 'gem5':
cp_es = [] if self.nockp else ['m5 checkpoint']
exit_es = ['m5 exit']
else:
cp_es = ['echo ready to checkpoint']
exit_es = ['poweroff -f']
es = self.prepare_pre_cp() + self.app.prepare_pre_cp() + cp_es + \
self.prepare_post_cp() + self.app.prepare_post_cp() + \
self.run_cmds() + self.cleanup_cmds() + exit_es
return '\n'.join(es)
def make_tar(self, env: ExpEnv, path: str) -> None:
with tarfile.open(path, 'w:') as tar:
# add main run script
cfg_i = tarfile.TarInfo('guest/run.sh')
cfg_i.mode = 0o777
cfg_f = self.strfile(self.config_str())
cfg_f.seek(0, io.SEEK_END)
cfg_i.size = cfg_f.tell()
cfg_f.seek(0, io.SEEK_SET)
tar.addfile(tarinfo=cfg_i, fileobj=cfg_f)
cfg_f.close()
# add additional config files
for (n, f) in self.config_files(env).items():
f_i = tarfile.TarInfo('guest/' + n)
f_i.mode = 0o777
f.seek(0, io.SEEK_END)
f_i.size = f.tell()
f.seek(0, io.SEEK_SET)
tar.addfile(tarinfo=f_i, fileobj=f)
f.close()
def prepare_pre_cp(self) -> tp.List[str]:
"""Commands to run to prepare node before checkpointing."""
return [
'set -x',
'export HOME=/root',
'export LANG=en_US',
'export PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:' + \
'/usr/bin:/sbin:/bin:/usr/games:/usr/local/games"'
]
def prepare_post_cp(self) -> tp.List[str]:
"""Commands to run to prepare node after checkpoint restore."""
return []
def run_cmds(self) -> tp.List[str]:
"""Commands to run on node."""
return self.app.run_cmds(self)
def cleanup_cmds(self) -> tp.List[str]:
"""Commands to run to cleanup node."""
return []
# pylint: disable=unused-argument
def config_files(self, env: ExpEnv) -> tp.Dict[str, tp.IO]:
"""
Additional files to put inside the node, which are mounted under
`/tmp/guest/`.
Specified in the following format: `filename_inside_node`:
`IO_handle_of_file`
"""
return self.app.config_files()
def strfile(self, s: str) -> io.BytesIO:
"""
Helper function to convert a string to an IO handle for usage in
`config_files()`.
Using this, you can create a file with the string as its content on the
simulated node.
"""
return io.BytesIO(bytes(s, encoding='UTF-8'))
class LinuxNode(NodeConfig):
def __init__(self) -> None:
super().__init__()
self.ifname = 'eth0'
self.drivers: tp.List[str] = []
self.force_mac_addr: tp.Optional[str] = None
def prepare_post_cp(self) -> tp.List[str]:
l = []
for d in self.drivers:
if d[0] == '/':
l.append('insmod ' + d)
else:
l.append('modprobe ' + d)
if self.force_mac_addr:
l.append(
'ip link set dev ' + self.ifname + ' address ' +
self.force_mac_addr
)
l.append('ip link set dev ' + self.ifname + ' up')
l.append(f'ip addr add {self.ip}/{self.prefix} dev {self.ifname}')
return super().prepare_post_cp() + l
class I40eLinuxNode(LinuxNode):
def __init__(self) -> None:
super().__init__()
self.drivers.append('i40e')
class CorundumLinuxNode(LinuxNode):
def __init__(self) -> None:
super().__init__()
self.drivers.append('/tmp/guest/mqnic.ko')
# pylint: disable=consider-using-with
def config_files(self, env: ExpEnv) -> tp.Dict[str, tp.IO]:
m = {'mqnic.ko': open(f'{env.repodir}/images/mqnic/mqnic.ko', 'rb')}
return {**m, **super().config_files(env)}
class E1000LinuxNode(LinuxNode):
def __init__(self) -> None:
super().__init__()
self.drivers.append('e1000')
class MtcpNode(NodeConfig):
def __init__(self) -> None:
super().__init__()
self.disk_image = 'mtcp'
self.pci_dev = '0000:00:02.0'
self.memory = 16 * 1024
self.num_hugepages = 4096
def prepare_pre_cp(self) -> tp.List[str]:
return super().prepare_pre_cp() + [
'mount -t proc proc /proc',
'mount -t sysfs sysfs /sys',
'mkdir -p /dev/hugepages',
'mount -t hugetlbfs nodev /dev/hugepages',
'mkdir -p /dev/shm',
'mount -t tmpfs tmpfs /dev/shm',
'echo ' + str(self.num_hugepages) + ' > /sys/devices/system/' + \
'node/node0/hugepages/hugepages-2048kB/nr_hugepages',
]
def prepare_post_cp(self) -> tp.List[str]:
return super().prepare_post_cp() + [
'insmod /root/mtcp/dpdk/x86_64-native-linuxapp-gcc/kmod/igb_uio.ko',
'/root/mtcp/dpdk/usertools/dpdk-devbind.py -b igb_uio ' +
self.pci_dev,
'insmod /root/mtcp/dpdk-iface-kmod/dpdk_iface.ko',
'/root/mtcp/dpdk-iface-kmod/dpdk_iface_main',
'ip link set dev dpdk0 up',
f'ip addr add {self.ip}/{self.prefix} dev dpdk0'
]
def config_files(self, env: ExpEnv) -> tp.Dict[str, tp.IO]:
m = {
'mtcp.conf':
self.strfile(
'io = dpdk\n'
'num_cores = ' + str(self.cores) + '\n'
'num_mem_ch = 4\n'
'port = dpdk0\n'
'max_concurrency = 4096\n'
'max_num_buffers = 4096\n'
'rcvbuf = 8192\n'
'sndbuf = 8192\n'
'tcp_timeout = 10\n'
'tcp_timewait = 0\n'
'#stat_print = dpdk0\n'
)
}
return {**m, **super().config_files(env)}
class TASNode(NodeConfig):
def __init__(self) -> None:
super().__init__()
self.disk_image = 'tas'
self.pci_dev = '0000:00:02.0'
self.memory = 16 * 1024
self.num_hugepages = 4096
self.fp_cores = 1
self.preload = True
def prepare_pre_cp(self) -> tp.List[str]:
return super().prepare_pre_cp() + [
'mount -t proc proc /proc',
'mount -t sysfs sysfs /sys',
'mkdir -p /dev/hugepages',
'mount -t hugetlbfs nodev /dev/hugepages',
'mkdir -p /dev/shm',
'mount -t tmpfs tmpfs /dev/shm',
'echo ' + str(self.num_hugepages) + ' > /sys/devices/system/' + \
'node/node0/hugepages/hugepages-2048kB/nr_hugepages',
]
def prepare_post_cp(self) -> tp.List[str]:
cmds = super().prepare_post_cp() + [
'insmod /root/dpdk/lib/modules/5.4.46/extra/dpdk/igb_uio.ko',
'/root/dpdk/sbin/dpdk-devbind -b igb_uio ' + self.pci_dev,
'cd /root/tas',
(
f'tas/tas --ip-addr={self.ip}/{self.prefix}'
f' --fp-cores-max={self.fp_cores} --fp-no-ints &'
),
'sleep 1'
]
if self.preload:
cmds += ['export LD_PRELOAD=/root/tas/lib/libtas_interpose.so']
return cmds
class I40eDCTCPNode(NodeConfig):
def prepare_pre_cp(self) -> tp.List[str]:
return super().prepare_pre_cp() + [
'mount -t proc proc /proc',
'mount -t sysfs sysfs /sys',
'sysctl -w net.core.rmem_default=31457280',
'sysctl -w net.core.rmem_max=31457280',
'sysctl -w net.core.wmem_default=31457280',
'sysctl -w net.core.wmem_max=31457280',
'sysctl -w net.core.optmem_max=25165824',
'sysctl -w net.ipv4.tcp_mem="786432 1048576 26777216"',
'sysctl -w net.ipv4.tcp_rmem="8192 87380 33554432"',
'sysctl -w net.ipv4.tcp_wmem="8192 87380 33554432"',
'sysctl -w net.ipv4.tcp_congestion_control=dctcp',
'sysctl -w net.ipv4.tcp_ecn=1'
]
def prepare_post_cp(self) -> tp.List[str]:
return super().prepare_post_cp() + [
'modprobe i40e',
'ethtool -G eth0 rx 4096 tx 4096',
'ethtool -K eth0 tso off',
'ip link set eth0 txqueuelen 13888',
f'ip link set dev eth0 mtu {self.mtu} up',
f'ip addr add {self.ip}/{self.prefix} dev eth0',
]
class I40eTCPCongNode(NodeConfig):
def prepare_pre_cp(self):
return super().prepare_pre_cp() + [
'mount -t proc proc /proc',
'mount -t sysfs sysfs /sys',
# 'sysctl -w net.core.rmem_default=31457280',
# 'sysctl -w net.core.rmem_max=31457280',
# 'sysctl -w net.core.wmem_default=31457280',
# 'sysctl -w net.core.wmem_max=31457280',
# 'sysctl -w net.core.optmem_max=25165824',
# 'sysctl -w net.ipv4.tcp_mem="786432 1048576 26777216"',
# 'sysctl -w net.ipv4.tcp_rmem="8192 87380 33554432"',
# 'sysctl -w net.ipv4.tcp_wmem="8192 87380 33554432"',
'sysctl -w net.ipv4.tcp_congestion_control=' +
f'{self.tcp_congestion_control}',
'sysctl -w net.ipv4.tcp_ecn=0'
]
def prepare_post_cp(self):
return super().prepare_post_cp() + [
'modprobe i40e',
'ethtool -G eth0 rx 4096 tx 4096',
'ethtool -K eth0 tso off', # 'ip link set eth0 txqueuelen 13888',
f'ip link set dev eth0 mtu {self.mtu} up',
f'ip addr add {self.ip}/{self.prefix} dev eth0',
]
class CorundumDCTCPNode(NodeConfig):
def prepare_pre_cp(self) -> tp.List[str]:
return super().prepare_pre_cp() + [
'mount -t proc proc /proc',
'mount -t sysfs sysfs /sys',
'sysctl -w net.core.rmem_default=31457280',
'sysctl -w net.core.rmem_max=31457280',
'sysctl -w net.core.wmem_default=31457280',
'sysctl -w net.core.wmem_max=31457280',
'sysctl -w net.core.optmem_max=25165824',
'sysctl -w net.ipv4.tcp_mem="786432 1048576 26777216"',
'sysctl -w net.ipv4.tcp_rmem="8192 87380 33554432"',
'sysctl -w net.ipv4.tcp_wmem="8192 87380 33554432"',
'sysctl -w net.ipv4.tcp_congestion_control=dctcp',
'sysctl -w net.ipv4.tcp_ecn=1'
]
def prepare_post_cp(self) -> tp.List[str]:
return super().prepare_post_cp() + [
'insmod mqnic.ko',
'ip link set dev eth0 up',
f'ip addr add {self.ip}/{self.prefix} dev eth0',
]
class LinuxFEMUNode(NodeConfig):
def __init__(self) -> None:
super().__init__()
self.drivers = ['nvme']
def prepare_post_cp(self) -> tp.List[str]:
l = ['lspci -vvvv']
for d in self.drivers:
if d[0] == '/':
l.append('insmod ' + d)
else:
l.append('modprobe ' + d)
return super().prepare_post_cp() + l
class IdleHost(AppConfig):
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
return ['sleep infinity']
class NVMEFsTest(AppConfig):
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
return [
'mount -t proc proc /proc',
'mkfs.ext3 /dev/nvme0n1',
'mount /dev/nvme0n1 /mnt',
'dd if=/dev/urandom of=/mnt/foo bs=1024 count=1024'
]
class DctcpServer(AppConfig):
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
return ['iperf -s -w 1M -Z dctcp']
class DctcpClient(AppConfig):
def __init__(self) -> None:
super().__init__()
self.server_ip = '192.168.64.1'
self.is_last = False
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
if self.is_last:
return [
'sleep 1',
f'iperf -w 1M -c {self.server_ip} -Z dctcp -i 1',
'sleep 2'
]
else:
return [
'sleep 1',
f'iperf -w 1M -c {self.server_ip} -Z dctcp -i 1',
'sleep 20'
]
class TcpCongServer(AppConfig):
def run_cmds(self, node):
return ['iperf -s -w 1M']
class TcpCongClient(AppConfig):
def __init__(self):
super().__init__()
self.server_ip = '192.168.64.1'
self.is_last = False
def run_cmds(self, node):
if self.is_last:
return [
'sleep 1',
f'iperf -w 1M -c {self.server_ip} -i 1',
'sleep 2',
]
else:
return [
'sleep 1',
f'iperf -w 1M -c {self.server_ip} -i 1',
'sleep 20',
]
class PingClient(AppConfig):
def __init__(self, server_ip: str = '192.168.64.1') -> None:
super().__init__()
self.server_ip = server_ip
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
return [f'ping {self.server_ip} -c 10']
class IperfTCPServer(AppConfig):
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
return ['iperf -s -l 32M -w 32M']
class IperfUDPServer(AppConfig):
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
return ['iperf -s -u']
class IperfTCPClient(AppConfig):
def __init__(self) -> None:
super().__init__()
self.server_ip = '10.0.0.1'
self.procs = 1
self.is_last = False
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
cmds = [
'sleep 1',
'iperf -l 32M -w 32M -c ' + self.server_ip + ' -i 1 -P ' +
str(self.procs)
]
if self.is_last:
cmds.append('sleep 0.5')
else:
cmds.append('sleep 10')
return cmds
class IperfUDPClient(AppConfig):
def __init__(self) -> None:
super().__init__()
self.server_ip = '10.0.0.1'
self.rate = '150m'
self.is_last = False
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
cmds = [
'sleep 1',
'iperf -c ' + self.server_ip + ' -i 1 -u -b ' + self.rate
]
if self.is_last:
cmds.append('sleep 0.5')
else:
cmds.append('sleep 10')
return cmds
class IperfUDPShortClient(AppConfig):
def __init__(self) -> None:
super().__init__()
self.server_ip = '10.0.0.1'
self.rate = '150m'
self.is_last = False
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
cmds = ['sleep 1', 'iperf -c ' + self.server_ip + ' -u -n 1 ']
return cmds
class IperfUDPClientSleep(AppConfig):
def __init__(self) -> None:
super().__init__()
self.server_ip = '10.0.0.1'
self.rate = '150m'
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
return ['sleep 1', 'sleep 10']
class NoTraffic(AppConfig):
def __init__(self) -> None:
super().__init__()
self.is_sleep = 1
self.is_server = 0
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
cmds = []
if self.is_server:
cmds.append('sleep infinity')
else:
if self.is_sleep:
cmds.append('sleep 10')
else:
cmds.append('dd if=/dev/urandom of=/dev/null count=500000')
return cmds
class NetperfServer(AppConfig):
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
return ['netserver', 'sleep infinity']
class NetperfClient(AppConfig):
def __init__(self) -> None:
super().__init__()
self.server_ip = '10.0.0.1'
self.duration_tp = 10
self.duration_lat = 10
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
return [
'netserver',
'sleep 0.5',
f'netperf -H {self.server_ip} -l {self.duration_tp}',
(
f'netperf -H {self.server_ip} -l {self.duration_lat} -t TCP_RR'
' -- -o mean_latency,p50_latency,p90_latency,p99_latency'
)
]
class VRReplica(AppConfig):
def __init__(self) -> None:
super().__init__()
self.index = 0
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
return [
'/root/nopaxos/bench/replica -c /root/nopaxos.config -i ' +
str(self.index) + ' -m vr'
]
class VRClient(AppConfig):
def __init__(self) -> None:
super().__init__()
self.server_ips: tp.List[str] = []
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
cmds = []
for ip in self.server_ips:
cmds.append('ping -c 2 ' + ip)
cmds.append(
'/root/nopaxos/bench/client -c /root/nopaxos.config ' +
'-m vr -u 2 -h ' + node.ip
)
return cmds
class NOPaxosReplica(AppConfig):
def __init__(self) -> None:
super().__init__()
self.index = 0
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
return [
'/root/nopaxos/bench/replica -c /root/nopaxos.config -i ' +
str(self.index) + ' -m nopaxos'
]
class NOPaxosClient(AppConfig):
def __init__(self) -> None:
super().__init__()
self.server_ips: tp.List[str] = []
self.is_last = False
self.use_ehseq = False
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
cmds = []
for ip in self.server_ips:
cmds.append('ping -c 2 ' + ip)
cmd = '/root/nopaxos/bench/client -c /root/nopaxos.config ' + \
'-m nopaxos -u 2 -h ' + node.ip
if self.use_ehseq:
cmd += ' -e'
cmds.append(cmd)
if self.is_last:
cmds.append('sleep 1')
else:
cmds.append('sleep infinity')
return cmds
class NOPaxosSequencer(AppConfig):
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
return [(
'/root/nopaxos/sequencer/sequencer -c /root/nopaxos.config'
' -m nopaxos'
)]
class RPCServer(AppConfig):
def __init__(self) -> None:
super().__init__()
self.port = 1234
self.threads = 1
self.max_flows = 1234
self.max_bytes = 1024
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
exe = 'echoserver_linux' if not isinstance(node, MtcpNode) else \
'echoserver_mtcp'
return [
'cd /root/tasbench/micro_rpc',
(
f'./{exe} {self.port} {self.threads} /tmp/guest/mtcp.conf'
f' {self.max_flows} {self.max_bytes}'
)
]
class RPCClient(AppConfig):
def __init__(self) -> None:
super().__init__()
self.server_ip = '10.0.0.1'
self.port = 1234
self.threads = 1
self.max_flows = 128
self.max_bytes = 1024
self.max_pending = 1
self.openall_delay = 2
self.max_msgs_conn = 0
self.max_pend_conns = 8
self.time = 25
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
exe = 'testclient_linux' if not isinstance(node, MtcpNode) else \
'testclient_mtcp'
return [
'cd /root/tasbench/micro_rpc',
(
f'./{exe} {self.server_ip} {self.port} {self.threads}'
f' /tmp/guest/mtcp.conf {self.max_bytes} {self.max_pending}'
f' {self.max_flows} {self.openall_delay} {self.max_msgs_conn}'
f' {self.max_pend_conns} &'
),
f'sleep {self.time}'
]
################################################################################
class HTTPD(AppConfig):
def __init__(self) -> None:
super().__init__()
self.threads = 1
self.file_size = 64
self.mtcp_config = 'lighttpd.conf'
self.httpd_dir = '' # TODO added because doesn't originally exist
def prepare_pre_cp(self) -> tp.List[str]:
return [
'mkdir -p /srv/www/htdocs/ /tmp/lighttpd/',
(
f'dd if=/dev/zero of=/srv/www/htdocs/file bs={self.file_size}'
' count=1'
)
]
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
return [
f'cd {self.httpd_dir}/src/',
(
f'./lighttpd -D -f ../doc/config/{self.mtcp_config}'
f' -n {self.threads} -m ./.libs/'
)
]
class HTTPDLinux(HTTPD):
def __init__(self) -> None:
super().__init__()
self.httpd_dir = '/root/mtcp/apps/lighttpd-mtlinux'
class HTTPDLinuxRPO(HTTPD):
def __init__(self) -> None:
super().__init__()
self.httpd_dir = '/root/mtcp/apps/lighttpd-mtlinux-rop'
class HTTPDMtcp(HTTPD):
def __init__(self) -> None:
super().__init__()
self.httpd_dir = '/root/mtcp/apps/lighttpd-mtcp'
self.mtcp_config = 'm-lighttpd.conf'
def prepare_pre_cp(self) -> tp.List[str]:
return super().prepare_pre_cp() + [
f'cp /tmp/guest/mtcp.conf {self.httpd_dir}/src/mtcp.conf',
(
'sed -i "s:^server.document-root =.*:server.document-root = '
'server_root + \\"/htdocs\\":" '
f'{self.httpd_dir}/doc/config/{self.mtcp_config}'
)
]
class HTTPC(AppConfig):
def __init__(self) -> None:
super().__init__()
self.server_ip = '10.0.0.1'
self.conns = 1000
#self.requests = 10000000
self.requests = 10000
self.threads = 1
self.url = '/file'
self.ab_dir = '' # TODO added because doesn't originally exist
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
return [
f'cd {self.ab_dir}/support/',
(
f'./ab -N {self.threads} -c {self.conns} -n {self.requests}'
f' {self.server_ip}{self.url}'
)
]
class HTTPCLinux(HTTPC):
def __init__(self) -> None:
super().__init__()
self.ab_dir = '/root/mtcp/apps/ab-linux'
class HTTPCMtcp(HTTPC):
def __init__(self) -> None:
super().__init__()
self.ab_dir = '/root/mtcp/apps/ab-mtcp'
def prepare_pre_cp(self) -> tp.List[str]:
return super().prepare_pre_cp() + [
f'cp /tmp/guest/mtcp.conf {self.ab_dir}/support/config/mtcp.conf',
f'rm -f {self.ab_dir}/support/config/arp.conf'
]
class MemcachedServer(AppConfig):
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
return ['memcached -u root -t 1 -c 4096']
class MemcachedClient(AppConfig):
def __init__(self) -> None:
super().__init__()
self.server_ips = ['10.0.0.1']
self.threads = 1
self.concurrency = 1
self.throughput = '1k'
def run_cmds(self, node: NodeConfig) -> tp.List[str]:
servers = [ip + ':11211' for ip in self.server_ips]
servers = ','.join(servers)
return [(
f'memaslap --binary --time 10s --server={servers}'
f' --thread={self.threads} --concurrency={self.concurrency}'
f' --tps={self.throughput} --verbose'
)]
# Copyright 2022 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import asyncio
import itertools
import shlex
import traceback
import typing as tp
from abc import ABC, abstractmethod
from simbricks.orchestration.exectools import (
Component, Executor, SimpleComponent
)
from simbricks.orchestration.experiment.experiment_environment import ExpEnv
from simbricks.orchestration.experiment.experiment_output import ExpOutput
from simbricks.orchestration.experiments import (
DistributedExperiment, Experiment
)
from simbricks.orchestration.simulators import Simulator
from simbricks.orchestration.utils import graphlib
class ExperimentBaseRunner(ABC):
def __init__(self, exp: Experiment, env: ExpEnv, verbose: bool) -> None:
self.exp = exp
self.env = env
self.verbose = verbose
self.profile_int: tp.Optional[int] = None
self.out = ExpOutput(exp)
self.running: tp.List[tp.Tuple[Simulator, SimpleComponent]] = []
self.sockets: tp.List[tp.Tuple[Executor, str]] = []
self.wait_sims: tp.List[Component] = []
@abstractmethod
def sim_executor(self, sim: Simulator) -> Executor:
pass
def sim_graph(self) -> tp.Dict[Simulator, tp.Set[Simulator]]:
sims = self.exp.all_simulators()
graph = {}
for sim in sims:
deps = sim.dependencies() + sim.extra_deps
print(f'deps of {sim}: {sim.dependencies()}')
graph[sim] = set()
for d in deps:
graph[sim].add(d)
return graph
async def start_sim(self, sim: Simulator) -> None:
"""Start a simulator and wait for it to be ready."""
name = sim.full_name()
if self.verbose:
print(f'{self.exp.name}: starting {name}')
run_cmd = sim.run_cmd(self.env)
if run_cmd is None:
if self.verbose:
print(f'{self.exp.name}: started dummy {name}')
return
# run simulator
executor = self.sim_executor(sim)
sc = executor.create_component(
name, shlex.split(run_cmd), verbose=self.verbose, canfail=True
)
await sc.start()
self.running.append((sim, sc))
# add sockets for cleanup
for s in sim.sockets_cleanup(self.env):
self.sockets.append((executor, s))
# Wait till sockets exist
wait_socks = sim.sockets_wait(self.env)
if wait_socks:
if self.verbose:
print(f'{self.exp.name}: waiting for sockets {name}')
await executor.await_files(wait_socks, verbose=self.verbose)
# add time delay if required
delay = sim.start_delay()
if delay > 0:
await asyncio.sleep(delay)
if sim.wait_terminate(self.env):
self.wait_sims.append(sc)
if self.verbose:
print(f'{self.exp.name}: started {name}')
async def before_wait(self) -> None:
pass
async def before_cleanup(self) -> None:
pass
async def after_cleanup(self) -> None:
pass
async def prepare(self) -> None:
# generate config tars
copies = []
for host in self.exp.hosts:
path = self.env.cfgtar_path(host)
if self.verbose:
print('preparing config tar:', path)
host.node_config.make_tar(self.env, path)
executor = self.sim_executor(host)
task = asyncio.create_task(executor.send_file(path, self.verbose))
copies.append(task)
await asyncio.gather(*copies)
# prepare all simulators in parallel
sims = []
for sim in self.exp.all_simulators():
prep_cmds = list(sim.prep_cmds(self.env))
executor = self.sim_executor(sim)
task = asyncio.create_task(
executor.run_cmdlist(
'prepare_' + self.exp.name, prep_cmds, verbose=self.verbose
)
)
sims.append(task)
await asyncio.gather(*sims)
async def wait_for_sims(self) -> None:
"""Wait for simulators to terminate (the ones marked to wait on)."""
if self.verbose:
print(f'{self.exp.name}: waiting for hosts to terminate')
for sc in self.wait_sims:
await sc.wait()
async def terminate_collect_sims(self) -> ExpOutput:
"""Terminates all simulators and collects output."""
self.out.set_end()
if self.verbose:
print(f'{self.exp.name}: cleaning up')
await self.before_cleanup()
# "interrupt, terminate, kill" all processes
scs = []
for _, sc in self.running:
scs.append(asyncio.create_task(sc.int_term_kill()))
await asyncio.gather(*scs)
# wait for all processes to terminate
for _, sc in self.running:
await sc.wait()
# remove all sockets
scs = []
for (executor, sock) in self.sockets:
scs.append(asyncio.create_task(executor.rmtree(sock)))
if scs:
await asyncio.gather(*scs)
# add all simulator components to the output
for sim, sc in self.running:
self.out.add_sim(sim, sc)
await self.after_cleanup()
return self.out
async def profiler(self):
assert self.profile_int
while True:
await asyncio.sleep(self.profile_int)
for (_, sc) in self.running:
await sc.sigusr1()
async def run(self) -> ExpOutput:
profiler_task = None
try:
self.out.set_start()
graph = self.sim_graph()
print(graph)
ts = graphlib.TopologicalSorter(graph)
ts.prepare()
while ts.is_active():
# start ready simulators in parallel
starting = []
sims = []
for sim in ts.get_ready():
starting.append(asyncio.create_task(self.start_sim(sim)))
sims.append(sim)
# wait for starts to complete
await asyncio.gather(*starting)
for sim in sims:
ts.done(sim)
if self.profile_int:
profiler_task = asyncio.create_task(self.profiler())
await self.before_wait()
await self.wait_for_sims()
except asyncio.CancelledError:
if self.verbose:
print(f'{self.exp.name}: interrupted')
self.out.set_interrupted()
except: # pylint: disable=bare-except
self.out.set_failed()
traceback.print_exc()
if profiler_task:
try:
profiler_task.cancel()
except asyncio.CancelledError:
pass
# The bare except above guarantees that we always execute the following
# code, which terminates all simulators and produces a proper output
# file.
terminate_collect_task = asyncio.create_task(
self.terminate_collect_sims()
)
# prevent terminate_collect_task from being cancelled
while True:
try:
return await asyncio.shield(terminate_collect_task)
except asyncio.CancelledError as e:
print(e)
pass
class ExperimentSimpleRunner(ExperimentBaseRunner):
"""Simple experiment runner with just one executor."""
def __init__(self, executor: Executor, *args, **kwargs) -> None:
self.executor = executor
super().__init__(*args, **kwargs)
def sim_executor(self, sim: Simulator) -> Executor:
return self.executor
class ExperimentDistributedRunner(ExperimentBaseRunner):
"""Simple experiment runner with just one executor."""
def __init__(
self, execs, exp: DistributedExperiment, *args, **kwargs
) -> None:
self.execs = execs
super().__init__(exp, *args, **kwargs)
self.exp = exp # overrides the type in the base class
assert self.exp.num_hosts <= len(execs)
def sim_executor(self, sim) -> Executor:
h_id = self.exp.host_mapping[sim]
return self.execs[h_id]
async def prepare(self) -> None:
# make sure all simulators are assigned to an executor
assert self.exp.all_sims_assigned()
# set IP addresses for proxies based on assigned executors
for p in itertools.chain(
self.exp.proxies_listen, self.exp.proxies_connect
):
executor = self.sim_executor(p)
p.ip = executor.ip
await super().prepare()
# Copyright 2021 Max Planck Institute for Software Systems, and
# National University of Singapore
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# Allow own class to be used as type for a method's argument
from __future__ import annotations
import pathlib
import shutil
import typing as tp
from abc import ABCMeta, abstractmethod
from simbricks.orchestration.exectools import LocalExecutor
from simbricks.orchestration.experiment.experiment_environment import ExpEnv
from simbricks.orchestration.experiment.experiment_output import ExpOutput
from simbricks.orchestration.experiments import Experiment
class Run(object):
"""Defines a single execution run for an experiment."""
def __init__(
self,
experiment: Experiment,
index: int,
env: ExpEnv,
outpath: str,
prereq: tp.Optional[Run] = None
):
self.experiment = experiment
self.index = index
self.env = env
self.outpath = outpath
self.output: tp.Optional[ExpOutput] = None
self.prereq = prereq
self.job_id: tp.Optional[int] = None
"""Slurm job id."""
def name(self) -> str:
return self.experiment.name + '.' + str(self.index)
async def prep_dirs(self, executor=LocalExecutor()) -> None:
shutil.rmtree(self.env.workdir, ignore_errors=True)
await executor.rmtree(self.env.workdir)
shutil.rmtree(self.env.shm_base, ignore_errors=True)
await executor.rmtree(self.env.shm_base)
if self.env.create_cp:
shutil.rmtree(self.env.cpdir, ignore_errors=True)
await executor.rmtree(self.env.cpdir)
pathlib.Path(self.env.workdir).mkdir(parents=True, exist_ok=True)
await executor.mkdir(self.env.workdir)
pathlib.Path(self.env.cpdir).mkdir(parents=True, exist_ok=True)
await executor.mkdir(self.env.cpdir)
pathlib.Path(self.env.shm_base).mkdir(parents=True, exist_ok=True)
await executor.mkdir(self.env.shm_base)
class Runtime(metaclass=ABCMeta):
"""Base class for managing the execution of multiple runs."""
def __init__(self) -> None:
self._interrupted = False
"""Indicates whether interrupt has been signaled."""
self.profile_int: tp.Optional[int] = None
@abstractmethod
def add_run(self, run: Run) -> None:
pass
@abstractmethod
async def start(self) -> None:
pass
@abstractmethod
def interrupt_handler(self) -> None:
"""
Interrupts signal handler.
All currently running simulators should be stopped cleanly and their
output collected.
"""
pass
def interrupt(self) -> None:
"""Signals interrupt to runtime."""
# don't invoke interrupt handler multiple times as this would trigger
# repeated CancelledError
if not self._interrupted:
self._interrupted = True
self.interrupt_handler()
def enable_profiler(self, profile_int: int) -> None:
self.profile_int = profile_int
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment