主页 > 知识库 > 网络编程 > Ruby >

Rails 4.0 先睹为快:作业队列

来源:oschina 作者:红薯 发表于:2012-06-26 08:32  点击:
Rails 最近增加了一个作业队列系统,让我们来看看如何使用。 Run, baby, run! 这个队列 API 非常简单,你将对象放到队列中,而这个对象需要提供一个名为 run 的方法,下面是个简单例子: 1 class TestJob 2 def run 3 puts I am running! 4 end 5 end 6 7 Rai
Rails 最近增加了一个作业队列系统,让我们来看看如何使用。

Run, baby, run!

这个队列 API 非常简单,你将对象放到队列中,而这个对象需要提供一个名为 run 的方法,下面是个简单例子:
1 class TestJob
 
2   def run
 
3     puts "I am running!"
 
4   end
 
5 end
 
6  
 
7 Rails.queue.push(TestJob.new)
 
8 => "I am running!"
 
对大多数人来说,这已足够。队列是在一个独立的线程中运行,因此你的应用不会因为一些很复杂的作业而导致无响应
Rails 中的基本队列并不是一个长期的解决方案,其目的仅仅是提供一个通用的 API 可以用来支持更可靠的队列系统。例如当你需要从 Resque 切换到 Sidekiq,你不需要更改你应用代码,你只需关心对象进入队列以及编组。
你也可以编写自己的队列,下面是一个简单的队列实现:
1 class MyQueue
 
2   def push(job)
 
3     job.run
 
4   end
 
5 end
 
要使用你自定义的队列,只需要在 application.rb 中设置:
1 config.queue = MyQueue
 
上面例子来源于 Rails 的测试套件,它定义了一个非异步的队列,当作业被放到队列中便立即执行。下面让我们开发一个实际的作业,无需依赖于 Queue 类。
01 class MyQueue
 
02   def initialize
 
03     @queue = []
 
04   end
 
05  
 
06   def push(job)
 
07     @queue.push(job)
 
08   end
 
09  
 
10   def pop
 
11     @queue.pop
 
12   end
 
13 end
 
这个例子我们实现了一个简单的队列,接下来你需要告诉 Rails 的 QueueConsumer 来使用这个队列,可以在 application.rb 的 initializer 块中设置:
1 intializer 'start queue consumer' do |app|
 
2   qpp.queue_consumer = config.queue_consumer.start(app.queue)
 
3   at_exit { app.queue.consumer.shutdown }
 
4 end
 
然后我们将新的作业放到队列中:
1 Rails.queue.push(TestJob.new)
 
啥也没有,为什么?检查 QueueConsumer:
1 Rails.application.queue_consumer
 
2 => #<Rails::Queueing::ThreadedConsumer @queue=#<MyQueue @queue=[]>, @thread=#<Thread dead>>
 
然后你会发现线程死了,我们可以强行要求队列处理:
1 Rails.application.queue_consumer.start
 
2 => "I am running!"
 
回过头来看看到底发生了什么。首先我们找到 ThreadedConsumer#start
01 def start
 
02   @thread = Thread.new do
 
03     while job = @queue.pop
 
04       begin
 
05         job.run
 
06       rescue Exception => e
 
07         handle_exception e
 
08       end
 
09     end
 
10   end
 
11   self
 
12 end
 
这个线程只有在 @queue.pop 返回一个 true 值的时候才会运行,这不太合理,我们需要不断的将对象推到队列中,让我们来看看 Queue#pop 发生了什么:
01 # Retrieves data from the queue.  If the queue is empty, the calling thread is
 
02 # suspended until data is pushed onto the queue.  If +non_block+ is true, the
 
03 # thread isn't suspended, and an exception is raised.
 
04 #
 
05 def pop(non_block=false)
 
06   while true
 
07     @mutex.synchronize do
 
08       @waiting.delete(Thread.current)
 
09       if @que.empty?
 
10         raise ThreadError, "queue empty" if non_block
 
11         @waiting.push Thread.current
 
12         @resource.wait(@mutex)
 
13       else
 
14         retval = @que.shift
 
15         @resource.signal
 
16         return retval
 
17       end
 
18     end
 
19   end
 
20 end
 
终于有点明白了,Queue#pop 是一个无限的循环在等待其需要的内容。而我们简单的 MyQueue 类在 ThreadConsumer#start 调用的时候会返回 nil,因此队列里没对象,线程就结束了。甚至当我们往队列里放对象时,再次 pop 操作后也会结束。
简单起见,只需要让 MyQueue 继承 Queue 即可:
1 class MyQueue < Queue
 
2 end
 
现在我们可以:
1 Rails.queue.push(TestJob.new)
 
2 => "I am running!"
 
Rails 4.0 中的队列系统是一个非常简单的解决方案,我们期待正式版的发布,能够支持更多更好的后台作业处理库。
需要注意的是,目前的队列还是 beta 版本,API 可能还有有所更改。
英文原文,OSCHINA原创翻译

    有帮助
    (0)
    0%
    没帮助
    (1)
    100%