Active Job の基礎

本ガイドでは、バックグラウンドで実行するジョブの作成、キュー登録(エンキュー: enqueue)、実行方法について解説します。

このガイドの内容:

  • ジョブの作成とキューへの登録方法
  • Solid Queueの設定と利用方法
  • バックグラウンドでのジョブ実行方法
  • アプリケーションから非同期にメールを送信する方法

1 Active Jobについて

Active Jobは、バックグラウンドジョブを宣言してキューイングバックエンドで実行するために設計されたRailsのフレームワークです。

メールの送信、データの処理、クリーンアップや料金の請求といった定期的なメンテナンス業務の処理などのタスクを実行するための、標準化されたインターフェイスを提供します。

Active Jobは、これらのタスクをメインアプリケーションスレッドで処理する代わりに、デフォルトのSolid Queueなどのキューイングバックエンドに処理させることで、時間のかかる操作がリクエスト・レスポンスのサイクルをブロックしないようにします。これにより、アプリケーションのパフォーマンスと応答性が向上し、タスクを並行して処理できるようになります。

2 ジョブを作成して登録する

このセクションでは、ジョブの作成方法とジョブの登録(enqueue: エンキュー)方法を手順を追って説明します。

2.1 ジョブを作成する

Active Jobは、ジョブ作成用のRailsジェネレータを提供しています。以下を実行すると、app/jobs/ディレクトリの下にジョブが1つ作成されます。

$ bin/rails generate job guests_cleanup
invoke  test_unit
create    test/jobs/guests_cleanup_job_test.rb
create  app/jobs/guests_cleanup_job.rb

以下のようにすると、特定のキューに対してジョブを1件作成できます。

$ bin/rails generate job guests_cleanup --queue urgent

ジェネレータを使いたくない場合は、app/jobs/ディレクトリの下に自分でジョブファイルを作成することもできます。ジョブファイルでは必ずApplicationJobを継承してください。

作成されたジョブは以下のようになります。

class GuestsCleanupJob < ApplicationJob
  queue_as :default

  def perform(*guests)
    # 後で実行するタスクをここに置く
  end
end

なお、performの定義にはいくつでも引数を渡せます。

ApplicationJobと異なる名前の抽象クラスが既に存在する場合、以下のように--parentオプションを渡すことで、別の抽象クラスが必要であることを示せます。

$ bin/rails generate job process_payment --parent=payment_job
class ProcessPaymentJob < PaymentJob
  queue_as :default

  def perform(*args)
    # 後で実行するタスクをここに置く
  end
end

2.2 ジョブをキューに登録する

キューへのジョブ登録はperform_laterで以下のように行います。オプションでsetも指定できます。

# 「キューイングシステムが空いたらジョブを実行する」とキューに登録する
GuestsCleanupJob.perform_later guest
# 明日正午に実行したいジョブをキューに登録する
GuestsCleanupJob.set(wait_until: Date.tomorrow.noon).perform_later(guest)
# 一週間後に実行したいジョブをキューに登録する
GuestsCleanupJob.set(wait: 1.week).perform_later(guest)
# `perform_now`と`perform_later`は`perform`を呼び出すので、
# 定義した引数を渡すことができる
GuestsCleanupJob.perform_later(guest1, guest2, filter: "some_filter")

以上でジョブ登録は完了です。

2.3 複数のジョブを一括登録する

perform_all_laterを使うと、複数のジョブを一括登録できます。詳しくは後述の一括登録を参照してください。

3 Solid Queue: デフォルトのバックエンド

Solid Queueは、通常のデータベースを利用するActive Job用のキューイングシステムであり、Rails 8.0からデフォルトで有効になっています。Solid Queueは、Redisなどの追加の依存関係を必要とせずに大量のデータをジョブキューで処理できます。

Solid Queueは、通常のジョブのエンキューや処理に加えて、ジョブの遅延実行やコンカレンシー制御、数値によるジョブごとの優先度指定、キュー実行順序に基づいた優先度などをサポートします。

3.1 セットアップ

3.1.1 development環境の場合

development環境のRailsは、非同期のインプロセスキューイングシステムを提供し、ジョブをメモリ上に保持します。

デフォルトの非同期バックエンドでは、プロセスがクラッシュしたり開発中のコンピュータがリセットされたりすると、未処理のジョブがすべて失われますが、開発中の小規模なアプリや重要度の低いジョブについては、これで十分です。

しかし、Solid Queueを使えば、production環境と同じ方法で以下のようにdevelopment環境のジョブキューシステムを設定できます。

# config/environments/development.rb
config.active_job.queue_adapter = :solid_queue
config.solid_queue.connects_to = { database: { writing: :queue } }

上の設定では、production環境におけるActive Jobのデフォルトと同様に、development環境に:solid_queueアダプタが設定され、書き込み用にqueueデータベースに接続します。

次に、development環境用のデータベース設定で以下のようにqueueを追加します。

# config/database.yml
development
  primary:
    <<: *default
    database: storage/development.sqlite3
  queue:
    <<: *default
    database: storage/development_queue.sqlite3
    migrations_paths: db/queue_migrate

データベース設定のqueueキーは、config.solid_queue.connects_toの設定で使われているキーと同じにする必要があります。

queueデータベースのマイグレーションを実行すれば、キューデータベース内のすべてのテーブルが作成されるようになります。

$ bin/rails db:migrate:queue

TIPS: queueデータベースのデフォルトの生成スキーマはdb/queue_schema.rbに配置されます。これらのスキーマファイルには solid_queue_ready_executionssolid_queue_scheduled_executionsなどのテーブルが含まれます。

最後に、キューを開始してジョブの処理を開始するには、次のコマンドを実行します。

bin/jobs start

3.1.2 production環境の場合

Solid Queueはすでにproduction環境用に設定済みです。config/environments/production.rbファイルを開くと、以下の内容が設定済みであることがわかります。

# config/environments/production.rb
# Active Jobのデフォルトのキューイングバックエンド(インプロセスかつ永続化されない)を置き換える
config.active_job.queue_adapter = :solid_queue
config.solid_queue.connects_to = { database: { writing: :queue } }

さらに、queueデータベースで利用するデータベースコネクションは、config/database.ymlファイルで設定されます。

# config/database.yml
# production環境のデータベースは、デフォルトでstorage/ ディレクトリに保存される
# このディレクトリは、デフォルトでconfig/deploy.ymlで永続的なDockerボリュームとしてマウントされる
production:
  primary:
    <<: *default
    database: storage/production.sqlite3
  queue:
    <<: *default
    database: storage/production_queue.sqlite3
    migrations_paths: db/queue_migrate

3.2 設定オプション

Solid Queueの設定オプションはconfig/queue.ymlで定義します。 以下はデフォルト設定の例です。

default: &default
  dispatchers:
    - polling_interval: 1
      batch_size: 500
  workers:
    - queues: "*"
      threads: 3
      processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %>
      polling_interval: 0.1

Solid Queueの設定オプションを理解するには、さまざまな種類のロール(role: 役割)を理解しておく必要があります。

  • ディスパッチャ(dispatcher): 今後実行するようにスケジュールされているジョブを選択します。 これらのジョブを実行する時刻になったら、ディスパッチャはそれらのジョブをsolid_queue_scheduled_executionsテーブルからsolid_queue_ready_executionsテーブルに移動し、ワーカーがジョブを取得できるようにします。また、コンカレンシー関連のメンテナンスも管理します。

  • ワーカー(worker): 実行準備が整ったジョブをsolid_queue_ready_executionsテーブルから取得します。

  • スケジューラ(scheduler): 定期的なタスクを処理し、期限が来たらジョブをキューに追加します。

  • スーパーバイザ(supervisor): システム全体を監視して、ワーカーとディスパッチャを管理します。 必要に応じてワーカーやディスパッチャを開始・停止し、健全性を監視し、すべてがスムーズに実行されるようにします。

config/queue.ymlの設定は、すべてがオプションです(つまり必須項目はありません)。設定が指定されていない場合、Solid Queueはデフォルト設定で1つのディスパッチャーと1つのワーカーで実行されます。

以下は、config/queue.ymlで設定できる設定オプションの一部です。

  • polling_interval: ディスパッチャーやワーカーが次のジョブをチェックするまでの待ち時間を秒で指定します。 ディスパッチャのデフォルト値は1秒、ワーカーのデフォルト値は0.1秒です。

  • batch_size: 1回のバッチでディスパッチされるジョブの件数です。 デフォルト値は500です。

  • concurrency_maintenance_interval: ディスパッチャがブロックされたジョブを解除できるかどうかを確認するまでの待ち時間を秒で指定します。 デフォルト値は600秒です。

  • queues: ワーカーがジョブを取得するキューのリストを指定します。 *を使って、すべてのキューまたはキュー名のプレフィックスを指定できます。 デフォルト値は*です。

  • threads: 各ワーカーのスレッドプールの最大サイズを指定します。 ワーカーが1回に取得するジョブの件数を決定します。 デフォルト値は3です。

  • processes: スーパーバイザによってforkされるワーカープロセスの個数を指定します。 各プロセスはCPUコアを専有できます。 デフォルト値は1です。

  • concurrency_maintenance: ディスパッチャがコンカレンシーメンテナンス作業を行うかどうかを指定します。 デフォルト値はtrueです。

設定オプションについて詳しくは、Solid Queueのドキュメントを参照してください。 また、config/<environment>.rbで設定できる追加の設定オプションを使うことで、RailsアプリケーションでSolid Queueをさらに詳細に設定できます。

3.3 キューの順序

設定のqueuesオプションには、後述する設定のオプションに沿って、ワーカーがジョブを選択するキューのリストを記述します。キューのリストでは、キューの順序が重要です。ワーカーはリストの最初のキューからジョブを選択し、最初のキューにジョブがなくなると、2番目のキューに移動し、以下同様に繰り返します。

# config/queue.yml
production:
  workers:
    - queues:[active_storage*, mailers]
      threads: 3
      polling_interval: 5

上の例では、ワーカーは最初に「active_storage」で始まるキュー(active_storage_analyseキューやactive_storage_transformキューなど)からジョブを取得します。「active_storage」で始まるキューにジョブが残っていない場合にのみ、ワーカーはmailersキューに移動します。

ワイルドカード*の利用は、「単独」または「キュー名の末尾に配置することで同じプレフィックスを持つすべてのキューに一致させる」形(active_storage*など)のみが可能です。*_some_queueなどのようなキュー名の冒頭への追加はできません。

queues: active_storage*のようにキュー名でワイルドカードを利用すると、マッチするすべてのキューを識別するためにDISTINCTクエリが必要になるため、ポーリングのパフォーマンスが低下して大きなテーブルでは遅くなる可能性があります。パフォーマンスを落とさないためには、ワイルドカードを使わずに正確なキュー名を指定することが推奨されます。

Active Jobは、ジョブをエンキューするときに正の整数の優先度をサポートします(後述の優先度セクションを参照)。 1件のキュー内では、優先度に基づいてジョブが選択されます(整数が小さいほど優先度が高くなります)。

ただしキューが複数ある場合は、キュー自体の順序が優先されます。たとえば、productionbackgroundという2つのキューがこの順序で設定されている場合、場合、backgroundキュー内の一部のジョブの優先度の方が高い場合でも、productionキュー内のジョブが常に最初に処理されます。

3.4 スレッド、プロセス、シグナル

Solid Queueの並列処理は、スレッドthreadsパラメータで設定可能)、プロセスprocessesパラメータで設定可能)、または水平スケーリングによって実現されます。

スーパーバイザーはプロセスを管理し、以下のシグナルに応答します。

  • TERMINT: 正常な終了処理を開始し、TERMシグナルを送信してSolidQueue.shutdown_timeoutに達するまで待機します。 終了しない場合は、QUITシグナルでプロセスを強制終了します。

  • QUIT: プロセスを強制的に即時終了します。

ワーカーがKILLシグナルなどによって予期せず強制終了された場合、実行中のジョブは失敗としてマークされ、SolidQueue::Processes::ProcessExitErrorSolidQueue::Processes::ProcessPrunedErrorなどのエラーが発生します。 ハートビート設定は、期限切れのプロセスを管理および検出するのに有用です。

スレッド、プロセス、シグナルについて詳しくはSolid Queueのドキュメントを参照してください。

3.5 キュー登録時のエラー

Solid Queueは、ジョブのエンキュー中にActive Recordエラーが発生すると、SolidQueue::Job::EnqueueErrorを発生させます。

このエラーは、Active Jobによって発生するActiveJob::EnqueueError(エラーを処理してperform_laterfalseを返すようにする)とは異なることにご注意ください。このため、RailsやTurbo::Streams::BroadcastJobなどのサードパーティgemによってエンキューされたジョブのエラー処理が複雑になります。

定期的なタスクの場合、エンキュー中に発生したエラーはすべてログに出力されますが、エラーをraiseしません。エンキュー時のエラーについて詳しくはSolid Queueのドキュメントを参照してください。

3.6 コンカレンシーの制御

Solid Queueは、Active Jobをコンカレンシー制御で拡張し、特定の種類のジョブや特定の引数を持つジョブの同時実行数を制限できるようにします。ジョブがこの制限を超えると、別のジョブが終了するか期間が終了するまでブロックされます。

class MyJob < ApplicationJob
  limits_concurrency to: 2, key: ->(contact) { contact.account }, duration: 5.minutes

  def perform(contact)
    # ジョブのロジックを実行する
  end
end

上の例では、同じアカウントの2つのMyJobインスタンスのみが同時に実行されます。その後、ジョブの1つが完了するまで、他のジョブはブロックされます。

以下のようにgroupパラメータを指定すると、異なるジョブタイプ間での同時実行を制御できます。たとえば、同じグループに属する2つの異なるジョブクラスは、まとめて同時実行が制限されます。

class Box::MovePostingsByContactToDesignatedBoxJob < ApplicationJob
  limits_concurrency key: ->(contact) { contact }, duration: 15.minutes, group: "ContactActions"
end

class Bundle::RebundlePostingsJob < ApplicationJob
  limits_concurrency key: ->(bundle) { bundle.contact }, duration: 15.minutes, group: "ContactActions"
end

これにより、特定の連絡先(contact)に対して一度に実行できるジョブは、ジョブクラスにかかわらず1件だけになります。

コンカレンシー制御について詳しくはSolid Queueのドキュメントを参照してください。

3.7 ジョブのエラー報告

利用しているエラートラッキングサービスがジョブエラーを自動的に報告しない場合は、Active Jobに手動でフックする形で報告できます。たとえば、ApplicationJobで以下のようにrescue_fromブロックを追加できます。

class ApplicationJob < ActiveJob::Base
  rescue_from(Exception) do |exception|
    Rails.error.report(exception)
    raise exception
  end
end

Action Mailerを利用している場合は、以下のようにMailDeliveryJobのエラーを個別に処理する必要があります。

class ApplicationMailer < ActionMailer::Base
  ActionMailer::MailDeliveryJob.rescue_from(Exception) do |exception|
    Rails.error.report(exception)
    raise exception
  end
end

3.8 ジョブのトランザクション整合性

Solid Queueは、デフォルトではメインアプリケーションとは別のデータベースを利用します。これにより、トランザクションの整合性に関する問題が回避され、トランザクションがコミットされた場合にのみジョブがエンキューされるようになります。

ただし、Solid Queueをアプリと同一のデータベースで利用する場合は、Active Jobのenqueue_after_transaction_commitオプションでトランザクションの整合性を有効にできます。このオプションは、ジョブごとに有効にすることも、以下のようにApplicationJobですべてのジョブに対して有効にすることも可能です。

class ApplicationJob < ActiveJob::Base
  self.enqueue_after_transaction_commit = true
end

また、Solid Queueジョブ用のデータベースコネクションを別途設定することで、トランザクションの整合性の問題を回避しながら、アプリと同一のデータベースを利用するようにSolid Queueを構成することも可能です。

トランザクションの整合性について詳しくはSolid Queueのドキュメントを参照してください。

3.9 定期的なタスク

Solid Queueは、cronジョブに似た定期的なタスクをサポートしています。定期タスクは設定ファイル(デフォルトではconfig/recurring.yml)で定義され、特定の時間にスケジューリングできます。タスク設定の例を以下に示します。

production:
  a_periodic_job:
    class: MyJob
    args: [42, { status: "custom_status" }]
    schedule: every second
  a_cleanup_task:
    command: "DeletedStuff.clear_all"
    schedule: every day at 9am

各タスクには、class(またはcommand)とscheduleを指定します(スケジュール指定文字列の解析にはFugit gemが使われます)。 上の設定例のMyJobのように、argsオプションでジョブに引数を渡すことも可能です。argsオプションには「単一の引数」「ハッシュ」「引数の配列」のいずれかを渡すことが可能で、配列の場合は最後の要素にキーワード引数も含められます。 このようにして、ジョブを定期実行したり、指定の時間に実行したりできます。

定期的なタスクについて詳しくはSolid Queueのドキュメントを参照してください。

3.10 ジョブのトラッキングと管理

失敗したジョブの監視や管理を一元化するには、mission_control-jobsなどのツールが有用です、ジョブのステータス、ジョブ失敗の理由、ジョブ再試行の動作に関する洞察を提供し、問題をより効果的にトラッキングして解決を支援します。

mission_control-jobsツールを使うと、たとえば大きなファイルを処理するジョブがタイムアウトで失敗したときに、失敗を検査し、ジョブの引数や実行履歴を確認して、「再試行」「再キューイング」「破棄」のいずれかを決定するのに役立ちます。

4 キュー

Active Jobのqueue_asを使って、特定のキューに入っているジョブをスケジューリングできます。

class GuestsCleanupJob < ApplicationJob
  queue_as :low_priority
  # ...
end

application.rbで以下のようにconfig.active_job.queue_name_prefixを使うことで、すべてのジョブでキュー名の前に特定の文字列を追加できます。

# config/application.rb
module YourApp
  class Application < Rails::Application
    config.active_job.queue_name_prefix = Rails.env
  end
end
# app/jobs/guests_cleanup_job.rb
class GuestsCleanupJob < ApplicationJob
  queue_as :low_priority
  # ...
end

# 以上で、production環境ではproduction_low_priorityというキューでジョブが
# 実行されるようになり、staging環境ではstaging_low_priorityというキューで
# ジョブが実行されるようになります

以下のようにジョブごとにプレフィックスを設定することも可能です。

class GuestsCleanupJob < ApplicationJob
  queue_as :low_priority
  self.queue_name_prefix = nil
  # ...
end

# これで`config.active_job.queue_name_prefix`の設定が上書きされ、
# 自分のジョブキューにプレフィックスが設定されなくなります

キュー名のプレフィックスのデフォルト区切り文字はアンダースコア_です。 この区切り文字は、config.active_job.queue_name_delimiter設定で変更できます。

# config/application.rb
module YourApp
  class Application < Rails::Application
    config.active_job.queue_name_prefix = Rails.env
    config.active_job.queue_name_delimiter = "."
  end
end
# app/jobs/guests_cleanup_job.rb
class GuestsCleanupJob < ApplicationJob
  queue_as :low_priority
  # ...
end

# 以上で、production環境ではproduction.low_priorityというキューでジョブが
# 実行されるようになり、staging環境ではstaging.low_priorityというキューでジョブが実行されるようになります

#queue_asにブロックを渡すと、キューをそのジョブレベルで制御できます。渡されたブロックは、そのジョブのコンテキストで実行されるため、キュー名を返す必要があります(これによりself.argumentsにアクセス可能になります)。

class ProcessVideoJob < ApplicationJob
  queue_as do
    video = self.arguments.first
    if video.owner.premium?
      :premium_videojobs
    else
      :videojobs
    end
  end

  def perform(video)
    # 動画を処理する
  end
end
ProcessVideoJob.perform_later(Video.last)

ジョブを実行するキューをさらに細かく制御したい場合は、#set:queueオプションを渡せます。

MyJob.set(queue: :another_queue).perform_later(record)

SOLID QUEUE以外の一部のバックエンドでは、リッスンするキューを指定する必要が生じることもあります。

5 優先度

優先度(priority)を指定してジョブをスケジューリングするには、queue_with_priorityメソッドを使います。

class GuestsCleanupJob < ApplicationJob
  queue_with_priority 10
  # ...
end

デフォルトのキューイングバックエンドであるSolid Queueは、キューの順序に基づいてジョブの優先順位を決定します。詳しくは前述のキューの順序セクションを参照してください。

Solid Queueでキューの順序と優先度オプションを両方使っている場合、キューの順序が優先され、優先度オプションは個別のキュー内でのみ適用されます。

Solid Queue以外のキューイングバックエンドでは、ジョブを同じキュー内や複数のキュー間の他のジョブと比較する形で優先度を指定できる場合があります。詳しくは、利用するバックエンドのドキュメントを参照してください。

以下のようにqueue_with_priorityにブロックを渡すことで、queue_asの場合と同様にブロックをジョブコンテキストで評価することも可能です。

class ProcessVideoJob < ApplicationJob
  queue_with_priority do
    video = self.arguments.first
    if video.owner.premium?
      0
    else
      10
    end
  end

  def perform(video)
    # 動画を処理する
  end
end
ProcessVideoJob.perform_later(Video.last)

以下のようにset:priorityオプションを渡すことも可能です。

MyJob.set(priority: 50).perform_later(record)

優先度の低い番号が、優先度の高い番号より先に実行されるか後に実行されるかは、アダプタの実装によって異なります。詳しくは利用しているバックエンドのドキュメントを参照してください。アダプタの作成者は、小さい番号ほど重要度が高いものとして扱うことが推奨されます。

6 コールバック

Active Jobが提供するフックを用いて、ジョブのライフサイクル中にロジックをトリガーできます。これらのコールバックは、Railsの他のコールバックと同様に通常のメソッドとして実装し、マクロ風のクラスメソッドでコールバックとして登録できます。

class GuestsCleanupJob < ApplicationJob
  queue_as :default

  around_perform :around_cleanup

  def perform
    # 後で行なう
  end

  private

  def around_cleanup
    # performの直前に何か実行
    yield
    # performの直後に何か実行
  end
end

このマクロスタイルのクラスメソッドは、ブロックを1つ受け取ることもできます。ブロック内のコード量が1行以内に収まるほど少ない場合は、この書き方をご検討ください。 たとえば、登録されたジョブごとの測定値を送信する場合は次のようにします。

class ApplicationJob < ActiveJob::Base
  before_enqueue { |job| $statsd.increment "#{job.class.name.underscore}.enqueue" }
end

6.1 利用できるコールバック

perform_all_laterでジョブをキューに一括登録すると、個別のジョブではaround_enqueueなどのコールバックがトリガーされなくなる点にご注意ください。

詳しくは一括登録のコールバックを参照してください。

7 一括登録

perform_all_laterを使うことで、複数のジョブをキューに一括登録(bulk enqueue: バルクエンキュー)できます。一括登録により、Redisやデータベースなどのキューデータストアとのジョブの往復回数を減らせるので、同じジョブを個別に登録するよりもパフォーマンスが向上します。

perform_all_laterはActive JobのトップレベルAPIで、インスタンス化されたジョブを引数として受け取ります(この点がperform_laterと異なることにご注意ください)。perform_all_laterは内部でperformを呼び出します。newに渡された引数は、最終的にperformが呼び出されるときにperformに渡されます。

以下は、GuestCleanupJobインスタンスを用いてperform_all_laterを呼び出すコード例です。

# `perform_all_later`に渡すジョブを作成する
# この`new`に渡した引数は`perform`に渡される
guest_cleanup_jobs = Guest.all.map { |guest| GuestsCleanupJob.new(guest) }

# `GuestCleanupJob`の個別のインスタンスごとにジョブをキューに登録する
ActiveJob.perform_all_later(guest_cleanup_jobs)

# `set`メソッドでオプションを設定してからジョブを一括登録してもよい
guest_cleanup_jobs = Guest.all.map { |guest| GuestsCleanupJob.new(guest).set(wait: 1.day) }

ActiveJob.perform_all_later(guest_cleanup_jobs)

perform_all_laterは、正常にエンキューされたジョブの個数をログ出力します。 たとえば、上のGuest.all.mapの結果guest_cleanup_jobsが3個になった場合、Enqueued 3 jobs to Async (3 GuestsCleanupJob)とログ出力されます(エンキューがすべて成功した場合)。

perform_all_laterの戻り値はnilです。これは、perform_laterの戻り値が、エンキューされたジョブクラスのインスタンスであるのと異なる点にご注意ください。

7.1 複数のActive Jobクラスをキューに登録する

perform_all_laterを使えば、同じ呼び出しでさまざまなActive Jobクラスのインスタンスを以下のようにエンキューすることも可能です。

class ExportDataJob < ApplicationJob
  def perform(*args)
    # データをエクスポートする
  end
end

class NotifyGuestsJob < ApplicationJob
  def perform(*guests)
    # ゲストにメールを送信する
  end
end

# ジョブインスタンスをインスタンス化する
cleanup_job = GuestsCleanupJob.new(guest)
export_job = ExportDataJob.new(data)
notify_job = NotifyGuestsJob.new(guest)

# さまざまなクラスのジョブインスタンスをまとめてキューに登録する
ActiveJob.perform_all_later(cleanup_job, export_job, notify_job)

7.2 一括登録のコールバック

perform_all_laterでジョブをキューに一括登録すると、個別のジョブではaround_enqueueなどのコールバックがトリガーされません。この振る舞いは、Active Recordの他の一括処理系メソッドと一貫しています。コールバックは個別のジョブに対して個々に実行されるので、perform_all_laterメソッドが持つ一括処理の性質の恩恵を受けられません。

ただし、perform_all_laterメソッドは、ActiveSupport::Notificationsでサブスクライブできるenqueue_all.active_jobイベントをトリガーします。

ジョブのエンキューが成功したかどうかを知るには、successfully_enqueued?メソッドを利用できます。

7.3 キューバックエンドのサポート

perform_all_laterによるキューへの一括登録(バルクエンキュー)は、キューバックエンド側でのサポートが必要ですす。デフォルトのキューバックエンドであるSolid Queueは、enqueue_allで一括登録をサポートします。

Sidekiqなどの他のバックエンドにはpush_bulkメソッドがあり、大量のジョブをRedisにプッシュして、ラウンドトリップネットワークの遅延を防ぐようになっています。GoodJobもGoodJob::Bulk.enqueueメソッドで一括登録をサポートします。

キューへの一括登録がキューバックエンドでサポートされていない場合、perform_all_laterはジョブを1件ずつキューに登録します。

8 Action Mailer

最近のWebアプリケーションでよく実行されるジョブといえば、リクエスト・レスポンスサイクルの外でメールを送信することで、アプリケーションのユーザーが待たされないようにすることでしょう。 Active JobはAction Mailerと統合されているので、非同期メール送信を手軽に行えます。

# すぐにメール送信するなら#deliver_now
UserMailer.welcome(@user).deliver_now

# Active Jobで後でメール送信するなら#deliver_later
UserMailer.welcome(@user).deliver_later

一般に、非同期キュー(.deliver_laterでメールを送信するなど)はRakeタスクに書いても動きません。Rakeが終了すると、.deliver_laterがメールの処理を完了する前にインプロセスのスレッドプールを削除する可能性があるためです。この問題を回避するには、.deliver_nowを用いるか、development環境で永続的キューを実行してください。

9 国際化(i18n)

各ジョブでは、ジョブ作成時に設定されたI18n.localeを使います。これはメールを非同期的に送信する場合に便利です。

I18n.locale = :eo

UserMailer.welcome(@user).deliver_later # メールがエスペラント語にローカライズされる

10 引数でサポートされる型

Active Jobの引数では、デフォルトで以下の型をサポートします。

  • 基本型(NilClassStringIntegerFloatBigDecimalTrueClassFalseClass
  • Symbol
  • Date
  • Time
  • DateTime
  • ActiveSupport::TimeWithZone
  • ActiveSupport::Duration
  • Hash(キーの型はStringSymbolにすべき)
  • ActiveSupport::HashWithIndifferentAccess
  • Array
  • Range
  • Module
  • Class

11 GlobalID

Active JobではGlobalIDがパラメータとしてサポートされています。 GlobalIDを使えば、動作中のActive Recordオブジェクトをジョブに渡す際にクラスとidを指定する必要がなくなります。クラスとidを指定する従来の方法では、後で明示的にデシリアライズ(deserialize)する必要がありました。

従来のジョブが以下のようなものだったとします。

class TrashableCleanupJob < ApplicationJob
  def perform(trashable_class, trashable_id, depth)
    trashable = trashable_class.constantize.find(trashable_id)
    trashable.cleanup(depth)
  end
end

現在は、上を以下のように簡潔に書けます。

class TrashableCleanupJob < ApplicationJob
  def perform(trashable, depth)
    trashable.cleanup(depth)
  end
end

このコードは、GlobalID::Identificationをミックスインするすべてのクラスで動作します。このモジュールはActive Recordクラスにデフォルトでミックスインされます。

11.1 シリアライザ

サポートされる引数の型は、以下のような独自のシリアライザを定義するだけで拡張できます。

# app/serializers/money_serializer.rb
class MoneySerializer < ActiveJob::Serializers::ObjectSerializer
  # あるオブジェクトを、サポートされているオブジェクト型を使用して、よりシンプルな表現形式に変換する。
  # 推奨される表現形式は、特定のキーを持つハッシュ。キーには基本型のみが利用可能。
  # カスタムシリアライザ型をこのハッシュに追加するには、`super`を呼ぶ必要がある。
  def serialize(money)
    super(
      "amount" => money.amount,
      "currency" => money.currency
    )
  end

  # シリアライズされた値を正しいオブジェクトに逆変換する
  def deserialize(hash)
    Money.new(hash["amount"], hash["currency"])
  end

  private
    # ある引数がこのシリアライザでシリアライズされるべきかどうかをチェックする
    def klass
      Money
    end
end

続いてこのシリアライザをリストに追加します。

# config/initializers/custom_serializers.rb
Rails.application.config.active_job.custom_serializers << MoneySerializer

初期化中は、再読み込み可能なコードの自動読み込みがサポートされていない点にご注意ください。

そのため、たとえば以下のようにconfig/application.rbを修正するなどして、シリアライザが1度だけ読み込まれるように設定することをおすすめします。

# config/application.rb
module YourApp
  class Application < Rails::Application
    config.autoload_once_paths << "#{root}/app/serializers"
  end
end

12 例外処理

Active Jobでは、ジョブ実行時に発生する例外をrescue_fromでキャッチする方法が提供されています。

class GuestsCleanupJob < ApplicationJob
  queue_as :default

  rescue_from(ActiveRecord::RecordNotFound) do |exception|
   # ここに例外処理を書く
  end

  def perform
    # 後で実行する処理を書く
  end
end

ジョブで発生した例外が回復されなかった場合、このジョブは「失敗(failed)」と呼ばれます。

12.1 失敗したジョブをリトライまたは廃棄する

失敗したジョブは、それ用の設定を行わない限り自動ではリトライされません。

実行中に例外が発生したジョブは、以下のようにretry_onでリトライすることも、discard_onで廃棄することもできます。

class RemoteServiceJob < ApplicationJob
  retry_on CustomAppException # デフォルトは「3秒ずつ待って5回リトライする」

  discard_on ActiveJob::DeserializationError

  def perform(*args)
    # CustomAppExceptionかActiveJob::DeserializationErrorをraiseする可能性があるとする
  end
end

12.2 デシリアライズ

GlobalIDを使うと、#performに渡された完全なActive Recordオブジェクトをシリアライズできるようになります。

ジョブがキューに登録された後で、#performメソッドが呼び出される前に、渡されたレコードが削除された場合は、Active JobはActiveJob::DeserializationError例外をraiseします。

13 ジョブをテストする

ジョブのテスト方法について詳しくは、テスティングガイドをご覧ください。

14 デバッグ

ジョブがどこから来ているのかを把握したい場合は、詳細なログを有効にできます。

15 代替キューイングバックエンド

Active Jobでは、複数のキューイングバックエンド(Sidekiq、Resque、Delayed Job など)用の組み込みアダプタも利用できます。

アダプタの最新リストについてはActiveJob::QueueAdaptersのAPIドキュメントを参照してください。

15.1 バックエンドを設定する

キューイングバックエンドは、config.active_job.queue_adapterで以下のように設定できます。

# config/application.rb
module YourApp
  class Application < Rails::Application
    # 必ずアダプタのgemをGemfileに追加し、
    # アダプタ固有のインストールおよびデプロイメント手順を実行すること
    config.active_job.queue_adapter = :sidekiq
  end
end

以下のように、バックエンドをジョブごとに設定することも可能です。

class GuestsCleanupJob < ApplicationJob
  self.queue_adapter = :resque
  # ...
end

# これで、このジョブは`resque`をバックエンドキューアダプタとして使い
# デフォルトのSolid Queueアダプタをオーバーライドする

15.2 バックエンドを起動する

ジョブはRailsアプリケーションと並行して実行されるため、ほとんどのキューイングライブラリでは、ジョブ処理を機能させるために、Railsアプリの起動とは別に、ライブラリ固有のキューイングサービスも起動しておく必要があります。

キューバックエンドの起動手順については、利用するライブラリのドキュメントを参照してください。主なドキュメントの一覧を以下に示します(すべてを網羅しているわけではありません)。

フィードバックについて

Railsガイドは GitHub の yasslab/railsguides.jp で管理・公開されております。本ガイドを読んで気になる文章や間違ったコードを見かけたら、気軽に Pull Request を出して頂けると嬉しいです。Pull Request の送り方については GitHub の README をご参照ください。

原著における間違いを見つけたら『Rails のドキュメントに貢献する』を参考にしながらぜひ Rails コミュニティに貢献してみてください 🛠💨✨

本ガイドの品質向上に向けて、皆さまのご協力が得られれば嬉しいです。

Railsガイド運営チーム (@RailsGuidesJP)

支援・協賛

Railsガイドは下記の協賛企業から継続的な支援を受けています。支援・協賛にご興味あれば協賛プランからお問い合わせいただけると嬉しいです。

  1. Star
  2. このエントリーをはてなブックマークに追加