From f67598808af930840a077a57e845d2bef9548a0f Mon Sep 17 00:00:00 2001 From: Matti Airas Date: Sat, 13 Jul 2024 20:18:58 +0200 Subject: [PATCH] Implement stream producers that emit characters or lines --- src/sensesp/system/stream_producer.h | 67 ++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 src/sensesp/system/stream_producer.h diff --git a/src/sensesp/system/stream_producer.h b/src/sensesp/system/stream_producer.h new file mode 100644 index 000000000..0296e109d --- /dev/null +++ b/src/sensesp/system/stream_producer.h @@ -0,0 +1,67 @@ +#ifndef SENSESP_SRC_SENSESP_SYSTEM_STREAM_PRODUCER_H_ +#define SENSESP_SRC_SENSESP_SYSTEM_STREAM_PRODUCER_H_ + +#include "sensesp.h" + +#include "ReactESP.h" +#include "sensesp/system/valueproducer.h" + +namespace sensesp { + +/** + * @brief ValueProducer that reads from a Stream and produces each character. + */ +class StreamCharProducer : public ValueProducer { + public: + StreamCharProducer(Stream* stream) : stream_{stream} { + read_reaction_ = ReactESP::app->onAvailable(*stream_, [this]() { + while (stream_->available()) { + char c = stream_->read(); + this->emit(c); + } + }); + } + + protected: + Stream* stream_; + StreamReaction* read_reaction_; +}; + +/** + * @brief ValueProducer that reads from a Stream and produces a full line. + */ +class StreamLineProducer : public ValueProducer { + public: + StreamLineProducer(Stream* stream, int max_line_length = 256) + : stream_{stream}, max_line_length_{max_line_length} { + static int buf_pos = 0; + buf_ = new char[max_line_length_ + 1]; + read_reaction_ = ReactESP::app->onAvailable(*stream_, [this]() { + while (stream_->available()) { + char c = stream_->read(); + if (c == '\n') { + // Include the newline character in the output + buf_[buf_pos++] = c; + buf_[buf_pos] = '\0'; + this->emit(buf_); + buf_pos = 0; + } else { + buf_[buf_pos++] = c; + if (buf_pos >= max_line_length_ - 1) { + buf_pos = 0; + } + } + } + }); + } + + protected: + const int max_line_length_; + char *buf_; + Stream* stream_; + StreamReaction* read_reaction_; +}; + +} // namespace sensesp + +#endif // SENSESP_SRC_SENSESP_SYSTEM_STREAM_PRODUCER_H_