Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve LogProcessingWorker._fetch_events() performances #94

Open
cbeaujoin-stellar opened this issue Feb 29, 2024 · 2 comments
Open

Improve LogProcessingWorker._fetch_events() performances #94

cbeaujoin-stellar opened this issue Feb 29, 2024 · 2 comments

Comments

@cbeaujoin-stellar
Copy link

cbeaujoin-stellar commented Feb 29, 2024

I'm using python-logstash-async in a very particular context:

  • High volume of logs every 2 seconds (~90 events)
  • Hazardous/bad network (cellular, wireless, ...)
  • Small/low resources (embeded systems)

First let's resume what LogProcessingWorker._fetch_events() do.

It sequentialy process the followings steps:

  • Retrieves an event in self._queue = PriorityQueue()
  • Write the event in memory or sqlite DB using self._fetch_queued_events_for_flush()
  • Loop on the previous two steps then when the queue is empty and if conditions are met:
    - It retrieves the events in the DB self._fetch_queued_events_for_flush() => self._database.get_queued_events()
    - It send the events using self._send_events(events)

So now what are the issues.

  • If the log's rate is high the queue is never emptied and messages never sent.
    That can be easily solved by adding;
        while True:
            try:
                self._fetch_event()
                self._process_event()
            except Empty:
                 ...
            else:
                # Do the same than in except Empty:

But this solution is not good enough because while storing the events in DB and sending them trough networks, they will accumulate.

  • In fact the queue is never emptyed because it takes too much time to write the event to the database.
    An option would be to retrieve the events from the queue by batch (not possible using PriorityQueue() implementation).
    Another could be to write the events in DB using another or multiple async threads.
    Writing to the sqlite DB using import sqlite3 does not allow concurrent access from multiple threads, the code should be rewritten using import aiosqlite and maybe the queue should be multiprocessing.Queue() type.

  • The same way the events should be sent using another async thread one at a time in order to not delete not yet sent events from database with flag pending_delete=1.
    So self._database.delete_queued_events() should not be call when a sending suceed but it should be a requirement before a new sending or when the app shutdown.

If I resume, basicaly my proposal is to have :

  • one thread that handle the queue
  • one/or more threads that handle the DB
  • one thread that handle the sending

It should improve the performance and makes async-logstash works better on the field.

For sure any other ideas are welcome !

@eht16
Copy link
Owner

eht16 commented Mar 3, 2024

Thanks for the detailed analysis and ideas.
Some comments:

  • aiosqlite might only solve the problems partially. It seems to bundle open connections in a seperated thread to not open multiple connections to the database. This helps but might not be enough on high load and especially on poor IO or multiprocessing. In the end SQLite is not that ideal for high load but for most use cases of this library a good trade-off between performance, easy to use/availability and available and resilience to errors.
    Is using the in-memory database an option for your use case?
  • using seperate threads for processing, storing and transmitting events might help but probably requires quite some efforts to implement properly and might still suffer from Python's Global Interpreter Lock (all threads running on the same CPU core)
  • PriorityQueue was introduced in Memory leak eridication #73 to workaround high memory usage of CPython's Queue implementation. Though if there is another similar queue implementation which supports getting items in batches, that would probably help
  • to address the problem that the queue never raises the Empty exception on high load and so the events from the database won't get transmitted, we could set a maximum size on the queue, use .put(block=False) and then handle the Full exception to force flushing pending events.

I think I will work soon on the last item to give the queue a maxsize and handle a full queue accordingly as this might improve the overall behavior.

Though for the other options it's unlikely I will be able to spend much time on this, as it would probably also require refactoring lots of code for a maybe only special use case.
I'd suggest to think about using the in-memory database or at least test whether it will improve the overall performance. If you like, feel free to try to port the code to aiosqlite, this sounds like relatively trivial and might already help a bit.

@cbeaujoin-stellar
Copy link
Author

Hi,
Thank you for your documented answer it was very instructive.
I will take time asap to port the code to aiosqlite.
On the other hand I have already implemented an async thread for transmitting and it works quite well.
I will keep you update.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants