"src/diffusers/models/controlnets/multicontrolnet_union.py" did not exist on "e564abe292750b7d2eef07f2b49ea2056df391ab"
Unverified Commit e4cb4a37 authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[Fix] reduce error msg, refine fetch logic of available ports (#3658)

* [Fix] reduce error msg, refine fetch logic of available ports

* un-initialize client before sending shutdown request

* fix import error

* print connect failure log only in debug mode

* enable DMLC_LOG_DEBUG=1 in CI
parent 9ec9df57
......@@ -269,8 +269,6 @@ def finalize_client():
if os.environ.get('DGL_DIST_MODE', 'standalone') != 'standalone':
rpc.finalize_sender()
rpc.finalize_receiver()
global INITIALIZED
INITIALIZED = False
def _exit():
......
......@@ -163,7 +163,7 @@ def connect_to_server(ip_config, num_servers, max_queue_size=MAX_QUEUE_SIZE, net
server_ip = addr[1]
server_port = addr[2]
while not rpc.connect_receiver(server_ip, server_port, server_id):
time.sleep(1)
time.sleep(3)
# Get local usable IP address and port
ip_addr = get_local_usable_addr(server_ip)
client_ip, client_port = ip_addr.split(':')
......@@ -185,7 +185,7 @@ def connect_to_server(ip_config, num_servers, max_queue_size=MAX_QUEUE_SIZE, net
rpc.set_num_client(res.num_client)
from .dist_context import exit_client, set_initialized
atexit.register(exit_client)
set_initialized()
set_initialized(True)
def shutdown_servers():
"""Issue commands to remote servers to shut them down.
......@@ -194,6 +194,8 @@ def shutdown_servers():
------
ConnectionError : If anything wrong with the connection.
"""
from .dist_context import set_initialized
set_initialized(False)
if rpc.get_rank() == 0: # Only client_0 issue this command
req = rpc.ShutDownRequest(rpc.get_rank())
for server_id in range(rpc.get_num_server()):
......
"""Functions used by server."""
import time
from . import rpc
from .constants import MAX_QUEUE_SIZE
......@@ -78,7 +79,9 @@ def start_server(server_id, ip_config, num_servers, num_clients, server_state, \
client_namebook[client_id] = addr
for client_id, addr in client_namebook.items():
client_ip, client_port = addr.split(':')
assert rpc.connect_receiver(client_ip, client_port, client_id)
# TODO[Rhett]: server should not be blocked endlessly.
while not rpc.connect_receiver(client_ip, client_port, client_id):
time.sleep(1)
if rpc.get_rank() == 0: # server_0 send all the IDs
for client_id, _ in client_namebook.items():
register_res = rpc.ClientRegisterResponse(client_id)
......
......@@ -31,15 +31,10 @@ bool TPSender::ConnectReceiver(const std::string &addr, int recv_id) {
tensorpipe::Message tpmsg;
tpmsg.metadata = "dglconnect";
pipe->write(tpmsg, [done](const tensorpipe::Error &error) {
if (error) {
LOG(WARNING) << "Error occurred when write to pipe: " << error.what();
done->set_value(false);
} else {
done->set_value(true);
}
done->set_value(!error);
});
if (!done->get_future().get()) {
LOG(WARNING) << "Failed to connect to receiver[" << addr << "].";
DLOG(WARNING) << "Failed to connect to receiver[" << addr << "].";
return false;
}
pipes_[recv_id] = pipe;
......@@ -128,7 +123,7 @@ void TPReceiver::OnAccepted(const Error &error, std::shared_ptr<Pipe> pipe) {
// read the handshake message: "dglconnect"
pipe->readDescriptor([pipe, this](const Error &error, Descriptor descriptor) {
if (error) {
LOG(WARNING) << "Unexpected error when reading from accepted pipe: " << error.what();
LOG(ERROR) << "Unexpected error when reading from accepted pipe: " << error.what();
return;
}
Allocation allocation;
......
......@@ -18,37 +18,12 @@ import backend as F
import math
import unittest
import pickle
from utils import reset_envs
from utils import reset_envs, generate_ip_config
if os.name != 'nt':
import fcntl
import struct
def get_local_usable_addr():
"""Get local usable IP and port
Returns
-------
str
IP address, e.g., '192.168.8.12:50051'
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't even have to be reachable
sock.connect(('10.255.255.255', 1))
ip_addr = sock.getsockname()[0]
except ValueError:
ip_addr = '127.0.0.1'
finally:
sock.close()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("", 0))
sock.listen(1)
port = sock.getsockname()[1]
sock.close()
return ip_addr + ' ' + str(port)
def create_random_graph(n):
arr = (spsp.random(n, n, density=0.001, format='coo', random_state=100) != 0).astype(np.int64)
return dgl.from_scipy(arr)
......@@ -766,10 +741,7 @@ def test_split_even():
assert np.all(all_edges == F.asnumpy(all_edges2))
def prepare_dist():
ip_config = open("kv_ip_config.txt", "w")
ip_addr = get_local_usable_addr()
ip_config.write('{}\n'.format(ip_addr))
ip_config.close()
generate_ip_config("kv_ip_config.txt", 1, 1)
if __name__ == '__main__':
os.makedirs('/tmp/dist_graph', exist_ok=True)
......
......@@ -10,7 +10,7 @@ import multiprocessing as mp
import numpy as np
import backend as F
import time
from utils import get_local_usable_addr, reset_envs
from utils import generate_ip_config, reset_envs
from pathlib import Path
import pytest
from scipy import sparse as spsp
......@@ -71,10 +71,7 @@ def start_get_degrees_client(rank, tmpdir, disable_shared_mem, nids=None):
return in_deg, out_deg, all_in_deg, all_out_deg
def check_rpc_sampling(tmpdir, num_server):
ip_config = open("rpc_ip_config.txt", "w")
for _ in range(num_server):
ip_config.write('{}\n'.format(get_local_usable_addr()))
ip_config.close()
generate_ip_config("rpc_ip_config.txt", num_server, num_server)
g = CitationGraphDataset("cora")[0]
g.readonly()
......@@ -106,10 +103,7 @@ def check_rpc_sampling(tmpdir, num_server):
F.asnumpy(sampled_graph.edata[dgl.EID]), F.asnumpy(eids))
def check_rpc_find_edges_shuffle(tmpdir, num_server):
ip_config = open("rpc_ip_config.txt", "w")
for _ in range(num_server):
ip_config.write('{}\n'.format(get_local_usable_addr()))
ip_config.close()
generate_ip_config("rpc_ip_config.txt", num_server, num_server)
g = CitationGraphDataset("cora")[0]
g.readonly()
......@@ -156,10 +150,7 @@ def create_random_hetero(dense=False, empty=False):
return g
def check_rpc_hetero_find_edges_shuffle(tmpdir, num_server):
ip_config = open("rpc_ip_config.txt", "w")
for _ in range(num_server):
ip_config.write('{}\n'.format(get_local_usable_addr()))
ip_config.close()
generate_ip_config("rpc_ip_config.txt", num_server, num_server)
g = create_random_hetero()
num_parts = num_server
......@@ -199,10 +190,7 @@ def test_rpc_find_edges_shuffle(num_server):
check_rpc_find_edges_shuffle(Path(tmpdirname), num_server)
def check_rpc_get_degree_shuffle(tmpdir, num_server):
ip_config = open("rpc_ip_config.txt", "w")
for _ in range(num_server):
ip_config.write('{}\n'.format(get_local_usable_addr()))
ip_config.close()
generate_ip_config("rpc_ip_config.txt", num_server, num_server)
g = CitationGraphDataset("cora")[0]
g.readonly()
......@@ -260,10 +248,7 @@ def test_rpc_sampling():
check_rpc_sampling(Path(tmpdirname), 2)
def check_rpc_sampling_shuffle(tmpdir, num_server):
ip_config = open("rpc_ip_config.txt", "w")
for _ in range(num_server):
ip_config.write('{}\n'.format(get_local_usable_addr()))
ip_config.close()
generate_ip_config("rpc_ip_config.txt", num_server, num_server)
g = CitationGraphDataset("cora")[0]
g.readonly()
......@@ -357,10 +342,7 @@ def start_hetero_etype_sample_client(rank, tmpdir, disable_shared_mem, fanout=3,
return block, gpb
def check_rpc_hetero_sampling_shuffle(tmpdir, num_server):
ip_config = open("rpc_ip_config.txt", "w")
for _ in range(num_server):
ip_config.write('{}\n'.format(get_local_usable_addr()))
ip_config.close()
generate_ip_config("rpc_ip_config.txt", num_server, num_server)
g = create_random_hetero()
num_parts = num_server
......@@ -424,10 +406,7 @@ def get_degrees(g, nids, ntype):
return deg
def check_rpc_hetero_sampling_empty_shuffle(tmpdir, num_server):
ip_config = open("rpc_ip_config.txt", "w")
for _ in range(num_server):
ip_config.write('{}\n'.format(get_local_usable_addr()))
ip_config.close()
generate_ip_config("rpc_ip_config.txt", num_server, num_server)
g = create_random_hetero(empty=True)
num_parts = num_server
......@@ -457,10 +436,8 @@ def check_rpc_hetero_sampling_empty_shuffle(tmpdir, num_server):
assert len(block.etypes) == len(g.etypes)
def check_rpc_hetero_etype_sampling_shuffle(tmpdir, num_server):
ip_config = open("rpc_ip_config.txt", "w")
for _ in range(num_server):
ip_config.write('{}\n'.format(get_local_usable_addr()))
ip_config.close()
generate_ip_config("rpc_ip_config.txt", num_server, num_server)
g = create_random_hetero(dense=True)
num_parts = num_server
num_hops = 1
......@@ -520,10 +497,8 @@ def check_rpc_hetero_etype_sampling_shuffle(tmpdir, num_server):
assert np.all(F.asnumpy(orig_dst1) == orig_dst)
def check_rpc_hetero_etype_sampling_empty_shuffle(tmpdir, num_server):
ip_config = open("rpc_ip_config.txt", "w")
for _ in range(num_server):
ip_config.write('{}\n'.format(get_local_usable_addr()))
ip_config.close()
generate_ip_config("rpc_ip_config.txt", num_server, num_server)
g = create_random_hetero(dense=True, empty=True)
num_parts = num_server
num_hops = 1
......@@ -655,10 +630,7 @@ def start_in_subgraph_client(rank, tmpdir, disable_shared_mem, nodes):
def check_rpc_in_subgraph_shuffle(tmpdir, num_server):
ip_config = open("rpc_ip_config.txt", "w")
for _ in range(num_server):
ip_config.write('{}\n'.format(get_local_usable_addr()))
ip_config.close()
generate_ip_config("rpc_ip_config.txt", num_server, num_server)
g = CitationGraphDataset("cora")[0]
g.readonly()
......
......@@ -9,7 +9,7 @@ import sys
import multiprocessing as mp
import numpy as np
import time
from utils import get_local_usable_addr, reset_envs
from utils import generate_ip_config, reset_envs
from pathlib import Path
from dgl.distributed import DistGraphServer, DistGraph, DistDataLoader
import pytest
......@@ -104,10 +104,7 @@ def start_dist_dataloader(rank, tmpdir, num_server, drop_last, orig_nid, orig_ei
@unittest.skipIf(dgl.backend.backend_name != 'pytorch', reason='Only support PyTorch for now')
def test_standalone(tmpdir):
reset_envs()
ip_config = open("mp_ip_config.txt", "w")
for _ in range(1):
ip_config.write('{}\n'.format(get_local_usable_addr()))
ip_config.close()
generate_ip_config("mp_ip_config.txt", 1, 1)
g = CitationGraphDataset("cora")[0]
print(g.idtype)
......@@ -176,10 +173,7 @@ def start_dist_neg_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, g
dgl.distributed.exit_client() # this is needed since there's two test here in one process
def check_neg_dataloader(g, tmpdir, num_server, num_workers):
ip_config = open("mp_ip_config.txt", "w")
for _ in range(num_server):
ip_config.write('{}\n'.format(get_local_usable_addr()))
ip_config.close()
generate_ip_config("mp_ip_config.txt", num_server, num_server)
num_parts = num_server
num_hops = 1
......@@ -221,10 +215,7 @@ def check_neg_dataloader(g, tmpdir, num_server, num_workers):
@pytest.mark.parametrize("reshuffle", [True, False])
def test_dist_dataloader(tmpdir, num_server, num_workers, drop_last, reshuffle):
reset_envs()
ip_config = open("mp_ip_config.txt", "w")
for _ in range(num_server):
ip_config.write('{}\n'.format(get_local_usable_addr()))
ip_config.close()
generate_ip_config("mp_ip_config.txt", num_server, num_server)
g = CitationGraphDataset("cora")[0]
print(g.idtype)
......@@ -361,10 +352,7 @@ def start_edge_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, orig_
dgl.distributed.exit_client() # this is needed since there's two test here in one process
def check_dataloader(g, tmpdir, num_server, num_workers, dataloader_type):
ip_config = open("mp_ip_config.txt", "w")
for _ in range(num_server):
ip_config.write('{}\n'.format(get_local_usable_addr()))
ip_config.close()
generate_ip_config("mp_ip_config.txt", num_server, num_server)
num_parts = num_server
num_hops = 1
......
......@@ -9,36 +9,12 @@ import unittest
from dgl.graph_index import create_graph_index
import multiprocessing as mp
from numpy.testing import assert_array_equal
from utils import generate_ip_config, reset_envs
if os.name != 'nt':
import fcntl
import struct
def get_local_usable_addr():
"""Get local usable IP and port
Returns
-------
str
IP address, e.g., '192.168.8.12:50051'
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't even have to be reachable
sock.connect(('10.255.255.255', 1))
ip_addr = sock.getsockname()[0]
except ValueError:
ip_addr = '127.0.0.1'
finally:
sock.close()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("", 0))
sock.listen(1)
port = sock.getsockname()[1]
sock.close()
return ip_addr + ' ' + str(port)
# Create an one-part Graph
node_map = F.tensor([0,0,0,0,0,0], F.int64)
edge_map = F.tensor([0,0,0,0,0,0,0], F.int64)
......@@ -296,12 +272,10 @@ def start_client_mul_role(i):
@unittest.skipIf(os.name == 'nt' or os.getenv('DGLBACKEND') == 'tensorflow', reason='Do not support windows and TF yet')
def test_kv_store():
ip_config = open("kv_ip_config.txt", "w")
reset_envs()
num_servers = 2
num_clients = 2
ip_addr = get_local_usable_addr()
ip_config.write('{}\n'.format(ip_addr))
ip_config.close()
generate_ip_config("kv_ip_config.txt", 1, num_servers)
ctx = mp.get_context('spawn')
pserver_list = []
pclient_list = []
......@@ -321,15 +295,13 @@ def test_kv_store():
@unittest.skipIf(os.name == 'nt' or os.getenv('DGLBACKEND') == 'tensorflow', reason='Do not support windows and TF yet')
def test_kv_multi_role():
ip_config = open("kv_ip_mul_config.txt", "w")
reset_envs()
num_servers = 2
num_trainers = 2
num_samplers = 2
generate_ip_config("kv_ip_mul_config.txt", 1, num_servers)
# There are two trainer processes and each trainer process has two sampler processes.
num_clients = num_trainers * (1 + num_samplers)
ip_addr = get_local_usable_addr()
ip_config.write('{}\n'.format(ip_addr))
ip_config.close()
ctx = mp.get_context('spawn')
pserver_list = []
pclient_list = []
......
......@@ -7,7 +7,7 @@ import backend as F
import unittest, pytest
import multiprocessing as mp
from numpy.testing import assert_array_equal
from utils import reset_envs
from utils import reset_envs, generate_ip_config
if os.name != 'nt':
import fcntl
......@@ -18,31 +18,6 @@ STR = 'hello world!'
HELLO_SERVICE_ID = 901231
TENSOR = F.zeros((10, 10), F.int64, F.cpu())
def get_local_usable_addr():
"""Get local usable IP and port
Returns
-------
str
IP address, e.g., '192.168.8.12:50051'
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't even have to be reachable
sock.connect(('10.255.255.255', 1))
ip_addr = sock.getsockname()[0]
except ValueError:
ip_addr = '127.0.0.1'
finally:
sock.close()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("", 0))
sock.listen(1)
port = sock.getsockname()[1]
sock.close()
return ip_addr + ' ' + str(port)
def foo(x, y):
assert x == 123
assert y == "abc"
......@@ -195,10 +170,7 @@ def test_rpc_msg():
def test_rpc():
reset_envs()
os.environ['DGL_DIST_MODE'] = 'distributed'
ip_config = open("rpc_ip_config.txt", "w")
ip_addr = get_local_usable_addr()
ip_config.write('%s\n' % ip_addr)
ip_config.close()
generate_ip_config("rpc_ip_config.txt", 1, 1)
ctx = mp.get_context('spawn')
pserver = ctx.Process(target=start_server, args=(1, "rpc_ip_config.txt"))
pclient = ctx.Process(target=start_client, args=("rpc_ip_config.txt",))
......@@ -211,10 +183,7 @@ def test_rpc():
def test_multi_client():
reset_envs()
os.environ['DGL_DIST_MODE'] = 'distributed'
ip_config = open("rpc_ip_config_mul_client.txt", "w")
ip_addr = get_local_usable_addr()
ip_config.write('%s\n' % ip_addr)
ip_config.close()
generate_ip_config("rpc_ip_config_mul_client.txt", 1, 1)
ctx = mp.get_context('spawn')
pserver = ctx.Process(target=start_server, args=(10, "rpc_ip_config_mul_client.txt"))
pclient_list = []
......@@ -233,11 +202,8 @@ def test_multi_client():
def test_multi_thread_rpc():
reset_envs()
os.environ['DGL_DIST_MODE'] = 'distributed'
ip_config = open("rpc_ip_config_multithread.txt", "w")
num_servers = 2
for _ in range(num_servers): # 3 servers
ip_config.write('{}\n'.format(get_local_usable_addr()))
ip_config.close()
generate_ip_config("rpc_ip_config_multithread.txt", num_servers, num_servers)
ctx = mp.get_context('spawn')
pserver_list = []
for i in range(num_servers):
......
......@@ -2,30 +2,38 @@
import socket
import os
def get_local_usable_addr():
"""Get local usable IP and port
Returns
-------
str
IP address, e.g., '192.168.8.12:50051'
"""
def generate_ip_config(file_name, num_machines, num_servers):
"""Get local IP and available ports, writes to file."""
# get available IP in localhost
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't even have to be reachable
sock.connect(('10.255.255.255', 1))
ip_addr = sock.getsockname()[0]
ip = sock.getsockname()[0]
except ValueError:
ip_addr = '127.0.0.1'
ip = '127.0.0.1'
finally:
sock.close()
# scan available PORT
ports = []
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("", 0))
sock.listen(1)
port = sock.getsockname()[1]
for port in range(10000, 65535):
try:
sock.connect((ip, port))
ports = []
except:
ports.append(port)
if len(ports) == num_machines * num_servers:
break
sock.close()
return ip_addr + ' ' + str(port)
if len(ports) < num_machines * num_servers:
raise RuntimeError(
"Failed to get available IP/PORT with required numbers.")
with open(file_name, 'w') as f:
for i in range(num_machines):
f.write('{} {}\n'.format(ip, ports[i*num_servers]))
def reset_envs():
......
......@@ -37,8 +37,7 @@ python3 -m pytest -v --junitxml=pytest_compute.xml tests/compute || fail "comput
python3 -m pytest -v --junitxml=pytest_backend.xml tests/$DGLBACKEND || fail "backend-specific"
export OMP_NUM_THREADS=1
export DMLC_LOG_DEBUG=1
if [ $2 != "gpu" ]; then
python3 -m pytest -v --capture=tee-sys --junitxml=pytest_distributed.xml tests/distributed/*.py || fail "distributed"
# Seperate kvstore test to another process, to avoid hangs
python3 -m pytest -v --capture=tee-sys --junitxml=pytest_distributed.xml tests/distributed/kv_store/*.py || fail "distributed kvstore"
fi
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment