5 #if !defined(RXCPP_RX_SCHEDULER_RUN_LOOP_HPP)
6 #define RXCPP_RX_SCHEDULER_RUN_LOOP_HPP
8 #include "../rx-includes.hpp"
12 namespace schedulers {
16 struct run_loop_state :
public std::enable_shared_from_this<run_loop_state>
20 typedef detail::schedulable_queue<
21 clock_type::time_point> queue_item_time;
23 typedef queue_item_time::item_type item_type;
24 typedef queue_item_time::const_reference const_reference_item_type;
26 virtual ~run_loop_state()
34 composite_subscription lifetime;
35 mutable std::mutex lock;
36 mutable queue_item_time q;
38 std::function<void(clock_type::time_point)> notify_earlier_wakeup;
58 std::weak_ptr<detail::run_loop_state> state;
60 virtual ~run_loop_worker()
64 explicit run_loop_worker(std::weak_ptr<detail::run_loop_state> ws)
69 virtual clock_type::time_point
now()
const {
70 return clock_type::now();
73 virtual void schedule(
const schedulable& scbl)
const {
74 schedule(
now(), scbl);
77 virtual void schedule(clock_type::time_point when,
const schedulable& scbl)
const {
79 auto st = state.lock();
80 std::unique_lock<std::mutex> guard(st->lock);
81 const bool need_earlier_wakeup_notification = st->notify_earlier_wakeup &&
82 (st->q.empty() || when < st->q.top().when);
83 st->q.push(detail::run_loop_state::item_type(when, scbl));
85 if (need_earlier_wakeup_notification) st->notify_earlier_wakeup(when);
91 std::weak_ptr<detail::run_loop_state> state;
102 virtual clock_type::time_point
now()
const {
103 return clock_type::now();
107 auto lifetime = state.lock()->lifetime;
108 auto token = lifetime.
add(cs);
109 cs.
add([=](){lifetime.remove(token);});
114 return std::make_shared<run_loop_worker>(state);
127 typedef detail::action_queue queue_type;
129 typedef detail::run_loop_state::item_type item_type;
130 typedef detail::run_loop_state::const_reference_item_type const_reference_item_type;
132 std::shared_ptr<detail::run_loop_state> state;
133 std::shared_ptr<run_loop_scheduler> sc;
138 : state(std::make_shared<detail::run_loop_state>())
143 queue_type::ensure(sc->create_worker_interface());
147 state->lifetime.unsubscribe();
149 std::unique_lock<std::mutex> guard(state->lock);
152 queue_type::destroy();
154 auto expired = std::move(state->q);
155 if (!state->q.empty()) std::terminate();
158 clock_type::time_point
now()
const {
159 return clock_type::now();
163 return state->lifetime;
167 return state->q.empty();
170 const_reference_item_type
peek()
const {
171 return state->q.top();
175 std::unique_lock<std::mutex> guard(state->lock);
176 if (state->q.empty()) {
179 auto&
peek = state->q.top();
180 if (!
peek.what.is_subscribed()) {
184 if (clock_type::now() <
peek.when) {
189 state->r.reset(state->q.empty());
191 what(state->r.get_recurse());
199 std::unique_lock<std::mutex> guard(state->lock);
200 state->notify_earlier_wakeup = f;