Commit e0b0cd9e authored by Paul's avatar Paul
Browse files

Record and wait events

parent 23fd64f8
......@@ -26,12 +26,11 @@ struct schedule_model
/// Get the number of concurrent instruction allowed
std::size_t concurrency() const;
/// Schedule a concurrent instruction
void schedule_instruction(program& p, instruction_ref ins, std::size_t n) const;
void sched(program& p, instruction_ref ins, std::size_t n) const;
// Insert necessary waits before an instruction
void wait(program& p,
instruction_ref ins,
std::size_t wait_on,
const std::vector<std::size_t>& wait_for) const;
void wait(program& p, instruction_ref ins, std::size_t wait_id) const;
// Insert necessary records after an instruction
void record(program& p, instruction_ref ins, std::size_t wait_id) const;
/// Compute weights for an operation
std::size_t weight(const operation& op) const;
};
......@@ -44,9 +43,10 @@ struct schedule_model
* struct schedule_model
* {
* std::size_t concurrency() const;
* void schedule_instruction(program& p,instruction_ref ins,std::size_t n) const;
* void wait(program& p,instruction_ref ins,std::size_t wait_on,const std::vector<std::size_t>&
* wait_for) const; std::size_t weight(const operation& op) const;
* void sched(program& p,instruction_ref ins,std::size_t n) const;
* void wait(program& p,instruction_ref ins,std::size_t wait_id) const;
* void record(program& p,instruction_ref ins,std::size_t wait_id) const;
* std::size_t weight(const operation& op) const;
* };
*
*/
......@@ -114,21 +114,22 @@ struct schedule_model
return (*this).private_detail_te_get_handle().concurrency();
}
void schedule_instruction(program& p, instruction_ref ins, std::size_t n) const
void sched(program& p, instruction_ref ins, std::size_t n) const
{
assert((*this).private_detail_te_handle_mem_var);
(*this).private_detail_te_get_handle().schedule_instruction(
p, std::move(ins), std::move(n));
(*this).private_detail_te_get_handle().sched(p, std::move(ins), std::move(n));
}
void wait(program& p,
instruction_ref ins,
std::size_t wait_on,
const std::vector<std::size_t>& wait_for) const
void wait(program& p, instruction_ref ins, std::size_t wait_id) const
{
assert((*this).private_detail_te_handle_mem_var);
(*this).private_detail_te_get_handle().wait(
p, std::move(ins), std::move(wait_on), wait_for);
(*this).private_detail_te_get_handle().wait(p, std::move(ins), std::move(wait_id));
}
void record(program& p, instruction_ref ins, std::size_t wait_id) const
{
assert((*this).private_detail_te_handle_mem_var);
(*this).private_detail_te_get_handle().record(p, std::move(ins), std::move(wait_id));
}
std::size_t weight(const operation& op) const
......@@ -151,13 +152,11 @@ struct schedule_model
virtual std::shared_ptr<private_detail_te_handle_base_type> clone() const = 0;
virtual const std::type_info& type() const = 0;
virtual std::size_t concurrency() const = 0;
virtual void schedule_instruction(program& p, instruction_ref ins, std::size_t n) const = 0;
virtual void wait(program& p,
instruction_ref ins,
std::size_t wait_on,
const std::vector<std::size_t>& wait_for) const = 0;
virtual std::size_t weight(const operation& op) const = 0;
virtual std::size_t concurrency() const = 0;
virtual void sched(program& p, instruction_ref ins, std::size_t n) const = 0;
virtual void wait(program& p, instruction_ref ins, std::size_t wait_id) const = 0;
virtual void record(program& p, instruction_ref ins, std::size_t wait_id) const = 0;
virtual std::size_t weight(const operation& op) const = 0;
};
template <typename PrivateDetailTypeErasedT>
......@@ -190,19 +189,22 @@ struct schedule_model
std::size_t concurrency() const override { return private_detail_te_value.concurrency(); }
void schedule_instruction(program& p, instruction_ref ins, std::size_t n) const override
void sched(program& p, instruction_ref ins, std::size_t n) const override
{
private_detail_te_value.sched(p, std::move(ins), std::move(n));
}
void wait(program& p, instruction_ref ins, std::size_t wait_id) const override
{
private_detail_te_value.schedule_instruction(p, std::move(ins), std::move(n));
private_detail_te_value.wait(p, std::move(ins), std::move(wait_id));
}
void wait(program& p,
instruction_ref ins,
std::size_t wait_on,
const std::vector<std::size_t>& wait_for) const override
void record(program& p, instruction_ref ins, std::size_t wait_id) const override
{
private_detail_te_value.wait(p, std::move(ins), std::move(wait_on), wait_for);
private_detail_te_value.record(p, std::move(ins), std::move(wait_id));
}
std::size_t weight(const operation& op) const override
......
......@@ -196,6 +196,29 @@ struct stream_info
return different(get_streams(ins, get_outputs()), xs...);
}
std::vector<instruction_ref> get_recorded_instructions(instruction_ref start)
{
std::vector<instruction_ref> result;
std::unordered_map<std::size_t, instruction_ref> m;
fix([&](auto self, auto ins) {
for(auto i : ins->inputs())
{
if(iweights.at(i) == 0)
{
self(i);
continue;
}
auto stream = get_stream(i);
if (m.count(stream) == 0)
m[stream] = i;
else
m[stream] = std::min(m[stream], i, by(std::less<>{}, [&](auto x) { return std::distance(x, start); }));
}
})(start);
std::transform(m.begin(), m.end(), std::back_inserter(result), [](auto&& p) { return p.second; });
return result;
}
std::vector<std::size_t> wait_for(instruction_ref ins) const
{
std::vector<std::size_t> result;
......@@ -290,8 +313,8 @@ void schedule::apply(program& p) const
}
// Schedule instructions
std::set<std::size_t> waited_for;
std::size_t waited_on = model.concurrency();
std::unordered_map<instruction_ref, std::size_t> ins2wait;
std::size_t wait_id = 0;
for(auto ins : iterator_for(p))
{
// Only schedule instructions that have a stream
......@@ -301,23 +324,25 @@ void schedule::apply(program& p) const
// Schedule instruction on the stream
auto stream = si.get_stream(ins);
assert(stream < model.concurrency());
model.schedule_instruction(p, ins, stream);
// Clear waits when switching streams
if(stream != waited_on)
waited_for.clear();
// Schedule wait instruction
model.sched(p, ins, stream);
// Insert wait instructions
if(si.is_merge_point(ins, stream))
{
auto wait_for = si.wait_for(ins);
// Dont wait for streams that have already been waited for
wait_for.erase(std::remove_if(wait_for.begin(),
wait_for.end(),
[&](auto x) { return waited_for.count(x) > 0; }),
wait_for.end());
if(not wait_for.empty())
model.wait(p, ins, stream, wait_for);
waited_for.insert(wait_for.begin(), wait_for.end());
waited_on = stream;
for(auto i:si.get_recorded_instructions(ins))
{
if (not si.has_stream(i))
continue;
if (stream == si.get_stream(i))
continue;
// Create a new event if it hasn't been recorded
if(ins2wait.count(i) == 0)
{
ins2wait[i] = wait_id;
model.record(p, i, wait_id);
wait_id++;
}
model.wait(p, ins, ins2wait.at(i));
}
}
}
......
......@@ -13,6 +13,8 @@ namespace gpu {
MIGRAPHX_DECLARE_ENV_VAR(MIGRAPHX_ENABLE_NULL_STREAM)
using hip_event_ptr = MIGRAPHX_MANAGE_PTR(hipEvent_t, hipEventDestroy);
struct hip_device
{
hip_device() { add_stream(); }
......@@ -140,12 +142,33 @@ struct context
void set_stream(std::size_t n) { get_current_device().set_stream(n); }
void create_events(std::size_t num_of_events)
{
for(int i = events.size(); i < num_of_events + 1; ++i)
events.emplace_back(create_event());
}
hipEvent_t get_event(std::size_t i) const
{
return events.at(i).get();
}
std::vector<argument> literals{};
void finish() const { gpu_sync(); }
static hip_event_ptr create_event()
{
hipEvent_t event;
auto status = hipEventCreateWithFlags(&event, hipEventDisableTiming);
if(status != hipSuccess)
MIGRAPHX_THROW("Failed to create event");
return hip_event_ptr{event};
}
private:
// TODO: Make this a vector to support multiple devices
std::shared_ptr<hip_device> current_device;
std::vector<shared<hip_event_ptr>> events;
};
} // namespace gpu
} // namespace MIGRAPHX_INLINE_NS
......
......@@ -17,11 +17,9 @@ struct schedule_model
{
std::size_t streams = 0;
std::size_t concurrency() const;
void schedule_instruction(program& p, instruction_ref ins, std::size_t n) const;
void wait(program& p,
instruction_ref ins,
std::size_t,
const std::vector<std::size_t>& wait_for) const;
void sched(program& p, instruction_ref ins, std::size_t n) const;
void wait(program& p, instruction_ref ins, std::size_t wait_id) const;
void record(program& p, instruction_ref ins, std::size_t wait_id) const;
std::size_t weight(const operation& op) const;
};
......
......@@ -8,52 +8,44 @@ namespace migraphx {
inline namespace MIGRAPHX_INLINE_NS {
namespace gpu {
using hip_event_ptr = MIGRAPHX_MANAGE_PTR(hipEvent_t, hipEventDestroy);
hip_event_ptr create_event()
{
hipEvent_t event;
// Default is hipEventReleaseToDevice
// auto status = hipEventCreateWithFlags(
// &event, hipEventDisableTiming | hipEventReleaseToSystem | hipEventBlockingSync);
auto status = hipEventCreateWithFlags(&event, hipEventDisableTiming);
if(status != hipSuccess)
MIGRAPHX_THROW("Failed to create event");
return hip_event_ptr{event};
}
struct wait_event
struct record_event
{
std::vector<std::size_t> wait_for;
shared<hip_event_ptr> event = nullptr;
std::size_t event = 0;
template <class Self, class F>
static auto reflect(Self& self, F f)
{
return pack(f(self.wait_for, "wait_for"));
return pack(f(self.event, "event"));
}
std::string name() const { return "gpu::wait_event"; }
std::string name() const { return "gpu::record_event"; }
shape compute_shape(const std::vector<shape>&) const { return {}; }
argument compute(context& ctx, const shape&, const std::vector<argument>&) const
{
assert(event != nullptr);
assert(std::none_of(wait_for.begin(), wait_for.end(), [&](auto i) {
return i == ctx.get_current_device().stream_id();
}));
for(auto n : wait_for)
ctx.get_stream(n).record(event.get());
ctx.get_stream().wait(event.get());
ctx.get_stream().record(ctx.get_event(event));
return {};
}
void finalize(context& ctx, const shape&, std::vector<shape>)
{
assert(std::none_of(wait_for.begin(), wait_for.end(), [&](auto i) {
return i == ctx.get_current_device().stream_id();
}));
(void)ctx;
assert(not wait_for.empty());
event = create_event();
ctx.create_events(event);
}
};
struct wait_event
{
std::size_t event = 0;
template <class Self, class F>
static auto reflect(Self& self, F f)
{
return pack(f(self.event, "event"));
}
std::string name() const { return "gpu::wait_event"; }
shape compute_shape(const std::vector<shape>&) const { return {}; }
argument compute(context& ctx, const shape&, const std::vector<argument>&) const
{
ctx.get_stream().wait(ctx.get_event(event));
return {};
}
};
......@@ -77,7 +69,7 @@ struct set_stream
};
std::size_t schedule_model::concurrency() const { return streams; }
void schedule_model::schedule_instruction(program& p, instruction_ref ins, std::size_t n) const
void schedule_model::sched(program& p, instruction_ref ins, std::size_t n) const
{
auto last_stream = std::find_if(std::make_reverse_iterator(ins),
std::make_reverse_iterator(p.begin()),
......@@ -91,12 +83,14 @@ void schedule_model::schedule_instruction(program& p, instruction_ref ins, std::
}
p.insert_instruction(ins, set_stream{n});
}
void schedule_model::wait(program& p,
instruction_ref ins,
std::size_t,
const std::vector<std::size_t>& wait_for) const
void schedule_model::wait(program& p, instruction_ref ins, std::size_t wait_id) const
{
p.insert_instruction(ins, wait_event{wait_id});
}
void schedule_model::record(program& p, instruction_ref ins, std::size_t wait_id) const
{
p.insert_instruction(ins, wait_event{wait_for});
p.insert_instruction(std::next(ins), record_event{wait_id});
}
static std::unordered_map<std::string, std::size_t> create_weight_map()
......
......@@ -26,12 +26,11 @@ struct schedule_model
/// Get the number of concurrent instruction allowed
std::size_t concurrency() const;
/// Schedule a concurrent instruction
void schedule_instruction(program& p, instruction_ref ins, std::size_t n) const;
void sched(program& p, instruction_ref ins, std::size_t n) const;
// Insert necessary waits before an instruction
void wait(program& p,
instruction_ref ins,
std::size_t wait_on,
const std::vector<std::size_t>& wait_for) const;
void wait(program& p,instruction_ref ins, std::size_t wait_id) const;
// Insert necessary records after an instruction
void record(program& p,instruction_ref ins, std::size_t wait_id) const;
/// Compute weights for an operation
std::size_t weight(const operation& op) const;
};
......@@ -41,8 +40,9 @@ struct schedule_model
<%
interface('schedule_model',
virtual('concurrency', returns='std::size_t', const=True),
virtual('schedule_instruction', p='program&', ins='instruction_ref', n='std::size_t', const=True),
virtual('wait', p='program&', ins='instruction_ref', wait_on='std::size_t', wait_for='const std::vector<std::size_t>&', const=True),
virtual('sched', p='program&', ins='instruction_ref', n='std::size_t', const=True),
virtual('wait', p='program&', ins='instruction_ref', wait_id='std::size_t', const=True),
virtual('record', p='program&', ins='instruction_ref', wait_id='std::size_t', const=True),
virtual('weight', returns='std::size_t', op='const operation&', const=True)
)
%>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment