How to Start a Software Company 2.0

by Richard Rodger

       
 
Race Condition in Ruby GServer

I've been playing around with Ruby lately, trying to write a UDP server. Of course, the way to write servers in Ruby is apparently (I'm still a bit new at this Ruby malarky) to subclass GServer.

And what a fine piece of code GServer is! Really does the job and shows just how easy it is to do stuff in Ruby. If you want to a TCP server.

But I needed a UDP server. So after much messing about I came up with the following half-baked solution. Please feel free to tear to pieces. I should remind you however that this code is WOMM certified.

# with apologies to John W. Small

require "socket"
require "thread"

class UServer

  DEFAULT_HOST = "127.0.0.1"
  DEFAULT_TRANSPORT = "TCP";

  def serve(io,content)
  end

  @@services = {}   # Hash of opened ports, i.e. services
  @@servicesMutex = Mutex.new

  def UServer.stop(port, 
                   host = DEFAULT_HOST, 
                   trans = DEFAULT_TRANSPORT)
    @@servicesMutex.synchronize {
      @@services[host][port][trans].stop
    }
  end

  def UServer.in_service?(port, 
                          host = DEFAULT_HOST, 
                          trans = DEFAULT_TRANSPORT)
    @@services.has_key?(host) and 
      @@services[host].has_key?(port) and
      @@services[host][port].has_key?(trans)
  end

  def stop
    @connectionsMutex.synchronize  {
      if @serverThread
        @serverThread.raise "stop"
      end
    }
  end

  def stopped?
    nil == @serverThread
  end

  def shutdown
    @shutdown = true
  end

  def connections
    @connections.size
  end

  def join
    @serverThread.join if @serverThread
  end

  attr_reader :port, :host, :trans, :maxConnections
  attr_accessor :stdlog, :audit, :debug

  def connecting(client)
    addr = client.peeraddr
    log("#{self.class.to_s} #{@host}:#{@port} #{@trans} client:#{addr[1]} " +
        "#{addr[2]}<#{addr[3]}> connect")
    true
  end

  def disconnecting(clientPort)
    log("#{self.class.to_s} #{@host}:#{@port} #{@trans}" +
      "client:#{clientPort} disconnect")
  end

  protected :connecting, :disconnecting

  def starting()
    log("#{self.class.to_s} #{@host}:#{@port} #{@trans} start")
  end

  def stopping()
    log("#{self.class.to_s} #{@host}:#{@port} #{@trans} stop")
  end

  protected :starting, :stopping


  def error(detail)
    log(detail.backtrace.join("\n"))
  end

  def log(msg)
    if @stdlog
      @stdlog.puts("[#{Time.new.ctime}] %s" % msg)
      @stdlog.flush
    end
  end

  protected :error, :log

  def initialize(port, host = DEFAULT_HOST, trans = DEFAULT_TRANSPORT, maxConnections = 4,
    stdlog = $stderr, audit = false, debug = false)
    @serverThread = nil
    @port = port
    @host = host
    @trans = trans
    @maxConnections = maxConnections
    @connections = []
    @connectionsMutex = Mutex.new
    @connectionsCV = ConditionVariable.new
    @stdlog = stdlog
    @audit = audit
    @debug = debug
  end


  def start(maxConnections = -1)
    raise "running" if !stopped?
    @shutdown = false
    @maxConnections = maxConnections if maxConnections > 0
    @@servicesMutex.synchronize  {
      if UServer.in_service?(@port,@host,@trans)
        raise "Port already in use: #{host}:#{@port} #{@trans}!"
      end

      if "TCP" == @trans
        @server = ContentTCPServer.new(@host,@port)
      else
        @server = ContentUDPServer.new(@host,@port)
      end

      @@services[@host] = {} unless @@services.has_key?(@host)
      @@services[@host][@port] = {} unless @@services[@host].has_key?(@port)
      @@services[@host][@port][@trans] = self;
    }
    @serverThread = Thread.new {
      begin
        starting if @audit
        while !@shutdown
          @connectionsMutex.synchronize  {
          puts "start @con.size=#{@connections.size}"
             while @connections.size >= @maxConnections
               @connectionsCV.wait(@connectionsMutex)
             end
          }

          client, port, close, content = @server.accept

          Thread.new(client,port,close,content)  { 
            |myClient, myPort, myClose, myContent|

            @connectionsMutex.synchronize {
              @connections << Thread.current
            }
            begin
              serve(myClient,myContent) if !@audit or connecting(myClient)
              puts "finished serve"
            rescue => detail
              error(detail) if @debug
            ensure
              begin
                if myClose
                  myClient.close
              end
              rescue
              end

              @connectionsMutex.synchronize {
                @connections.delete(Thread.current)
                @connectionsCV.signal
              }

              disconnecting(myPort) if @audit
            end
          }
        end
      rescue => detail
        error(detail) if @debug
      ensure
        begin
          @server.close
        rescue
        end
        if @shutdown
          @connectionsMutex.synchronize  {
             while @connections.size > 0
               @connectionsCV.wait(@connectionsMutex)
             end
          }
        else
          @connections.each { |c| c.raise "stop" }
        end
        @serverThread = nil
        @@servicesMutex.synchronize  {
          @@services[@host][@port].delete(@trans)
        }
        stopping if @audit
      end
    }
    self
  end

end


class ContentTCPServer

  def initialize( host, port )
    @server = TCPServer.new(host,port)
  end
  
  def accept
    client = @server.accept
    return [client,client.peeraddr[1],true,client.gets(nil)]
  end  

  def close 
    @server.close
  end

end


class ContentUDPServer

  def initialize( host, port )
    puts "init"

    @socket = UDPSocket.new
    puts "new s: #{@socket} on #{host}:#{port}"

    @socket.bind(host, port)
    puts "bound: #{@socket}"
  end
  
  def accept
    puts "accept"

    packet = @socket.recvfrom(1024)
    return [@socket, 0, false, packet[0]]
  end  

  def close 
    @socket.close
  end

end

Lovely! Who says Java can't be translated into Ruby? I even used duck typing!

Anyway, see if you can spot the significant difference in thread handling between this and the original GServer.

Well, OK, here it is:

GServer says:

@connections << Thread.new(client)  { |myClient|
  ...
}

I say:

Thread.new(client,port,close,content)  { |myClient, myPort, myClose, myContent|
  @connectionsMutex.synchronize {
    @connections << Thread.current
  }
  ...
}

What happened was that UDP packets were handled so quickly that the thread was (mostly) never placed in the @connections list until after it was (supposedly) removed from the list by

@connectionsMutex.synchronize {
  @connections.delete(Thread.current)
  @connectionsCV.signal
}

at the end of the request thread. Seems like a nasty race condition to me. My code just makes sure that the thread is placed into the @connections list before nasty stuff can happen.

Am I right? You tell me...

@ 03:34 PM GMT+00:00 [ comments [2] ]   email this   links to this

If you liked this entry, please consider bookmarking it &mdash Thanks!
Bookmark Race Condition in Ruby GServer at del.icio.us Digg Race Condition in Ruby GServer at Digg.com Bookmark Race Condition in Ruby GServer at reddit.com Bookmark Race Condition in Ruby GServer at YahooMyWeb Bookmark Race Condition in Ruby GServer at Spurl.net Bookmark Race Condition in Ruby GServer at Simpy.com Bookmark Polyphasic Mutants at NewsVine Blink this Race Condition in Ruby GServer at blinklist.com Bookmark Race Condition in Ruby GServer at Furl.net Fark Race Condition in Ruby GServer at Fark.com

 
 
Trackback URL: http://old.richardrodger.com/roller/trackback/richard/Weblog/race_condition_in_ruby_gserver
Comments:

Python does this really well with a nice hierarchy of classes that includes a UDP Server a TCPServer and a ThreadingMixin that automagically converts your server to a multi-threaded server.

Apologies for the shameless python plug but I just implemented a logging server in about a day using this framework and I was pretty impressed.

Posted by Joe Drumgoole on March 26, 2007 at 11:13 AM GMT+00:00
Website: http://putplace.com #

I've heard that Python is very handy for this sort of thing. This is more of a Ruby learning exercise for me though.

I've already hit some flakiness - for example "gets" on windows blocks *all* threads. Nasty. I know Python would be quite a bit more mature in this area for sure.

I'm going to put the code up on http://sipsanity.sourceforge.net so check back if you're interested in my ruby-based approach. It will be ugly!

Posted by Richard Rodger on March 27, 2007 at 10:52 PM GMT+00:00
Website: http://www.ricebridge.com #

Comments for this have been disabled. Please send me a mail if you want to comment and I will activate comments again.
 
YahooBloglines
NewsgatorMSN
Google Readerdel.icio.us FurlSubscribe with myFeedster
« March 2007 »
SunMonTueWedThuFriSat
    
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
21
22
23
24
25
26
27
28
29
30
31
       
Today

All | General | Java | Business | Fun | Perl | Rant | Ireland | Web
[This is a Roller site]
[Valid Atom 1.0] [Valid RSS]
Technology Blog Top Sites
Blogarama - The Blogs Directory

Blog Directory & Search engine

Blog Flux Directory
Irish Blogs
 View My Public Stats on MyBlogLog.com

Performancing
Enter your Email


Powered by FeedBlitz
Theme adapted from Sotto.
 
Ricebridge XML Manager
  • Convert XML to a table of data
  • Convert XML to CSV, and CSV to XML
  • High-speed, single-pass XPath
  • Memory-stable and fault-tolerant
  • Loads of documentation
  • Cut-and-paste code examples
  • Find a bug, get a gift cert
Ricebridge Java XML Manager Component


Ricebridge CSV Manager
  • Convert CSV to a table of data
  • Handle any type of delimited file
  • Memory-stable and fault-tolerant
  • Loads of documentation
  • Cut-and-paste code examples
  • Find a bug, get a gift cert
Ricebridge Java CSV Manager Component


Popular Posts

 Sign up for MyBlogLog.com
Alertra Website Monitoring Service
Get Chitika eMiniMalls
Solo Tees
BlogJet