Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMP] Multi-node high-availability jobrunner #607

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions queue_job/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,23 @@ Configuration
.. [1] It works with the threaded Odoo server too, although this way
of running Odoo is obviously not for production purposes.

* Deploying in high availability mode or odoo.sh:

When deploying queue_job on multiple nodes or on odoo.sh, on top of the configuration
parameters mentioned above you need to also set the env variable
ODOO_QUEUE_JOB_HIGH_AVAILABILITY=1 or via config parameter as such:

.. code-block:: ini

(...)
[queue_job]
high_availability = 1


> :warning: **Warning:** Failure to enable the high_availability flag on odoo.sh could
constitute a breach of Acceptable Use Policy. Always enable this flag via the odoo.conf file for odoo.sh
deployment

Usage
=====

Expand Down Expand Up @@ -571,6 +588,12 @@ Known issues / Roadmap
You must therefore requeue them manually, either from the Jobs view,
or by running the following SQL statement *before starting Odoo*:

* When deployed in high_availability mode the allocated databases for the
jobrunners must be identical. If the databases are different and overlap
i.e jobrunner A runs on DB1,DB2 and jobrunner B runs on DB2,DB3 then either
DB1 or DB3 will not proccess jobs because there can be only one leader per
sets of databases.

.. code-block:: sql

update queue_job set state='pending' where state in ('started', 'enqueued')
Expand Down Expand Up @@ -631,6 +654,8 @@ Contributors
* Souheil Bejaoui <[email protected]>
* Eric Antones <[email protected]>
* Simone Orsi <[email protected]>
* Paul Catinean <[email protected]>
* Ruchir Shukla <[email protected]>

Maintainers
~~~~~~~~~~~
Expand Down
92 changes: 82 additions & 10 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
import selectors
import threading
import time
import uuid
from contextlib import closing, contextmanager

import psycopg2
Expand All @@ -159,6 +160,7 @@

SELECT_TIMEOUT = 60
ERROR_RECOVERY_DELAY = 5
LEADER_CHECK_DELAY = 10

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -192,7 +194,7 @@ def _odoo_now():
return _datetime_to_epoch(dt)


def _connection_info_for(db_name):
def _connection_info_for(db_name, jobrunner_ha_uuid=None):
db_or_uri, connection_info = odoo.sql_db.connection_info_for(db_name)

for p in ("host", "port", "user", "password"):
Expand All @@ -202,6 +204,8 @@ def _connection_info_for(db_name):

if cfg:
connection_info[p] = cfg
if jobrunner_ha_uuid:
connection_info["application_name"] = "jobrunner_%s" % jobrunner_ha_uuid

return connection_info

Expand Down Expand Up @@ -260,14 +264,13 @@ def urlopen():


class Database(object):
def __init__(self, db_name):
def __init__(self, db_name, jobrunner_ha_uuid=None):
self.db_name = db_name
connection_info = _connection_info_for(db_name)
self.jobrunner_ha_uuid = jobrunner_ha_uuid
connection_info = _connection_info_for(db_name, self.jobrunner_ha_uuid)
self.conn = psycopg2.connect(**connection_info)
self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.has_queue_job = self._has_queue_job()
if self.has_queue_job:
self._initialize()

def close(self):
# pylint: disable=except-pass
Expand All @@ -280,6 +283,41 @@ def close(self):
pass
self.conn = None

def _check_leader(self, jobrunner_db_names):
"""Check if the linked jobrunner is the leader of all jobrunner_db_names"""
if not self.jobrunner_ha_uuid:
return False

with closing(self.conn.cursor()) as cr:
cr.execute(
"""
SELECT substring(application_name FROM 'jobrunner_(.*)')
FROM pg_stat_activity
WHERE application_name LIKE 'jobrunner_%%' AND
datname IN %s
ORDER BY backend_start, datname
LIMIT 1;""",
(jobrunner_db_names,),
)
res = cr.fetchone()
leader_uuid = res[0] if res else ""
if leader_uuid != self.jobrunner_ha_uuid:
_logger.debug(
"jobrunner %s: not leader of db(s) [ %s ]. leader: %s. sleeping %s sec.",
self.jobrunner_ha_uuid,
", ".join(jobrunner_db_names),
leader_uuid,
LEADER_CHECK_DELAY,
)
return False

_logger.info(
"jobrunner %s is now the leader of db(s) [ %s ]",
self.jobrunner_ha_uuid,
", ".join(jobrunner_db_names),
)
return True

def _has_queue_job(self):
with closing(self.conn.cursor()) as cr:
cr.execute(
Expand Down Expand Up @@ -353,6 +391,7 @@ def __init__(
user=None,
password=None,
channel_config_string=None,
high_availability=None,
):
self.scheme = scheme
self.host = host
Expand All @@ -363,6 +402,10 @@ def __init__(
if channel_config_string is None:
channel_config_string = _channels()
self.channel_manager.simple_configure(channel_config_string)
self.uuid = False
if high_availability:
self.uuid = str(uuid.uuid4())
_logger.info("jobrunner %s initialized in HA mode", self.uuid)
self.db_by_name = {}
self._stop = False
self._stop_pipe = os.pipe()
Expand All @@ -388,12 +431,18 @@ def from_environ_or_config(cls):
password = os.environ.get(
"ODOO_QUEUE_JOB_HTTP_AUTH_PASSWORD"
) or queue_job_config.get("http_auth_password")
if "ODOO_QUEUE_JOB_HIGH_AVAILABILITY" in os.environ:
high_availability = str(os.environ["ODOO_QUEUE_JOB_HIGH_AVAILABILITY"])
else:
high_availability = str(queue_job_config.get("high_availability"))
high_availability = high_availability.lower() in ("true", "1", "t")
runner = cls(
scheme=scheme or "http",
host=host or "localhost",
port=port or 8069,
user=user,
password=password,
high_availability=high_availability,
)
return runner

Expand Down Expand Up @@ -421,15 +470,20 @@ def close_databases(self, remove_jobs=True):
_logger.warning("error closing database %s", db_name, exc_info=True)
self.db_by_name = {}

def initialize_runner(self):
"""Listen for db notifications and load existing jobs into memory"""
for db in self.db_by_name.values():
db._initialize()
with db.select_jobs("state in %s", (NOT_DONE,)) as cr:
for job_data in cr:
self.channel_manager.notify(db.db_name, *job_data)
_logger.info("queue job runner ready for db %s", db.db_name)

def initialize_databases(self):
for db_name in self.get_db_names():
db = Database(db_name)
db = Database(db_name, self.uuid)
if db.has_queue_job:
self.db_by_name[db_name] = db
with db.select_jobs("state in %s", (NOT_DONE,)) as cr:
for job_data in cr:
self.channel_manager.notify(db_name, *job_data)
_logger.info("queue job runner ready for db %s", db_name)

def run_jobs(self):
now = _odoo_now()
Expand Down Expand Up @@ -511,6 +565,17 @@ def stop(self):
# wakeup the select() in wait_notification
os.write(self._stop_pipe[1], b".")

def check_db_leader(self):
"""Check if the current jobrunner is the leader for all configured databases"""
jobrunner_db_names = tuple(self.db_by_name.keys())

if not jobrunner_db_names:
return False

# Use the first db connection to for leadership check
db_obj = self.db_by_name[jobrunner_db_names[0]]
return db_obj._check_leader(jobrunner_db_names)

def run(self):
_logger.info("starting")
while not self._stop:
Expand All @@ -520,6 +585,13 @@ def run(self):
# TODO: how to detect new databases or databases
# on which queue_job is installed after server start?
self.initialize_databases()
while not self._stop and self.uuid:
leader = self.check_db_leader()
if leader:
break
time.sleep(LEADER_CHECK_DELAY)
continue
self.initialize_runner()
_logger.info("database connections ready")
# inner loop does the normal processing
while not self._stop:
Expand Down
17 changes: 17 additions & 0 deletions queue_job/readme/CONFIGURE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,20 @@

.. [1] It works with the threaded Odoo server too, although this way
of running Odoo is obviously not for production purposes.

* Deploying in high availability mode or odoo.sh:

When deploying queue_job on multiple nodes or on odoo.sh, on top of the configuration
parameters mentioned above you need to also set the env variable
ODOO_QUEUE_JOB_HIGH_AVAILABILITY=1 or via config parameter as such:

.. code-block:: ini

(...)
[queue_job]
high_availability = 1


> :warning: **Warning:** Failure to enable the high_availability flag on odoo.sh could
constitute a breach of Acceptable Use Policy. Always enable this flag via the odoo.conf file for odoo.sh
deployment
2 changes: 2 additions & 0 deletions queue_job/readme/CONTRIBUTORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@
* Souheil Bejaoui <[email protected]>
* Eric Antones <[email protected]>
* Simone Orsi <[email protected]>
* Paul Catinean <[email protected]>
* Ruchir Shukla <[email protected]>
6 changes: 6 additions & 0 deletions queue_job/readme/ROADMAP.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
You must therefore requeue them manually, either from the Jobs view,
or by running the following SQL statement *before starting Odoo*:

* When deployed in high_availability mode the allocated databases for the
jobrunners must be identical. If the databases are different and overlap
i.e jobrunner A runs on DB1,DB2 and jobrunner B runs on DB2,DB3 then either
DB1 or DB3 will not proccess jobs because there can be only one leader per
sets of databases.

.. code-block:: sql

update queue_job set state='pending' where state in ('started', 'enqueued')
21 changes: 21 additions & 0 deletions queue_job/static/description/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,20 @@ <h1><a class="toc-backref" href="#toc-entry-2">Configuration</a></h1>
of running Odoo is obviously not for production purposes.</td></tr>
</tbody>
</table>
<ul class="simple">
<li>Deploying in high availability mode or odoo.sh:</li>
</ul>
<p>When deploying queue_job on multiple nodes or on odoo.sh, on top of the configuration
parameters mentioned above you need to also set the env variable
ODOO_QUEUE_JOB_HIGH_AVAILABILITY=1 or via config parameter as such:</p>
<pre class="code ini literal-block">
<span class="na">(...)</span><span class="w">
</span><span class="k">[queue_job]</span><span class="w">
</span><span class="na">high_availability</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s">1</span>
</pre>
<p>&gt; :warning: <strong>Warning:</strong> Failure to enable the high_availability flag on odoo.sh could
constitute a breach of Acceptable Use Policy. Always enable this flag via the odoo.conf file for odoo.sh
deployment</p>
</div>
<div class="section" id="usage">
<h1><a class="toc-backref" href="#toc-entry-3">Usage</a></h1>
Expand Down Expand Up @@ -871,6 +885,11 @@ <h1><a class="toc-backref" href="#toc-entry-11">Known issues / Roadmap</a></h1>
therefore fill the running queue and prevent other jobs to start.
You must therefore requeue them manually, either from the Jobs view,
or by running the following SQL statement <em>before starting Odoo</em>:</li>
<li>When deployed in high_availability mode the allocated databases for the
jobrunners must be identical. If the databases are different and overlap
i.e jobrunner A runs on DB1,DB2 and jobrunner B runs on DB2,DB3 then either
DB1 or DB3 will not proccess jobs because there can be only one leader per
sets of databases.</li>
</ul>
<pre class="code sql literal-block">
<span class="k">update</span><span class="w"> </span><span class="n">queue_job</span><span class="w"> </span><span class="k">set</span><span class="w"> </span><span class="k">state</span><span class="o">=</span><span class="s1">'pending'</span><span class="w"> </span><span class="k">where</span><span class="w"> </span><span class="k">state</span><span class="w"> </span><span class="k">in</span><span class="w"> </span><span class="p">(</span><span class="s1">'started'</span><span class="p">,</span><span class="w"> </span><span class="s1">'enqueued'</span><span class="p">)</span>
Expand Down Expand Up @@ -930,6 +949,8 @@ <h2><a class="toc-backref" href="#toc-entry-17">Contributors</a></h2>
<li>Souheil Bejaoui &lt;<a class="reference external" href="mailto:souheil.bejaoui&#64;acsone.eu">souheil.bejaoui&#64;acsone.eu</a>&gt;</li>
<li>Eric Antones &lt;<a class="reference external" href="mailto:eantones&#64;nuobit.com">eantones&#64;nuobit.com</a>&gt;</li>
<li>Simone Orsi &lt;<a class="reference external" href="mailto:simone.orsi&#64;camptocamp.com">simone.orsi&#64;camptocamp.com</a>&gt;</li>
<li>Paul Catinean &lt;<a class="reference external" href="mailto:pca&#64;pledra.com">pca&#64;pledra.com</a>&gt;</li>
<li>Ruchir Shukla &lt;<a class="reference external" href="mailto:ruchir&#64;bizzappdev.com">ruchir&#64;bizzappdev.com</a>&gt;</li>
</ul>
</div>
<div class="section" id="maintainers">
Expand Down
Loading