1 はじめに

Active Jobは、ジョブを宣言し、それによってバックエンドでさまざまな方法によるキュー操作を実行するためのフレームワークです。ジョブには、定期的なクリーンアップを始めとして、請求書発行やメール配信など、あらゆる処理がジョブになります。これらのジョブをより細かな作業単位に分割して並列実行することもできます。

2 Active Jobの目的

Active Jobの主要な目的は、あらゆるRailsアプリケーションにジョブ管理インフラを配置することです。これにより、Delayed JobとResqueなどのように、さまざまなジョブ実行機能のAPIの違いを気にせずにジョブフレームワーク機能やその他のgemを搭載することができるようになります。バックエンドでのキューイング作業では、操作方法以外のことを気にせずに済みます。さらに、ジョブ管理フレームワークを切り替える際にジョブを書き直さずに済みます。

デフォルトのRailsは非同期キューを実装します。これは、インプロセスのスレッドプールでジョブを実行します。ジョブは非同期に実行されますが、再起動するとすべてのジョブは失われます。

3 ジョブを作成して登録する

このセクションでは、ジョブの作成方法とジョブの登録 (enqueue: エンキュー) 方法を手順を追って説明します。

3.1 ジョブを作成する

Active Jobは、ジョブ作成用のRailsジェネレータを提供しています。以下を実行すると、app/jobsにジョブが1つ作成されます。

$ bin/rails generate job guests_cleanup
invoke  test_unit
create    test/jobs/guests_cleanup_job_test.rb
create  app/jobs/guests_cleanup_job.rb

以下のようにすると、特定のキューに対してジョブを1件作成できます。

$ bin/rails generate job guests_cleanup --queue urgent

ジェネレータを使いたくない場合は、app/jobsの下に自分でジョブファイルを作成することもできます。ジョブファイルでは必ずApplicationJobを継承してください。

作成されたジョブは以下のようになります。

class GuestsCleanupJob < ApplicationJob
  queue_as :default

  def perform(*guests)
    # 後で実行するタスクをここに置く
  end
end

なお、performの定義にはいくつでも引数を渡せます。

ApplicationJobと異なる名前の抽象クラスが既に存在する場合、以下のように--parentオプションを渡すことで、別の抽象クラスが必要であることを示せます。

$ bin/rails generate job process_payment --parent=payment_job
class ProcessPaymentJob < PaymentJob
  queue_as :default

  def perform(*args)
    # 後で実行するタスクをここに置く
  end
end

3.2 ジョブをキューに登録する

キューへのジョブ登録はperform_laterで以下のように行います。オプションでsetも指定できます。

# 「キューイングシステムが空いたらジョブを実行する」とキューに登録する
GuestsCleanupJob.perform_later guest
# 明日正午に実行したいジョブをキューに登録する
GuestsCleanupJob.set(wait_until: Date.tomorrow.noon).perform_later(guest)
# 一週間後に実行したいジョブをキューに登録する
GuestsCleanupJob.set(wait: 1.week).perform_later(guest)
# `perform_now`と`perform_later`は`perform`を呼び出すので、
# 定義した引数を渡すことができる
GuestsCleanupJob.perform_later(guest1, guest2, filter: 'some_filter')

以上でジョブ登録は完了です。

3.3 複数のジョブを一括登録する

perform_all_laterを使うと、複数のジョブを一括登録できます。詳しくは一括登録を参照してください。

4 ジョブを実行する

production環境でのジョブのキュー登録と実行では、キューイングのバックエンドを用意しておく必要があります。具体的には、Railsで使うべきサードパーティのキューイングライブラリを決める必要があります。 Rails自身が提供するのは、ジョブをメモリに保持するインプロセスのキューイングシステムだけです。 プロセスがクラッシュしたりコンピュータをリセットしたりすると、デフォルトの非同期バックエンドの振る舞いによって主要なジョブが失われてしまいます。アプリケーションが小規模な場合やミッションクリティカルでないジョブであればこれでも構いませんが、多くのproductionでは永続的なバックエンドを選ぶ必要があります。

4.1 バックエンド

Active Jobには、Sidekiq、Resque、Delayed Jobなどさまざまなキューイングバックエンドに接続できるアダプタがビルトインで用意されています。利用可能な最新のアダプタのリストについては、APIドキュメントのActiveJob::QueueAdaptersを参照してください。

4.2 バックエンドを設定する

キューイングバックエンドは、[config.active_job.queue_adapter]で手軽に設定できます。

# config/application.rb
module YourApp
  class Application < Rails::Application
    # 必ずGemfileにアダプタのgemを追加し、
    # アダプタ固有のインストール方法や
    # デプロイ方法に従うこと。
    config.active_job.queue_adapter = :sidekiq
  end
end

次のように、ジョブごとにバックエンドを設定することもできます。

class GuestsCleanupJob < ApplicationJob
  self.queue_adapter = :resque
  # ...
end

# これでジョブが`resque`を使うようになります
# `config.active_job.queue_adapter`で設定された内容が
# バックエンドキューアダプタでオーバーライドされるためです

4.3 バックエンドを起動する

ジョブはRailsアプリケーションに対して並列で実行されるので、多くのキューイングライブラリでは、ジョブを処理するためにライブラリ固有のキューイングサービスを (Railsアプリケーションの起動とは別に) 起動しておくことが求められます。キューのバックエンドの起動方法については、ライブラリのドキュメントを参照してください。

以下はドキュメントのリストの一部です(すべてを網羅しているわけではありません)。

5 キュー

多くのアダプタでは複数のキューを扱えます。Active Jobのqueue_asを使って、特定のキューに入っているジョブをスケジューリングできます。

class GuestsCleanupJob < ApplicationJob
  queue_as :low_priority
  # ...
end

application.rbで以下のようにconfig.active_job.queue_name_prefixを使うことで、すべてのジョブでキュー名の前に特定の文字列を追加できます。

# config/application.rb
module YourApp
  class Application < Rails::Application
    config.active_job.queue_name_prefix = Rails.env
  end
end
# app/jobs/guests_cleanup_job.rb
class GuestsCleanupJob < ApplicationJob
  queue_as :low_priority
  # ...
end

# 以上で、production環境ではproduction_low_priorityというキューでジョブが
# 実行されるようになり、staging環境ではstaging_low_priorityというキューで
# ジョブが実行されるようになります

以下のようにジョブごとにプレフィックスを設定することもできます。

class GuestsCleanupJob < ApplicationJob
  queue_as :low_priority
  self.queue_name_prefix = nil
  # ...
end

# これで自分のジョブキューにプレフィックスが設定されなくなり
# `config.active_job.queue_name_prefix`の設定が上書きされます

キュー名のプレフィックスのデフォルト区切り文字は'_'です。config.active_job.queue_name_delimiterを設定することでこの区切り文字を変更できます。

# config/application.rb
module YourApp
  class Application < Rails::Application
    config.active_job.queue_name_prefix = Rails.env
    config.active_job.queue_name_delimiter = '.'
  end
end
# app/jobs/guests_cleanup_job.rb
class GuestsCleanupJob < ApplicationJob
  queue_as :low_priority
  # ...
end

# 以上で、production環境ではproduction.low_priorityというキューでジョブが
# 実行されるようになり、staging環境ではstaging.low_priorityというキューでジョブが実行されるようになります

#queue_asにブロックを渡すと、キューをそのジョブレベルで制御できます。与えられたブロックは、そのジョブのコンテキストで実行されます (これによりself.argumentsにアクセスできるようになります)。そしてキュー名を返さなくてはなりません。

class ProcessVideoJob < ApplicationJob
  queue_as do
    video = self.arguments.first
    if video.owner.premium?
      :premium_videojobs
    else
      :videojobs
    end
  end

  def perform(video)
    # 動画を処理する
  end
end
ProcessVideoJob.perform_later(Video.last)

ジョブを実行するキューをさらに細かく制御したい場合は、#set:queueオプションを渡せます。

MyJob.set(queue: :another_queue).perform_later(record)

設定したキュー名をキューイングバックエンドが「リッスンする」ようにしてください。一部のバックエンドでは、リッスンするキューを指定する必要が生じることもあります。

6 優先順位付け

アダプタによってはジョブレベルでの優先順位付けをサポートしており、キュー内の別のジョブや、すべてのキュー内にある他のジョブに対してジョブを優先できます。

優先順位を指定してジョブをスケジューリングするには、queue_with_priorityメソッドを使います。

class GuestsCleanupJob < ApplicationJob
  queue_with_priority 10
  # ...
end

このメソッドは、優先順位付けをサポートしていないアダプタでは無効です。

queue_asの場合と同様に、queue_with_priorityにブロックを渡してジョブのコンテキストで評価することも可能です。

class ProcessVideoJob < ApplicationJob
  queue_with_priority do
    video = self.arguments.first
    if video.owner.premium?
      0
    else
      10
    end
  end

  def perform(video)
    # Process video
  end
end
ProcessVideoJob.perform_later(Video.last)

以下のようにset:priorityオプションを渡すことも可能です。

MyJob.set(priority: 50).perform_later(record)

優先度の低い番号が、優先度の高い番号より先に実行されるか後に実行されるかは、アダプタの実装によって異なります。詳しくはバックエンドのドキュメントを参照してください。アダプタの作成者は、優先度の低い番号をより重要視することをオススメします。

7 コールバック

Active Jobが提供するフックを用いて、ジョブのライフサイクル中にロジックをトリガできます。これらのコールバックは、Railsの他のコールバックと同様に通常のメソッドとして実装し、マクロ風のクラスメソッドでコールバックとして登録できます。

class GuestsCleanupJob < ApplicationJob
  queue_as :default

  around_perform :around_cleanup

  def perform
    # 後で行なう
  end

  private

  def around_cleanup
    # performの直前に何か実行
    yield
    # performの直後に何か実行
  end
end

このマクロスタイルのクラスメソッドは、ブロックを1つ受け取ることもできます。ブロック内のコード量が1行以内に収まるほど少ない場合は、この書き方をご検討ください。 たとえば、登録されたジョブごとの測定値を送信する場合は次のようにします。

class ApplicationJob < ActiveJob::Base
  before_enqueue { |job| $statsd.increment "#{job.class.name.underscore}.enqueue" }
end

7.1 利用できるコールバック

perform_all_laterでジョブをキューに一括登録すると、個別のジョブではaround_enqueueなどのコールバックがトリガーされなくなる点にご注意ください。 詳しくは一括登録のコールバックを参照してください。

8 一括登録

perform_all_laterを使うことで、複数のジョブをキューに一括登録(bulk enqueue: バルクエンキュー)できます。一括登録により、Redisやデータベースなどのキューデータストアとのジョブの往復が削減され、同じジョブを個別に登録するよりもパフォーマンスが向上します。

perform_all_laterはActive JobのトップレベルAPIで、インスタンス化されたジョブを引数として受け取ります(この点がperform_laterと異なることにご注意ください)。 perform_all_laterは内部でperformを呼び出します。 newに渡された引数は、最終的にperformが呼び出されるときにperformに渡されます。

以下は、GuestCleanupJobインスタンスを用いてperform_all_laterを呼び出すコード例です。

# `perform_all_later`に渡すジョブを作成する
# この`new`に渡した引数は`perform`に渡される
guest_cleanup_jobs = Guest.all.map { |guest| GuestsCleanupJob.new(guest) }

# `GuestCleanupJob`の個別のインスタンスごとにジョブをキューに登録する
ActiveJob.perform_all_later(guest_cleanup_jobs)

# `set`メソッドでオプションを設定してからジョブを一括登録してもよい
guest_cleanup_jobs = Guest.all.map { |guest| GuestsCleanupJob.new(guest).set(wait: 1.day) }

ActiveJob.perform_all_later(guest_cleanup_jobs)

perform_all_laterは、正常にキューに登録されたジョブの個数をログ出力します。たとえば、上のGuest.all.mapの結果guest_cleanup_jobsが3個になった場合、Enqueued 3 jobs to Async (3 GuestsCleanupJob)とログ出力されます(キュー登録がすべて成功した場合)。

perform_all_laterの戻り値はnilです。これは、perform_laterがキューに登録したジョブクラスのインスタンスを返すのと異なる点にご注意ください。

8.1 複数のActive Jobクラスを登録する

perform_all_laterを使えば、同じ呼び出しでさまざまなActive Jobクラスのインスタンスを以下のようにキューに登録することも可能です。

class ExportDataJob < ApplicationJob
  def perform(*args)
    # データをエクスポートする
  end
end

class NotifyGuestsJob < ApplicationJob
  def perform(*guests)
    # ゲストにメールを送信する
  end
end

# ジョブインスタンスをインスタンス化する
cleanup_job = GuestsCleanupJob.new(guest)
export_job = ExportDataJob.new(data)
notify_job = NotifyGuestsJob.new(guest)

# さまざまなクラスのジョブインスタンスをまとめてキューに登録する
ActiveJob.perform_all_later(cleanup_job, export_job, notify_job)

8.2 一括登録のコールバック

perform_all_laterでジョブをキューに一括登録すると、個別のジョブではaround_enqueueなどのコールバックがトリガーされません。この振る舞いは、Active Recordの他の一括処理系メソッドと一貫しています。コールバックは個別のジョブに対して実行されるので、perform_all_laterメソッドでは一括処理の恩恵を受けられません。

ただし、perform_all_laterメソッドは、ActiveSupport::Notificationsでサブスクライブできるenqueue_all.active_jobイベントをトリガーします。

ジョブのキューへの登録が成功したかどうかを知るには、successfully_enqueued?メソッドが利用できます。

8.3 キューバックエンドのサポート

perform_all_laterによるキューへの一括登録を行うには、キューのバックエンドにるサポートが必要です。

たとえば、Sidekiqにはpush_bulkメソッドがあるので、これを用いて多数のジョブをRedisにプッシュして、往復の増加によるネットワーク遅延を避けられます。GoodJobではGoodJob::Bulk.enqueueメソッドによるキューへの一括登録もサポートしています。新しいキューバックエンドであるSolid Queueでもキューへの一括登録のサポートが追加されました。

キューへの一括登録がキューバックエンドでサポートされていない場合、perform_all_laterはジョブを1件ずつキューに登録します。

9 Action Mailer

最近のWebアプリケーションでよく実行されるジョブといえば、リクエスト-レスポンスのサイクルの外でメールを送信することでしょう。これにより、ユーザーが送信を待つ必要がなくなります。Active JobはAction Mailerと統合されているので、非同期メール送信を簡単に行えます。

# すぐにメール送信するなら#deliver_now
UserMailer.welcome(@user).deliver_now

# Active Jobで後でメール送信するなら#deliver_later
UserMailer.welcome(@user).deliver_later

一般に、非同期キュー(.deliver_laterでメールを送信するなど)はRakeタスクに書いても動きません。Rakeが終了すると、.deliver_laterがメールの処理を完了する前にインプロセスのスレッドプールを削除する可能性があるためです。この問題を回避するには、.deliver_nowを用いるか、development環境で永続的キューを実行してください。

10 国際化(i18n)

各ジョブでは、ジョブ作成時に設定されたI18n.localeを使います。これはメールを非同期的に送信する場合に便利です。

I18n.locale = :eo

UserMailer.welcome(@user).deliver_later # メールがエスペラント語にローカライズされる

11 引数でサポートされる型

Active Jobの引数では、デフォルトで以下の型をサポートします。

  • 基本型(NilClassStringIntegerFloatBigDecimalTrueClassFalseClass
  • Symbol
  • Date
  • Time
  • DateTime
  • ActiveSupport::TimeWithZone
  • ActiveSupport::Duration
  • Hash(キーの型はStringSymbolにすべき)
  • ActiveSupport::HashWithIndifferentAccess
  • Array
  • Range
  • Module
  • Class

12 GlobalID

Active JobではGlobalIDがパラメータとしてサポートされています。GlobalIDを使えば、動作中のActive Recordオブジェクトをジョブに渡す際にクラスとidを指定する必要がありません。クラスとidを指定する従来の方法では、後で明示的にデシリアライズ (deserialize) する必要がありました。従来のジョブが以下のようなものだったとします。

class TrashableCleanupJob < ApplicationJob
  def perform(trashable_class, trashable_id, depth)
    trashable = trashable_class.constantize.find(trashable_id)
    trashable.cleanup(depth)
  end
end

上は以下のように簡潔に書けます。

class TrashableCleanupJob < ApplicationJob
  def perform(trashable, depth)
    trashable.cleanup(depth)
  end
end

このコードは、GlobalID::Identificationをミックスインするすべてのクラスで動作します。このモジュールはActive Recordクラスにデフォルトでミックスインされます。

12.1 シリアライザ

サポートされる引数の型は、以下のような独自のシリアライザを定義するだけで拡張できます。

# app/serializers/money_serializer.rb
class MoneySerializer < ActiveJob::Serializers::ObjectSerializer
  # ある引数がこのシリアライザでシリアライズされるべきかどうかをチェックする
  def serialize?(argument)
    argument.is_a? Money
  end
  # あるオブジェクトを、オブジェクト型をサポートするもっとシンプルな表現形式に変換する。
  # 表現形式としては特定のキーを持つハッシュが推奨される。キーには基本型のみが利用可能。
  # `super`を読んでカスタムシリアライザ型をハッシュに追加すべき
  def serialize(money)
    super(
      "amount" => money.amount,
      "currency" => money.currency
    )
  end
  # シリアライズされた値を正しいオブジェクトに逆変換する
  def deserialize(hash)
    Money.new(hash["amount"], hash["currency"])
  end
end

続いてこのシリアライザをリストに追加します。

# config/initializers/custom_serializers.rb
Rails.application.config.active_job.custom_serializers << MoneySerializer

初期化中は、再読み込み可能なコードの自動読み込みがサポートされていない点にご注意ください。そのため、たとえば以下のようにconfig/application.rbを修正するなどして、シリアライザが1度だけ読み込まれるように設定することをおすすめします。

# config/application.rb
module YourApp
  class Application < Rails::Application
    config.autoload_once_paths << Rails.root.join('app', 'serializers')
  end
end

13 例外処理

Active Jobでは、ジョブ実行時に発生する例外をrescue_fromでキャッチする方法が提供されています。

class GuestsCleanupJob < ApplicationJob
  queue_as :default

  rescue_from(ActiveRecord::RecordNotFound) do |exception|
   # ここに例外処理を書く
  end

  def perform
    # 後で実行する処理を書く
  end
end

13.1 失敗したジョブをリトライまたは廃棄する

実行中に例外が発生したジョブは、以下のようにretry_onでリトライすることも、discard_onで廃棄することもできます。

class RemoteServiceJob < ApplicationJob
  retry_on CustomAppException # defaults to 3s wait, 5 attempts

  discard_on ActiveJob::DeserializationError

  def perform(*args)
    # CustomAppExceptionかActiveJob::DeserializationErrorをraiseする可能性があるとする
  end
end

詳しくは、ActiveJob::Exceptions APIドキュメントを参照してください。

13.2 デシリアライズ

GlobalIDによって#performに渡された完全なActive Recordオブジェクトのシリアライズが可能になります。

ジョブがキューに登録された後で、渡したレコードが1件削除され、かつ#performメソッドをまだ呼び出していない場合は、Active JobによってActiveJob::DeserializationErrorエラーがraiseされます。

14 ジョブをテストする

ジョブのテスト方法について詳しくは、テスティングガイドをご覧ください。

15 デバッグ

ジョブがどこから来ているのかを把握したい場合は、詳細なログを有効にできます。

フィードバックについて

Railsガイドは GitHub の yasslab/railsguides.jp で管理・公開されております。本ガイドを読んで気になる文章や間違ったコードを見かけたら、気軽に Pull Request を出して頂けると嬉しいです。Pull Request の送り方については GitHub の README をご参照ください。

原著における間違いを見つけたら『Rails のドキュメントに貢献する』を参考にしながらぜひ Rails コミュニティに貢献してみてください 🛠💨✨

本ガイドの品質向上に向けて、皆さまのご協力が得られれば嬉しいです。

Railsガイド運営チーム (@RailsGuidesJP)

支援・協賛

Railsガイドは下記の協賛企業から継続的な支援を受けています。支援・協賛にご興味あれば協賛プランからお問い合わせいただけると嬉しいです。

  1. Star
  2. このエントリーをはてなブックマークに追加