39 #if !defined(RXCPP_OPERATORS_RX_AMB_HPP)
40 #define RXCPP_OPERATORS_RX_AMB_HPP
42 #include "../rx-includes.hpp"
51 struct amb_invalid_arguments {};
54 struct amb_invalid :
public rxo::operator_base<amb_invalid_arguments<AN...>> {
55 using type = observable<amb_invalid_arguments<
AN...>, amb_invalid<
AN...>>;
58 using amb_invalid_t =
typename amb_invalid<
AN...>::type;
60 template<
class T,
class Observable,
class Coordination>
62 :
public operator_base<rxu::value_type_t<T>>
67 typedef amb<T, Observable, Coordination> this_type;
69 typedef rxu::decay_t<T> source_value_type;
70 typedef rxu::decay_t<Observable> source_type;
72 typedef typename source_type::source_operator_type source_operator_type;
73 typedef typename source_value_type::value_type value_type;
75 typedef rxu::decay_t<Coordination> coordination_type;
76 typedef typename coordination_type::coordinator_type coordinator_type;
80 values(source_operator_type o, coordination_type sf)
81 : source_operator(std::move(o))
82 , coordination(std::move(sf))
85 source_operator_type source_operator;
86 coordination_type coordination;
90 amb(
const source_type& o, coordination_type sf)
91 : initial(o.source_operator, std::move(sf))
95 template<
class Subscriber>
96 void on_subscribe(Subscriber scbr)
const {
99 typedef Subscriber output_type;
101 struct amb_state_type
102 :
public std::enable_shared_from_this<amb_state_type>
105 amb_state_type(values i, coordinator_type coor, output_type oarg)
107 , source(i.source_operator)
108 , coordinator(std::move(coor))
109 , out(std::move(oarg))
110 , pendingObservables(0)
111 , firstEmitted(
false)
114 observable<source_value_type, source_operator_type> source;
115 coordinator_type coordinator;
117 int pendingObservables;
119 std::vector<composite_subscription> innerSubscriptions;
122 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
125 auto state = std::make_shared<amb_state_type>(initial, std::move(coordinator), std::move(scbr));
127 composite_subscription outercs;
131 state->out.add(outercs);
134 [&](){
return state->coordinator.in(state->source);},
136 if (source.empty()) {
143 auto sink = make_subscriber<source_value_type>(
147 [state](source_value_type st) {
149 if (state->firstEmitted)
152 composite_subscription innercs;
154 state->innerSubscriptions.push_back(innercs);
158 auto innercstoken = state->out.add(innercs);
160 innercs.add(make_subscription([state, innercstoken](){
161 state->out.remove(innercstoken);
164 auto selectedSource = state->coordinator.in(st);
166 auto current_id = state->pendingObservables++;
170 auto sinkInner = make_subscriber<value_type>(
174 [state, st, current_id](value_type ct) {
175 state->out.on_next(std::move(ct));
176 if (!state->firstEmitted) {
177 state->firstEmitted = true;
178 auto do_unsubscribe = [](composite_subscription cs) {
181 std::for_each(state->innerSubscriptions.begin(), state->innerSubscriptions.begin() + current_id, do_unsubscribe);
182 std::for_each(state->innerSubscriptions.begin() + current_id + 1, state->innerSubscriptions.end(), do_unsubscribe);
187 state->out.on_error(e);
191 state->out.on_completed();
195 auto selectedSinkInner = state->coordinator.out(sinkInner);
196 selectedSource.subscribe(std::move(selectedSinkInner));
200 state->out.on_error(e);
204 if (state->pendingObservables == 0) {
205 state->out.on_completed();
210 [&](){
return state->coordinator.out(sink);},
212 if (selectedSink.empty()) {
215 source->subscribe(std::move(selectedSink.get()));
223 template<
class...
AN>
234 template<
class Observable,
246 template<
class Observable,
class Coordination,
255 static Result
member(Observable&& o, Coordination&& cn) {
256 return Result(Amb(std::forward<Observable>(o), std::forward<Coordination>(cn)));
259 template<
class Observable,
class Value0,
class... ValueN,
269 static Result
member(Observable&& o, Value0&& v0, ValueN&&... vn) {
273 template<
class Observable,
class Coordination,
class Value0,
class... ValueN,
284 static Result
member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
285 return Result(Amb(
rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
288 template<
class...
AN>
289 static operators::detail::amb_invalid_t<
AN...>
member(
AN...) {
292 static_assert(
sizeof...(
AN) == 10000,
"amb takes (optional Coordination, optional Value0, optional ValueN...)");