Skip to content

Commit

Permalink
feat: adds support for 'change replication source to' statement
Browse files Browse the repository at this point in the history
  • Loading branch information
dennisurtubia committed May 17, 2024
1 parent 6ce2f49 commit a8b7337
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 1 deletion.
72 changes: 71 additions & 1 deletion plugins/modules/mysql_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
description:
- Module operating mode. Could be
C(changeprimary) (CHANGE MASTER TO),
C(changereplication) (CHANGE REPLICATION SOURCE TO),
C(getprimary) (SHOW MASTER STATUS),
C(getreplica) (SHOW REPLICA STATUS),
C(startreplica) (START REPLICA),
Expand All @@ -34,6 +35,7 @@
type: str
choices:
- changeprimary
- changereplication
- getprimary
- getreplica
- startreplica
Expand Down Expand Up @@ -229,6 +231,13 @@
primary_log_file: mysql-bin.000009
primary_log_pos: 4578
- name: Change replication source to replica server 192.0.2.1 and use binary log 'mysql-bin.000009' with position 4578
community.mysql.mysql_replication:
mode: changereplication
primary_host: 192.0.2.1
primary_log_file: mysql-bin.000009
primary_log_pos: 4578
- name: Check replica status using port 3308
community.mysql.mysql_replication:
mode: getreplica
Expand Down Expand Up @@ -438,6 +447,16 @@ def changeprimary(cursor, chm, connection_name='', channel=''):
cursor.execute(query)


def changereplication(cursor, chm, channel=''):
query = 'CHANGE REPLICATION SOURCE TO %s' % ','.join(chm)

if channel:
query += " FOR CHANNEL '%s'" % channel

executed_queries.append(query)
cursor.execute(query)


def main():
argument_spec = mysql_common_argument_spec()
argument_spec.update(
Expand All @@ -449,7 +468,8 @@ def main():
'startreplica',
'resetprimary',
'resetreplica',
'resetreplicaall']),
'resetreplicaall',
'changereplication']),
primary_auto_position=dict(type='bool', default=False, aliases=['master_auto_position']),
primary_host=dict(type='str', aliases=['master_host']),
primary_user=dict(type='str', aliases=['master_user']),
Expand Down Expand Up @@ -655,6 +675,56 @@ def main():
module.exit_json(msg="Replica reset", changed=True, queries=executed_queries)
else:
module.exit_json(msg="Replica already reset", changed=False, queries=executed_queries)
elif mode == 'changereplication':
chm = []
result = {}
if primary_host is not None:
chm.append("SOURCE_HOST='%s'" % primary_host)
if primary_user is not None:
chm.append("SOURCE_USER='%s'" % primary_user)
if primary_password is not None:
chm.append("SOURCE_PASSWORD='%s'" % primary_password)
if primary_port is not None:
chm.append("SOURCE_PORT=%s" % primary_port)
if primary_connect_retry is not None:
chm.append("SOURCE_CONNECT_RETRY=%s" % primary_connect_retry)
if primary_log_file is not None:
chm.append("SOURCE_LOG_FILE='%s'" % primary_log_file)
if primary_log_pos is not None:
chm.append("SOURCE_LOG_POS=%s" % primary_log_pos)
if primary_delay is not None:
chm.append("SOURCE_DELAY=%s" % primary_delay)
if relay_log_file is not None:
chm.append("RELAY_LOG_FILE='%s'" % relay_log_file)
if relay_log_pos is not None:
chm.append("RELAY_LOG_POS=%s" % relay_log_pos)
if primary_ssl is not None:
if primary_ssl:
chm.append("SOURCE_SSL=1")
else:
chm.append("SOURCE_SSL=0")
if primary_ssl_ca is not None:
chm.append("SOURCE_SSL_CA='%s'" % primary_ssl_ca)
if primary_ssl_capath is not None:
chm.append("SOURCE_SSL_CAPATH='%s'" % primary_ssl_capath)
if primary_ssl_cert is not None:
chm.append("SOURCE_SSL_CERT='%s'" % primary_ssl_cert)
if primary_ssl_key is not None:
chm.append("SOURCE_SSL_KEY='%s'" % primary_ssl_key)
if primary_ssl_cipher is not None:
chm.append("SOURCE_SSL_CIPHER='%s'" % primary_ssl_cipher)
if primary_ssl_verify_server_cert:
chm.append("SOURCE_SSL_VERIFY_SERVER_CERT=1")
if primary_auto_position:
chm.append("SOURCE_AUTO_POSITION=1")
try:
changereplication(cursor, chm, channel)
except mysql_driver.Warning as e:
result['warning'] = to_native(e)
except Exception as e:
module.fail_json(msg='%s. Query == CHANGE REPLICATION SOURCE TO %s' % (to_native(e), chm))
result['changed'] = True
module.exit_json(queries=executed_queries, **result)

warnings.simplefilter("ignore")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@
- import_tasks: mysql_replication_resetprimary_mode.yml

- include_tasks: issue-28.yml

# Tests of changereplication mode:
- import_tasks: mysql_replication_changereplication_mode.yml
when:
- db_engine == 'mysql'
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
---

- vars:
mysql_params: &mysql_params
login_user: '{{ mysql_user }}'
login_password: '{{ mysql_password }}'
login_host: '{{ mysql_host }}'

block:
# Get primary log file and log pos:
- name: Get primary status
mysql_replication:
<<: *mysql_params
login_port: '{{ mysql_primary_port }}'
mode: getprimary
register: mysql_primary_status

# Test changereplication mode:
- name: Run replication
mysql_replication:
<<: *mysql_params
login_port: '{{ mysql_replica1_port }}'
mode: changereplication
primary_host: '{{ mysql_host }}'
primary_port: '{{ mysql_primary_port }}'
primary_user: '{{ replication_user }}'
primary_password: '{{ replication_pass }}'
primary_log_file: '{{ mysql_primary_status.File }}'
primary_log_pos: '{{ mysql_primary_status.Position }}'
primary_ssl_ca: ''
primary_ssl: no
register: result

- name: Assert that changereplication is changed and return expected query
assert:
that:
- result is changed
- result.queries == expected_queries
vars:
expected_queries: ["CHANGE REPLICATION SOURCE TO SOURCE_HOST='{{ mysql_host }}',\
SOURCE_USER='{{ replication_user }}',SOURCE_PASSWORD='********',\
SOURCE_PORT={{ mysql_primary_port }},SOURCE_LOG_FILE=\
'{{ mysql_primary_status.File }}',SOURCE_LOG_POS=\
{{ mysql_primary_status.Position }},SOURCE_SSL=0,SOURCE_SSL_CA=''"]

# Test changereplication mode with channel:
- name: Run replication
mysql_replication:
<<: *mysql_params
login_port: '{{ mysql_replica1_port }}'
mode: changereplication
primary_user: '{{ replication_user }}'
primary_password: '{{ replication_pass }}'
channel: '{{ test_channel }}'

register: with_channel_result_queries

- name: Assert that changereplication is changed and is called correctly with channel
assert:
that:
- with_channel_result_queries is changed
- with_channel_result_queries.queries == expected_queries
vars:
expected_queries: ["CHANGE REPLICATION SOURCE TO SOURCE_USER='{{ replication_user }}',\
SOURCE_PASSWORD='********' FOR CHANNEL '{{ test_channel }}'"]

0 comments on commit a8b7337

Please sign in to comment.