Fighting with EventMachine

Here is one of my old post (late 2011) from dev.af83.com about EventMachine, when I worked with ruby. TL;DR without futures/promises or fibers async languages are doomed.

If it’s your first night with EventMachine, you have to fight.

EventMachine in a nutshell

EventMachine is an implementation of the reactor pattern. It’s not Twisted, nor Node, nor Erlang. It follows the Ruby way, hence you have to follow the EventMachine way.

The event-loop pattern is now mainstream

The event-loop pattern is quite simple. The actions are no more systematically sequential. It’s not thread-based: there is only one thread for the event-loop, therefore execution is still sequential. This enforce a unique context, accessing variables is still atomic (no concurrency).

Classical asynchronous actions are waiting actions, such as IOs (network or disk access). This makes long-lived actions (CPU intensive) to jam the flow. Async actions are detached from the flow, and a callback is called when it finished. If you don’t take care, you will have stairs of block (one for each callback) and troubles for handling errors, the now infamous callback spaghetti.

The EventMachine way

EventMachine.run do
  http = EventMachine::HttpRequest.new("http://google.com").get
  http.callback do
    puts "got a response"
    puts http.response
    EventMachine.stop
  end
  puts "too early"
end

The first block handles the reactor, the second one is the callback.

What if I want to fetch more than one url?

EventMachine.run do
  google = EventMachine::HttpRequest.new("http://google.com").get
  google.callback do
    puts google.response
    yahoo  = EventMachine::HttpRequest.new("http://yahoo.com").get
    yahoo.callback do
      puts yahoo.response
      EventMachine.stop
    end
  end
end

As the event-loop pattern forces you to nest callbacks, you’re building an infinite staircase. It also makes actions sequential, but you may don’t want google to answer before yahoo. You also had to add one level of indentation per url.

A workaround is to handle it by hand:

EventMachine.run do
  urls = %w{ google.com yahoo.com}
  finished = urls.length
  responses = urls.map do |url|
    response = EventMachine::HttpRequest.new("http://#{url}").get
    response.callback do
      finished -= 1
      if finished == 0
        puts responses.map{|response| response.response }
        EventMachine.stop
      end
    end
    response
  end
end

Don’t you love flashbacks in movies? This way, url fetches are parallel, and you’ve even get a callback when all actions are finished.

EventMachine provides a helper for such common patterns:

EventMachine.run do
  urls = %w{ google.com yahoo.com}
  EventMachine::Iterator.new(urls, urls.length).map(
    proc do |url, iter|
      http = EventMachine::HttpRequest.new("http://#{url}").get
      http.callback { iter.return http.response }
    end,
    proc do |responses|
      puts responses
      EventMachine.stop
    end
  )
end

It is the official way, but IMO it turned out not as elegant as it could have been. The first Proc handles the iterations and return the response, the second Proc is the finished callback.

Em-http-request provides a specific object for doing batch jobs in a simpler form:

EventMachine.run do
  multi = EventMachine::MultiRequest.new
  multi.add(:google, EventMachine::HttpRequest.new("http://google.com").get)
  multi.add(:yahoo, EventMachine::HttpRequest.new("http://yahoo.com").get)
  multi.callback do
    puts multi.responses[:callback]
    puts multi.responses[:errback]
    EventMachine.stop
  end
end

Short, elegant, and specific, isn’t it? As nobody cares about the specific requests’ callbacks, they are out. The response is deferrable and MultiRequest collects the spreaded responses. You don’t have to handle parallel or sequential multi-actions in your own API. It’s really a job for EventMachine (or Synchrony)!

The mysterious Deferrable behavior

As seen above, an async action returns a deferrable response, which is a mysterious object bound a callback. Why not a classical Proc? Why not a quick and dirty DefaultDeferrable as you may have seen in EM’s doc?

The callback is just a trigger, it is not responsible for giving the answer back: the deferrable object is the answer.

But Deferrable is a complex answer, providing a state, two (actual) triggers, a success callback and a error callback; and an optional timeout.

In this pattern, an async function returns a response immediately (a Deferrable one), but the actual value of the response will be available later. Think of it as a closed box that will open itself, later on.

Using API functions with a block is just a syntactic sugar, but don’t forget to provide both a callback and an errorback, and more importantly, you must return a Deferrable.

Example of an API based on EventMachine

Let’s take a simple example: airports are required to freely give weather information (no authentication layer to handle). So here is the first weather webservice I found in Google. It’s PHP-based on the server side, response serialization is done the oldschool way, but it’s free indeed and it just works.

class Weather
  include EventMachine::Deferrable
  attr_reader :code, :date, :temp, :humidity, :wind_speed

  def _feed(code, raw)
    rough = raw.split('#')
    @code = code
    @date, @temp, @humidity, @wind_speed = rough[0..3]
  end

  def to_s
    "<Weather: code=#{@code} temp=#{@temp} humidity=#{@humidity} wind_speed=#{@wind_speed}>"
  end
end

def meteo(icao, &block)
  response = Weather.new
  response.callback(&block) if block_given?
  http = EventMachine::HttpRequest.new("http://stationmeteo.meteorologic.net/metar/your-metar.php?type=mes&icao=#{icao}").get
  http.errback do |error|
    response.fail :system, error
  end
  http.callback do
    response._feed icao, http.response
    response.succeed response #giving itself as first argument give the choice to how handle answer
  end
  response
end

And now an example with EventMachine’s iterator:

EventMachine.run do
  # Airport of Lyon, Paris and Marseille; US airports don't seems to work.
  EventMachine::Iterator.new(%w{LFRS LFLL LFML}, 3).map(
    proc do |airport, iter|
      meteo airport do |score|
        iter.return score
      end
    end,
    proc do |scores|
      puts scores
      EM.stop
    end
  )
end

Fiber, or the hidden-threaded way

It can be funny to fight with callback in daemon project, at least for some time. But it may come a time when you just yearn for sequential programming. Still you want to use this cool framework wich uses EventMachine to handle lots of parallels connections. How would you do then?

You could strive to parrallelize a few requests, but most of the time, you just want to be able to describe your needs in a sequential fashion, everywhere.

Fiber, shipped with Ruby 1.9, is the answer. A Fiber waits for the response, for you, pausing the execution in the middle of a flat code chunk using #yield and resuming execution at any time with #resume. We are using Synchrony here:

alias :ameteo :meteo

def meteo(icao)
  f = Fiber.current
  conn = ameteo icao
  conn.callback { |resp| f.resume(resp) }
  conn.errback { |*errors| f.resume(*errors) }
  Fiber.yield
end

Usage is straightforward:

EventMachine.synchrony do
  puts meteo "LFRS"
  puts meteo "LFML"
  EventMachine.stop
end

Sequential actions are now, well, sequential. Revolutionary.

In order to transform you previously async methods into a Fiber-aware one, you may prefix the method with a (as in async), then wraps it within a fiber which will be resumed in the callbacks. It can be made systematic, have a look of how Synchrony monkey patches common libraries.

You could also explicitly ask for parallel actions, using an async variant of your code.

EventMachine.synchrony do
  responses = EventMachine::Synchrony::Iterator.new(%w{LFRS LFLL LFML}, 3).map do |airport, iter|
    ameteo airport do |response|
      iter.return response
    end
  end
  puts responses
  EventMachine.stop
end

EventMachine.synchrony do
  multi = EventMachine::Synchrony::Multi.new
  multi.add :satolas, ameteo("LFRS")
  multi.add :paris, ameteo("LFLL")
  responses = multi.perform.responses[:callback].values
  puts responses
  EventMachine.stop
end

blogroll

social