4.3. Slave promotion in Python
You have now seen two techniques for promoting a slave: a traditional
technique that suffers from a loss of transactions on some slaves, and
a more complex technique that recovers all available transactions. The
traditional technique is straightforward to implement in Python, so
let’s concentrate on the more complicated one. To handle slave
promotion this way, it is necessary to:
Configure all the slaves correctly
Add the tables Global_Trans_ID and
Last_Exec_Trans to the master
Provide the application code to commit a transaction
correctly
Write code to automate slave promotion
You can use the Promotable class
(shown in Example 10)
to handle the new kind of server. As you can see, this example reuses
the previously introduced _enable_binlog
helper method, and adds a method to set the log-slave-updates
option. Since a promotable slave requires the special tables we showed
earlier for the master, the addition of a promotable slave requires
the tables added to the master. To do this, we
write a function named _add_global_id_tables. The function assumes
that if the tables already exist, they have the correct definition, so
no attempt is made to re-create them. However, the
Last_Exec_Trans table needs to start with one row
for the update to work correctly, so if no warning was produced to
indicate that a table already exists, we create the table and add a
row with NULL.
Example 10. The definition of a promotable slave role
_GLOBAL_TRANS_ID_DEF = """ CREATE TABLE IF NOT EXISTS Global_Trans_ID ( number INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY (number) ) ENGINE=MyISAM """
_LAST_EXEC_TRANS_DEF = """ CREATE TABLE IF NOT EXISTS Last_Exec_Trans ( server_id INT UNSIGNED DEFAULT NULL, trans_id INT UNSIGNED DEFAULT NULL ) ENGINE=InnoDB """
class Promotable(Role): def __init__(self, repl_user, master): self.__master = master self.__user = repl_user
def _add_global_id_tables(self, master): master.sql(_GLOBAL_TRANS_ID_DEF) master.sql(_LAST_EXEC_TRANS_DEF) if not master.sql("SELECT @@warning_count"): master.sql("INSERT INTO Last_Exec_Trans() VALUES ()")
def _relay_events(self, server, config): config.set('mysqld', 'log-slave-updates')
def imbue(self, server): # Fetch and update the configuration config = server.get_config() self._set_server_id(server, config) self._enable_binlog(server, config) self._relay_event(server, config)
# Put the new configuration in place server.stop() server.put_config(config) server.start()
# Add tables to master self._add_global_id_tables(self.__master)
server.repl_user = self.__master.repl_user
|
This routine configures the slaves and the master correctly for
using global transaction IDs. You still have to update the
Last_Exec_Trans table when committing each
transaction. In Example 11 you can see an
example implementation in PHP for committing transactions. The code is
written using PHP, since this is part of the application code and not
part of the code for managing the deployment.
Example 11. Code for starting, committing, and aborting
transactions
function start_trans($link) { mysql_query("START TRANSACTION", $link); }
function commit_trans($link) { mysql_select_db("common", $link); mysql_query("SET SQL_LOG_BIN = 0", $link); mysql_query("INSERT INTO Global_Trans_ID() VALUES ()", $link); $trans_id = mysql_insert_id($link); $result = mysql_query("SELECT @@server_id as server_id", $link); $row = mysql_fetch_row($result); $server_id = $row[0];
$delete_query = "DELETE FROM Global_Trans_ID WHERE number = %d"; mysql_query(sprintf($delete_query, $trans_id), $link); mysql_query("SET SQL_LOG_BIN = 1", $link);
$update_query = "UPDATE Last_Exec_Trans SET server_id = %d, trans_id = %d"; mysql_query(sprintf($update_query, $server_id, $trans_id), $link); mysql_query("COMMIT", $link); }
function rollback_trans($link) { mysql_query("ROLLBACK", $link); }
|
We can then use this code to commit transactions by calling the
functions instead of the usual COMMIT and ROLLBACK. For example, we could write a PHP
function to add a message to a database and update a message counter
for the user:
function add_message($email, $message, $link) {
start_trans($link);
mysql_select_db("common", $link);
$query = sprintf("SELECT user_id FROM user WHERE email = '%s'", $email);
$result = mysql_query($query, $link);
$row = mysql_fetch_row($result);
$user_id = $row[0];
$update_user = "UPDATE user SET messages = messages + 1 WHERE user_id = %d";
mysql_query(sprintf($update_user, $user_id), $link);
$insert_message = "INSERT INTO message VALUES (%d,'%s')";
mysql_query(sprintf($insert_message, $user_id, $message), $link);
commit_trans($link);
}
$conn = mysql_connect(":/var/run/mysqld/mysqld1.sock", "root");
add_message('mats@example.com', "MySQL Python Replicant rules!", $conn);
What remains is the job of handling the actual slave promotion
in the event of a failure. The procedure was already outlined above,
but the implementation is more involved.
In this case, we need to
fetch the entire binlog file, since we do not know where to start
reading. The fetch_remote_binlog
function in Example 12 returns an
iterator to the lines of the binary log.
Example 12. Fetching a remote binary log
def fetch_remote_binlog(server, binlog_file): command = ["mysqlbinlog", "--read-from-remote-server", "--force", "--host=%s" % (server.host), "--user=%s" % (server.sql_user.name)] if server.sql_user.passwd: command.append("--password=%s" % (server.sql_user.passwd)) command.append(binlog_file) return iter(subprocess.Popen(command, stdout=subprocess.PIPE).stdout)
|
The iterator returns the lines of the binary log one by one, so
the lines have to be further separated into transactions and events to
make the binary log easier to work with. In Example 13 you can see a
function named group_by_event that
groups lines belonging to the same event into a single string, and a
function named group_by_trans that
groups a stream of events (as returned by group_by_event) into lists, where each list
represents a transaction.
Example 13. Parsing the mysqlbinlog output to extract
transactions
delimiter = "/*!*/;"
def group_by_event(lines): event_lines = [] for line in lines: if line.startswith('#'): if line.startswith("# End of log file"): del event_lines[-1] yield ''.join(event_lines) return if line.startswith("# at"): yield ''.join(event_lines) event_lines = [] event_lines.append(line)
def group_by_trans(lines): group = [] in_transaction = False for event in group_by_event(lines): group.append(event) if event.find(delimiter + "\nBEGIN\n" + delimiter) >= 0: in_transaction = True elif not in_transaction: yield group group = [] else: p = event.find("\nCOMMIT") if p >= 0 and (event.startswith(delimiter, p+7) or event.startswith(delimiter, p+8)): yield group group = [] in_transaction = False
|
Example 14
shows a function named scan_logfile that
scans the mysqlbinlog output for
the global transaction IDs that were introduced. The function accepts
a master from which to fetch the binlog file, the name of a binlog
file to scan (the filename is the name of the binary log on the
master), and a callback function on_gid that will be called whenever a global
transaction ID is seen. The on_gid function will
be called with the global transaction ID (consisting of a server_id and a trans_id) and the binlog position of the end
of the transaction.
Example 14. Scanning a binlog file for global transaction IDs
_GIDCRE = re.compile(r"^UPDATE Last_Exec_Trans SET\s+" r"server_id = (?P<server_id>\d+),\s+" r"trans_id = (?P<trans_id>\d+)$", re.MULTILINE) _HEADCRE = re.compile(r"#\d{6}\s+\d?\d:\d\d:\d\d\s+" r"server id\s+(?P<sid>\d+)\s+" r"end_log_pos\s+(?P<end_pos>\d+)\s+" r"(?P<type>\w+)")
def scan_logfile(master, logfile, on_gid): from mysqlrep import Position lines = fetch_remote_binlog(master, logfile) # Scan the output to find global transaction ID update statements for trans in group_by_trans(lines): if len(trans) < 3: continue # Check for an update of the Last_Exec_Trans table m = _GIDCRE.search(trans[-2]) if m: server_id = int(m.group("server_id")) trans_id = int(m.group("trans_id")) # Check for an information comment with end_log_pos. We # assume InnoDB tables only, so we can therefore rely on # the transactions to end in an Xid event. m = _HEADCRE.search(trans[-1]) if m and m.group("type") == "Xid": pos = Position(server_id, logfile, int(m.group("end_pos"))) on_gid(server_id, trans_id, pos)
|
The code for the last step is given in Example 15. The promote_slave
function takes a list of slaves that lost their master and executes
the promotion by identifying the new master from the slaves. Finally,
it reconnects all the other slaves to the promoted slave by scanning
the binary logs. The code uses the support function
fetch_ global_trans_id to fetch the global
transaction ID from the table that we introduced.
Example 15. Identifying the new master and reconnecting all slaves to
it
def fetch_global_trans_id(slave): result = slave.sql("SELECT server_id, trans_id FROM Last_Exec_Trans") return (int(result["server_id"]), int(result["trans_id"]))
def promote_slave(slaves): slave_info = {}
# Collect the global transaction ID of each slave for slave in slaves: slave.connect() server_id, trans_id = fetch_global_trans_id(slave) slave_info.setdefault(trans_id, []).append((server_id, trans_id, slave)) slave.disconnect()
# Pick the slave to promote by taking the slave with the highest # global transaction id. new_master = slave_info[max(slave_info)].pop()[2]
def maybe_change_master(server_id, trans_id, position): from mysqlrep.utility import change_master try: for sid, tid, slave in slave_info[trans_id]: if slave is not new_master: change_master(slave, new_master, position) except KeyError: pass
# Read the the master logfiles of the new master. new_master.connect() logs = [row["Log_name"] for row in new_master.sql("SHOW MASTER LOGS")] new_master.disconnect()
# Read the master logfiles one by one in reverse order, the # latest binlog file first. logs.reverse() for log in logs: scan_logfile(new_master, log, maybe_change_master)
|
Worth noting in the code is that the slaves are collected into a
dictionary using the transaction ID from the global transaction ID as
a key.
Note:
With the code in Example 15, there is no
guarantee that the transaction IDs will be in order, so if that is
important, you will have to take additional measures. The
transaction IDs could be in a nonsequential order if one transaction
fetches a global transaction ID but is interrupted before it can
commit another transaction, which fetches a global transaction ID
and commits. To make sure the transaction IDs reflect the order in
which transactions start, add a SELECT ...
FOR UPDATE just before fetching a global transaction ID by
changing the code as follows:
def commit_trans(cur):
cur.execute("SELECT * FROM Last_Exec_Trans FOR UPDATE")
cur.execute("SET SQL_LOG_BIN = 0")
cur.execute("INSERT INTO Global_Trans_ID() VALUES ()")
.
.
cur.commit()
This will lock the row until the transaction is committed, but
will also slow down the system some, which is wasteful if the
ordering is not required.
The main approach shown in this article to sync servers is to
implement the transaction commit procedure in the application,
meaning that the application code needs to know table names and the
intricacies of how to produce and manipulate the global transaction
ID. Once you understand them, the complexities are not as much of a
barrier as they might seem at first. Often, you can handle them with
relative ease by creating functions in the application code that the
application writer can call without having to know the
details.
Another approach is to put the transaction commit logic in the
database server by using stored procedures. Depending on the
situation, this can sometimes be a better alternative. For example,
the commit procedure can be changed without having to change the
application code.
For this technique to work, it is necessary to put the
transaction ID from the Global_Trans_ID table
and the server ID into either a user-defined variable or a local
variable in the stored routine. Depending on which approach you
select, the query in the binary log will look a little
different.
Using local variables is less likely to interfere with
surrounding code since user-defined variables “leak” out from the
stored procedure.
The procedure for committing a transaction will then
be:
CREATE PROCEDURE commit_trans ()
SQL SECURITY DEFINER
BEGIN
DECLARE trans_id, server_id INT UNSIGNED;
SET SQL_LOG_BIN = 0;
INSERT INTO global_trans_id() values ();
SELECT LAST_INSERT_ID() INTO trans_id,
@@server_id INTO server_id;
SET SQL_LOG_BIN = 1;
INSERT INTO last_exec_trans(server_id, trans_id)
VALUES (server_id, trans_id);
COMMIT;
END
Committing a transaction from the application code is then
simple:
CALL Commit_Trans();
Now the task remains of changing the procedure for scanning
the binary log for the global transaction ID. So, how will a call to
this function appear in the binary log? Well, a quick call to
mysqlbinlog shows:
# at 1724
#091129 18:35:11 server id 1 end_log_pos 1899 Query thread_id=75
exec_time=0 error_code=0
SET TIMESTAMP=1259516111/*!*/;
INSERT INTO last_exec_trans(server_id, trans_id)
VALUES ( NAME_CONST('server_id',1), NAME_CONST('trans_id',13))
/*!*/;
# at 1899
#091129 18:35:11 server id 1 end_log_pos 1926 Xid = 1444
COMMIT/*!*/;
As you can see, both the server ID and the transaction ID are
clearly visible in the output. How to match this statement using a
regular expression is left as an exercise for the reader.