Unverified Commit 381421b7 authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[TensorpipeDeprecation] remove long live server from launch (#5920)

parent b51fb6f6
...@@ -100,7 +100,6 @@ class TestConstructDglServerEnvVars(unittest.TestCase): ...@@ -100,7 +100,6 @@ class TestConstructDglServerEnvVars(unittest.TestCase):
ip_config="path/to/ip.config", ip_config="path/to/ip.config",
num_servers=5, num_servers=5,
graph_format="csc", graph_format="csc",
keep_alive=False,
), ),
( (
"DGL_ROLE=server " "DGL_ROLE=server "
...@@ -111,7 +110,6 @@ class TestConstructDglServerEnvVars(unittest.TestCase): ...@@ -111,7 +110,6 @@ class TestConstructDglServerEnvVars(unittest.TestCase):
"DGL_IP_CONFIG=path/to/ip.config " "DGL_IP_CONFIG=path/to/ip.config "
"DGL_NUM_SERVER=5 " "DGL_NUM_SERVER=5 "
"DGL_GRAPH_FORMAT=csc " "DGL_GRAPH_FORMAT=csc "
"DGL_KEEP_ALIVE=0 "
), ),
) )
...@@ -195,8 +193,6 @@ def test_submit_jobs(): ...@@ -195,8 +193,6 @@ def test_submit_jobs():
args.workspace = test_dir args.workspace = test_dir
args.part_config = "ogb-products.json" args.part_config = "ogb-products.json"
args.ip_config = "ip_config.txt" args.ip_config = "ip_config.txt"
args.server_name = "ogb-products"
args.keep_alive = False
args.num_server_threads = 1 args.num_server_threads = 1
args.graph_format = "csc" args.graph_format = "csc"
args.extra_envs = ["NCCL_DEBUG=INFO"] args.extra_envs = ["NCCL_DEBUG=INFO"]
...@@ -233,7 +229,6 @@ def test_submit_jobs(): ...@@ -233,7 +229,6 @@ def test_submit_jobs():
for cmd in servers_cmd: for cmd in servers_cmd:
common_checks() common_checks()
assert "DGL_ROLE=server" in cmd assert "DGL_ROLE=server" in cmd
assert "DGL_KEEP_ALIVE=0" in cmd
assert "DGL_SERVER_ID=" in cmd assert "DGL_SERVER_ID=" in cmd
......
...@@ -7,7 +7,6 @@ import os ...@@ -7,7 +7,6 @@ import os
import queue import queue
import re import re
import signal import signal
import stat
import subprocess import subprocess
import sys import sys
import time import time
...@@ -329,7 +328,6 @@ def construct_dgl_server_env_vars( ...@@ -329,7 +328,6 @@ def construct_dgl_server_env_vars(
ip_config: str, ip_config: str,
num_servers: int, num_servers: int,
graph_format: str, graph_format: str,
keep_alive: bool,
pythonpath: Optional[str] = "", pythonpath: Optional[str] = "",
) -> str: ) -> str:
"""Constructs the DGL server-specific env vars string that are required for DGL code to behave in the correct """Constructs the DGL server-specific env vars string that are required for DGL code to behave in the correct
...@@ -346,8 +344,6 @@ def construct_dgl_server_env_vars( ...@@ -346,8 +344,6 @@ def construct_dgl_server_env_vars(
Relative path to workspace. Relative path to workspace.
num_servers: num_servers:
graph_format: graph_format:
keep_alive:
Whether to keep server alive when clients exit
pythonpath: Optional. If given, this will pass this as PYTHONPATH. pythonpath: Optional. If given, this will pass this as PYTHONPATH.
Returns: Returns:
...@@ -363,7 +359,6 @@ def construct_dgl_server_env_vars( ...@@ -363,7 +359,6 @@ def construct_dgl_server_env_vars(
"DGL_IP_CONFIG={DGL_IP_CONFIG} " "DGL_IP_CONFIG={DGL_IP_CONFIG} "
"DGL_NUM_SERVER={DGL_NUM_SERVER} " "DGL_NUM_SERVER={DGL_NUM_SERVER} "
"DGL_GRAPH_FORMAT={DGL_GRAPH_FORMAT} " "DGL_GRAPH_FORMAT={DGL_GRAPH_FORMAT} "
"DGL_KEEP_ALIVE={DGL_KEEP_ALIVE} "
"{suffix_optional_envvars}" "{suffix_optional_envvars}"
) )
suffix_optional_envvars = "" suffix_optional_envvars = ""
...@@ -378,7 +373,6 @@ def construct_dgl_server_env_vars( ...@@ -378,7 +373,6 @@ def construct_dgl_server_env_vars(
DGL_IP_CONFIG=ip_config, DGL_IP_CONFIG=ip_config,
DGL_NUM_SERVER=num_servers, DGL_NUM_SERVER=num_servers,
DGL_GRAPH_FORMAT=graph_format, DGL_GRAPH_FORMAT=graph_format,
DGL_KEEP_ALIVE=int(keep_alive),
suffix_optional_envvars=suffix_optional_envvars, suffix_optional_envvars=suffix_optional_envvars,
) )
...@@ -494,79 +488,6 @@ def wrap_cmd_with_extra_envvars(cmd: str, env_vars: list) -> str: ...@@ -494,79 +488,6 @@ def wrap_cmd_with_extra_envvars(cmd: str, env_vars: list) -> str:
return wrap_cmd_with_local_envvars(cmd, env_vars) return wrap_cmd_with_local_envvars(cmd, env_vars)
g_monitor_file = None
g_group_id = 0
def has_alive_servers(args):
"""Check whether there exists alive servers.
For each group of long live servers, a monitor file named
'dgl_dist_monitor_{args.server_name}' is created under '/tmp/' directory.
We check the existence of this monitor file to determine whether to
launch new servers or utilize the existing alive ones. If there
exist alive servers, we obtain availale group ID from the monitor
file which could be used in current client groups.
Returns
-------
bool
indicates whether there exists alive servers.
"""
if args.server_name is None:
return False
global g_monitor_file
global g_group_id
monitor_file = "/tmp/dgl_dist_monitor_" + args.server_name
from filelock import FileLock
lock = FileLock(monitor_file + ".lock")
with lock:
next_group_id = None
ret = os.path.exists(monitor_file)
if ret:
print(
"Monitor file for alive servers already exist: {}.".format(
monitor_file
)
)
lines = [line.rstrip("\n") for line in open(monitor_file)]
g_group_id = int(lines[0])
next_group_id = g_group_id + 1
if not ret and args.keep_alive:
next_group_id = 1
print(
"Monitor file for alive servers is created: {}.".format(
monitor_file
)
)
g_monitor_file = monitor_file
if next_group_id is not None:
with open(monitor_file, "w") as f:
f.write(str(next_group_id))
return ret
def clean_alive_servers():
"""Remove keep alive related files"""
global g_monitor_file
try:
if g_monitor_file is not None:
os.remove(g_monitor_file)
os.remove(g_monitor_file + ".lock")
print(
"Monitor file for alive servers is removed: {}.".format(
g_monitor_file
)
)
except:
print(
"Failed to delete monitor file for alive servers: {}.".format(
g_monitor_file
)
)
def get_available_port(ip): def get_available_port(ip):
"""Get available port with specified ip.""" """Get available port with specified ip."""
import socket import socket
...@@ -621,41 +542,37 @@ def submit_jobs(args, udf_command, dry_run=False): ...@@ -621,41 +542,37 @@ def submit_jobs(args, udf_command, dry_run=False):
state_q = queue.Queue() state_q = queue.Queue()
tot_num_clients = args.num_trainers * (1 + args.num_samplers) * len(hosts) tot_num_clients = args.num_trainers * (1 + args.num_samplers) * len(hosts)
# launch server tasks # launch server tasks
if not has_alive_servers(args): server_env_vars = construct_dgl_server_env_vars(
server_env_vars = construct_dgl_server_env_vars( num_samplers=args.num_samplers,
num_samplers=args.num_samplers, num_server_threads=args.num_server_threads,
num_server_threads=args.num_server_threads, tot_num_clients=tot_num_clients,
tot_num_clients=tot_num_clients, part_config=args.part_config,
part_config=args.part_config, ip_config=args.ip_config,
ip_config=args.ip_config, num_servers=args.num_servers,
num_servers=args.num_servers, graph_format=args.graph_format,
graph_format=args.graph_format, pythonpath=os.environ.get("PYTHONPATH", ""),
keep_alive=args.keep_alive, )
pythonpath=os.environ.get("PYTHONPATH", ""), for i in range(len(hosts) * server_count_per_machine):
ip, _ = hosts[int(i / server_count_per_machine)]
server_env_vars_cur = f"{server_env_vars} DGL_SERVER_ID={i}"
cmd = wrap_cmd_with_local_envvars(udf_command, server_env_vars_cur)
cmd = (
wrap_cmd_with_extra_envvars(cmd, args.extra_envs)
if len(args.extra_envs) > 0
else cmd
) )
for i in range(len(hosts) * server_count_per_machine): cmd = "cd " + str(args.workspace) + "; " + cmd
ip, _ = hosts[int(i / server_count_per_machine)] servers_cmd.append(cmd)
server_env_vars_cur = f"{server_env_vars} DGL_SERVER_ID={i}" if not dry_run:
cmd = wrap_cmd_with_local_envvars(udf_command, server_env_vars_cur) thread_list.append(
cmd = ( execute_remote(
wrap_cmd_with_extra_envvars(cmd, args.extra_envs) cmd,
if len(args.extra_envs) > 0 state_q,
else cmd ip,
) args.ssh_port,
cmd = "cd " + str(args.workspace) + "; " + cmd username=args.ssh_username,
servers_cmd.append(cmd)
if not dry_run:
thread_list.append(
execute_remote(
cmd,
state_q,
ip,
args.ssh_port,
username=args.ssh_username,
)
) )
else: )
print(f"Use running server {args.server_name}.")
# launch client tasks # launch client tasks
client_env_vars = construct_dgl_client_env_vars( client_env_vars = construct_dgl_client_env_vars(
...@@ -668,7 +585,7 @@ def submit_jobs(args, udf_command, dry_run=False): ...@@ -668,7 +585,7 @@ def submit_jobs(args, udf_command, dry_run=False):
num_omp_threads=os.environ.get( num_omp_threads=os.environ.get(
"OMP_NUM_THREADS", str(args.num_omp_threads) "OMP_NUM_THREADS", str(args.num_omp_threads)
), ),
group_id=g_group_id, group_id=0,
pythonpath=os.environ.get("PYTHONPATH", ""), pythonpath=os.environ.get("PYTHONPATH", ""),
) )
...@@ -716,7 +633,6 @@ def submit_jobs(args, udf_command, dry_run=False): ...@@ -716,7 +633,6 @@ def submit_jobs(args, udf_command, dry_run=False):
logging.info("Stop launcher") logging.info("Stop launcher")
# We need to tell the cleanup process to kill remote training jobs. # We need to tell the cleanup process to kill remote training jobs.
conn2.send("cleanup") conn2.send("cleanup")
clean_alive_servers()
sys.exit(0) sys.exit(0)
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
...@@ -811,22 +727,7 @@ def main(): ...@@ -811,22 +727,7 @@ def main():
you can set the LD_LIBRARY_PATH and NCCL_DEBUG by adding: \ you can set the LD_LIBRARY_PATH and NCCL_DEBUG by adding: \
--extra_envs LD_LIBRARY_PATH=/usr/local/cuda/lib64:$LD_LIBRARY_PATH NCCL_DEBUG=INFO ", --extra_envs LD_LIBRARY_PATH=/usr/local/cuda/lib64:$LD_LIBRARY_PATH NCCL_DEBUG=INFO ",
) )
parser.add_argument(
"--keep_alive",
action="store_true",
help="Servers keep alive when clients exit",
)
parser.add_argument(
"--server_name",
type=str,
help="Used to check whether there exist alive servers",
)
args, udf_command = parser.parse_known_args() args, udf_command = parser.parse_known_args()
if args.keep_alive:
assert (
args.server_name is not None
), "Server name is required if '--keep_alive' is enabled."
print("Servers will keep alive even clients exit...")
assert len(udf_command) == 1, "Please provide user command line." assert len(udf_command) == 1, "Please provide user command line."
assert ( assert (
args.num_trainers is not None and args.num_trainers > 0 args.num_trainers is not None and args.num_trainers > 0
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment