世界上最伟大的投资就是投资自己的教育
消息队列之 active_job 结合 Sidekiq (一)
1. 简介
sidekiq是用 ruby 实现的消息列队的 gem,相关的 gem 还有resque,delayed_job,queue_classic等。关于消息队列的解释可以查阅这篇文章PostgreSQL 的 listen/notify 之消息队列 (一),这里不再详述。每个 gem 的存储介质和原理都不太一样,比如 queue_classic 只用 PostgreSQL 来存,delayed_job 可以用 Mysql,PostgreSQL,Sqlite 等,而 sidekiq 用 redis。原理也不一样,比如 queue_classic 是基于 PostgreSQL 的 listen/notify,sidekiq 是基于 redis 的 pub/sub 模式。
有这么多的消息队列,每个都要用,都要去学,是一件很耗精力的事情。而且,这些 gem 的使用方法都差不多,很多使用上的概念很类似,比如指定队列,优先级,执行时间等都很相似。何不用一种接口或方法来统一呢,就像 activerecord 一样,假如要换数据库,那只是换个 gem 加一个配置文件而已,因为他们的使用方法都差不多。
只要把相同的使用方法抽出来,形成一个接口,而这些 gem 就成了适配器 (adapter)。这个就像汽车的引擎或发动机一样,可以随时更换。
所以有了active_job,这个原本是一个 gem,不过从 4.2 开始被合作到 rails,成为 rails 的一部分了。
2. sidekiq
现在我们来单独使用 sidekiq,也就是不用 active_job 的情况下。
2.1 安装
在 Gemfile 文件添加下面这行。
gem 'sidekiq', '~> 3.5.1'
# 因为依赖于redis
sudo apt-get install redis-server
由于消息队列是开了另一个独立于 web 的进程,在那个进程中,是在时刻接收进来的任务,然后排队处理,所以需要在 controller 或 model 把那个任务丢给消息队列的进程就好了。详情可参阅PostgreSQL 的 listen/notify 之消息队列 (一)的详述。
而 controller 或 model 是通过什么和那进程连接的呢?就是通过 worker,一个 worker 是暴露在外面的类,可以在 controller 或 model 中全局调用。
首先来定义一个 worker。
# app/workers/update_article_visit_count_worker.rb
class UpdateArticleVisitCountWorker
include Sidekiq::Worker
def perform(article_id)
logger.info 'update article visit count begin'
@article = Article.find(article_id)
@article.visit_count += 1
@article.save!(validate: false)
logger.info 'update article visit count end'
end
end
现在就可以把 sidekiq 的进程启动起来,准备测试。
bundle exec sidekiq
进入rails console
中。
article = Article.find(:first)
UpdateArticleVisitCountWorker.perform_async(article.id)
如果没有报错且数据库被作了相应的改变,说明是有效果的。
这里有个要注意的地方,在上面的例子中,不传 article 整个对象,而是传 article 的 id,是有原因的,主要是基于两点。
第一,无论 article 还是 article_id 是作为参数传给 sidekiq 进程的,你传对象不好处理的,那是 ruby 的对象,你传到另一个进程需要反序列化之类的,而传 id,只是一个数字就好传多了,也好处理。
第二,worker 中是放耗时的动作的,那个 find 可以比较耗时,这取决于数据库的记录,所以尽量将这种需要时间的动作放到 worker 中的 perform。
整个 sidekiq 的使用很简单,就是建立一个 worker 的类,然后把逻辑写到 perform 方法,在需要的地方用 perform_async 调用就好了。
关于 sidekiq 的线上部署,如果你是使用 mina 的话,那就简单了,使用mina-sidekiq,也就几行代码搞定。
sidekiq 默认会使用 redis 的默认 ip 地址,端口等,不过这些都是可以改变的。
sidekiq 远不止这么简单,它是有强大的功能却易用,关于 sidekiq 的更加详细的信息,比如错误处理,日志记录,Batches,监控,高级选项等可以参考sidekiq-wiki。还有,sidekiq 还有很多插件,你往 github 搜一下可能就可以找到一些。
3. active_job
把上面的例子改成用 active_job 来实现。
生成类似 worker 的文件。
rails generate job update_article_visit_count
生成了下面这个文件。
# app/jobs/update_article_visit_count_job.rb
class UpdateArticleVisitCountJob < ActiveJob::Base
queue_as :default
def perform(*args)
# Do something later
end
end
UpdateArticleVisitCountJob
类继承自ActiveJob::Base
。
现在把上面的那段逻辑放进去。
# app/jobs/update_article_visit_count_job.rb
class UpdateArticleVisitCountJob < ActiveJob::Base
queue_as :default
def perform(article_id)
logger.info 'update article visit count begin'
@article = Article.find(article_id)
@article.visit_count += 1
@article.save!(validate: false)
logger.info 'update article visit count end'
end
end
指定用 sidekiq 作为 adapter。
# config/application.rb
module YourApp
class Application < Rails::Application
config.active_job.queue_adapter = :sidekiq
end
end
进入rails console
中测试。
article = Article.first
UpdateArticleVisitCountJob.perform_later article.id
跟上面的调用也是类似。
这是基本的功能,active_job 还支持指定队列,回调函数,异常捕获等。具体地参考官方文档
下一篇:消息队列之 RabbitMQ 结合 sneakers(二)
完结。
本站文章均为原创内容,如需转载请注明出处,谢谢。
© 汕尾市求知科技有限公司 | Rails365 Gitlab | 知乎 | b 站 | csdn
粤公网安备 44152102000088号 | 粤ICP备19038915号
Top