27 #if !defined(RXCPP_OPERATORS_RX_SKIP_UNTIL_HPP)
28 #define RXCPP_OPERATORS_RX_SKIP_UNTIL_HPP
30 #include "../rx-includes.hpp"
39 struct skip_until_invalid_arguments {};
42 struct skip_until_invalid :
public rxo::operator_base<skip_until_invalid_arguments<AN...>> {
43 using type = observable<skip_until_invalid_arguments<
AN...>, skip_until_invalid<
AN...>>;
46 using skip_until_invalid_t =
typename skip_until_invalid<
AN...>::type;
48 template<
class T,
class Observable,
class TriggerObservable,
class Coordination>
51 typedef rxu::decay_t<Observable> source_type;
52 typedef rxu::decay_t<TriggerObservable> trigger_source_type;
53 typedef rxu::decay_t<Coordination> coordination_type;
54 typedef typename coordination_type::coordinator_type coordinator_type;
57 values(source_type s, trigger_source_type t, coordination_type sf)
58 : source(std::move(s))
59 , trigger(std::move(t))
60 , coordination(std::move(sf))
64 trigger_source_type trigger;
65 coordination_type coordination;
69 skip_until(source_type s, trigger_source_type t, coordination_type sf)
70 : initial(std::move(s), std::move(t), std::move(sf))
85 template<
class Subscriber>
86 void on_subscribe(Subscriber s)
const {
88 typedef Subscriber output_type;
90 :
public std::enable_shared_from_this<state_type>
93 state_type(
const values& i, coordinator_type coor,
const output_type& oarg)
95 , mode_value(mode::skipping)
96 , coordinator(std::move(coor))
99 out.add(trigger_lifetime);
100 out.add(source_lifetime);
102 typename mode::type mode_value;
103 composite_subscription trigger_lifetime;
104 composite_subscription source_lifetime;
105 coordinator_type coordinator;
109 auto coordinator = initial.coordination.create_coordinator();
112 auto state = std::make_shared<state_type>(initial, std::move(coordinator), std::move(s));
115 [&](){
return state->coordinator.in(state->trigger);},
117 if (trigger.empty()) {
122 [&](){
return state->coordinator.in(state->source);},
124 if (source.empty()) {
128 auto sinkTrigger = make_subscriber<typename trigger_source_type::value_type>(
132 state->trigger_lifetime,
134 [state](
const typename trigger_source_type::value_type&) {
135 if (state->mode_value != mode::skipping) {
138 state->mode_value = mode::triggered;
139 state->trigger_lifetime.unsubscribe();
143 if (state->mode_value != mode::skipping) {
146 state->mode_value = mode::errored;
147 state->out.on_error(e);
151 if (state->mode_value != mode::skipping) {
154 state->mode_value = mode::clear;
155 state->trigger_lifetime.unsubscribe();
159 [&](){
return state->coordinator.out(sinkTrigger);},
161 if (selectedSinkTrigger.empty()) {
164 trigger->subscribe(std::move(selectedSinkTrigger.get()));
166 source.get().subscribe(
168 state->source_lifetime,
171 if (state->mode_value != mode::triggered) {
174 state->out.on_next(t);
178 if (state->mode_value > mode::triggered) {
181 state->mode_value = mode::errored;
182 state->out.on_error(e);
186 if (state->mode_value != mode::triggered) {
189 state->mode_value = mode::stopped;
190 state->out.on_completed();
200 template<
class...
AN>
211 template<
class Observable,
class TimePoint,
214 std::is_convertible<TimePoint, rxsc::scheduler::clock_type::time_point>>,
219 class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, TriggerObservable,
identity_one_worker>,
222 static Result
member(Observable&& o, TimePoint&& when) {
224 return Result(SkipUntil(std::forward<Observable>(o),
rxs::timer(std::forward<TimePoint>(when), cn), cn));
227 template<
class Observable,
class TimePoint,
class Coordination,
231 std::is_convertible<TimePoint, rxsc::scheduler::clock_type::time_point>>,
236 class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, TriggerObservable,
rxu::decay_t<Coordination>>,
239 static Result
member(Observable&& o, TimePoint&& when, Coordination cn) {
240 return Result(SkipUntil(std::forward<Observable>(o),
rxs::timer(std::forward<TimePoint>(when), cn), cn));
243 template<
class Observable,
class TriggerObservable,
250 static Result
member(Observable&& o, TriggerObservable&& t) {
251 return Result(SkipUntil(std::forward<Observable>(o), std::forward<TriggerObservable>(t),
identity_current_thread()));
254 template<
class Observable,
class TriggerObservable,
class Coordination,
262 static Result
member(Observable&& o, TriggerObservable&& t, Coordination&& cn) {
263 return Result(SkipUntil(std::forward<Observable>(o), std::forward<TriggerObservable>(t), std::forward<Coordination>(cn)));
266 template<
class...
AN>
267 static operators::detail::skip_until_invalid_t<
AN...>
member(
AN...) {
270 static_assert(
sizeof...(
AN) == 10000,
"skip_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination)");