本ガイドでは、バックグラウンドで実行するジョブの作成、キュー登録(エンキュー: enqueue)、実行方法について解説します。
このガイドの内容:
Active Jobは、バックグラウンドジョブを宣言してキューイングバックエンドで実行するために設計されたRailsのフレームワークです。
メールの送信、データの処理、クリーンアップや料金の請求といった定期的なメンテナンス業務の処理などのタスクを実行するための、標準化されたインターフェイスを提供します。
Active Jobは、これらのタスクをメインアプリケーションスレッドで処理する代わりに、デフォルトのSolid Queueなどのキューイングバックエンドに処理させることで、時間のかかる操作がリクエスト・レスポンスのサイクルをブロックしないようにします。これにより、アプリケーションのパフォーマンスと応答性が向上し、タスクを並行して処理できるようになります。
このセクションでは、ジョブの作成方法とジョブの登録(enqueue: エンキュー)方法を手順を追って説明します。
Active Jobは、ジョブ作成用のRailsジェネレータを提供しています。以下を実行すると、app/jobs/ディレクトリの下にジョブが1つ作成されます。
$ bin/rails generate job guests_cleanup invoke test_unit create test/jobs/guests_cleanup_job_test.rb create app/jobs/guests_cleanup_job.rb
以下のようにすると、特定のキューに対してジョブを1件作成できます。
$ bin/rails generate job guests_cleanup --queue urgent
ジェネレータを使いたくない場合は、app/jobs/ディレクトリの下に自分でジョブファイルを作成することもできます。ジョブファイルでは必ずApplicationJobを継承してください。
作成されたジョブは以下のようになります。
class GuestsCleanupJob < ApplicationJob queue_as :default def perform(*guests) # 後で実行するタスクをここに置く end end
なお、performの定義にはいくつでも引数を渡せます。
ApplicationJobと異なる名前の抽象クラスが既に存在する場合、以下のように--parentオプションを渡すことで、別の抽象クラスが必要であることを示せます。
$ bin/rails generate job process_payment --parent=payment_job
class ProcessPaymentJob < PaymentJob queue_as :default def perform(*args) # 後で実行するタスクをここに置く end end
キューへのジョブ登録はperform_laterで以下のように行います。オプションでsetも指定できます。
# 「キューイングシステムが空いたらジョブを実行する」とキューに登録する GuestsCleanupJob.perform_later guest
# 明日正午に実行したいジョブをキューに登録する GuestsCleanupJob.set(wait_until: Date.tomorrow.noon).perform_later(guest)
# 一週間後に実行したいジョブをキューに登録する GuestsCleanupJob.set(wait: 1.week).perform_later(guest)
# `perform_now`と`perform_later`は`perform`を呼び出すので、 # 定義した引数を渡すことができる GuestsCleanupJob.perform_later(guest1, guest2, filter: "some_filter")
以上でジョブ登録は完了です。
perform_all_laterを使うと、複数のジョブを一括登録できます。詳しくは後述の一括登録を参照してください。
Solid Queueは、通常のデータベースを利用するActive Job用のキューイングシステムであり、Rails 8.0からデフォルトで有効になっています。Solid Queueは、Redisなどの追加の依存関係を必要とせずに大量のデータをジョブキューで処理できます。
Solid Queueは、通常のジョブのエンキューや処理に加えて、ジョブの遅延実行やコンカレンシー制御、数値によるジョブごとの優先度指定、キュー実行順序に基づいた優先度などをサポートします。
development環境のRailsは、非同期のインプロセスキューイングシステムを提供し、ジョブをメモリ上に保持します。
デフォルトの非同期バックエンドでは、プロセスがクラッシュしたり開発中のコンピュータがリセットされたりすると、未処理のジョブがすべて失われますが、開発中の小規模なアプリや重要度の低いジョブについては、これで十分です。 しかしSolid Queueを使えば、production環境でもジョブキューシステムを設定できます。
# config/environments/development.rb config.active_job.queue_adapter = :solid_queue config.solid_queue.connects_to = { database: { writing: :queue } }
上の設定では、development環境でActive Jobのデフォルトとして:solid_queueアダプタが設定され、書き込み用にqueueデータベースに接続します。
次に、development環境用のデータベース設定で以下のようにqueueを追加します。
# config/database.yml
development:
primary:
<<: *default
database: storage/development.sqlite3
queue:
<<: *default
database: storage/development_queue.sqlite3
migrations_paths: db/queue_migrate
データベース設定のqueueキーは、config.solid_queue.connects_toの設定で使われているキーと同じにする必要があります。
db:prepareコマンドを実行することで、development環境のqueueデータベースで必要なテーブルがすべて作成されます。
$ bin/rails db:prepare
TIPS: queueデータベースのデフォルトの生成スキーマはdb/queue_schema.rbに配置されます。これらのスキーマファイルには solid_queue_ready_executionsやsolid_queue_scheduled_executionsなどのテーブルが含まれます。
最後に、キューを開始してジョブの処理を開始するには、次のコマンドを実行します。
bin/jobs start
Solid Queueはすでにproduction環境用に設定済みです。config/environments/production.rbファイルを開くと、以下の内容が設定済みであることがわかります。
# config/environments/production.rb # Active Jobのデフォルトのキューイングバックエンド(インプロセスかつ永続化されない)を置き換える config.active_job.queue_adapter = :solid_queue config.solid_queue.connects_to = { database: { writing: :queue } }
さらに、queueデータベースで利用するデータベースコネクションは、config/database.ymlファイルで設定されます。
# config/database.yml
# production環境のデータベースは、デフォルトでstorage/ ディレクトリに保存される
# このディレクトリは、デフォルトでconfig/deploy.ymlで永続的なDockerボリュームとしてマウントされる
production:
primary:
<<: *default
database: storage/production.sqlite3
queue:
<<: *default
database: storage/production_queue.sqlite3
migrations_paths: db/queue_migrate
データベースを利用可能な状態にするために、db:prepareを実行します。
$ bin/rails db:prepare
Solid Queueの設定オプションはconfig/queue.ymlで定義します。
以下はデフォルト設定の例です。
default: &default
dispatchers:
- polling_interval: 1
batch_size: 500
workers:
- queues: "*"
threads: 3
processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %>
polling_interval: 0.1
Solid Queueの設定オプションを理解するには、さまざまな種類のロール(role: 役割)を理解しておく必要があります。
ディスパッチャ(dispatcher): 今後実行するようにスケジュールされているジョブを選択します。
これらのジョブを実行する時刻になったら、ディスパッチャはそれらのジョブをsolid_queue_scheduled_executionsテーブルからsolid_queue_ready_executionsテーブルに移動し、ワーカーがジョブを取得できるようにします。また、コンカレンシー関連のメンテナンスも管理します。
ワーカー(worker): 実行準備が整ったジョブをsolid_queue_ready_executionsテーブルから取得します。
スケジューラ(scheduler): 定期的なタスクを処理し、期限が来たらジョブをキューに追加します。
スーパーバイザ(supervisor): システム全体を監視して、ワーカーとディスパッチャを管理します。 必要に応じてワーカーやディスパッチャを開始・停止し、健全性を監視し、すべてがスムーズに実行されるようにします。
config/queue.ymlの設定は、すべてがオプションです(つまり必須項目はありません)。設定が指定されていない場合、Solid Queueはデフォルト設定で1つのディスパッチャーと1つのワーカーで実行されます。
以下は、config/queue.ymlで設定できる設定オプションの一部です。
polling_interval: ディスパッチャーやワーカーが次のジョブをチェックするまでの待ち時間を秒で指定します。
ディスパッチャのデフォルト値は1秒、ワーカーのデフォルト値は0.1秒です。
batch_size: 1回のバッチでディスパッチされるジョブの件数です。
デフォルト値は500です。
concurrency_maintenance_interval: ディスパッチャがブロックされたジョブを解除できるかどうかを確認するまでの待ち時間を秒で指定します。
デフォルト値は600秒です。
queues: ワーカーがジョブを取得するキューのリストを指定します。
*を使って、すべてのキューまたはキュー名のプレフィックスを指定できます。
デフォルト値は*です。
threads: 各ワーカーのスレッドプールの最大サイズを指定します。
ワーカーが1回に取得するジョブの件数を決定します。
デフォルト値は3です。
processes: スーパーバイザによってforkされるワーカープロセスの個数を指定します。
各プロセスはCPUコアを専有できます。
デフォルト値は1です。
concurrency_maintenance: ディスパッチャがコンカレンシーメンテナンス作業を行うかどうかを指定します。
デフォルト値はtrueです。
設定オプションについて詳しくは、Solid Queueのドキュメントを参照してください。
また、config/<environment>.rbで設定できる追加の設定オプションを使うことで、RailsアプリケーションでSolid Queueをさらに詳細に設定できます。
設定のqueuesオプションには、後述する設定のオプションに沿って、ワーカーがジョブを選択するキューのリストを記述します。キューのリストでは、キューの順序が重要です。ワーカーはリストの最初のキューからジョブを選択し、最初のキューにジョブがなくなると、2番目のキューに移動し、以下同様に繰り返します。
# config/queue.yml
production:
workers:
- queues:[active_storage*, mailers]
threads: 3
polling_interval: 5
上の例では、ワーカーは最初に「active_storage」で始まるキュー(active_storage_analyseキューやactive_storage_transformキューなど)からジョブを取得します。「active_storage」で始まるキューにジョブが残っていない場合にのみ、ワーカーはmailersキューに移動します。
ワイルドカード*の利用は、「単独」または「キュー名の末尾に配置することで同じプレフィックスを持つすべてのキューに一致させる」形(active_storage*など)のみが可能です。*_some_queueなどのようなキュー名の冒頭への追加はできません。
queues: active_storage*のようにキュー名でワイルドカードを利用すると、SQLiteやPostgreSQLのポーリングのパフォーマンスが低下する可能性があります。一致するすべてのキューを識別するためにDISTINCTクエリが必要になるため、これらのRDBMSの大規模なテーブルではワイルドカードクエリが遅くなる可能性があります。パフォーマンスを落とさないためには、ワイルドカードを使わずに正確なキュー名を指定することが推奨されます。詳しくは、Solid Queueドキュメントのキューの仕様とパフォーマンスを参照してください。
Active Jobは、ジョブをエンキューするときに正の整数の優先度をサポートします(後述の優先度セクションを参照)。 1件のキュー内では、優先度に基づいてジョブが選択されます(整数が小さいほど優先度が高くなります)。
ただしキューが複数ある場合は、キュー自体の順序が優先されます。たとえば、productionとbackgroundという2つのキューがこの順序で設定されている場合、場合、backgroundキュー内の一部のジョブの優先度の方が高い場合でも、productionキュー内のジョブが常に最初に処理されます。
Solid Queueの並列処理は、スレッド(threadsパラメータで設定可能)、プロセス(processesパラメータで設定可能)、または水平スケーリングによって実現されます。
スーパーバイザーはプロセスを管理し、以下のシグナルに応答します。
TERM、INT: 正常な終了処理を開始し、TERMシグナルを送信してSolidQueue.shutdown_timeoutに達するまで待機します。
終了しない場合は、QUITシグナルでプロセスを強制終了します。
QUIT: プロセスを強制的に即時終了します。
ワーカーがKILLシグナルなどによって予期せず強制終了された場合、実行中のジョブは失敗としてマークされ、SolidQueue::Processes::ProcessExitErrorやSolidQueue::Processes::ProcessPrunedErrorなどのエラーが発生します。
ハートビート設定は、期限切れのプロセスを管理および検出するのに有用です。
スレッド、プロセス、シグナルについて詳しくはSolid Queueのドキュメントを参照してください。
Solid Queueは、ジョブのエンキュー中にActive Recordエラーが発生すると、SolidQueue::Job::EnqueueErrorを発生させます。
このエラーは、Active Jobによって発生するActiveJob::EnqueueError(エラーを処理してperform_laterがfalseを返すようにする)とは異なることにご注意ください。このため、RailsやTurbo::Streams::BroadcastJobなどのサードパーティgemによってエンキューされたジョブのエラー処理が複雑になります。
定期的なタスクの場合、エンキュー中に発生したエラーはすべてログに出力されますが、エラーをraiseしません。エンキュー時のエラーについて詳しくはSolid Queueのドキュメントを参照してください。
Solid Queueは、Active Jobをコンカレンシー制御で拡張し、特定の種類のジョブや特定の引数を持つジョブの同時実行数を制限できるようにします。ジョブがこの制限を超えると、別のジョブが終了するか期間が終了するまでブロックされます。
class MyJob < ApplicationJob limits_concurrency to: 2, key: ->(contact) { contact.account }, duration: 5.minutes def perform(contact) # ジョブのロジックを実行する end end
上の例では、同じアカウントの2つのMyJobインスタンスのみが同時に実行されます。その後、ジョブの1つが完了するまで、他のジョブはブロックされます。
以下のようにgroupパラメータを指定すると、異なるジョブタイプ間での同時実行を制御できます。たとえば、同じグループに属する2つの異なるジョブクラスは、まとめて同時実行が制限されます。
class Box::MovePostingsByContactToDesignatedBoxJob < ApplicationJob limits_concurrency key: ->(contact) { contact }, duration: 15.minutes, group: "ContactActions" end class Bundle::RebundlePostingsJob < ApplicationJob limits_concurrency key: ->(bundle) { bundle.contact }, duration: 15.minutes, group: "ContactActions" end
これにより、特定の連絡先(contact)に対して一度に実行できるジョブは、ジョブクラスにかかわらず1件だけになります。
コンカレンシー制御について詳しくはSolid Queueのドキュメントを参照してください。
利用しているエラートラッキングサービスがジョブエラーを自動的に報告しない場合は、Active Jobに手動でフックする形で報告できます。たとえば、ApplicationJobで以下のようにrescue_fromブロックを追加できます。
class ApplicationJob < ActiveJob::Base rescue_from(Exception) do |exception| Rails.error.report(exception) raise exception end end
Action Mailerを利用している場合は、以下のようにMailDeliveryJobのエラーを個別に処理する必要があります。
class ApplicationMailer < ActionMailer::Base ActionMailer::MailDeliveryJob.rescue_from(Exception) do |exception| Rails.error.report(exception) raise exception end end
⚠️ ジョブをアプリケーションデータと同じACID準拠のデータベースに保存することで、強力かつ効果的なツールが実現します。トランザクション整合性を利用して、ジョブがコミットされない限りアプリ内の一部のアクションはコミットされず、その逆も同様です。また、ジョブをキューに登録するトランザクションがコミットされるまで、ジョブはキューに登録されません。 これは非常に強力で便利な機能ですが、ロジックの一部でこの振る舞いをベースにしている場合、今後別のActive Jobバックエンドに移行したり、Solid Queueを独自のデータベースに移行したりすると、突然振る舞いが変わるなど逆効果になる可能性があります。
これは非常に厄介な問題になる可能性がありますが、多くの人が気にする必要はないため、デフォルトではSolid Queueはメインアプリとは別のデータベースに設定されています。
ただし、Solid Queueをアプリと同一のデータベースで利用する場合は、Active Jobのenqueue_after_transaction_commitオプションを使うことで、トランザクション整合性に誤って依存することのないようにできます。このオプションは、ジョブごとに有効にすることも、以下のようにApplicationJobですべてのジョブに対して有効にすることも可能です。
class ApplicationJob < ActiveJob::Base self.enqueue_after_transaction_commit = true end
また、Solid Queueジョブ用のデータベースコネクションを別途設定することで、トランザクション整合性への依存を回避しながら、アプリと同一のデータベースを利用するようにSolid Queueを構成することも可能です。
トランザクションの整合性について詳しくはSolid Queueのドキュメントを参照してください。
Solid Queueは、cronジョブに似た定期的なタスクをサポートしています。定期タスクは設定ファイル(デフォルトではconfig/recurring.yml)で定義され、特定の時間にスケジューリングできます。タスク設定の例を以下に示します。
production:
a_periodic_job:
class: MyJob
args: [42, { status: "custom_status" }]
schedule: every second
a_cleanup_task:
command: "DeletedStuff.clear_all"
schedule: every day at 9am
各タスクには、class(またはcommand)とscheduleを指定します(スケジュール指定文字列の解析にはFugit gemが使われます)。
上の設定例のMyJobのように、argsオプションでジョブに引数を渡すことも可能です。argsオプションには「単一の引数」「ハッシュ」「引数の配列」のいずれかを渡すことが可能で、配列の場合は最後の要素にキーワード引数も含められます。
このようにして、指定の時間にジョブを定期実行できます。
定期的なタスクについて詳しくはSolid Queueのドキュメントを参照してください。
失敗したジョブの監視や管理を一元化するには、mission_control-jobsなどのツールが有用です、ジョブのステータス、ジョブ失敗の理由、ジョブ再試行の動作に関する洞察を提供し、問題をより効果的にトラッキングして解決を支援します。
mission_control-jobsツールを使うと、たとえば大きなファイルを処理するジョブがタイムアウトで失敗したときに、失敗を検査し、ジョブの引数や実行履歴を確認して、「再試行」「再キューイング」「破棄」のいずれかを決定するのに役立ちます。
Active Jobのqueue_asを使って、特定のキューに入っているジョブをスケジューリングできます。
class GuestsCleanupJob < ApplicationJob queue_as :low_priority # ... end
application.rbで以下のようにconfig.active_job.queue_name_prefixを使うことで、すべてのジョブでキュー名の前に特定の文字列を追加できます。
# config/application.rb module YourApp class Application < Rails::Application config.active_job.queue_name_prefix = Rails.env end end
# app/jobs/guests_cleanup_job.rb class GuestsCleanupJob < ApplicationJob queue_as :low_priority # ... end # 以上で、production環境ではproduction_low_priorityというキューでジョブが # 実行されるようになり、staging環境ではstaging_low_priorityというキューで # ジョブが実行されるようになります
以下のようにジョブごとにプレフィックスを設定することも可能です。
class GuestsCleanupJob < ApplicationJob queue_as :low_priority self.queue_name_prefix = nil # ... end # これで`config.active_job.queue_name_prefix`の設定が上書きされ、 # 自分のジョブキューにプレフィックスが設定されなくなります
キュー名のプレフィックスのデフォルト区切り文字はアンダースコア_です。
この区切り文字は、config.active_job.queue_name_delimiter設定で変更できます。
# config/application.rb module YourApp class Application < Rails::Application config.active_job.queue_name_prefix = Rails.env config.active_job.queue_name_delimiter = "." end end
# app/jobs/guests_cleanup_job.rb class GuestsCleanupJob < ApplicationJob queue_as :low_priority # ... end # 以上で、production環境ではproduction.low_priorityというキューでジョブが # 実行されるようになり、staging環境ではstaging.low_priorityというキューでジョブが実行されるようになります
#queue_asにブロックを渡すと、キューをそのジョブレベルで制御できます。渡されたブロックは、そのジョブのコンテキストで実行されるため、キュー名を返す必要があります(これによりself.argumentsにアクセス可能になります)。
class ProcessVideoJob < ApplicationJob queue_as do video = self.arguments.first if video.owner.premium? :premium_videojobs else :videojobs end end def perform(video) # 動画を処理する end end
ProcessVideoJob.perform_later(Video.last)
ジョブを実行するキューをさらに細かく制御したい場合は、#setに:queueオプションを渡せます。
MyJob.set(queue: :another_queue).perform_later(record)
Solid Queue以外の一部のバックエンドでは、リッスンするキューを指定する必要が生じることもあります。
優先度(priority)を指定してジョブをスケジューリングするには、queue_with_priorityメソッドを使います。
class GuestsCleanupJob < ApplicationJob queue_with_priority 10 # ... end
デフォルトのキューイングバックエンドであるSolid Queueは、キューの順序に基づいてジョブの優先順位を決定します。詳しくは前述のキューの順序セクションを参照してください。
Solid Queueでキューの順序と優先度オプションを両方使っている場合、キューの順序が優先され、優先度オプションは個別のキュー内でのみ適用されます。
Solid Queue以外のキューイングバックエンドでは、ジョブを同じキュー内や複数のキュー間の他のジョブと比較する形で優先度を指定できる場合があります。詳しくは、利用するバックエンドのドキュメントを参照してください。
以下のようにqueue_with_priorityにブロックを渡すことで、queue_asの場合と同様にブロックをジョブコンテキストで評価することも可能です。
class ProcessVideoJob < ApplicationJob queue_with_priority do video = self.arguments.first if video.owner.premium? 0 else 10 end end def perform(video) # 動画を処理する end end
ProcessVideoJob.perform_later(Video.last)
以下のようにsetに:priorityオプションを渡すことも可能です。
MyJob.set(priority: 50).perform_later(record)
優先度の低い番号が、優先度の高い番号より先に実行されるか後に実行されるかは、アダプタの実装によって異なります。詳しくは利用しているバックエンドのドキュメントを参照してください。アダプタの作成者は、小さい番号ほど重要度が高いものとして扱うことが推奨されます。
Active Jobのジョブ継続機能を使うと、ジョブを中断可能なステップに分割できます。これは、キューのシャットダウン中など、ジョブが中断される可能性がある場合に便利です。継続機能を使うと、ジョブは最後に完了したステップから再開できるため、最初からやり直す必要がなくなります。
ジョブの継続機能を使うには、ActiveJob::Continuableモジュールをincludeします。次に、performメソッド内でstepメソッドを使って各ステップを定義できます。各ステップはブロックで宣言することも、メソッド名を参照して宣言することも可能です。
class ProcessImportJob < ApplicationJob include ActiveJob::Continuable def perform(import_id) # 常にジョブ開始時に実行される(中断されたステップから再開する場合でも実行される) @import = Import.find(import_id) # ステップをブロックで定義する場合 step :initialize do @import.initialize end # カーソルを持つステップ: 進行状況を保存して、ジョブが中断されたら再開する step :process do |step| @import.records.find_each(start: step.cursor) do |record| record.process step.advance! from: record.id end end # メソッドを参照してステップを定義する場合 step :finalize end private def finalize @import.finalize end end
個別のステップはシーケンシャルに実行されます。ジョブがステップ間で中断された場合や、カーソルを使っているステップ内で中断された場合は、最後に記録された位置からジョブを再開します。これにより、長時間実行されるジョブや複数フェーズで構成されるジョブを、安全に一時停止・再開できるようになります。詳しくは、APIドキュメントのActiveJob::Continuationを参照してください。
Active Jobが提供するフックを用いて、ジョブのライフサイクル中にロジックをトリガーできます。これらのコールバックは、Railsの他のコールバックと同様に通常のメソッドとして実装し、マクロ風のクラスメソッドでコールバックとして登録できます。
class GuestsCleanupJob < ApplicationJob queue_as :default around_perform :around_cleanup def perform # 後で行なう end private def around_cleanup # performの直前に何か実行 yield # performの直後に何か実行 end end
このマクロスタイルのクラスメソッドは、ブロックを1つ受け取ることもできます。ブロック内のコード量が1行以内に収まるほど少ない場合は、この書き方をご検討ください。 たとえば、登録されたジョブごとの測定値を送信する場合は次のようにします。
class ApplicationJob < ActiveJob::Base before_enqueue { |job| $statsd.increment "#{job.class.name.underscore}.enqueue" } end
before_enqueuearound_enqueueafter_enqueuebefore_performaround_performafter_performafter_discardperform_all_laterでジョブをキューに一括登録すると、個別のジョブではaround_enqueueなどのコールバックがトリガーされなくなる点にご注意ください。
詳しくは一括登録のコールバックを参照してください。
perform_all_laterを使うことで、複数のジョブをキューに一括登録(bulk enqueue: バルクエンキュー)できます。一括登録により、Redisやデータベースなどのキューデータストアとのジョブの往復回数を減らせるので、同じジョブを個別に登録するよりもパフォーマンスが向上します。
perform_all_laterはActive JobのトップレベルAPIで、インスタンス化されたジョブを引数として受け取ります(この点がperform_laterと異なることにご注意ください)。perform_all_laterは内部でperformを呼び出します。newに渡された引数は、最終的にperformが呼び出されるときにperformに渡されます。
以下は、GuestsCleanupJobインスタンスを用いてperform_all_laterを呼び出すコード例です。
# `perform_all_later`に渡すジョブを作成する # この`new`に渡した引数は`perform`に渡される cleanup_jobs = Guest.all.map { |guest| GuestsCleanupJob.new(guest) } # `GuestsCleanupJob`の個別のインスタンスごとにジョブをキューに登録する ActiveJob.perform_all_later(cleanup_jobs) # `set`メソッドでオプションを設定してからジョブを一括登録してもよい cleanup_jobs = Guest.all.map { |guest| GuestsCleanupJob.new(guest).set(wait: 1.day) } ActiveJob.perform_all_later(cleanup_jobs)
perform_all_laterは、正常にエンキューされたジョブの個数をログ出力します。
たとえば、上のGuest.all.mapの結果cleanup_jobsが3個になった場合、Enqueued 3 jobs to Async (3 GuestsCleanupJob)とログ出力されます(エンキューがすべて成功した場合)。
perform_all_laterの戻り値はnilです。これは、perform_laterの戻り値が、エンキューされたジョブクラスのインスタンスであるのと異なる点にご注意ください。
perform_all_laterを使えば、同じ呼び出しでさまざまなActive Jobクラスのインスタンスを以下のようにエンキューすることも可能です。
class ExportDataJob < ApplicationJob def perform(*args) # データをエクスポートする end end class NotifyGuestsJob < ApplicationJob def perform(*guests) # ゲストにメールを送信する end end # ジョブインスタンスをインスタンス化する cleanup_job = GuestsCleanupJob.new(guest) export_job = ExportDataJob.new(data) notify_job = NotifyGuestsJob.new(guest) # さまざまなクラスのジョブインスタンスをまとめてキューに登録する ActiveJob.perform_all_later(cleanup_job, export_job, notify_job)
perform_all_laterでジョブをキューに一括登録すると、個別のジョブではaround_enqueueなどのコールバックがトリガーされません。この振る舞いは、Active Recordの他の一括処理系メソッドと一貫しています。コールバックは個別のジョブに対して個々に実行されるので、perform_all_laterメソッドが持つ一括処理の性質の恩恵を受けられません。
ただし、perform_all_laterメソッドは、ActiveSupport::Notificationsでサブスクライブできるenqueue_all.active_jobイベントをトリガーします。
ジョブのエンキューが成功したかどうかを知るには、successfully_enqueued?メソッドを利用できます。
perform_all_laterによるキューへの一括登録(バルクエンキュー)は、キューバックエンド側でのサポートが必要ですす。デフォルトのキューバックエンドであるSolid Queueは、enqueue_allで一括登録をサポートします。
Sidekiqなどの他のバックエンドにはpush_bulkメソッドがあり、大量のジョブをRedisにプッシュして、ラウンドトリップネットワークの遅延を防ぐようになっています。GoodJobもGoodJob::Bulk.enqueueメソッドで一括登録をサポートします。
キューへの一括登録がキューバックエンドでサポートされていない場合、perform_all_laterはジョブを1件ずつキューに登録します。
最近のWebアプリケーションでよく実行されるジョブといえば、リクエスト・レスポンスサイクルの外でメールを送信することで、アプリケーションのユーザーが待たされないようにすることでしょう。 Active JobはAction Mailerと統合されているので、非同期メール送信を手軽に行えます。
# すぐにメール送信するなら#deliver_now UserMailer.welcome(@user).deliver_now # Active Jobで後でメール送信するなら#deliver_later UserMailer.welcome(@user).deliver_later
一般に、非同期キュー(.deliver_laterでメールを送信するなど)はRakeタスクに書いても動きません。Rakeが終了すると、.deliver_laterがメールの処理を完了する前にインプロセスのスレッドプールを削除する可能性があるためです。この問題を回避するには、.deliver_nowを用いるか、development環境で永続的キューを実行してください。
各ジョブでは、ジョブ作成時に設定されたI18n.localeを使います。これはメールを非同期的に送信する場合に便利です。
I18n.locale = :eo UserMailer.welcome(@user).deliver_later # メールがエスペラント語にローカライズされる
Active Jobの引数では、デフォルトで以下の型をサポートします。
NilClass、String、Integer、Float、BigDecimal、TrueClass、FalseClass)SymbolDateTimeDateTimeActiveSupport::TimeWithZoneActiveSupport::DurationHash(キーの型はStringかSymbolにすべき)ActiveSupport::HashWithIndifferentAccessArrayRangeModuleClassActive JobではGlobalIDがパラメータとしてサポートされています。 GlobalIDを使えば、動作中のActive Recordオブジェクトをジョブに渡す際にクラスとidを指定する必要がなくなります。クラスとidを指定する従来の方法では、後で明示的にデシリアライズ(deserialize)する必要がありました。
従来のジョブが以下のようなものだったとします。
class TrashableCleanupJob < ApplicationJob def perform(trashable_class, trashable_id, depth) trashable = trashable_class.constantize.find(trashable_id) trashable.cleanup(depth) end end
現在は、上を以下のように簡潔に書けます。
class TrashableCleanupJob < ApplicationJob def perform(trashable, depth) trashable.cleanup(depth) end end
このコードは、GlobalID::Identificationをミックスインするすべてのクラスで動作します。このモジュールはActive Recordクラスにデフォルトでミックスインされます。
サポートされる引数の型は、以下のような独自のシリアライザを定義するだけで拡張できます。
# app/serializers/money_serializer.rb class MoneySerializer < ActiveJob::Serializers::ObjectSerializer # あるオブジェクトを、サポートされているオブジェクト型を使用して、よりシンプルな表現形式に変換する。 # 推奨される表現形式は、特定のキーを持つハッシュ。キーには基本型のみが利用可能。 # カスタムシリアライザ型をこのハッシュに追加するには、`super`を呼ぶ必要がある。 def serialize(money) super( "amount" => money.amount, "currency" => money.currency ) end # シリアライズされた値を正しいオブジェクトに逆変換する def deserialize(hash) Money.new(hash["amount"], hash["currency"]) end private # ある引数がこのシリアライザでシリアライズされるべきかどうかをチェックする def klass Money end end
続いてこのシリアライザをリストに追加します。
# config/initializers/custom_serializers.rb Rails.application.config.active_job.custom_serializers << MoneySerializer
初期化中は、再読み込み可能なコードの自動読み込みがサポートされていない点にご注意ください。
そのため、たとえば以下のようにconfig/application.rbを修正するなどして、シリアライザが1度だけ読み込まれるように設定することをおすすめします。
# config/application.rb module YourApp class Application < Rails::Application config.autoload_once_paths << "#{root}/app/serializers" end end
Active Jobでは、ジョブ実行時に発生する例外をrescue_fromでキャッチする方法が提供されています。
class GuestsCleanupJob < ApplicationJob queue_as :default rescue_from(ActiveRecord::RecordNotFound) do |exception| # ここに例外処理を書く end def perform # 後で実行する処理を書く end end
ジョブで発生した例外が回復されなかった場合、このジョブは「失敗(failed)」と呼ばれます。
失敗したジョブは、それ用の設定を行わない限り自動ではリトライされません。
実行中に例外が発生したジョブは、以下のようにretry_onでリトライすることも、discard_onで廃棄することもできます。
class RemoteServiceJob < ApplicationJob retry_on CustomAppException # デフォルトは「3秒ずつ待って5回リトライする」 discard_on ActiveJob::DeserializationError def perform(*args) # CustomAppExceptionかActiveJob::DeserializationErrorをraiseする可能性があるとする end end
GlobalIDを使うと、#performに渡された完全なActive Recordオブジェクトをシリアライズできるようになります。
ジョブがキューに登録された後で、#performメソッドが呼び出される前に、渡されたレコードが削除された場合は、Active JobはActiveJob::DeserializationError例外をraiseします。
ジョブのテスト方法について詳しくは、テスティングガイドをご覧ください。
ジョブがどこから来ているのかを把握したい場合は、詳細なログを有効にできます。
Active Jobでは、複数のキューイングバックエンド(Sidekiq、Resque、Delayed Job など)用の組み込みアダプタも利用できます。
アダプタの最新リストについてはActiveJob::QueueAdaptersのAPIドキュメントを参照してください。
キューイングバックエンドは、config.active_job.queue_adapterで以下のように設定できます。
# config/application.rb module YourApp class Application < Rails::Application # 必ずアダプタのgemをGemfileに追加し、 # アダプタ固有のインストールおよびデプロイメント手順を実行すること config.active_job.queue_adapter = :sidekiq end end
以下のように、バックエンドをジョブごとに設定することも可能です。
class GuestsCleanupJob < ApplicationJob self.queue_adapter = :resque # ... end # これで、このジョブは`resque`をバックエンドキューアダプタとして使い # デフォルトのSolid Queueアダプタをオーバーライドする
ジョブはRailsアプリケーションと並行して実行されるため、ほとんどのキューイングライブラリでは、ジョブ処理を機能させるために、Railsアプリの起動とは別に、ライブラリ固有のキューイングサービスも起動しておく必要があります。
キューバックエンドの起動手順については、利用するライブラリのドキュメントを参照してください。主なドキュメントの一覧を以下に示します(すべてを網羅しているわけではありません)。
Railsガイドは GitHub の yasslab/railsguides.jp で管理・公開されております。本ガイドを読んで気になる文章や間違ったコードを見かけたら、気軽に Pull Request を出して頂けると嬉しいです。Pull Request の送り方については GitHub の README をご参照ください。
原著における間違いを見つけたら『Rails のドキュメントに貢献する』を参考にしながらぜひ Rails コミュニティに貢献してみてください 🛠💨✨
本ガイドの品質向上に向けて、皆さまのご協力が得られれば嬉しいです。
Railsガイド運営チーム (@RailsGuidesJP)
Railsガイドは下記の協賛企業から継続的な支援を受けています。支援・協賛にご興味あれば協賛プランからお問い合わせいただけると嬉しいです。