Throttle your Threads…

Lets say you want to run some command, such as /bin/long-command on a set of directories. And you have a lot of directories. You know it’ll take forever to complete serially, so you want to cook up a way to run these commands in parallel. You know the server CAN handle more than one command at once, but you have no idea how many it can handle without keeling over, and you have thousands of commands to run. Running them all at once backgrounded will kill the system for sure. You COULD try and stagger them and let the delay in overlap be a natural throttle, but sometimes the command completes in one minute and sometimes in 10, so thats not a good idea either. So you decide it would be best to set a process concurrency limit. But what if you set that limit too low? too high? restarting in the middle would be bad… you COULD make some sort of completed log and build into your script a skip for completed files, but why? that doesnt seem so elegant. Your car is good at handling variable speed allowances… it goes fast when you say and slow when you say… maybe we can give a simple bash script a gas pedal? That just might work!

echo '5' > /tmp/threads
for i in $(fileroot/*); do  while [ $(pgrep long-command) -ge $(cat /tmp/threads) ]

do

sleep 1

done

( /bin/long-command $i )&

sleep 1

done

Now you can speed it up and throttle it back by adjusting the integer value inside /tmp/threads.

“It was the little old server from Pasadena…”

so-you-wanna-see-an-image

We’ve been asked how we manage serving files from Amazons very cool S3 service at WordPress.com… This is how. (covering a requested image already stored on S3, not the upload -> s3 process)

A request comes into pound for a file. Pound hashes the hostname (via a custom patch which we have not, but may, release) , to determine which of several backend servers the request should hit. Pound forwards the request to that server. This, of course, means that a given blog always serves from the same backend server. The only exception to the afore-mentioned rule is if that server is, for some reason, unavailable in which case it picks another server to serve that hostname from temporarily.

The request then comes into varnishd on the backend servers. The varnishd daemon checks its 300Gb worth of files cache and (for the sake of this example) finds nothing (hey, new images are uploaded all the time!) Varnishd then checks with the web server (running on the same machine, just bound to a different IP/Port#) and that request is handled by a custom script.

So, a http daemon on the same backend server runs the file request. The custom script checks the DB to gather information on the file (specifically which DC’s it is in, size, mod time, and whether its deleted or not) all this info is saved in memcached for 5 minutes. The script increments and checks the “hawtness” (term courtesy of Barry) of the file in memcached (if the file has been accessed over a certain # of times it is then deemed “hawt”, and a special header is sent with the response telling varnishd to put the file into its cache. When that happens the request would be served directly by varnishd in the previous paragraph and never hit the httpd or this script again (or at least not until the cache entry expires.)) At this point, assuming the file should exist (deleted = 0 in the files db) we fetch the file from a backend source.

Which backend source depends on where it is available. The order of preference is as follows: Always fetch from Amazon S3 if the file lives there (no matter what, the following preferences only ever occur if, for some reason, s3 = 0 in the files db), and if that fails fetch from the one files server we still have (which has larger slower disks, and is used for archiving purposes and fault tolerance only)

After fetching the file from the back end… the custom script hands the data and programatically generated headers to the http daemon, which hands the data to varnishd, varnishd hands the data to pound, pound hands the data to the requesting client, and the image appears in the web browser.

And there was much rejoicing (yay.)

For the visual people among us who like visuals and stuff… (I like visuals…) here goes…

Autumn Leaves Leaf #3: Commander

This leaf is capable of running a script on the local server in response to the !deploy channel command. For security you have to authenticate first. To do so you send it a message with a password. it then it http authenticates against a specific url with your nickname and the mesage text as the password. If the file fetched matches predesignated contents then you are added to the internal ACL. Anyone in the ACL can run the !deploy command. If you leave the chan, join the chan, change nicks, or quit irc you will be removed from the ACL and have to re-authenticate. This could be adapted to any system command for any purpose. I ended up not needing this leaf; I still wanted to put it out there since its functional and useful.

require 'net/http'
require 'net/https'

class Commander < AutumnLeaf
  
  before_filter :authenticate, :only => [ :reload, :sync, :quit, :deploy ]
  $authenticated = []

  def authenticate_filter(sender, channel, command, msg, options)
    return true if $authenticated.include?(sender)
    return false
  end

  def did_receive_private_message(sender, msg)
    # assumes there is a file at 
    # http://my.svnserver.com/svn/access 
    # whose contents are "granted" 
    Net::HTTP.start('my.svnserver.com') {|http|
      req = Net::HTTP::Get.new('/svn/access');
      req.basic_auth(sender, msg)
      response = http.request(req)
      $authenticated < < sender if response.body == "granted"
    }
  end

  def someone_did_quit(sender, msg)
    $authenticated.delete(sender)
  end

  def someone_did_leave_channel(sender, channel)
    $authenticated.delete(sender)
  end

  def someone_did_join_channel(sender, channel)
    $authenticated.delete(sender)
  end

  def deploy_command(sender, channel, text)
    message "deploying..."
    system("sudo /usr/local/bin/deploy.sh 1>/dev/null 2>/dev/null")
   end

end

Autumn Leaves Leaf #2: Feeder

This handy little bot keeps track of RSS feeds, and announces in the channel when one is updated. (note: be sure to edit the path to the datafiles) Each poller runs inside its own ruby thread, and can be run on its own independent schedule

require 'thread'
require 'rss/1.0'
require 'rss/2.0'
require 'open-uri'
require 'fileutils'
require 'digest/md5'

class Feeder < AutumnLeaf

def watch_feed(url, title, sleepfor=300)
  message "Watching (#{title}) [#{url}] every #{sleepfor} seconds"
  feedid = Digest::MD5.hexdigest(title)
  Thread.new {
    while true
      begin
        content = ""
        open(url) { |s|
          content = s.read
        }
        rss = RSS::Parser.parse(content, false)
        rss.items.each { |entry|
          digest = Digest::MD5.hexdigest(entry.title)
          if !File.exist?("/tmp/.rss.#{feedid}.#{digest}")
            FileUtils.touch("/tmp/.rss.#{feedid}.#{digest}")
            message "#{entry.title} (#{title}) #{entry.link}"
          end
          sleep(2)
        }
      rescue
        sleep(2)
      end
      sleep(sleepfor)
    end
  }
  sleep(1)
end

def did_start_up
  watch_feed("http://planet.wordpress.org/rss20.xml", "planet", 600)
  watch_feed("http://wordpress.com/feed/", "wpcom", 300)
end

end

Autumn Leaves Leaf #1: Announcer

This bot is perfect for anything where you need to easily build IRC channel notifications into an existing process. It’s simple, clean, and agnostic. Quite simply you connect to a TCP port, give it one line, the port closes, the line given shows up in the channel. eg: echo ‘hello’ | nc -q 1 bothost 22122

require 'socket'
require 'thread'

class Announcer < AutumnLeaf

        def handle_incoming(sock)
                Thread.new {
                line = sock.gets
                message line
                sock.close
                }
        end

        def did_start_up
                Thread.new {
                        listener = TCPServer.new('',22122)
                        while (new_socket = listener.accept)
                                handle_incoming(new_socket)
                        end
                }
        end

end