RxCpp
The Reactive Extensions for Native (RxCpp) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in both C and C++.
Public Types | Public Member Functions | Public Attributes | Friends | List of all members
rxcpp::observable< T, SourceOperator > Class Template Reference

a source of values. subscribe or use one of the operator methods that return a new observable, which uses this observable as a source. More...

#include <rx-observable.hpp>

Inheritance diagram for rxcpp::observable< T, SourceOperator >:
Inheritance graph
[legend]
Collaboration diagram for rxcpp::observable< T, SourceOperator >:
Collaboration graph
[legend]

Public Types

typedef rxu::decay_t< SourceOperator > source_operator_type
 
typedef T value_type
 
- Public Types inherited from rxcpp::observable_base< T >
typedef tag_observable observable_tag
 
typedef T value_type
 

Public Member Functions

 ~observable ()
 
 observable ()
 
 observable (const source_operator_type &o)
 
 observable (source_operator_type &&o)
 
template<class SO >
 observable (const observable< T, SO > &o)
 implicit conversion between observables of the same value_type More...
 
template<class SO >
 observable (observable< T, SO > &&o)
 implicit conversion between observables of the same value_type More...
 
template<class... AN>
observable< T > as_dynamic (AN **...) const
 
template<class... AN>
blocking_observable< T, this_typeas_blocking (AN **...) const
 
template<class... ArgN>
auto subscribe (ArgN &&... an) const -> composite_subscription
 
template<class... AN>
auto all (AN &&... an) const
 
template<class... AN>
auto is_empty (AN &&... an) const
 Returns an Observable that emits true if the source Observable is empty, otherwise false. More...
 
template<class... AN>
auto any (AN &&... an) const
 
template<class... AN>
auto exists (AN &&... an) const
 Returns an Observable that emits true if any item emitted by the source Observable satisfies a specified condition, otherwise false. Emits false if the source Observable terminates without emitting any item. More...
 
template<class... AN>
auto contains (AN &&... an) const
 Returns an Observable that emits true if the source Observable emitted a specified item, otherwise false. Emits false if the source Observable terminates without emitting any item. More...
 
template<class... AN>
auto filter (AN &&... an) const
 
template<class... AN>
auto switch_if_empty (AN &&... an) const
 
template<class... AN>
auto default_if_empty (AN &&... an) const
 If the source Observable terminates without emitting any items, emits a default item and completes. More...
 
template<class... AN>
auto sequence_equal (AN... an) const
 
template<class... AN>
auto tap (AN &&... an) const
 
template<class... AN>
auto time_interval (AN &&... an) const
 
template<class... AN>
auto timeout (AN &&... an) const
 
template<class... AN>
auto timestamp (AN &&... an) const
 
template<class... AN>
auto finally (AN &&... an) const
 
template<class... AN>
auto on_error_resume_next (AN &&... an) const
 
template<class... AN>
auto switch_on_error (AN &&... an) const
 
template<class... AN>
auto map (AN &&... an) const
 
template<class... AN>
auto transform (AN &&... an) const
 
template<class... AN>
auto debounce (AN &&... an) const
 
template<class... AN>
auto delay (AN &&... an) const
 
template<class... AN>
auto distinct (AN &&... an) const
 
template<class... AN>
auto distinct_until_changed (AN &&... an) const
 
template<class... AN>
auto element_at (AN &&... an) const
 
template<class... AN>
auto window (AN &&... an) const
 
template<class... AN>
auto window_with_time (AN &&... an) const
 
template<class... AN>
auto window_with_time_or_count (AN &&... an) const
 
template<class... AN>
auto window_toggle (AN &&... an) const
 
template<class... AN>
auto buffer (AN &&... an) const
 
template<class... AN>
auto buffer_with_time (AN &&... an) const
 
template<class... AN>
auto buffer_with_time_or_count (AN &&... an) const
 
template<class... AN>
auto switch_on_next (AN &&... an) const
 
template<class... AN>
auto merge (AN... an) const
 
template<class... AN>
auto merge_delay_error (AN... an) const
 
template<class... AN>
auto amb (AN... an) const
 
template<class... AN>
auto flat_map (AN &&... an) const
 
template<class... AN>
auto merge_transform (AN &&... an) const
 
template<class... AN>
auto concat (AN... an) const
 
template<class... AN>
auto concat_map (AN &&... an) const
 
template<class... AN>
auto concat_transform (AN &&... an) const
 
template<class... AN>
auto with_latest_from (AN... an) const
 
template<class... AN>
auto combine_latest (AN... an) const
 
template<class... AN>
auto zip (AN &&... an) const
 
template<class... AN>
auto group_by (AN &&... an) const
 
template<class... AN>
auto ignore_elements (AN &&... an) const
 
template<class... AN>
auto multicast (AN &&... an) const
 
template<class... AN>
auto publish (AN &&... an) const
 
template<class... AN>
auto publish_synchronized (AN &&... an) const
 Turn a cold observable hot and allow connections to the source to be independent of subscriptions. More...
 
template<class... AN>
auto replay (AN &&... an) const
 
template<class... AN>
auto subscribe_on (AN &&... an) const
 
template<class... AN>
auto observe_on (AN &&... an) const
 
template<class... AN>
auto reduce (AN &&... an) const
 
template<class... AN>
auto accumulate (AN &&... an) const
 
template<class... AN>
auto first (AN **...) const
 For each item from this observable reduce it by sending only the first item. More...
 
template<class... AN>
auto last (AN **...) const
 For each item from this observable reduce it by sending only the last item. More...
 
template<class... AN>
auto count (AN **...) const
 For each item from this observable reduce it by incrementing a count. More...
 
template<class... AN>
auto sum (AN **...) const
 For each item from this observable reduce it by adding to the previous items. More...
 
template<class... AN>
auto average (AN **...) const
 For each item from this observable reduce it by adding to the previous values and then dividing by the number of items at the end. More...
 
template<class... AN>
auto max (AN **...) const
 For each item from this observable reduce it by taking the max value of the previous items. More...
 
template<class... AN>
auto min (AN **...) const
 For each item from this observable reduce it by taking the min value of the previous items. More...
 
template<class... AN>
auto scan (AN... an) const
 
template<class... AN>
auto sample_with_time (AN &&... an) const
 
template<class... AN>
auto skip (AN... an) const
 
template<class... AN>
auto skip_while (AN... an) const
 
template<class... AN>
auto skip_last (AN... an) const
 
template<class... AN>
auto skip_until (AN... an) const
 
template<class... AN>
auto take (AN... an) const
 
template<class... AN>
auto take_last (AN &&... an) const
 
template<class... AN>
auto take_until (AN &&... an) const
 
template<class... AN>
auto take_while (AN &&... an) const
 
template<class... AN>
auto repeat (AN... an) const
 
template<class... AN>
auto retry (AN... an) const
 
template<class... AN>
auto start_with (AN... an) const
 
template<class... AN>
auto pairwise (AN... an) const
 

Public Attributes

source_operator_type source_operator
 

Friends

template<class U , class SO >
class observable
 
template<class U , class SO >
bool operator== (const observable< U, SO > &, const observable< U, SO > &)
 

Detailed Description

template<class T, class SourceOperator>
class rxcpp::observable< T, SourceOperator >

a source of values. subscribe or use one of the operator methods that return a new observable, which uses this observable as a source.

Some code
This sample will observable::subscribe() to values from a observable<void, void>::range().
Sample Code\n
auto values1 = rxcpp::observable<>::range(1, 5);
values1.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnNext: 5
OnCompleted

Member Typedef Documentation

◆ source_operator_type

template<class T , class SourceOperator >
typedef rxu::decay_t<SourceOperator> rxcpp::observable< T, SourceOperator >::source_operator_type

◆ value_type

template<class T , class SourceOperator >
typedef T rxcpp::observable< T, SourceOperator >::value_type

Constructor & Destructor Documentation

◆ ~observable()

template<class T , class SourceOperator >
rxcpp::observable< T, SourceOperator >::~observable ( )
inline

◆ observable() [1/5]

template<class T , class SourceOperator >
rxcpp::observable< T, SourceOperator >::observable ( )
inline

◆ observable() [2/5]

template<class T , class SourceOperator >
rxcpp::observable< T, SourceOperator >::observable ( const source_operator_type o)
inlineexplicit

◆ observable() [3/5]

template<class T , class SourceOperator >
rxcpp::observable< T, SourceOperator >::observable ( source_operator_type &&  o)
inlineexplicit

◆ observable() [4/5]

template<class T , class SourceOperator >
template<class SO >
rxcpp::observable< T, SourceOperator >::observable ( const observable< T, SO > &  o)
inline

implicit conversion between observables of the same value_type

◆ observable() [5/5]

template<class T , class SourceOperator >
template<class SO >
rxcpp::observable< T, SourceOperator >::observable ( observable< T, SO > &&  o)
inline

implicit conversion between observables of the same value_type

Member Function Documentation

◆ accumulate()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::accumulate ( AN &&...  an) const
inline

◆ all()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::all ( AN &&...  an) const
inline

◆ amb()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::amb ( AN...  an) const
inline

◆ any()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::any ( AN &&...  an) const
inline

◆ as_blocking()

template<class T , class SourceOperator >
template<class... AN>
blocking_observable<T, this_type> rxcpp::observable< T, SourceOperator >::as_blocking ( AN **  ...) const
inline

Return a new observable that contains the blocking methods for this observable.

Returns
An observable that contains the blocking methods for this observable.
Sample Code\n
printf("[thread %s] Start task\n", get_pid().c_str());
auto values = rxcpp::observable<>::from(rxcpp::observe_on_new_thread(), 1, 2, 3).map([](int v){
printf("[thread %s] Emit value: %d\n", get_pid().c_str(), v);
return v;
});
values.
[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
printf("[thread %s] Finish task\n", get_pid().c_str());
[thread 3070107664] Start task
[thread 3015701568] Emit value: 1
[thread 3015701568] OnNext: 1
[thread 3015701568] Emit value: 2
[thread 3015701568] OnNext: 2
[thread 3015701568] Emit value: 3
[thread 3015701568] OnNext: 3
[thread 3015701568] OnCompleted
[thread 3070107664] Finish task

◆ as_dynamic()

template<class T , class SourceOperator >
template<class... AN>
observable<T> rxcpp::observable< T, SourceOperator >::as_dynamic ( AN **  ...) const
inline

Return a new observable that performs type-forgetting conversion of this observable.

Returns
The source observable converted to observable<T>.
Note
This operator could be useful to workaround lambda deduction bug on msvc 2013.
Sample Code\n
auto o1 = rxcpp::observable<>::range(1, 3);
auto values = o1.concat(o2, o3);
printf("type of o1: %s\n", typeid(o1).name());
printf("type of o1.as_dynamic(): %s\n", typeid(o1.as_dynamic()).name());
printf("type of o2: %s\n", typeid(o2).name());
printf("type of o2.as_dynamic(): %s\n", typeid(o2.as_dynamic()).name());
printf("type of o3: %s\n", typeid(o3).name());
printf("type of o3.as_dynamic(): %s\n", typeid(o3.as_dynamic()).name());
printf("type of values: %s\n", typeid(values).name());
printf("type of values.as_dynamic(): %s\n", typeid(values.as_dynamic()).name());
type of o1: N5rxcpp10observableIiNS_7sources6detail5rangeIiNS_19identity_one_workerEEEEE
type of o1.as_dynamic(): N5rxcpp10observableIiNS_18dynamic_observableIiEEEE
type of o2: N5rxcpp10observableIiNS_7sources6detail7iterateISt5arrayIiLj1EENS_19identity_one_workerEEEEE
type of o2.as_dynamic(): N5rxcpp10observableIiNS_18dynamic_observableIiEEEE
type of o3: N5rxcpp10observableIiNS_7sources6detail7iterateISt16initializer_listIiENS_19identity_one_workerEEEEE
type of o3.as_dynamic(): N5rxcpp10observableIiNS_18dynamic_observableIiEEEE
type of values: N5rxcpp10observableIiNS_9operators6detail6concatINS0_IiNS_18dynamic_observableIiEEEENS0_IS6_NS4_IS6_EEEENS_19identity_one_workerEEEEE
type of values.as_dynamic(): N5rxcpp10observableIiNS_18dynamic_observableIiEEEE

◆ average()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::average ( AN **  ...) const
inline

For each item from this observable reduce it by adding to the previous values and then dividing by the number of items at the end.

Returns
An observable that emits a single item: the average of elements emitted by the source observable.
Sample Code\n
auto values = rxcpp::observable<>::range(1, 4).average();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 2.500000
OnCompleted
When the source observable completes without emitting any items:
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: average() requires a stream with at least one value
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 4).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
catch (const std::runtime_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Error from source

◆ buffer()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::buffer ( AN &&...  an) const
inline

◆ buffer_with_time()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::buffer_with_time ( AN &&...  an) const
inline

◆ buffer_with_time_or_count()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::buffer_with_time_or_count ( AN &&...  an) const
inline

◆ combine_latest()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::combine_latest ( AN...  an) const
inline

◆ concat()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::concat ( AN...  an) const
inline

◆ concat_map()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::concat_map ( AN &&...  an) const
inline

◆ concat_transform()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::concat_transform ( AN &&...  an) const
inline

◆ contains()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::contains ( AN &&...  an) const
inline

Returns an Observable that emits true if the source Observable emitted a specified item, otherwise false. Emits false if the source Observable terminates without emitting any item.

Template Parameters
Tthe type of the item to search for.
Parameters
valuethe item to search for.
Returns
An observable that emits true if the source Observable emitted a specified item, otherwise false.
Sample Code\n
auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).contains(3);
values.
[](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); },
[]() { printf("OnCompleted\n"); });
OnNext: true
OnCompleted

◆ count()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::count ( AN **  ...) const
inline

For each item from this observable reduce it by incrementing a count.

Returns
An observable that emits a single item: the number of elements emitted by the source observable.
Sample Code\n
auto values = rxcpp::observable<>::range(1, 3).count();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3
OnCompleted
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 3).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
count();
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
catch (const std::runtime_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Error from source

◆ debounce()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::debounce ( AN &&...  an) const
inline

◆ default_if_empty()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::default_if_empty ( AN &&...  an) const
inline

If the source Observable terminates without emitting any items, emits a default item and completes.

Template Parameters
Valuethe type of the value to emit.
Parameters
vthe default value to emit.
Returns
Observable that emits the specified default item if the source observable is empty.
Sample Code\n
values.subscribe(
[](int v) { printf("OnNext: %d\n", v); },
[]() { printf("OnCompleted\n"); } );
OnNext: 42
OnCompleted

◆ delay()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::delay ( AN &&...  an) const
inline

◆ distinct()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::distinct ( AN &&...  an) const
inline

◆ distinct_until_changed()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::distinct_until_changed ( AN &&...  an) const
inline

◆ element_at()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::element_at ( AN &&...  an) const
inline

◆ exists()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::exists ( AN &&...  an) const
inline

Returns an Observable that emits true if any item emitted by the source Observable satisfies a specified condition, otherwise false. Emits false if the source Observable terminates without emitting any item.

Template Parameters
Predicatethe type of the test function.
Parameters
pthe test function to test items emitted by the source Observable.
Returns
An observable that emits true if any item emitted by the source observable satisfies a specified condition, otherwise false.
Sample Code\n
auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).exists([](int n) { return n > 3; });
values.
[](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); },
[]() { printf("OnCompleted\n"); });
OnNext: true
OnCompleted

◆ filter()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::filter ( AN &&...  an) const
inline

◆ finally()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::finally ( AN &&...  an) const
inline

◆ first()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::first ( AN **  ...) const
inline

For each item from this observable reduce it by sending only the first item.

Returns
An observable that emits only the very first item emitted by the source observable.
Sample Code\n
auto values = rxcpp::observable<>::range(1, 3).first();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1
OnCompleted
When the source observable calls on_error:
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: first() requires a stream with at least one value

◆ flat_map()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::flat_map ( AN &&...  an) const
inline

◆ group_by()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::group_by ( AN &&...  an) const
inline

◆ ignore_elements()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::ignore_elements ( AN &&...  an) const
inline

◆ is_empty()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::is_empty ( AN &&...  an) const
inline

Returns an Observable that emits true if the source Observable is empty, otherwise false.

Returns
An observable that emits a boolean value.
Sample Code\n
auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).is_empty();
values.
[](bool v) { printf("OnNext: %s\n", v ? "true" : "false"); },
[]() { printf("OnCompleted\n"); });

◆ last()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::last ( AN **  ...) const
inline

For each item from this observable reduce it by sending only the last item.

Returns
An observable that emits only the very last item emitted by the source observable.
Sample Code\n
auto values = rxcpp::observable<>::range(1, 3).last();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 3
OnCompleted
When the source observable calls on_error:
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: last() requires a stream with at least one value

◆ map()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::map ( AN &&...  an) const
inline

◆ max()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::max ( AN **  ...) const
inline

For each item from this observable reduce it by taking the max value of the previous items.

Returns
An observable that emits a single item: the max of elements emitted by the source observable.
Sample Code\n
auto values = rxcpp::observable<>::range(1, 4).max();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 4.000000
OnCompleted
When the source observable completes without emitting any items:
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: max() requires a stream with at least one value
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 4).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
max();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
catch (const std::runtime_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Error from source

◆ merge()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::merge ( AN...  an) const
inline

◆ merge_delay_error()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::merge_delay_error ( AN...  an) const
inline

◆ merge_transform()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::merge_transform ( AN &&...  an) const
inline

◆ min()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::min ( AN **  ...) const
inline

For each item from this observable reduce it by taking the min value of the previous items.

Returns
An observable that emits a single item: the min of elements emitted by the source observable.
Sample Code\n
auto values = rxcpp::observable<>::range(1, 4).min();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 1.000000
OnCompleted
When the source observable completes without emitting any items:
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: min() requires a stream with at least one value
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 4).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
min();
values.
[](double v){printf("OnNext: %lf\n", v);},
[](std::exception_ptr ep){
catch (const std::runtime_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Error from source

◆ multicast()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::multicast ( AN &&...  an) const
inline

◆ observe_on()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::observe_on ( AN &&...  an) const
inline

◆ on_error_resume_next()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::on_error_resume_next ( AN &&...  an) const
inline

◆ pairwise()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::pairwise ( AN...  an) const
inline

◆ publish()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::publish ( AN &&...  an) const
inline

◆ publish_synchronized()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::publish_synchronized ( AN &&...  an) const
inline

Turn a cold observable hot and allow connections to the source to be independent of subscriptions.

Template Parameters
Coordinationthe type of the scheduler.
Parameters
cna scheduler all values are queued and delivered on.
csthe subscription to control lifetime (optional).
Returns
rxcpp::connectable_observable that upon connection causes the source observable to emit items to its observers, on the specified scheduler.
Sample Code\n
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
take(5).
// Subscribe from the beginning
values.subscribe(
[](long v){printf("[1] OnNext: %ld\n", v);},
[](){printf("[1] OnCompleted\n");});
// Another subscription from the beginning
values.subscribe(
[](long v){printf("[2] OnNext: %ld\n", v);},
[](){printf("[2] OnCompleted\n");});
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){
values.subscribe(
[](long v){printf("[3] OnNext: %ld\n", v);},
[](){printf("[3] OnCompleted\n");});
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
[1] OnNext: 1
[2] OnNext: 1
[1] OnNext: 2
[2] OnNext: 2
[1] OnNext: 3
[2] OnNext: 3
[1] OnNext: 4
[2] OnNext: 4
[1] OnNext: 5
[2] OnNext: 5
[1] OnCompleted
[2] OnCompleted
[3] OnCompleted

◆ reduce()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::reduce ( AN &&...  an) const
inline

◆ repeat()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::repeat ( AN...  an) const
inline

◆ replay()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::replay ( AN &&...  an) const
inline

◆ retry()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::retry ( AN...  an) const
inline

◆ sample_with_time()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::sample_with_time ( AN &&...  an) const
inline

◆ scan()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::scan ( AN...  an) const
inline

◆ sequence_equal()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::sequence_equal ( AN...  an) const
inline

◆ skip()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::skip ( AN...  an) const
inline

◆ skip_last()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::skip_last ( AN...  an) const
inline

◆ skip_until()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::skip_until ( AN...  an) const
inline

◆ skip_while()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::skip_while ( AN...  an) const
inline

◆ start_with()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::start_with ( AN...  an) const
inline

◆ subscribe()

template<class T , class SourceOperator >
template<class... ArgN>
auto rxcpp::observable< T, SourceOperator >::subscribe ( ArgN &&...  an) const -> composite_subscription
inline

◆ subscribe_on()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::subscribe_on ( AN &&...  an) const
inline

◆ sum()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::sum ( AN **  ...) const
inline

For each item from this observable reduce it by adding to the previous items.

Returns
An observable that emits a single item: the sum of elements emitted by the source observable.
Sample Code\n
auto values = rxcpp::observable<>::range(1, 3).sum();
values.
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
OnNext: 6
OnCompleted
When the source observable completes without emitting any items:
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
catch (const rxcpp::empty_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: sum() requires a stream with at least one value
When the source observable calls on_error:
auto values = rxcpp::observable<>::range(1, 3).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
sum();
values.
[](int v){printf("OnNext: %d\n", v);},
[](std::exception_ptr ep){
catch (const std::runtime_error& ex) {
printf("OnError: %s\n", ex.what());
}
},
[](){printf("OnCompleted\n");});
OnError: Error from source

◆ switch_if_empty()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::switch_if_empty ( AN &&...  an) const
inline

◆ switch_on_error()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::switch_on_error ( AN &&...  an) const
inline

◆ switch_on_next()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::switch_on_next ( AN &&...  an) const
inline

◆ take()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::take ( AN...  an) const
inline

◆ take_last()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::take_last ( AN &&...  an) const
inline

◆ take_until()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::take_until ( AN &&...  an) const
inline

◆ take_while()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::take_while ( AN &&...  an) const
inline

◆ tap()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::tap ( AN &&...  an) const
inline

◆ time_interval()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::time_interval ( AN &&...  an) const
inline

◆ timeout()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::timeout ( AN &&...  an) const
inline

◆ timestamp()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::timestamp ( AN &&...  an) const
inline

◆ transform()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::transform ( AN &&...  an) const
inline

◆ window()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::window ( AN &&...  an) const
inline

◆ window_toggle()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::window_toggle ( AN &&...  an) const
inline

◆ window_with_time()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::window_with_time ( AN &&...  an) const
inline

◆ window_with_time_or_count()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::window_with_time_or_count ( AN &&...  an) const
inline

◆ with_latest_from()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::with_latest_from ( AN...  an) const
inline

◆ zip()

template<class T , class SourceOperator >
template<class... AN>
auto rxcpp::observable< T, SourceOperator >::zip ( AN &&...  an) const
inline

Friends And Related Function Documentation

◆ observable

template<class T , class SourceOperator >
template<class U , class SO >
friend class observable
friend

◆ operator==

template<class T , class SourceOperator >
template<class U , class SO >
bool operator== ( const observable< U, SO > &  ,
const observable< U, SO > &   
)
friend

Member Data Documentation

◆ source_operator

template<class T , class SourceOperator >
source_operator_type rxcpp::observable< T, SourceOperator >::source_operator
mutable

The documentation for this class was generated from the following file:
rxcpp::sources::timer
auto timer(TimePointOrDuration when) -> typename std::enable_if< detail::defer_timer< TimePointOrDuration, identity_one_worker >::value, typename detail::defer_timer< TimePointOrDuration, identity_one_worker >::observable_type >::type
Definition: rx-timer.hpp:114
rxcpp::empty_error
Definition: rx-operators.hpp:289
rxcpp::observable::as_blocking
blocking_observable< T, this_type > as_blocking(AN **...) const
Definition: rx-observable.hpp:580
rxcpp::sources::range
auto range(T first=0, T last=std::numeric_limits< T >::max(), std::ptrdiff_t step=1) -> observable< T, detail::range< T, identity_one_worker >>
Definition: rx-range.hpp:119
rxcpp::observable::take
auto take(AN... an) const
Definition: rx-observable.hpp:1368
rxcpp::observe_on_new_thread
observe_on_one_worker observe_on_new_thread()
Definition: rx-observe_on.hpp:328
rxcpp::observable::sum
auto sum(AN **...) const
For each item from this observable reduce it by adding to the previous items.
Definition: rx-observable.hpp:1254
cpplinq::from
linq_driver< iter_cursor< typename util::container_traits< TContainer >::iterator > > from(TContainer &c)
Definition: linq.hpp:556
rxcpp::observable::count
auto count(AN **...) const
For each item from this observable reduce it by incrementing a count.
Definition: rx-observable.hpp:1242
rxcpp::observable::last
auto last(AN **...) const
For each item from this observable reduce it by sending only the last item.
Definition: rx-observable.hpp:1230
rxcpp::util::rethrow_exception
RXCPP_NORETURN void rethrow_exception(error_ptr e)
Definition: rx-util.hpp:902
rxcpp::sources::interval
auto interval(Duration period) -> typename std::enable_if< detail::defer_interval< Duration, identity_one_worker >::value, typename detail::defer_interval< Duration, identity_one_worker >::observable_type >::type
Definition: rx-interval.hpp:113
rxcpp::observable::max
auto max(AN **...) const
For each item from this observable reduce it by taking the max value of the previous items.
Definition: rx-observable.hpp:1278
rxcpp::observable::subscribe
auto subscribe(ArgN &&... an) const -> composite_subscription
Definition: rx-observable.hpp:637
rxcpp::observable::default_if_empty
auto default_if_empty(AN &&... an) const
If the source Observable terminates without emitting any items, emits a default item and completes.
Definition: rx-observable.hpp:722
rxcpp::observable::first
auto first(AN **...) const
For each item from this observable reduce it by sending only the first item.
Definition: rx-observable.hpp:1218
rxcpp::observable::concat
auto concat(AN... an) const
Definition: rx-observable.hpp:1041
rxcpp::observable
a source of values. subscribe or use one of the operator methods that return a new observable,...
Definition: rx-observable.hpp:478
rxcpp::observable::average
auto average(AN **...) const
For each item from this observable reduce it by adding to the previous values and then dividing by th...
Definition: rx-observable.hpp:1266
rxcpp::sources::just
auto just(Value0 v0) -> typename std::enable_if<!is_coordination< Value0 >::value, decltype(iterate(*(std::array< Value0, 1 > *) nullptr, identity_immediate()))>::type
Definition: rx-iterate.hpp:267
rxcpp::observable::min
auto min(AN **...) const
For each item from this observable reduce it by taking the min value of the previous items.
Definition: rx-observable.hpp:1290
rxcpp::observable::publish_synchronized
auto publish_synchronized(AN &&... an) const
Turn a cold observable hot and allow connections to the source to be independent of subscriptions.
Definition: rx-observable.hpp:1152