世界上最伟大的投资就是投资自己的教育
PostgreSQL 的 listen/notify 之 queue_classic (十四)
1. 介绍
queue_classic是一个 ruby 的 gem,用来实现 PostgreSQL 的消息队列。它基于PostgreSQL 的 listen/notify,有很多接口,使用起来比较简单。
如果你有使用过sidekiq,resque,delayed_job,就会发现基本每个这种消息队列的 gem 的使用方法都是类似的,里面的概念也是差不多,也就是说,学会了queue_classic就等于会其中三种,只要掌握思想就好了。
2. 安装
假如我们已经有一个 rails 项目了。
在 Gemfile 添加下面这行。
gem "queue_classic", "~> 3.0.0"
执行bundle install
正如我们上面所说的,任务是需要存储的。所以要创建相应的表来存。
# 创建queue_classic_jobs表
rails generate queue_classic:install
# username替换为你自己的数据库的用户名,password是数据库的密码,rails365_dev是数据库名
export QC_DATABASE_URL="postgres://username:password@localhost/rails365_dev"
bundle exec rake db:migrate
3. 测试
我们先在rails console
里测试。
前面说过,消息队列是跑在一个进程里的。所以要启动那个进程。
bundle exec rake qc:work
你会发现 delayed_job,sidekiq 也是差不多的启动方法。
还可以指定队列来启动。
QUEUES="priority_queue,secondary_queue" bundle exec rake qc:work
启动好后,我们进入rails console
中,执行下面这行语句。
➜ rails365 git:(master) ✗ rails c
Loading development environment (Rails 4.2.3)
2.2.2 :001 > QC.enqueue("Kernel.puts", "hello world")
nil
你会在进程里看到类似这样的输出,就说明成功了。
➜ rails365 git:(master) ✗ bundle exec rake qc:work
hello world
QC.enqueue 就是后面的命令加上参数作为任务 push 到队列中。
上面只是个最简单的例子,还有可以在指定时间,指定队列来执行,具体更为详细的命令要看官方的 readme 文档。
如果需要调试或查看日志,可以开启调试功能,有两种不同的日志,分别是:
export QC_MEASURE="true"
# or
export DEBUG="true"
在运行rake qc:work
之前运行,具体的效果,尝试下就知道的。
4. 在 rails 中使用
以本站为例,是一个放博客的网站,文章是存放在 articles 这张表,当时为了查看哪篇文章最受欢迎,就在 articles 存了一个字段叫 visit_count,但每次用户查看文章时,就会往这个字段加 1。这个动作是用 rails 的ActiveSupport::Notifications配合 sidekiq 的消息队列来做的,现在要改成用 queue_classic 来做。
我们来看下相关的代码。
# app/workers/update_article_visit_count_worker.rb
class UpdateArticleVisitCountWorker
include Sidekiq::Worker
def perform(article_id)
logger.info 'update article visit count begin'
@article = Article.find(article_id)
@article.visit_count += 1
@article.save!(validate: false)
logger.info 'update article visit count end'
end
end
在哪里调用呢,我们是结合 ActiveSupport::Notifications 来做的,这个先不管,你也可以在 articles_controller 的 show action 直接调用。
# config/initializers/notification.rb
ActiveSupport::Notifications.subscribe "process_action.action_controller" do |name, started, finished, unique_id, payload|
Rails.logger.info payload
if payload[:controller] == "ArticlesController" && payload[:action] == "show"
UpdateArticleVisitCountWorker.perform_async(payload[:params]["id"]) if payload[:params]["id"].present?
end
end
上文提过,queue_classic 主要是利用QC.enqueue
这条命令把任务 push 到队列中的。只需要把这行UpdateArticleVisitCountWorker.perform_async(payload[:params]["id"]) if payload[:params]["id"].present?
改成我们需要的就可以了。
把增加 visit_count 的值的逻辑移动 model 中去,然后在ActiveSupport::Notifications.subscribe
用QC.enqueue
中调用就好了。
改造之后是这样的。
# app/models/article.rb
class Article < ActiveRecord::Base
def self.update_article_visit_count(article_id)
article = Article.find(article_id)
article.visit_count += 1
article.save!(validate: false)
end
end
# config/initializers/notification.rb
ActiveSupport::Notifications.subscribe "process_action.action_controller" do |name, started, finished, unique_id, payload|
Rails.logger.info payload
if payload[:controller] == "ArticlesController" && payload[:action] == "show"
QC.enqueue "Article.update_article_visit_count",payload[:params]["id"] if payload[:params]["id"].present?
end
end
现在需要重启下rails server
和bundle exec rake qc:work
。
重启rails server
运行好export QC_DATABASE_URL="postgres://username:password@localhost/rails365_dev"
这个命令。
为了让rake qc:work
更能明显地看到日志信息,在运行rake qc:work
前先执行export QC_MEASURE="true"
。
现在可以去页面上测试的。
5. 注意事项
第一点是关于环境变量,也就是QC_DATABASE_URL
、QC_MEASURE
、DEBUG
这三个,当部署到线上环境时,就要把这三个变量写进 shell 的配置文件,如比 ubuntu 系统,就写进~/.bashrc_profile 就好了。
第二点是关于错误的任务,任务也是有可能会报错的,但是我们不知道哪个任务报错了,所以很不方便,其实官方提供了接口的,你自己可以捕获那个错误信息,捕获后就可以进行自己想要的处理了。其实错误的任务都会一直存在表 queue_classic_jobs 中,这样查看就好,至于那个接口,就是worker.rb中的 handle_failure 方法,官方 readme 文档也有示例,这里不再深究。
完结。
本站文章均为原创内容,如需转载请注明出处,谢谢。
© 汕尾市求知科技有限公司 | Rails365 Gitlab | 知乎 | b 站 | csdn
粤公网安备 44152102000088号 | 粤ICP备19038915号
Top