Skip to content

Commit

Permalink
Merge pull request #714 from SignalK/stream_producers
Browse files Browse the repository at this point in the history
Stream producers
  • Loading branch information
mairas authored Aug 2, 2024
2 parents c6bb92e + f675988 commit ef03b90
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 0 deletions.
67 changes: 67 additions & 0 deletions src/sensesp/system/stream_producer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#ifndef SENSESP_SRC_SENSESP_SYSTEM_STREAM_PRODUCER_H_
#define SENSESP_SRC_SENSESP_SYSTEM_STREAM_PRODUCER_H_

#include "sensesp.h"

#include "ReactESP.h"
#include "sensesp/system/valueproducer.h"

namespace sensesp {

/**
* @brief ValueProducer that reads from a Stream and produces each character.
*/
class StreamCharProducer : public ValueProducer<char> {
public:
StreamCharProducer(Stream* stream) : stream_{stream} {
read_reaction_ = ReactESP::app->onAvailable(*stream_, [this]() {
while (stream_->available()) {
char c = stream_->read();
this->emit(c);
}
});
}

protected:
Stream* stream_;
StreamReaction* read_reaction_;
};

/**
* @brief ValueProducer that reads from a Stream and produces a full line.
*/
class StreamLineProducer : public ValueProducer<String> {
public:
StreamLineProducer(Stream* stream, int max_line_length = 256)
: stream_{stream}, max_line_length_{max_line_length} {
static int buf_pos = 0;
buf_ = new char[max_line_length_ + 1];
read_reaction_ = ReactESP::app->onAvailable(*stream_, [this]() {
while (stream_->available()) {
char c = stream_->read();
if (c == '\n') {
// Include the newline character in the output
buf_[buf_pos++] = c;
buf_[buf_pos] = '\0';
this->emit(buf_);
buf_pos = 0;
} else {
buf_[buf_pos++] = c;
if (buf_pos >= max_line_length_ - 1) {
buf_pos = 0;
}
}
}
});
}

protected:
const int max_line_length_;
char *buf_;
Stream* stream_;
StreamReaction* read_reaction_;
};

} // namespace sensesp

#endif // SENSESP_SRC_SENSESP_SYSTEM_STREAM_PRODUCER_H_
6 changes: 6 additions & 0 deletions src/sensesp/system/task_queue_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,18 @@ class TaskQueueProducer : public ObservableValue<T> {
: TaskQueueProducer(value, ReactESP::app, queue_size, poll_rate) {}

virtual void set(const T& value) override {
// WARNING: This does not check if the queue is full.
xQueueSend(queue_, &value, 0);
}

int push(const T& value) {
int retval;
if (queue_size_ == 1) {
retval = xQueueOverwrite(queue_, &value);
} else {
retval = xQueueSend(queue_, &value, 0);
}
return retval;
}

private:
Expand Down

0 comments on commit ef03b90

Please sign in to comment.