php open locking daemon

Don’t you hate that… When it’s 2:00am… and you really should be in bed… But your mind has hold of a problem, and wont let it go. I have a project where it would be really handy for a process to be able to lock (arbitrary string identifier) and for another process to be able to check whether (arbitrary string identifier) is still locked. And the processes that do the locking can die… so the lock really needs to expire when they do. I could use MySQLs get_lock but I’m already abusing the hell out of that for more distributed things (and since you cannot have more than one mysql named lock at a time per connection, i don’t think it would work here…) in the originating processes, and these locks are machine wide, not network wide…

I don’t like flock because you have to actually create a file to try and lock it leaving race conditions and the possibility of orphaned files on the file system which just sucks… I thought about Memcached but I really need something that can be held open for long periods of inactivity and released if the client dies which precludes the infinite and the timed method of memcached value storage…

After some searching I found old — Open Lock Daemon which looked like a super good fit… Until I dug into the communication protocol… What a nightmare for wanting to lock a string… srsly. So not being able to find anything (and apparently not being able to sleep until I had a satisfactory answer) I decided to write one. In PHP, naturally. Weighing in at 180 lines I think it’s a pretty acceptable/workable first pass.

[ edit: code available here ]

class lockd {

        var $sock = null;
        var $lockd_port = 2626;
        var $lockd_addr = '127.0.0.1';
        var $processing = 0;

        var $stat_connects = 0;
        var $stat_orphans  = 0;
        var $stat_commands = 0;
        var $gs = 0;
        var $rs = 0;
        var $is = 0;
        var $qs = 0;

        var $answering = true;
        var $connections = array();
        var $locks = array();

        function lockd( $args=array() ) {
                foreach ( $args as $i => $v ) {
                        $i = "lockd_$i";
                        $this->$i = $v;
                }

                set_time_limit( 0 );
                pcntl_signal( SIGTERM,  array( $this, 'sig_handler' ) );
                pcntl_signal( SIGHUP,   array( $this, 'sig_handler' ) );
                pcntl_signal( SIGUSR1,  array( $this, 'sig_handler' ) );

                $this->sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
                @socket_set_option( SOL_SOCKET, SO_REUSEADDR, 1 );
                @socket_set_option( SOL_SOCKET, SO_LINGER, array( 'l_onoff' => 0, 'l_linger' => 0 ) );
                socket_set_nonblock( $this->sock );
                if ( !socket_bind( $this->sock, $this->lockd_addr, $this->lockd_port ) )
                        die( "Could not bind socket...
" );
                if ( !socket_listen( $this->sock, 100 ) )
                        die( "Could not listen on socket...
" );

                $pid = pcntl_fork();
                if ( $pid == -1 )
                        die( "Error Forking...
" );
                if ( $pid )
                        die( "Detaching...
" );
                echo chr(10).getmypid().chr(10);

                register_tick_function( array( &$this, 'process' ) );

                declare(ticks = 1);

                $this->accept_loop();

        }

        function accept_loop() {
                if ( $this->answering ) {
                        if ( ( $c = @socket_accept( $this->sock) ) ) {
                                // echo "Accepting connection #".count( $this->connections )." $c ".chr(10);
                                socket_set_block( $c );
                                socket_set_option( $c, SOL_SOCKET, SO_KEEPALIVE, 1 );
                                socket_set_option( $c, SOL_SOCKET, SO_RCVLOWAT, 2 );
                                $this->connections[] = &$c;
                        } else {
                                usleep( 50000 );
                        }
                }
                $this->accept_loop();
        }

        function sig_handler( $sig ) {
                $this->answering = false;
                //echo "Got signal: $sig
";
                //echo "\tKilling the listening socket
";
                socket_close( $this->sock );
                foreach ( $this->connections as $i => $c ) {
                        //echo "\tKilling connection $i
";
                        socket_close( $this->connections[$i] );
                }
                //echo "Sockets closed. Shutting down...";
                die();
        }

        function process() {
                // begin function locking
                if ( $this->processing )
                        return;
                $this->processing++;
                if ( $this->processing > 1 ) {
                        $this->processing--;
                        return;
                }
                // end function locking

                if ( !count( $this->connections ) ) {
                        $this->processing--;
                        return;
                }

                // process reads
                $r = $this->connections;
                $w = null;
                $e = null; //$this->connections;

                $num_changed_sockets = @socket_select($r, $w, $e, 0);
                if ( $num_changed_sockets === false ) {
                        $this->processing--;
                        return;
                }
                if ( $num_changed_sockets < 1 ) {                         $this->processing--;
                        return;
                }
                foreach ( $r as $c ) {
                        if ( !socket_get_option( $c, SOL_SOCKET, SO_RCVBUF ) )
                                continue;
                        $n = array_search( $c, $this->connections );
                        $d = socket_read( $c, 4096 );
                        if ( !$d ) {
                                // echo "< < #$c EOF
";                                 socket_close( $this->connections[$n] );
                                unset( $this->connections[$n] );
                                foreach ( array_keys( $this->locks, $c ) as $lock ) {
                                        $this->stat_orphans++;
                                        unset( $this->locks[$lock] );
                                }
                                continue;

                        }

                        $cmd = $d{0};
                        $hash = md5( substr( $d, 1 ) );
                        switch ( $cmd ) {
                                case 'g': // get a lock
                                        $this->gs++;
                                        if ( isset( $this->locks[$hash] ) ) {
                                                socket_write( $c, "0 Cannot Get Lock
" );
                                                break;
                                        }
                                        $this->locks[$hash] = $c;
                                        socket_write( $c, "1 Got Lock
" );
                                        break;
                                case 'r': // release lock
                                        $this->rs++;
                                        if ( isset( $this->locks[$hash] ) && $this->locks[$hash] == $c ) {
                                                unset( $this->locks[$hash] );
                                                socket_write( $c, "1 Released Lock
" );
                                                break;
                                        }
                                        socket_write( $c, "0 Cannot Release Lock
" );
                                        break;
                                case 'i': // inspect lock
                                        $this->is++;
                                        if ( isset( $this->locks[$hash] ) )
                                                socket_write( $c, "1 Locked
" );
                                        else
                                                socket_write( $c, "0 Not Locked
" );
                                        break;
                                case 'q': // get system stats
                                        $this->qs++;
                                        socket_write( $c, print_r( array(
                                                'conns' => count( $this->connections ),
                                                'locks' => count( $this->locks ),
                                                'orphans' => $this->stat_orphans,
                                                'commands' => $this->stat_commands,
                                                'command_g' => $this->gs,
                                                'command_r' => $this->rs,
                                                'command_i' => $this->is,
                                                'command_q' => $this->qs,
                                        ), true ) );
                                        break;
                        }

                        // echo "$c < < $d";                  }                 $this->processing--;
        }
}

3 thoughts on “php open locking daemon

Leave a Reply