Skip to content

Commit

Permalink
Merge pull request #24 from tumblr/0.10.0
Browse files Browse the repository at this point in the history
0.10.0
  • Loading branch information
tbchrist committed Oct 21, 2015
2 parents 67315bc + 77929d7 commit e5bbc19
Show file tree
Hide file tree
Showing 31 changed files with 555 additions and 146 deletions.
59 changes: 43 additions & 16 deletions bin/jetpants
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,13 @@ module Jetpants
end
end

spares_needed.map do |role, needed|
spares_needed.each do |role, needed|
next if needed == 0
available = Jetpants.topology.count_spares(role: "#{role}_slave".to_sym, like: compare)
raise "Not enough spare machines with role of #{role} slave! Requested #{needed} but only have #{available} available." if needed > available
end

spares_needed.map do |role, needed|
spares_needed.each do |role, needed|
next if needed == 0
targets.concat Jetpants.topology.claim_spares(needed, role: "#{role}_slave".to_sym, like: compare)
end
Expand Down Expand Up @@ -407,10 +407,17 @@ module Jetpants
method_option :max_id, :desc => 'Maximum ID of parent shard to split'
method_option :ranges, :desc => 'Optional comma-separated list of ranges per child ie "1000-1999,2000-2499" (default if omitted: split evenly)'
method_option :count, :desc => 'How many child shards to split the parent into (only necessary if the ranges option is omitted)'
method_option :shard_pool, :desc => 'The sharding pool for which to perform the split'
def shard_split
shard_min = options[:min_id] || ask('Please enter min ID of the parent shard: ')
shard_max = options[:max_id] || ask('Please enter max ID of the parent shard: ')
s = Jetpants.topology.shard shard_min, shard_max

shard_pool = options[:shard_pool] || ask("Please enter the sharding pool for which to perform the split (enter for default pool, #{Jetpants.topology.default_shard_pool}): ")
shard_pool = Jetpants.topology.default_shard_pool if shard_pool.empty?

output "Using shard pool `#{shard_pool}`"

s = Jetpants.topology.shard(shard_min, shard_max, shard_pool)

raise "Shard not found" unless s
raise "Shard isn't in ready state" unless s.state == :ready
Expand Down Expand Up @@ -503,10 +510,14 @@ module Jetpants

desc 'shard_cutover', 'truncate the current last shard range, and add a new shard after it'
method_option :cutover_id, :desc => 'Minimum ID of new last shard being created'
method_option :shard_pool, :desc => 'The sharding pool for which to perform the cutover'
def shard_cutover
cutover_id = options[:cutover_id] || ask('Please enter min ID of the new shard to be created: ')
cutover_id = cutover_id.to_i
last_shard = Jetpants.topology.shards.select {|s| s.max_id == 'INFINITY' && s.in_config?}.first
shard_pool = options[:shard_pool] || ask("Please enter the sharding pool for which to perform the split (enter for default pool, #{Jetpants.topology.default_shard_pool}): ")
shard_pool = Jetpants.topology.default_shard_pool if shard_pool.empty?

last_shard = Jetpants.topology.shards(shard_pool).select {|s| s.max_id == 'INFINITY' && s.in_config?}.first
last_shard_master = last_shard.master

# Simple sanity-check that the cutover ID is greater than the current last shard's MIN id.
Expand All @@ -528,7 +539,7 @@ module Jetpants
# has the same master/slaves but now has a non-infinity max ID.
last_shard.state = :recycle
last_shard.sync_configuration
last_shard_replace = Shard.new(last_shard.min_id, cutover_id - 1, last_shard_master)
last_shard_replace = Shard.new(last_shard.min_id, cutover_id - 1, last_shard_master, :ready, shard_pool)
last_shard_replace.sync_configuration
Jetpants.topology.add_pool last_shard_replace

Expand Down Expand Up @@ -557,12 +568,12 @@ module Jetpants
end
end

new_last_shard = Shard.new(cutover_id, 'INFINITY', new_last_shard_master)
new_last_shard = Shard.new(cutover_id, 'INFINITY', new_last_shard_master, :ready, shard_pool)
new_last_shard.sync_configuration
Jetpants.topology.add_pool new_last_shard

# Create tables on the new shard's master, obtaining schema from previous last shard
tables = Table.from_config 'sharded_tables'
tables = Table.from_config('sharded_tables', shard_pool)
last_shard_master.export_schemata tables
last_shard_master.host.fast_copy_chain(Jetpants.export_location, new_last_shard_master.host, files: ["create_tables_#{last_shard_master.port}.sql"])
new_last_shard_master.import_schemata!
Expand All @@ -581,9 +592,12 @@ module Jetpants
method_option :min_id, :desc => 'Minimum ID of shard involved in master promotion'
method_option :max_id, :desc => 'Maximum ID of shard involved in master promotion'
method_option :new_master, :desc => 'New node to become master of the shard'
method_option :shard_pool, :desc => 'The sharding pool for which to perform the promotion'
def shard_promote_master
shard_pool = options[:shard_pool] || ask("Please enter the sharding pool for which to perform the master promotion (enter for default, #{Jetpants.topology.default_shard_pool}): ")
shard_pool = Jetpants.topology.default_shard_pool if shard_pool.empty?
# find the shard we are going to do master promotion on
s = ask_shard_being_promoted(:prep, options[:max_id], options[:max_id])
s = ask_shard_being_promoted(:prep, options[:max_id], options[:max_id], shard_pool)

new_master = ask_node "Please enter the IP of the new master for #{s}: ", options[:new_master]
raise "New master node #{new_master} is not currently a slave in shard #{s}" unless s.slaves && s.slaves.include?(new_master)
Expand All @@ -610,9 +624,12 @@ module Jetpants
desc 'shard_promote_master_reads', 'Lockless shard master promotion (step 2 of 4): move reads to new master'
method_option :min_id, :desc => 'Minimum ID of shard involved in master promotion'
method_option :max_id, :desc => 'Maximum ID of shard involved in master promotion'
method_option :shard_pool, :desc => 'The sharding pool for which to perform the promotion'
def shard_promote_master_reads
shard_pool = options[:shard_pool] || ask("Please enter the sharding pool for which to perform the master promotion (enter for default pool, #{Jetpants.topology.default_shard_pool}): ")
shard_pool = Jetpants.topology.default_shard_pool if shard_pool.empty?
# find the shard we are going to do master promotion on
s = ask_shard_being_promoted(:prep, options[:max_id], options[:max_id])
s = ask_shard_being_promoted(:prep, options[:max_id], options[:max_id], shard_pool)

# at this point we only have one slave, which is the new master
new_master = s.master.slaves.last
Expand All @@ -637,8 +654,11 @@ module Jetpants
end

desc 'shard_promote_master_writes', 'Lockless shard master promotion (step 3 of 4): move writes to new master'
method_option :shard_pool, :desc => 'The sharding pool for which to perform the promotion'
def shard_promote_master_writes
s = ask_shard_being_promoted :writes
shard_pool = options[:shard_pool] || ask("Please enter the sharding pool for which to perform the master promotion (enter for default pool, #{Jetpants.topology.default_shard_pool}): ")
shard_pool = Jetpants.topology.default_shard_pool if shard_pool.empty?
s = ask_shard_being_promoted(:writes, nil, nil, shard_pool)
if s.state != :child
raise "Shard #{s} is in wrong state to perform this action! Expected :child, found #{s.state}"
end
Expand All @@ -659,8 +679,11 @@ module Jetpants
end

desc 'shard_promote_master_cleanup', 'Lockless shard master promotion (step 4 of 4): clean up shard and eject old master'
method_option :shard_pool, :desc => 'The sharding pool for which to perform the promotion'
def shard_promote_master_cleanup
s = ask_shard_being_promoted :cleanup
shard_pool = options[:shard_pool] || ask("Please enter the sharding pool for which to perform the master promotion (enter for default pool, #{Jetpants.topology.default_shard_pool}): ")
shard_pool = Jetpants.topology.default_shard_pool if shard_pool.empty?
s = ask_shard_being_promoted(:cleanup, nil, nil, shard_pool)
if s.state != :needs_cleanup
raise "Shard #{s} is in wrong state to perform this action! Expected :needs_cleanup, found #{s.state}"
end
Expand Down Expand Up @@ -794,13 +817,17 @@ module Jetpants
output 'Which shard would you like to perform this action on?'
shard_min = ask('Please enter min ID of the shard: ')
shard_max = ask('Please enter max ID of the shard: ')
s = Jetpants.topology.shard shard_min, shard_max
shard_pool = ask("Please enter the sharding pool which to perform the action on (enter for default pool, #{Jetpants.topology.default_shard_pool}): ")
shard_pool = Jetpants.topology.default_shard_pool if shard_pool.empty?
s = Jetpants.topology.shard(shard_min, shard_max, shard_pool)
raise 'Shard not found' unless s
s
end

def ask_shard_being_split
shards_being_split = Jetpants.shards.select {|s| s.children.count > 0}
shard_pool = ask("Enter shard pool to take action on (enter for default pool, #{Jetpants.topology.default_shard_pool}):")
shard_pool = Jetpants.topology.default_shard_pool if shard_pool.empty?
shards_being_split = Jetpants.shards(shard_pool).select {|s| s.children.count > 0}
if shards_being_split.count == 0
raise 'No shards are currently being split. You can only use this task after running "jetpants shard_split".'
elsif shards_being_split.count == 1
Expand All @@ -814,9 +841,9 @@ module Jetpants
s
end

def ask_shard_being_promoted(stage = :prep, min_id = nil, max_id = nil)
def ask_shard_being_promoted(stage = :prep, min_id = nil, max_id = nil, shard_pool)
if stage == :writes || stage == :cleanup
shards_being_promoted = Jetpants.shards.select do |s|
shards_being_promoted = Jetpants.shards(shard_pool).select do |s|
[:reads, :child, :needs_cleanup].include?(s.state) && !s.parent && s.master.master
end

Expand All @@ -836,7 +863,7 @@ module Jetpants
max_id = ask("Enter max id of shard to perform master promotion: ")
max_id = Integer(max_id) rescue max_id.upcase

s = Jetpants.topology.shard(min_id, max_id)
s = Jetpants.topology.shard(min_id, max_id, shard_pool)
end
raise "Invalid shard selected!" unless s.is_a? Shard

Expand Down
13 changes: 11 additions & 2 deletions lib/jetpants.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
module Jetpants; end

$LOAD_PATH.unshift File.join(File.dirname(__FILE__), 'jetpants'), File.join(File.dirname(__FILE__), '..', 'plugins')
%w(output callback table host db pool topology shard monkeypatch commandsuite).each {|g| require g}
%w(output callback table host db pool topology shard shardpool monkeypatch commandsuite).each {|g| require g}

# Since Jetpants is extremely multi-threaded, we need to force uncaught exceptions to
# kill all threads in order to have any kind of sane error handling.
Expand Down Expand Up @@ -37,7 +37,7 @@ module Jetpants
'verify_replication' => true, # raise exception if the 2 repl threads are in different states, or if actual repl topology differs from Jetpants' understanding of it
'plugins' => {}, # hash of plugin name => arbitrary plugin data (usually a nested hash of settings)
'ssh_keys' => nil, # array of SSH key file locations
'sharded_tables' => [], # array of name => {sharding_key=>X, chunks=>Y} hashes
'sharded_tables' => [], # hash of {shard_pool => {name => {sharding_key=>X, chunks=>Y}} hashes
'compress_with' => false, # command line to use for compression in large file transfers
'decompress_with' => false, # command line to use for decompression in large file transfers
'private_interface' => 'bond0', # network interface corresponding to private IP
Expand All @@ -48,6 +48,14 @@ module Jetpants
'log_file' => '/var/log/jetpants.log', # where to log all output from the jetpants commands
'local_private_interface' => nil, # local network interface corresponding to private IP of the machine jetpants is running on
'free_mem_min_mb' => 0, # Minimum amount of free memory in MB to be maintained on the node while performing the task (eg. network copy)
'default_shard_pool' => nil, # default pool for sharding operations
'import_without_indices' => false,
'ssl_ca_path' => '/var/lib/mysql/ca.pem',
'ssl_client_cert_path' => '/var/lib/mysql/client-cert.pem',
'ssl_client_key_path' => '/var/lib/mysql/client-key.pem',
'encrypt_with' => false, # command line stream encryption binary
'decrypt_with' => false, # command line stream decryption binary
'encrypt_file_transfers' => false # flag to use stream encryption
}

config_paths = ["/etc/jetpants.yaml", "~/.jetpants.yml", "~/.jetpants.yaml"]
Expand Down Expand Up @@ -156,5 +164,6 @@ def with_retries(retries = nil, max_retry_backoff = nil)

# Finally, initialize topology object
@topology = Topology.new
@topology.load_shard_pools unless @config['lazy_load_pools']
@topology.load_pools unless @config['lazy_load_pools']
end
39 changes: 36 additions & 3 deletions lib/jetpants/db/import_export.rb
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ def rebuild!(tables=false, min_id=false, max_id=false)

p = pool
if p.is_a?(Shard)
tables ||= Table.from_config 'sharded_tables'
tables ||= Table.from_config('sharded_tables', p.shard_pool.name)
min_id ||= p.min_id
max_id ||= p.max_id if p.max_id != 'INFINITY'
end
Expand Down Expand Up @@ -307,8 +307,40 @@ def rebuild!(tables=false, min_id=false, max_id=false)
export_schemata tables
export_data tables, min_id, max_id
import_schemata!
alter_schemata if respond_to? :alter_schemata
if respond_to? :alter_schemata
alter_schemata
# re-retrieve table metadata in the case that we alter the tables
pool.probe_tables
tables = pool.tables.select{|t| pool.tables.map(&:name).include?(t.name)}
end

index_list = {}
db_prefix = "USE #{app_schema};"

if Jetpants.import_without_indices
tables.each do |t|
index_list[t] = t.indexes

t.indexes.each do |index_name, index_info|
drop_idx_cmd = t.drop_index_query(index_name)
output "Dropping index #{index_name} from #{t.name} prior to import"
mysql_root_cmd("#{db_prefix}#{drop_idx_cmd}")
end
end
end

import_data tables, min_id, max_id

if Jetpants.import_without_indices
index_list.each do |table, indexes|
next if indexes.keys.empty?

create_idx_cmd = table.create_index_query(indexes)
index_names = indexes.keys.join(", ")
output "Recreating indexes #{index_names} for #{table.name} after import"
mysql_root_cmd("#{db_prefix}#{create_idx_cmd}")
end
end

restart_mysql
catch_up_to_master if is_slave?
Expand Down Expand Up @@ -345,8 +377,9 @@ def clone_to!(*targets)
targets.concurrent_each {|t| t.ssh_cmd "rm -rf #{t.mysql_directory}/ib_logfile*"}

files = (databases + ['ibdata1', app_schema]).uniq
files += ['*.tokudb', 'tokudb.*', 'log*.tokulog*'] if ssh_cmd("test -f #{mysql_directory}/tokudb.environment 2>/dev/null; echo $?").chomp.to_i == 0
files << 'ib_lru_dump' if ssh_cmd("test -f #{mysql_directory}/ib_lru_dump 2>/dev/null; echo $?").chomp.to_i == 0

fast_copy_chain(mysql_directory, destinations, :port => 3306, :files => files, :overwrite => true)
clone_settings_to!(*targets)

Expand Down
28 changes: 25 additions & 3 deletions lib/jetpants/db/replication.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,39 @@ def change_master_to(new_master, option_hash={})

repl_user = option_hash[:user] || replication_credentials[:user]
repl_pass = option_hash[:password] || replication_credentials[:pass]
use_ssl = new_master.use_ssl_replication? && use_ssl_replication?

pause_replication if @master && !@repl_paused
result = mysql_root_cmd "CHANGE MASTER TO " +
cmd_str = "CHANGE MASTER TO " +
"MASTER_HOST='#{new_master.ip}', " +
"MASTER_PORT=#{new_master.port}, " +
"MASTER_LOG_FILE='#{logfile}', " +
"MASTER_LOG_POS=#{pos}, " +
"MASTER_USER='#{repl_user}', " +
"MASTER_PASSWORD='#{repl_pass}'"

output "Changing master to #{new_master} with coordinates (#{logfile}, #{pos}). #{result}"

if use_ssl
ssl_ca_path = option_hash[:ssl_ca_path] || Jetpants.ssl_ca_path
ssl_client_cert_path = option_hash[:ssl_client_cert_path] || Jetpants.ssl_client_cert_path
ssl_client_key_path = option_hash[:ssl_client_key_path] || Jetpants.ssl_client_key_path

cmd_str += ", MASTER_SSL=1"
cmd_str += ", MASTER_SSL_CA='#{ssl_ca_path}'" if ssl_ca_path

if ssl_client_cert_path && ssl_client_key_path
cmd_str +=
", MASTER_SSL_CERT='#{ssl_client_cert_path}', " +
"MASTER_SSL_KEY='#{ssl_client_key_path}'"
end
end

result = mysql_root_cmd cmd_str

msg = "Changing master to #{new_master}"
msg += " using SSL" if use_ssl
msg += " with coordinates (#{logfile}, #{pos}). #{result}"
output msg

@master.slaves.delete(self) if @master rescue nil
@master = new_master
@repl_paused = true
Expand Down
8 changes: 8 additions & 0 deletions lib/jetpants/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ def detect_table_schema(table_name)
'columns' => connection.schema(table_name).map{|schema| schema[0]}
}

if pool.is_a? Shard
config_params = Jetpants.send('sharded_tables')[pool.shard_pool.name.downcase]

unless(config_params[table_name].nil?)
params.merge!(config_params[table_name])
end
end

Table.new(table_name, params)
end

Expand Down
2 changes: 1 addition & 1 deletion lib/jetpants/db/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def stop_mysql
output "Attempting to shutdown MySQL"
disconnect if @db
output service(:stop, 'mysql')
running = ssh_cmd "netstat -ln | grep ':#{@port}' | wc -l"
running = ssh_cmd "netstat -ln | grep \":#{@port}\\s\" | wc -l"
raise "[#{@ip}] Failed to shut down MySQL: Something is still listening on port #{@port}" unless running.chomp == '0'
@options = []
@running = false
Expand Down
5 changes: 5 additions & 0 deletions lib/jetpants/db/state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,11 @@ def avg_buffer_pool_hit_rate
((buffer_pool_hit_rate.split[4].to_f * 100) / buffer_pool_hit_rate.split[6].to_f).round(2)
end

# Determine whether a server should use ssl as a replication source
def use_ssl_replication?
global_variables[:have_ssl] && global_variables[:have_ssl].downcase == "yes"
end

###### Private methods #####################################################

private
Expand Down
Loading

0 comments on commit e5bbc19

Please sign in to comment.