Leverage the "J" in JRuby for Powerful Concurrency
I was working on a project that required some heavy-weight processing of links to determine if they were valid based on a variety of criteria. Since the standard Ruby interpreter was running each process sequentially the naive approach was taking a prohibitively long time. Fortunately this was a simple pure Ruby problem and each record was processed independently of each other this was a good opportunity to use JRuby and it’s Java interoperability to get some real concurrency power.
Using the ruby-concurrent library and JRuby Interop I first load in the Java
Futures libraries.
java_import java.util.concurrent.Callable
java_import java.util.concurrent.Executors
java_import java.util.concurrent.FutureTask
java_import java.util.concurrent.LinkedBlockingQueue
java_import java.util.concurrent.ThreadPoolExecutor
java_import java.util.concurrent.TimeUnit
Since the goal is to squeeze as many executions as possible I set the pool size to saturate all available cores.
POOL_SIZE = java.lang.Runtime.getRuntime.availableProcessors * 8
The application reads in a CSV and writes each row into a threadsafe array for later processing. Since the input itself wasn’t the bottleneck sequentially reading in the file had little impact on the execution time.
class App
def initialize(file_to_process)
@queue = ThreadSafe::Array.new
CSV.foreach(file_to_process, 'r') do |row|
@queue.push(row)
end
end
App.work sets up the futures creation and execution. I looped over the queue
of rows and create a new Worker (defined below) and hand that off to the Java
Future in preparation for execution. Again, the list of tasks in the queue
wasn’t the bottle next so enumerating over the queue was not a significant
impact on the execution performance.
def work
tasks = ThreadSafe::Array.new
executor = ThreadPoolExecutor.new(POOL_SIZE, POOL_SIZE, 60, TimeUnit::SECONDS, LinkedBlockingQueue.new)
while @queue.size > 0
row = @queue.pop
email = row.shift
task = FutureTask.new(Worker.new(email, row))
executor.execute(task)
tasks.push(task)
end
As each future completes it’s work the tasks array enumerates the results of
the worker execution. In this case I’m just writing the output to STDOUT.
tasks.each do |t|
result = t.get
if result[:status] == :passed
$stdout.puts([result[:email], result[:data]].flatten.to_csv)
else
$stderr.puts([result[:input], result[:data], result[:reason]].flatten.to_csv)
end
end
executor.shutdown
end
Worker implements the actual reason for the script. We include Java’s
Callable module so we can work with the FutureTask defined above.
class Worker
include java.util.concurrent.Callable
This code is less important but represents some of the work being done. It was a
rough experiment and the data was a mess. The scrub_input was kind of a work
in progress.
attr_reader :input
attr_reader :data
def initialize(email, data)
@input = email
@data = data
end
def scrub_input
@input
.sub(/^(%20|%3c|%22)+/, '')
.sub(/^(\\)+/, '')
.sub(/(%20|%3c|%22)+$/, '')
.sub(/^[-.]+/, '')
.sub(/^#+/, '')
end
Since the rules were being implemented in a plugin style I had a little fun with the naming. I’ll leave the references up to you.
def call
email = scrub_input
inspector = Schrute::BeetInspector.new(email, [
Schrute::Beets::TldBeet,
Schrute::Beets::DomainBeet,
Schrute::Beets::LocalpartBeet,
Schrute::Beets::SuppressionListBeet,
Schrute::Beets::KeywordBeet,
Schrute::Beets::TelnetBeet
]).call
Here I take the result of the input inspector and format it so it can be returned and handled by the executor.
status = inspector.passed? ? :passed : :failed
{
status: status,
email: email,
input: input,
data: data,
reason: inspector.result[:evaluation].last.reasons
}
end
end
end
Since the code was implemented in a single script file the execution is defined at the bottom.
file_to_process = ARGV[0]
fail "Usage: process <file_to_process.csv>" unless file_to_process
App.new(file_to_process).work
The original gist is up
on GitHub. In the end using this technique reduced a multiple hour process into
a few minutes. There was some tweaking for handling particularly slow Beets
but those were separate issues entirely.