RethinkDB 2.0 drivers: native support for Tornado and EventMachine
Asynchronous web frameworks like Tornado and EventMachine simplify realtime application development in Python and Ruby. They support an event-driven programming model that fits the realtime web, making it easy to use WebSocket connections without blocking. You can plug in RethinkDB changefeeds—which allow you to subscribe to changes on database queries—to extend that event-driven model to your persistence layer, offering a full backend stack for pushing live updates to your frontend.
The upcoming RethinkDB 2.0 release introduces support for using Tornado and EventMachine to perform asynchronous queries in the Python and Ruby client drivers. When we announced the availability of the RethinkDB 2.0 release candidate last week, the updated client drivers weren’t quite ready yet. Today, we’re issuing fresh RethinkDB 2.0 RC client drivers that fully incorporate the new functionality (download them here).
Asynchronous query execution is particularly useful in applications that consume changefeeds. You can use asynchronous queries to keep multiple changefeeds running in the background, operating continuously without blocking execution.
In this blog post, I’ll demonstrate basic RethinkDB usage with Tornado and EventMachine and I’ll show you how I used the EventMachine integration in Ruby to build a realtime application with RethinkDB and Faye.
Asynchronous queries in Python
Tornado is a web framework and collection of asynchronous networking libraries for Python. In RethinkDB’s new Python client driver, you can use Tornado coroutines to perform queries in the background:
import rethinkdb as r
from tornado import ioloop, gen
r.set_loop_type("tornado")
@gen.coroutine
def print_changes():
conn = yield r.connect(host="localhost", port=28015)
feed = yield r.table("table").changes().run(conn)
while (yield feed.fetch_next()):
change = yield feed.next()
print(change)
ioloop.IOLoop.current().add_callback(print_changes)
The example demonstrates how to create a changefeed that tracks updates on a given table. It uses a Tornado coroutine that runs in the background to print each update that occurs on the table.
The r.set_loop_type
method tells the client driver to integrate with the
Tornado event loop. In the future, users will be able to specify other
asynchronous programming frameworks to use instead.
You can apply the gen.coroutine
decorator to turn a function into a Tornado
coroutine. When you run a coroutine, the yield
keyword causes execution of
the function to suspend temporarily until the operation after the yield keyword
is complete. Tornado coroutines let you express asynchronous operations in a
relatively flat, synchronous style.
The print_changes
function iterates over a cursor returned by a changefeed,
using the yield
keyword to pause execution until the next item is available
from the changefeed. The ioloop.IOLoop
line tells Tornado to run the
coroutine in the background. For more details, you can refer to our preliminary
Tornado integration docs.
Asynchronous queries in Ruby
EventMachine is an event-driven concurrency library for Ruby. In RethinkDB’s new Ruby client driver, you can use EventMachine to perform queries in the background:
require "rethinkdb"
include RethinkDB::Shortcuts
conn = r.connect host: "localhost", port: 28015
EM.run do
r.table("table").changes.em_run(conn) do |err, change|
puts change
end
end
The example above demonstrates how to create a changefeed that tracks updates on a particular table. It uses EventMachine to run in the background and print each update that occurs on the table.
To perform queries with EventMachine, use em_run
instead of the conventional
run
method. When you call em_run
, you can use a Ruby block to handle the
output. For a command with a single value output, the block will execute once.
When you consume a changefeed or perform a command that returns a cursor, the
block will execute once for each item. Always remember to wrap an EM.run
block around code that uses EventMachine. For more details about how to use
EventMachine in the Ruby client driver, you can refer to our preliminary
EventMachine integration docs.
Build a realtime Ruby app with Faye and RethinkDB
To take the asynchronous query support for a test drive, I built a simple realtime Ruby app with Faye and RethinkDB. Faye is an open source publish/subscribe framework that includes a backend server component and a frontend JavaScript library that runs in the web browser.
Faye makes it easy to do realtime messaging between client and server. It uses
WebSockets where available, but can also fall back on long polling. My simple
demo app is a realtime todo list. When any user adds a new item to the list or
checks off an existing item, the updates propagate to every user. The
application stores the global todo list in RethinkDB. A changefeed on the
todo
table uses Faye to send updates to all of the users.
The following code is responsible for initializing Faye, attaching a changefeed
to the todo
table, and broadcasting all updates to realtime clients:
EM.run do
...
# Initialize Faye and bind its URLs to the path /faye
App = Faye::RackAdapter.new Sinatra::Application, mount: "/faye"
# Establish a connection to the database
conn = r.connect host: "localhost", port: 28015
# Attach a changefeed to the "todo" table
r.table("todo").changes.em_run(conn) do |err, change|
# Publish each change to the "/todo/update" channel
App.get_client.publish("/todo/update", change)
end
...
end
Faye applications define path-delimited channels where messages are sent and
received. When an application publishes a message to a channel, all subscribers
receive the published data. In the example above, the application publishes
changefeed updates to the /todo/update
channel. The frontend client
subscribes to that channel in order to receive the updates:
faye.subscribe("/todo/update", function(data) {
console.log(data);
});
Both the client and the server are capable of publishing messages. The server can subscribe to a channel in order to receive instructions from the frontend. For example, when a user adds a todo list item, the frontend JavaScript client publishes a message to a Faye channel:
todo.add = function() {
faye.publish("/todo/add", todo.newItem);
};
The server subscribes to that channel in order to receive and handle the update:
App.get_client.subscribe "/todo/add" do |m|
if m.is_a? String
r.table("todo").insert(text: m, done: false).run(Conn)
end
end
In theory, I could have the frontend client subscribe to the /todo/add
channel in order to see new items added by other users. But I want to relay all
operations through the server so that I can update the backend data store and
perform proper validation.
In addition to realtime communication with Faye, the application also includes an embedded Sinatra server. It uses Sinatra to expose a REST endpoint with a JSON representation of the todo list. The frontend, which is built with Polymer, uses the JSON API to populate its initial state.
The following code demonstrates how to embed Sinatra in a Faye application. It
includes the /todo
URL endpoint, which returns the JSON todo list data:
EM.run do
Conn = r.connect host: "localhost", port: 28015
class SinatraApp < Sinatra::Base
get "/" do
redirect "/index.html"
end
get "/todo" do
r.table("todo").coerce_to("array").run(Conn).to_json
end
end
App = Faye::RackAdapter.new SinatraApp, mount: "/faye"
...
Rack::Server.start app: App, Port: 4000
end
It’s worth noting that the individual URL endpoints handled by Sinatra can also
get in on the realtime action. You can call App.get_client.publish
in any of
the URL route handlers if you want to make them pass realtime data to the
frontend.
Asynchronous programming frameworks take a lot of the complexity out of realtime application development. Used with RethinkDB changefeeds and WebSocket abstraction libraries, you have a fairly compelling stack for propagating live updates and handling realtime events.
Resources:
- See the complete source code of the todo list application
- Download the RethinkDB 2.0 release candidate
- Read the RethinkDB async docs