Skip to content

Commit

Permalink
Adds support for CHANGE REPLICATION SOURCE TO statement (#636)
Browse files Browse the repository at this point in the history
* feat: adds support for 'change replication source to' statement
  • Loading branch information
dennisurtubia authored May 21, 2024
1 parent 6ce2f49 commit a80b805
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
---
minor_changes:
- mysql_replication - Adds support for `CHANGE REPLICATION SOURCE TO` statement (https://github.com/ansible-collections/community.mysql/issues/635).
73 changes: 72 additions & 1 deletion plugins/modules/mysql_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
author:
- Balazs Pocze (@banyek)
- Andrew Klychkov (@Andersson007)
- Dennis Urtubia (@dennisurtubia)
options:
mode:
description:
- Module operating mode. Could be
C(changeprimary) (CHANGE MASTER TO),
C(changereplication) (CHANGE REPLICATION SOURCE TO) - only supported in MySQL 8.0.23 and later,
C(getprimary) (SHOW MASTER STATUS),
C(getreplica) (SHOW REPLICA STATUS),
C(startreplica) (START REPLICA),
Expand All @@ -34,6 +36,7 @@
type: str
choices:
- changeprimary
- changereplication
- getprimary
- getreplica
- startreplica
Expand Down Expand Up @@ -229,6 +232,13 @@
primary_log_file: mysql-bin.000009
primary_log_pos: 4578
- name: Change replication source to replica server 192.0.2.1 and use binary log 'mysql-bin.000009' with position 4578
community.mysql.mysql_replication:
mode: changereplication
primary_host: 192.0.2.1
primary_log_file: mysql-bin.000009
primary_log_pos: 4578
- name: Check replica status using port 3308
community.mysql.mysql_replication:
mode: getreplica
Expand Down Expand Up @@ -438,6 +448,16 @@ def changeprimary(cursor, chm, connection_name='', channel=''):
cursor.execute(query)


def changereplication(cursor, chm, channel=''):
query = 'CHANGE REPLICATION SOURCE TO %s' % ','.join(chm)

if channel:
query += " FOR CHANNEL '%s'" % channel

executed_queries.append(query)
cursor.execute(query)


def main():
argument_spec = mysql_common_argument_spec()
argument_spec.update(
Expand All @@ -449,7 +469,8 @@ def main():
'startreplica',
'resetprimary',
'resetreplica',
'resetreplicaall']),
'resetreplicaall',
'changereplication']),
primary_auto_position=dict(type='bool', default=False, aliases=['master_auto_position']),
primary_host=dict(type='str', aliases=['master_host']),
primary_user=dict(type='str', aliases=['master_user']),
Expand Down Expand Up @@ -655,6 +676,56 @@ def main():
module.exit_json(msg="Replica reset", changed=True, queries=executed_queries)
else:
module.exit_json(msg="Replica already reset", changed=False, queries=executed_queries)
elif mode == 'changereplication':
chm = []
result = {}
if primary_host is not None:
chm.append("SOURCE_HOST='%s'" % primary_host)
if primary_user is not None:
chm.append("SOURCE_USER='%s'" % primary_user)
if primary_password is not None:
chm.append("SOURCE_PASSWORD='%s'" % primary_password)
if primary_port is not None:
chm.append("SOURCE_PORT=%s" % primary_port)
if primary_connect_retry is not None:
chm.append("SOURCE_CONNECT_RETRY=%s" % primary_connect_retry)
if primary_log_file is not None:
chm.append("SOURCE_LOG_FILE='%s'" % primary_log_file)
if primary_log_pos is not None:
chm.append("SOURCE_LOG_POS=%s" % primary_log_pos)
if primary_delay is not None:
chm.append("SOURCE_DELAY=%s" % primary_delay)
if relay_log_file is not None:
chm.append("RELAY_LOG_FILE='%s'" % relay_log_file)
if relay_log_pos is not None:
chm.append("RELAY_LOG_POS=%s" % relay_log_pos)
if primary_ssl is not None:
if primary_ssl:
chm.append("SOURCE_SSL=1")
else:
chm.append("SOURCE_SSL=0")
if primary_ssl_ca is not None:
chm.append("SOURCE_SSL_CA='%s'" % primary_ssl_ca)
if primary_ssl_capath is not None:
chm.append("SOURCE_SSL_CAPATH='%s'" % primary_ssl_capath)
if primary_ssl_cert is not None:
chm.append("SOURCE_SSL_CERT='%s'" % primary_ssl_cert)
if primary_ssl_key is not None:
chm.append("SOURCE_SSL_KEY='%s'" % primary_ssl_key)
if primary_ssl_cipher is not None:
chm.append("SOURCE_SSL_CIPHER='%s'" % primary_ssl_cipher)
if primary_ssl_verify_server_cert:
chm.append("SOURCE_SSL_VERIFY_SERVER_CERT=1")
if primary_auto_position:
chm.append("SOURCE_AUTO_POSITION=1")
try:
changereplication(cursor, chm, channel)
except mysql_driver.Warning as e:
result['warning'] = to_native(e)
except Exception as e:
module.fail_json(msg='%s. Query == CHANGE REPLICATION SOURCE TO %s' % (to_native(e), chm))
result['changed'] = True
module.exit_json(queries=executed_queries, **result)

warnings.simplefilter("ignore")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@
- import_tasks: mysql_replication_resetprimary_mode.yml

- include_tasks: issue-28.yml

# Tests of changereplication mode:
- import_tasks: mysql_replication_changereplication_mode.yml
when:
- db_engine == 'mysql'
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
---

- vars:
mysql_params: &mysql_params
login_user: '{{ mysql_user }}'
login_password: '{{ mysql_password }}'
login_host: '{{ mysql_host }}'

block:
# Get primary log file and log pos:
- name: Get primary status
mysql_replication:
<<: *mysql_params
login_port: '{{ mysql_primary_port }}'
mode: getprimary
register: mysql_primary_status

# Test changereplication mode:
- name: Run replication
mysql_replication:
<<: *mysql_params
login_port: '{{ mysql_replica1_port }}'
mode: changereplication
primary_host: '{{ mysql_host }}'
primary_port: '{{ mysql_primary_port }}'
primary_user: '{{ replication_user }}'
primary_password: '{{ replication_pass }}'
primary_log_file: '{{ mysql_primary_status.File }}'
primary_log_pos: '{{ mysql_primary_status.Position }}'
primary_ssl_ca: ''
primary_ssl: no
register: result

- name: Assert that changereplication is changed and return expected query
assert:
that:
- result is changed
- result.queries == expected_queries
vars:
expected_queries: ["CHANGE REPLICATION SOURCE TO SOURCE_HOST='{{ mysql_host }}',\
SOURCE_USER='{{ replication_user }}',SOURCE_PASSWORD='********',\
SOURCE_PORT={{ mysql_primary_port }},SOURCE_LOG_FILE=\
'{{ mysql_primary_status.File }}',SOURCE_LOG_POS=\
{{ mysql_primary_status.Position }},SOURCE_SSL=0,SOURCE_SSL_CA=''"]

# Test changereplication mode with channel:
- name: Run replication
mysql_replication:
<<: *mysql_params
login_port: '{{ mysql_replica1_port }}'
mode: changereplication
primary_user: '{{ replication_user }}'
primary_password: '{{ replication_pass }}'
channel: '{{ test_channel }}'

register: with_channel_result_queries

- name: Assert that changereplication is changed and is called correctly with channel
assert:
that:
- with_channel_result_queries is changed
- with_channel_result_queries.queries == expected_queries
vars:
expected_queries: ["CHANGE REPLICATION SOURCE TO SOURCE_USER='{{ replication_user }}',\
SOURCE_PASSWORD='********' FOR CHANNEL '{{ test_channel }}'"]
Original file line number Diff line number Diff line change
Expand Up @@ -318,5 +318,5 @@
- name: Assert that stopslave returns expected error message
assert:
that:
- result.msg == "value of mode must be one of{{ ":" }} getprimary, getreplica, changeprimary, stopreplica, startreplica, resetprimary, resetreplica, resetreplicaall, got{{ ":" }} stopslave"
- result.msg == "value of mode must be one of{{ ":" }} getprimary, getreplica, changeprimary, stopreplica, startreplica, resetprimary, resetreplica, resetreplicaall, changereplication, got{{ ":" }} stopslave"
- result is failed

0 comments on commit a80b805

Please sign in to comment.