15 #if !defined(RXCPP_RX_COROUTINE_HPP)
16 #define RXCPP_RX_COROUTINE_HPP
20 #ifdef _RESUMABLE_FUNCTIONS_SUPPORTED
24 #include <experimental/resumable>
30 using namespace std::chrono;
31 using namespace std::experimental;
33 template<
typename Source>
34 struct co_observable_iterator;
36 template<
typename Source>
37 struct co_observable_iterator_state : std::enable_shared_from_this<co_observable_iterator_state<Source>>
39 using value_type =
typename Source::value_type;
41 ~co_observable_iterator_state() {
42 lifetime.unsubscribe();
44 explicit co_observable_iterator_state(
const Source& o) : o(o) {}
46 coroutine_handle<> caller{};
47 composite_subscription lifetime{};
48 const value_type* value{
nullptr};
49 exception_ptr
error{
nullptr};
53 template<
typename Source>
54 struct co_observable_inc_awaiter
60 bool await_suspend(coroutine_handle<> handle) {
61 if (!state->lifetime.is_subscribed()) {
return false;}
62 state->caller = handle;
66 co_observable_iterator<Source> await_resume();
68 shared_ptr<co_observable_iterator_state<Source>> state;
71 template<
typename Source>
72 struct co_observable_iterator :
public iterator<input_iterator_tag, typename Source::value_type>
74 using value_type =
typename Source::value_type;
76 co_observable_iterator() {}
78 explicit co_observable_iterator(
const Source& o) : state(make_shared<co_observable_iterator_state<Source>>(o)) {}
79 explicit co_observable_iterator(
const shared_ptr<co_observable_iterator_state<Source>>& o) : state(o) {}
81 co_observable_iterator(co_observable_iterator&&)=
default;
82 co_observable_iterator& operator=(co_observable_iterator&&)=
default;
84 co_observable_inc_awaiter<Source> operator++()
86 return co_observable_inc_awaiter<Source>{state};
89 co_observable_iterator& operator++(
int) =
delete;
92 bool operator==(co_observable_iterator
const &rhs)
const
94 return !!state && !rhs.state && !state->lifetime.is_subscribed();
97 bool operator!=(co_observable_iterator
const &rhs)
const
99 return !(*
this == rhs);
102 value_type
const &operator*()
const
104 return *(state->value);
107 value_type
const *operator->()
const
109 return std::addressof(
operator*());
112 shared_ptr<co_observable_iterator_state<Source>> state;
115 template<
typename Source>
116 co_observable_iterator<Source> co_observable_inc_awaiter<Source>::await_resume() {
118 return co_observable_iterator<Source>{state};
121 template<
typename Source>
122 struct co_observable_iterator_awaiter
124 using iterator=co_observable_iterator<Source>;
125 using value_type=
typename iterator::value_type;
127 explicit co_observable_iterator_awaiter(
const Source& o) : it(o) {
134 void await_suspend(coroutine_handle<> handle) {
135 weak_ptr<co_observable_iterator_state<Source>> wst=it.state;
136 it.state->caller = handle;
139 auto st = wst.lock();
140 if (st && !!st->caller) {
141 auto caller = st->caller;
142 st->caller = nullptr;
146 rxo::subscribe<value_type>(
149 [wst](
const value_type& v){
150 auto st = wst.lock();
151 if (!st || !st->caller) {terminate();}
152 st->value = addressof(v);
153 auto caller = st->caller;
154 st->caller =
nullptr;
158 [wst](exception_ptr e){
159 auto st = wst.lock();
160 if (!st || !st->caller) {terminate();}
162 auto caller = st->caller;
163 st->caller =
nullptr;
168 iterator await_resume() {
170 return std::move(it);
182 template<
typename T,
typename SourceOperator>
184 -> rxcpp::coroutine::co_observable_iterator_awaiter<rxcpp::observable<T, SourceOperator>> {
185 return rxcpp::coroutine::co_observable_iterator_awaiter<rxcpp::observable<T, SourceOperator>>{o};
188 template<
typename T,
typename SourceOperator>
190 -> rxcpp::coroutine::co_observable_iterator<rxcpp::observable<T, SourceOperator>> {
191 return rxcpp::coroutine::co_observable_iterator<rxcpp::observable<T, SourceOperator>>{};