bsp.cpp 18.2 KB
Newer Older
Davis King's avatar
Davis King committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// Copyright (C) 2012  Davis E. King (davis@dlib.net)
// License: Boost Software License   See LICENSE.txt for the full license.

#include "bsp.h"
#include "../ref.h"

// ----------------------------------------------------------------------------------------
// ----------------------------------------------------------------------------------------

namespace dlib
{

    namespace impl
    {

        struct hostinfo
        {
            hostinfo() {}
            hostinfo (
                const std::string& ip_,
                unsigned short port_,
                unsigned long node_id_
            ) : 
                ip(ip_),
                port(port_),
                node_id(node_id_)
            {
            }

            std::string ip;
            unsigned short port;
            unsigned long node_id;
        };

        void connect_all (
            map_id_to_con& cons,
            const std::vector<std::pair<std::string,unsigned short> >& hosts,
            unsigned long node_id
        )
        {
            cons.clear();
            for (unsigned long i = 0; i < hosts.size(); ++i)
            {
                scoped_ptr<bsp_con> con(new bsp_con(hosts[i]));
                serialize(node_id, con->stream); // tell the other end our node_id
                unsigned long id = i+1;
                cons.add(id, con);
            }
        }

        void connect_all_hostinfo (
            map_id_to_con& cons,
            const std::vector<hostinfo>& hosts,
            unsigned long node_id
        )
        {
            cons.clear();
            for (unsigned long i = 0; i < hosts.size(); ++i)
            {
                scoped_ptr<bsp_con> con(new bsp_con(make_pair(hosts[i].ip,hosts[i].port)));
                serialize(node_id, con->stream); // tell the other end our node_id
                con->stream.flush();
                unsigned long id = hosts[i].node_id;
                cons.add(id, con);
            }
        }


        void serialize (
            const hostinfo& item,
            std::ostream& out
        )
        {
            dlib::serialize(item.ip, out);
            dlib::serialize(item.port, out);
            dlib::serialize(item.node_id, out);
        }

        void deserialize (
            hostinfo& item,
            std::istream& in
        )
        {
            dlib::deserialize(item.ip, in);
            dlib::deserialize(item.port, in);
            dlib::deserialize(item.node_id, in);
        }

        void send_out_connection_orders (
            map_id_to_con& cons,
            const std::vector<std::pair<std::string,unsigned short> >& hosts
        )
        {
            // tell everyone their node ids
            cons.reset();
            while (cons.move_next())
            {
                dlib::serialize(cons.element().key(), cons.element().value()->stream);
            }

            // now tell them who to connect to
            std::vector<hostinfo> targets; 
            for (unsigned long i = 0; i < hosts.size(); ++i)
            {
                hostinfo info(hosts[i].first, hosts[i].second, i+1);

                dlib::serialize(targets, cons[info.node_id]->stream);
                targets.push_back(info);

                // let the other host know how many incoming connections to expect
                const unsigned long num = hosts.size()-targets.size();
                dlib::serialize(num, cons[info.node_id]->stream);
                cons[info.node_id]->stream.flush();
            }
        }

    // ------------------------------------------------------------------------------------

        // These control bytes are sent before each message nodes send to each other.
        const static char MESSAGE_HEADER         = 0;
        const static char WAITING_ON_RECEIVE     = 1;
        const static char NOT_WAITING_ON_RECEIVE = 2;
        const static char ALL_NODES_WAITING      = 3;
        const static char SENT_MESSAGE           = 4;
        const static char GOT_MESSAGE            = 5;
126
        const static char NODE_TERMINATE         = 6;
Davis King's avatar
Davis King committed
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203

    // ------------------------------------------------------------------------------------

        void listen_and_connect_all(
            unsigned long& node_id,
            map_id_to_con& cons,
            unsigned short port
        )
        {
            cons.clear();
            scoped_ptr<listener> list;
            const int status = create_listener(list, port);
            if (status == PORTINUSE)
            {
                throw socket_error("Unable to create listening port " + cast_to_string(port) +
                                   ".  The port is already in use");
            }
            else if (status != 0)
            {
                throw socket_error("Unable to create listening port " + cast_to_string(port) );
            }

            scoped_ptr<connection> con;
            if (list->accept(con))
            {
                throw socket_error("Error occurred while accepting new connection");
            }

            scoped_ptr<bsp_con> temp(new bsp_con(con));

            unsigned long remote_node_id;
            dlib::deserialize(remote_node_id, temp->stream);
            dlib::deserialize(node_id, temp->stream);
            std::vector<hostinfo> targets; 
            dlib::deserialize(targets, temp->stream);
            unsigned long num_incoming_connections;
            dlib::deserialize(num_incoming_connections, temp->stream);

            cons.add(remote_node_id,temp);

            // make a thread that will connect to all the targets
            map_id_to_con cons2;
            thread_function thread(impl::connect_all_hostinfo, ref(cons2), ref(targets), node_id);

            // accept any incoming connections
            for (unsigned long i = 0; i < num_incoming_connections; ++i)
            {
                // If it takes more than 10 seconds for the other nodes to connect to us
                // then something has gone horribly wrong and it almost certainly will
                // never connect at all.  So just give up if that happens.
                const unsigned long timeout_milliseconds = 10000;
                if (list->accept(con, timeout_milliseconds))
                {
                    throw socket_error("Error occurred while accepting new connection");
                }

                temp.reset(new bsp_con(con));

                dlib::deserialize(remote_node_id, temp->stream);
                cons.add(remote_node_id,temp);
            }


            // put all the connections created by the thread into cons
            thread.wait();
            while (cons2.size() > 0)
            {
                unsigned long id;
                scoped_ptr<bsp_con> temp;
                cons2.remove_any(id,temp);
                cons.add(id,temp);
            }
        }
    }
    
// ----------------------------------------------------------------------------------------
// ----------------------------------------------------------------------------------------
Davis King's avatar
Davis King committed
204
//                          IMPLEMENTATION OF bsp_context OBJECT MEMBERS
Davis King's avatar
Davis King committed
205
// ----------------------------------------------------------------------------------------
206
207
// ----------------------------------------------------------------------------------------

Davis King's avatar
Davis King committed
208
    void bsp_context::
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
    close_all_connections_gracefully(
    )
    {
        if (_node_id == 0)
        {
            // Wait for all the other nodes to terminate before we do anything since
            // we are the controller node.
            receive();
        }

        _cons.reset();
        while (_cons.move_next())
        {
            // tell the other end that we are intentionally dropping the connection
            serialize(impl::NODE_TERMINATE,_cons.element().value()->stream);
            _cons.element().value()->stream.flush();
            _cons.element().value()->con->shutdown();
        }

        check_for_errors();
    }

Davis King's avatar
Davis King committed
231
232
// ----------------------------------------------------------------------------------------

Davis King's avatar
Davis King committed
233
234
    bsp_context::
    ~bsp_context()
Davis King's avatar
Davis King committed
235
236
237
238
239
240
241
242
243
244
245
246
247
248
    {
        _cons.reset();
        while (_cons.move_next())
        {
            _cons.element().value()->con->shutdown();
        }


        // this will wait for all the threads to terminate
        threads.clear();
    }

// ----------------------------------------------------------------------------------------

Davis King's avatar
Davis King committed
249
250
    bsp_context::
    bsp_context(
Davis King's avatar
Davis King committed
251
252
253
        unsigned long node_id_,
        impl::map_id_to_con& cons_
    ) :
254
        read_thread_terminated_improperly(false),
Davis King's avatar
Davis King committed
255
256
257
258
259
260
261
262
        outstanding_messages(0),
        num_waiting_nodes(0),
        buf_not_empty(class_mutex),
        _cons(cons_),
        _node_id(node_id_)
    {
        // spawn a bunch of read threads, one for each connection
        member_function_pointer<impl::bsp_con*, unsigned long>::kernel_1a_c mfp;
Davis King's avatar
Davis King committed
263
        mfp.set(*this, &bsp_context::read_thread);
Davis King's avatar
Davis King committed
264
265
266
267
268
269
270
271
272
273
274
275
276
        _cons.reset();
        while (_cons.move_next())
        {
            scoped_ptr<thread_function> ptr(new thread_function(mfp,
                                                                _cons.element().value().get(),
                                                                _cons.element().key()));
            threads.push_back(ptr);
        }

    }

// ----------------------------------------------------------------------------------------

Davis King's avatar
Davis King committed
277
    bool bsp_context::
Davis King's avatar
Davis King committed
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
    receive_data (
        shared_ptr<std::string>& item,
        unsigned long& sending_node_id
    ) 
    {
        using namespace impl;
        // If there aren't any other nodes then you will never receive anything.
        if (_cons.size() == 0)
            return false;

        {
            auto_mutex lock(class_mutex);
            if (msg_buffer.size() == 0)
            {
                send_to_master_node(WAITING_ON_RECEIVE);
293
                while (msg_buffer.size() == 0 && !read_thread_terminated_improperly)
Davis King's avatar
Davis King committed
294
295
296
                {
                    buf_not_empty.wait();
                }
297
                if (read_thread_terminated_improperly)
Davis King's avatar
Davis King committed
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
                {
                    throw dlib::socket_error("A connection between processing nodes has been lost.");
                }
                send_to_master_node(NOT_WAITING_ON_RECEIVE);
            }

            sending_node_id = msg_sender_id.front();
            msg_sender_id.pop_front();
            item = msg_buffer.front();
            msg_buffer.pop_front();
        }

        // if this is a message from another node rather than the
        // "everyone is blocked on receive() message".
        if (item)
        {
            send_to_master_node(GOT_MESSAGE);
            return true;
        }
        else
        {
            return false;
        }
    }

// ----------------------------------------------------------------------------------------

Davis King's avatar
Davis King committed
325
    void bsp_context::
Davis King's avatar
Davis King committed
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
    send_to_master_node (
        char msg
    )
    {
        using namespace impl;
        // if we aren't the special controlling node then send the
        // controller a message.
        if (_cons.is_in_domain(0))
        {
            serialize(msg, _cons[0]->stream);
            _cons[0]->stream.flush();
        }
        else if (_node_id == 0) // if this is the master node
        {
            // since we are the master node we will just modify our state directly
            auto_mutex lock(class_mutex);
            switch(msg)
            {
                case WAITING_ON_RECEIVE: {
                    ++num_waiting_nodes;
                    notify_everyone_if_all_blocked();
                } break;

                case NOT_WAITING_ON_RECEIVE: {
                    --num_waiting_nodes;
                } break;

                case SENT_MESSAGE: {
                    ++outstanding_messages;
                } break;

                case GOT_MESSAGE: {
                    --outstanding_messages;
                } break;

                default:
                    DLIB_CASSERT(false,"this should not happen");
            }
        }
    }

// ----------------------------------------------------------------------------------------

Davis King's avatar
Davis King committed
369
    void bsp_context::
Davis King's avatar
Davis King committed
370
371
372
373
374
375
376
377
378
379
380
381
382
383
    notify_everyone_if_all_blocked(
    )
    {
        using namespace impl;
        // if all the nodes are blocked on receive() and there aren't any
        // messages in flight.
        if (_node_id == 0 && num_waiting_nodes == number_of_nodes() && outstanding_messages == 0)
        {
            // send notifications
            _cons.reset();
            while (_cons.move_next())
            {
                try
                {
384
385
386
387
388
389
390
391
392
                    // Skip connections to nodes that have already terminated their
                    // execution.
                    if (_cons.element().value()->terminated == false)
                    {
                        serialize(ALL_NODES_WAITING, _cons.element().value()->stream);
                        _cons.element().value()->stream.flush();
                        if (!_cons.element().value()->stream)
                            throw dlib::error("Error writing data to TCP connection");
                    }
Davis King's avatar
Davis King committed
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
                }
                catch (std::exception& e)
                {
                    const connection* const con = _cons.element().value()->con.get();
                    std::ostringstream sout;
                    sout << "An exception occurred in the controlling node while it was trying to communicate with a listening node.\n";
                    sout << "  Listening processing node address:   " << con->get_foreign_ip() << ":" << con->get_foreign_port() << std::endl;
                    sout << "  Controlling processing node address: " << con->get_local_ip() << ":" << con->get_local_port() << std::endl;
                    sout << "  Error message in the exception: " << e.what() << std::endl;
                    error_message = sout.str();
                }
            }

            // unblock the control node itself
            shared_ptr<std::string> msg;
            msg_buffer.push_back(msg);
            msg_sender_id.push_back(0);
            buf_not_empty.signal();
        }
    }

// ----------------------------------------------------------------------------------------

Davis King's avatar
Davis King committed
416
    void bsp_context::
Davis King's avatar
Davis King committed
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
    read_thread (
        impl::bsp_con* con,
        unsigned long sender_id
    )
    {
        try
        {
            using namespace impl;
            while (con->stream.peek() != EOF)
            {
                char header;
                deserialize(header, con->stream);
                switch (header)
                {
                    case MESSAGE_HEADER: {
                        shared_ptr<std::string> msg(new std::string);
                        deserialize(*msg, con->stream);

                        auto_mutex lock(class_mutex);
                        msg_buffer.push_back(msg);
                        msg_sender_id.push_back(sender_id);
                        buf_not_empty.signal();
                    } break;

                    case WAITING_ON_RECEIVE: {
                        auto_mutex lock(class_mutex);
                        ++num_waiting_nodes;
                        notify_everyone_if_all_blocked();
                    } break;

                    case NOT_WAITING_ON_RECEIVE: {
                        auto_mutex lock(class_mutex);
                        --num_waiting_nodes;
                    } break;

                    case ALL_NODES_WAITING: {
                        // put something into the message buffer that lets 
                        // receive() know to return false.  We do this using
                        // a null msg pointer.
                        auto_mutex lock(class_mutex);
                        shared_ptr<std::string> msg;
                        msg_buffer.push_back(msg);
                        msg_sender_id.push_back(sender_id);
                        buf_not_empty.signal();
                    } break;

                    case SENT_MESSAGE: {
                        auto_mutex lock(class_mutex);
                        ++outstanding_messages;
                    } break;

                    case GOT_MESSAGE: {
                        auto_mutex lock(class_mutex);
                        --outstanding_messages;
                    } break;
472
473
474
475
476
477
478
479
480
481
482
483

                    case NODE_TERMINATE: {
                        auto_mutex lock(class_mutex);
                        if (_node_id == 0)
                        {
                            // a terminating node is basically the same as a node that waits forever.
                            _cons[sender_id]->terminated = true;
                            ++num_waiting_nodes; 
                            notify_everyone_if_all_blocked();
                        }
                        return;
                    } break;
Davis King's avatar
Davis King committed
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
                }
            }
        }
        catch (std::exception& e)
        {
            std::ostringstream sout;
            sout << "An exception was thrown while attempting to receive a message from processing node " << sender_id << ".\n";
            sout << "  Sending processing node address:   " << con->con->get_foreign_ip() << ":" << con->con->get_foreign_port() << std::endl;
            sout << "  Receiving processing node address: " << con->con->get_local_ip() << ":" << con->con->get_local_port() << std::endl;
            sout << "  Error message in the exception: " << e.what() << std::endl;
            auto_mutex lock(class_mutex);
            error_message = sout.str();
        }

        auto_mutex lock(class_mutex);
499
        read_thread_terminated_improperly = true;
Davis King's avatar
Davis King committed
500
501
502
503
504
        buf_not_empty.signal();
    }

// ----------------------------------------------------------------------------------------

Davis King's avatar
Davis King committed
505
    void bsp_context::
Davis King's avatar
Davis King committed
506
507
508
509
510
511
512
513
514
    check_for_errors()
    {
        auto_mutex lock(class_mutex);
        if (error_message.size() != 0)
            throw dlib::socket_error(error_message);
    }

// ----------------------------------------------------------------------------------------

Davis King's avatar
Davis King committed
515
    void bsp_context::
Davis King's avatar
Davis King committed
516
517
518
519
520
521
    send_data(
        const std::string& item,
        unsigned long target_node_id
    ) 
    {
        using namespace impl;
522
523
524
        if (_cons[target_node_id]->terminated)
            throw socket_error("Attempt to send a message to a node that has terminated.");

Davis King's avatar
Davis King committed
525
526
527
528
529
530
531
532
533
534
        serialize(MESSAGE_HEADER, _cons[target_node_id]->stream);
        serialize(item, _cons[target_node_id]->stream);
        _cons[target_node_id]->stream.flush();
        send_to_master_node(SENT_MESSAGE);
    }

// ----------------------------------------------------------------------------------------

}