Certain RethinkDB drivers support asynchronous connections by integrating with popular async libraries. This is particularly useful with changefeeds and other real-time applications.
Due to its event-driven nature, JavaScript can easily execute RethinkDB queries in an asynchronous fashion. The official RethinkDB drivers currently support integration with EventMachine for Ruby, and Tornado and Twisted for Python.
No special procedures or commands are necessary to execute RethinkDB queries asynchronously in JavaScript. Read about using callbacks and promises with RethinkDB in the documentation for the run command.
In addition, RethinkDB’s cursors and feeds implement an EventEmitter interface compatible with Node’s. This allows your application to set up listeners to receive data from queries as the data becomes available.
The RethinkDB Ruby driver adds a new ReQL command, em_run, designed to work with EventMachine. In addition, it provides a superclass, RethinkDB::Handler
, with event-specific methods (e.g., on_open
, on_close
) that may be overridden by a class your application defines and passes to em_run
.
The easiest way to use RethinkDB with EventMachine is simply by passing a block to em_run
. If RethinkDB returns a sequence (including a stream), the block will be called once with each element of the sequence. Otherwise, the block will be called just once with the returned value.
Example: Iterate over a stream
require 'eventmachine'
require 'rethinkdb'
include RethinkDB::Shortcuts
conn = r.connect(host: 'localhost', port: 28015)
EventMachine.run {
r.table('test').order_by(:index => 'id').em_run(conn) { |row|
# do something with returned row data
p row
}
}
The em_run
command returns a QueryHandle
instance. The QueryHandle
will be closed when all results have been received, or when EventMachine stops running. You can explicitly close it with the close
method.
EventMachine.run {
printed = 0
handle = r.table('test').order_by(:index => 'id').em_run(conn) { |row|
printed += 1
if printed > 3
handle.close
else
p row
end
}
}
In the form above—with a block that accepts a single argument—RethinkDB’s EventMachine adapter will throw errors back up to your application for you to handle in the same fashion as you would using RethinkDB without EventMachine. If the table test
did not exist in the database above, you would receive the standard ReqlRunTimeError
:
RethinkDB::ReqlRunTimeError: Table `test.test` does not exist.
Backtrace:
r.table('test')
^^^^^^^^^^^^^^^
You can also choose to receive errors in the block by accepting two arguments.
EventMachine.run {
r.table('test').order_by(:index => 'id').em_run(conn) { |err, row|
if err
p [:err, err.to_s]
else
p [:row, row]
end
}
}
In this form, the block will receive nil
as the first argument if there is no error. In the case of an error, the second argument will be nil
.
To gain more precise control, write a class that inherits from RethinkDB::Handler
and override the event handling methods, then pass an instance of that class to em_run
.
Example: Iterate over a stream using a handler
require 'eventmachine'
require 'rethinkdb'
include RethinkDB::Shortcuts
conn = r.connect(host: 'localhost', port: 28015)
class Printer < RethinkDB::Handler
def on_open
p :open
end
def on_close
p :closed
end
def on_error(err)
p [:err, err.to_s]
end
def on_val(val)
p [:val, val]
end
end
EventMachine.run {
r.table('test').order_by(:index => 'id').em_run(conn, Printer)
}
# Sample output
:open
[:val, {"id"=>1}]
[:val, {"id"=>2}]
[:val, {"id"=>3}]
:closed
In addition to the simple on_val
method, you can provide methods that specifically apply to arrays, streams and atoms.
class Printer < RethinkDB::Handler
def on_open
p :open
end
def on_close
p :closed
end
def on_error(err)
p [:err, err.to_s]
end
# Handle arrays
def on_array(array)
p [:array, array]
end
# Handle atoms
def on_atom(atom)
p [:atom, atom]
end
# Handle individual values received from streams
def on_stream_val(val)
p [:stream_val, val]
end
def on_val(val)
p [:val, val]
end
end
EventMachine.run {
r.table('test').order_by(:index => 'id').em_run(conn, Printer)
# print an array
r.expr([1, 2, 3]).em_run(conn, Printer)
# print a single row
r.table('test').get(1).em_run(conn, Printer)
}
# Sample output
:open
[:stream_val, {"id"=>0}]
[:stream_val, {"id"=>1}]
[:stream_val, {"id"=>2}]
:closed
:open
[:array, [1, 2, 3]]
:closed
:open
[:atom, {"id"=>0}]
:closed
The various on_*
methods provide fallbacks for one another:
on_array
if defined; otherwise it will be handled by on_atom
. If neither of those are defined, the individual elements of the array will be handled by on_stream_val
or, if that is not defined, on_val
.on_stream_val
if defined; otherwise it will be handled by on_val
.on_atom
if defined; otherwise it will be handled by on_val
.Thus, on_val
acts a “catch-all” for any data that is not handled by a more specific method.
The order in which callbacks are called in the EventMachine.run
block is not guaranteed; in the sample output above, [:array, [1, 2, 3]]
might have printed first.
A changefeed is handled like any other stream; when you pass a block to em_run
, the block is called with each document received on the feed. If you pass a Handler
that defines on_stream_val
(or on_val
), those methods will be called with each document.
In addition, there are changefeed-specific methods that may be defined.
on_initial_val
: if the changefeed returns initial values (include_initial
has been specified as an option to changes, those values will be passed to this method.on_uninitial_val
: a changefeed that returns initial values may also return “uninitial” values to indicate a document already sent as an initial value has been changed (see the changes
documentation for details); those values, if any, will be passed to this method.on_change
: changes will be passed to this method.on_change_error
: if the feed includes a document specifying errors that do not cause the feed to abort (for instance, a notification the server discarded some changes), those errors will be passed to this method.on_state
: a feed may include documents specifying the state of the stream; those documents will be passed to this function if defined.class FeedPrinter < RethinkDB::Handler
def on_open
p :open
end
def on_close
p :closed
end
def on_error(err)
p [:err, err.to_s]
end
def on_initial_val(val)
p [:initial, val]
end
def on_state(state)
p [:state, state]
end
def on_change(old, new)
p [:change, old, new]
end
end
# Subscribe to changes on the documents with the two lowest ids
EventMachine.run {
r.table('test').order_by(:index => 'id').limit(2).changes
.em_run(conn, FeedPrinter)
}
# Sample output
:open
[:state, "initializing"]
[:initial_val, {"id"=>1}]
[:initial_val, {"id"=>0}]
[:state, "ready"]
# Execute: r.table('test').insert({id: 0.5}).run(conn)
[:change, {"id"=>1}, {"id"=>0.5}]
# Execute: r.table_drop('test').run(conn)
[:err, "Changefeed aborted (table unavailable).\nBacktrace..."]
:closed
You can register multiple queries with the same Handler
instance. If you define Handler
methods with an additional argument (two arguments instead of one, or one argument instead of zero), that argument will receive the appropriate QueryHandle
instance.
class MultiQueryPrinter < RethinkDB::Handler
def on_open(qh)
p [:open, names[qh]]
end
def on_close(qh)
p [:close, names[qh]]
EventMachine.stop if @closed == 2
end
def on_val(val, qh)
p [:val, val, names[qh]]
end
end
EventMachine.run {
printer = Printer.new
h1 = r.expr(1).em_run(conn, printer)
h2 = r.expr(2).em_run(conn, printer)
names = { h1 => "h1", h2 => "h2" }
}
# Sample output
[:open, "h1"]
[:val, 1, "h1"]
[:close, "h1"]
[:open, "h2"]
[:val, 2, "h2"]
[:close, "h2"]
If you call the stop
method on a Handler
, it will stop processing changes and open streams using that handler will be closed. Queries registered with that handler instance will not be interrupted if they are currently processing (e.g., a batch write), but will close rather than executing after the handler has been stopped.
Example: Print the first five changes to a table. After the handler has been stopped, the changefeed query will be closed on the next change to the table rather than returning a value.
class FeedPrinter < RethinkDB::Handler
def initialize(max)
@counter = max
stop if @counter <= 0
end
def on_open
# Once the changefeed is open, insert 10 rows
r.table('test').insert([{}] * 10).run(conn, noreply: true)
end
def on_val(val)
# Every time we print a change, decrement @counter and stop if we hit 0
p val
@counter -= 1
stop if @counter <= 0
end
end
EventMachine.run {
r.table('test').changes.em_run(conn, Printer.new(5))
}
The RethinkDB Python driver integrates with both the Tornado web framework and the Twisted networking engine. By using the set_loop_type command, you can select either the 'tornado'
or 'twisted'
event loop model, returning Tornado Future
objects or Twisted Deferred
objects respectively.
Before connect
, use the set_loop_type("tornado")
command to set RethinkDB to use asynchronous event loops compatible with Tornado.
from rethinkdb import RethinkDB
from tornado import ioloop, gen
from tornado.concurrent import Future, chain_future
import functools
r = RethinkDB()
r.set_loop_type("tornado")
connection = r.connect(host='localhost', port=28015)
After executing set_loop_type
, r.connect
will return a Tornado Future
, as will r.run
.
Example: Simple use
@gen.coroutine
def single_row(connection_future):
# Wait for the connection to be ready
connection = yield connection_future
# Insert some data
yield r.table('test').insert([{"id": 0}, {"id": 1}, {"id": 2}]).run(connection)
# Print the first row in the table
row = yield r.table('test').get(0).run(connection)
print(row)
# Output
{u'id': 0}
Example: Using a cursor
@gen.coroutine
def use_cursor(connection_future):
# Wait for the connection to be ready
connection = yield connection_future
# Insert some data
yield r.table('test').insert([{"id": 0}, {"id": 1}, {"id": 2}]).run(connection)
# Print every row in the table.
cursor = yield r.table('test').order_by(index="id").run(connection)
while (yield cursor.fetch_next()):
item = yield cursor.next()
print(item)
# Output
{u'id': 0}
{u'id': 1}
{u'id': 2}
Note that looping over a cursor must be done with while
and fetch_next
, rather than using a for x in cursor
loop.
If an error occurs during an asynchronous operation, the yield
statement will throw an exception as normal. This may happen immediately (for example, you might reference a table that doesn’t exist), but your application might receive large amounts of data before the error (for example, your network might be disrupted after the connection is established).
One error in particular is notable. If you have a coroutine set to consume a changefeed indefinitely, and the connection closes, the coroutine will experience a ReqlRuntimeError
.
Example: Re-thrown errors
@gen.coroutine
def bad_table(connection):
yield r.table('non_existent').run(connection)
Traceback (most recent call last):
... elided ...
rethinkdb.errors.ReqlRuntimeError: Table `test.non_existent` does not exist. in:
r.table('non_existent')
^^^^^^^^^^^^^^^^^^^^^^^
Example: Catching errors in the coroutine
@gen.coroutine
def catch_bad_table(connection):
try:
yield r.table('non_existent').run(connection)
except r.ReqlRuntimeError:
print("Saw error")
# Output
Saw error
The asynchronous database API allows you to handle multiple changefeeds simultaneously by scheduling background coroutines. As an example, consider this changefeed handler:
@gen.coroutine
def print_cfeed_data(connection_future, table):
connection = yield connection_future
feed = yield r.table(table).changes().run(connection)
while (yield feed.fetch_next()):
item = yield feed.next()
print(item)
We can schedule it on the Tornado IO loop with this code:
ioloop.IOLoop.current().add_callback(print_cfeed_data, connection, table)
Now the coroutine will run in the background, printing out changes. When we alter the table, the changes will be noticed.
Now, consider a larger example.
class ChangefeedNoticer(object):
def __init__(self, connection):
self._connection = connection
self._sentinel = object()
self._cancel_future = Future()
@gen.coroutine
def print_cfeed_data(self, table):
feed = yield r.table(table).changes().run(self._connection)
self._feeds_ready[table].set_result(True)
while (yield feed.fetch_next()):
cursor = feed.next()
chain_future(self._cancel_future, cursor)
item = yield cursor
if item is self._sentinel:
return
print("Seen on table %s: %s" % (table, item))
@gen.coroutine
def table_write(self, table):
for i in range(10):
yield r.table(table).insert({'id': i}).run(self._connection)
@gen.coroutine
def exercise_changefeeds(self):
self._feeds_ready = {'a': Future(), 'b': Future()}
loop = ioloop.IOLoop.current()
loop.add_callback(self.print_cfeed_data, 'a')
loop.add_callback(self.print_cfeed_data, 'b')
yield self._feeds_ready
yield [self.table_write('a'), self.table_write('b')]
self._cancel_future.set_result(self._sentinel)
@classmethod
@gen.coroutine
def run(cls, connection_future):
connection = yield connection_future
if 'a' in (yield r.table_list().run(connection)):
yield r.table_drop('a').run(connection)
yield r.table_create('a').run(connection)
if 'b' in (yield r.table_list().run(connection)):
yield r.table_drop('b').run(connection)
yield r.table_create('b').run(connection)
noticer = cls(connection)
yield noticer.exercise_changefeeds()
# Output
Seen on table a: {u'old_val': None, u'new_val': {u'id': 0}}
Seen on table b: {u'old_val': None, u'new_val': {u'id': 0}}
Seen on table a: {u'old_val': None, u'new_val': {u'id': 1}}
Seen on table b: {u'old_val': None, u'new_val': {u'id': 1}}
Seen on table a: {u'old_val': None, u'new_val': {u'id': 2}}
Seen on table b: {u'old_val': None, u'new_val': {u'id': 2}}
Seen on table a: {u'old_val': None, u'new_val': {u'id': 3}}
Seen on table b: {u'old_val': None, u'new_val': {u'id': 3}}
Seen on table a: {u'old_val': None, u'new_val': {u'id': 4}}
Seen on table b: {u'old_val': None, u'new_val': {u'id': 4}}
Seen on table a: {u'old_val': None, u'new_val': {u'id': 5}}
Seen on table a: {u'old_val': None, u'new_val': {u'id': 6}}
Seen on table b: {u'old_val': None, u'new_val': {u'id': 5}}
Seen on table a: {u'old_val': None, u'new_val': {u'id': 7}}
Seen on table b: {u'old_val': None, u'new_val': {u'id': 6}}
Seen on table a: {u'old_val': None, u'new_val': {u'id': 8}}
Seen on table b: {u'old_val': None, u'new_val': {u'id': 7}}
Seen on table a: {u'old_val': None, u'new_val': {u'id': 9}}
Seen on table b: {u'old_val': None, u'new_val': {u'id': 8}}
Seen on table b: {u'old_val': None, u'new_val': {u'id': 9}}
Here, we listen for changes on multiple tables at once. We simultaneously write into the tables, and observe our writes appear in the changefeeds. We then cancel the changefeeds after we’ve written 10 items into each of the tables.
Before connect
, use the set_loop_type("twisted")
command to set RethinkDB to use asynchronous event loops compatible with the Twisted reactor.
from rethinkdb import RethinkDB
from twisted.internet import reactor, defer
from twisted.internet.defer import inlineCallbacks, returnValue
r = RethinkDB()
r.set_loop_type('twisted')
connection = r.connect(host='localhost', port=28015)
After executing set_loop_type
, r.connect
will return a Twisted Deferred
, as will r.run
.
Example: Simple use
@inlineCallbacks
def single_row(conn_deferred):
# Wait for the connection to be ready
conn = yield conn_deferred
# Insert some data
yield r.table('test').insert([{"id": 0}, {"id": 1}, {"id": 2}]).run(conn)
# Print the first row in the table
row = yield r.table('test').get(0).run(conn)
print(row)
# Output
{u'id': 0}
Example: Using a cursor
@inlineCallbacks
def use_cursor(conn):
# Insert some data
yield r.table('test').insert([{"id": 0}, {"id": 1}, {"id": 2}]).run(conn)
# Print every row in the table.
cursor = yield r.table('test').order_by(index="id").run(conn)
while (yield cursor.fetch_next()):
item = yield cursor.next()
print(item)
# Output:
{u'id': 0}
{u'id': 1}
{u'id': 2}
Note that looping over a cursor must be done with while
and fetch_next
, rather than using a for x in cursor
loop.
If an error occurs during an asynchronous operation, the yield
statement will throw an exception as normal. This may happen immediately (for example, you might reference a table that doesn’t exist), but your application might receive large amounts of data before the error (for example, your network might be disrupted after the connection is established).
One error in particular is notable. If you have a task that consumes a changefeed indefinitely, and the connection closes, the task will experience a ReqlRuntimeError
.
Example: Re-thrown errors
@inlineCallbacks
def bad_table(conn):
yield r.table('non_existent').run(conn)
Unhandled error in Deferred:
Traceback (most recent call last):
Failure: rethinkdb.errors.ReqlOpFailedError: Table `test.non_existent` does not exist in:
r.table('non_existent')
^^^^^^^^^^^^^^^^^^^^^^^
Example: Catching runtime errors
@inlineCallbacks
def catch_bad_table(conn):
try:
yield r.table('non_existent').run(conn)
except r.ReqlRuntimeError:
print("Saw error")
# Output
Saw error
The asynchronous database API allows you to handle multiple changefeeds simultaneously by running multiple background tasks. As an example, consider this changefeed handler:
@inlineCallbacks
def print_feed(conn, table):
feed = yield r.table(table).changes().run(conn)
while (yield feed.fetch_next()):
item = yield feed.next()
print("Seen on table %s: %s" % (table, str(item)))
We can schedule it on the Twisted reactor
with this code:
reactor.callLater(0, print_cfeed_data, conn, table)
Now the task will run in the background, printing out changes. When we alter the table, the changes will be noticed.
Now consider a larger example:
@inlineCallbacks
def print_feed(conn, table, ready, cancel):
def errback_feed(feed, err):
feed.close()
return err
feed = yield r.table(table).changes().run(conn)
cancel.addErrback(lambda err: errback_feed(feed, err))
ready.callback(None)
while (yield feed.fetch_next()):
item = yield feed.next()
print("Seen on table %s: %s" % (table, str(item)))
@inlineCallbacks
def table_write(conn, table):
for i in range(10):
yield r.table(table).insert({'id': i}).run(conn)
@inlineCallbacks
def notice_changes(conn, *tables):
# Reset the state of the tables on the server
if len(tables) > 0:
table_list = yield r.table_list().run(conn)
yield defer.DeferredList([r.table_drop(t).run(conn) for t in tables if t in table_list])
yield defer.DeferredList([r.table_create(t).run(conn) for t in tables])
readies = [defer.Deferred() for t in tables]
cancel = defer.Deferred()
feeds = [print_feed(conn, table, ready, cancel) for table, ready in zip(tables, readies)]
# Wait for the feeds to become ready
yield defer.gatherResults(readies)
yield defer.gatherResults([table_write(conn, table) for table in tables])
# Cancel the feeds and wait for them to exit
cancel.addErrback(lambda err: None)
cancel.cancel()
yield defer.DeferredList(feeds)
yield notice_changes(conn, 'a', 'b')
# Output
Seen on table b: {u'old_val': None, u'new_val': {u'id': 0}}
Seen on table a: {u'old_val': None, u'new_val': {u'id': 0}}
Seen on table b: {u'old_val': None, u'new_val': {u'id': 1}}
Seen on table a: {u'old_val': None, u'new_val': {u'id': 1}}
Seen on table b: {u'old_val': None, u'new_val': {u'id': 2}}
Seen on table a: {u'old_val': None, u'new_val': {u'id': 2}}
Seen on table b: {u'old_val': None, u'new_val': {u'id': 3}}
Seen on table a: {u'old_val': None, u'new_val': {u'id': 3}}
Seen on table b: {u'old_val': None, u'new_val': {u'id': 4}}
Seen on table a: {u'old_val': None, u'new_val': {u'id': 4}}
Seen on table b: {u'old_val': None, u'new_val': {u'id': 5}}
Seen on table a: {u'old_val': None, u'new_val': {u'id': 5}}
Seen on table b: {u'old_val': None, u'new_val': {u'id': 6}}
Seen on table a: {u'old_val': None, u'new_val': {u'id': 6}}
Seen on table b: {u'old_val': None, u'new_val': {u'id': 7}}
Seen on table a: {u'old_val': None, u'new_val': {u'id': 7}}
Seen on table b: {u'old_val': None, u'new_val': {u'id': 8}}
Seen on table a: {u'old_val': None, u'new_val': {u'id': 8}}
Seen on table a: {u'old_val': None, u'new_val': {u'id': 9}}
Seen on table b: {u'old_val': None, u'new_val': {u'id': 9}}
Here, we listen for changes on multiple tables at once. We simultaneously write into the tables, and observe our writes appear in the changefeeds. We then cancel the changefeeds after we’ve written 10 items into each of the tables.