25 #if !defined(RXCPP_OPERATORS_RX_BUFFER_COUNT_HPP)
26 #define RXCPP_OPERATORS_RX_BUFFER_COUNT_HPP
28 #include "../rx-includes.hpp"
37 struct buffer_count_invalid_arguments {};
40 struct buffer_count_invalid :
public rxo::operator_base<buffer_count_invalid_arguments<AN...>> {
41 using type = observable<buffer_count_invalid_arguments<
AN...>, buffer_count_invalid<
AN...>>;
44 using buffer_count_invalid_t =
typename buffer_count_invalid<
AN...>::type;
49 typedef rxu::decay_t<T> source_value_type;
50 typedef std::vector<source_value_type> value_type;
52 struct buffer_count_values
54 buffer_count_values(
int c,
int s)
63 buffer_count_values initial;
70 template<
class Subscriber>
71 struct buffer_count_observer :
public buffer_count_values
73 typedef buffer_count_observer<Subscriber> this_type;
74 typedef std::vector<T> value_type;
75 typedef rxu::decay_t<Subscriber> dest_type;
76 typedef observer<value_type, this_type> observer_type;
79 mutable std::deque<value_type> chunks;
81 buffer_count_observer(dest_type d, buffer_count_values v)
82 : buffer_count_values(v)
87 void on_next(T v)
const {
88 if (cursor++ % this->skip == 0) {
89 chunks.emplace_back();
91 for(
auto& chunk : chunks) {
94 while (!chunks.empty() &&
int(chunks.front().size()) == this->count) {
95 dest.on_next(std::move(chunks.front()));
102 void on_completed()
const {
105 while (!chunks.empty()) {
106 dest.on_next(std::move(chunks.front()));
118 static subscriber<T, observer<T, this_type>> make(dest_type d, buffer_count_values v) {
119 auto cs = d.get_subscription();
120 return make_subscriber<T>(std::move(cs), this_type(std::move(d), std::move(v)));
124 template<
class Subscriber>
125 auto operator()(Subscriber dest)
const
126 -> decltype(buffer_count_observer<Subscriber>::make(std::move(dest), initial)) {
127 return buffer_count_observer<Subscriber>::make(std::move(dest), initial);
135 template<
class...
AN>
146 template<
class Observable,
150 class BufferCount = rxo::detail::buffer_count<SourceValue>,
153 -> decltype(o.template lift<Value>(BufferCount(
count,
skip))) {
154 return o.template lift<Value>(BufferCount(
count,
skip));
157 template<
class Observable,
161 class BufferCount = rxo::detail::buffer_count<SourceValue>,
164 -> decltype(o.template lift<Value>(BufferCount(
count,
count))) {
165 return o.template lift<Value>(BufferCount(
count,
count));
168 template<
class...
AN>
169 static operators::detail::buffer_count_invalid_t<
AN...>
member(
AN...) {
172 static_assert(
sizeof...(
AN) == 10000,
"buffer takes (Count, optional Skip)");