Google Cloud Workflowsとは?サーバーレスで実現する業務自動化の完全ガイド
Google Cloud Workflows サーバーレス オーケストレーション DX

Google Cloud Workflowsとは?サーバーレスで実現する業務自動化の完全ガイド

ArcHack
8分で読めます

はじめに

現代のクラウドアプリケーション開発において、複数のサービスやAPIを連携させて業務を自動化することは不可欠です。しかし、複雑な処理フローを管理し、エラーハンドリングやリトライ処理を実装するのは容易ではありません。

Google Cloud Workflowsは、こうした課題を解決するフルマネージドのサーバーレスオーケストレーションサービスです。本記事では、Google Cloud Workflowsの基本から実践的な活用方法まで、徹底的に解説します。

Google Cloud Workflowsとは

概要

Google Cloud Workflows(以下、Workflows)は、Google Cloudが提供するサーバーレスなワークフローオーケストレーションサービスです。複数のCloud FunctionsやCloud Run、BigQueryなどのGoogle Cloudサービス、さらには外部APIを定義した順序で実行できます。

主な特徴

Workflowsには以下のような特徴があります:

  1. フルマネージド: インフラの管理が不要で、運用負荷を大幅に削減
  2. サーバーレス: 自動スケーリングにより、トラフィックに応じて柔軟に対応
  3. 低コスト: 従量課金制で、無料枠も充実(内部ステップ5,000回/月、外部ステップ2,000回/月)
  4. シンプルな定義: YAMLまたはJSON形式で直感的にワークフローを記述
  5. 豊富な機能: 条件分岐、繰り返し、並列実行、エラーハンドリングをサポート

なぜWorkflowsが必要なのか

従来の課題

従来、複数のサービスを連携させる際には以下のような課題がありました:

  • 複雑なコード: 各サービスの呼び出しロジックをアプリケーションコードに埋め込む必要がある
  • エラーハンドリング: リトライ処理や例外処理を個別に実装する必要がある
  • スケーラビリティ: トラフィック増加時のスケーリング対応が必要
  • 可視性の欠如: 処理フローの全体像が把握しづらい

Workflowsによる解決

Workflowsを使用することで、これらの課題を以下のように解決できます:

  • 宣言的な定義: YAMLで処理フローを明示的に定義
  • 組み込みのエラーハンドリング: リトライポリシーを簡単に設定
  • 自動スケーリング: サーバーレスアーキテクチャによる自動対応
  • 実行履歴: すべての実行ログを自動的に記録し、可視化

主なユースケース

Workflowsは、さまざまなシーンで活用できます。

1. データパイプライン

最も一般的なユースケースの一つが、データパイプラインの構築です。

具体例:

  • 業務システムからデータを抽出(Extract)
  • Cloud Storageにデータを一時保存
  • BigQueryにデータをロード(Load)
  • SQLクエリでデータを変換(Transform)
  • 変換結果を別のテーブルに保存

このようなETL/ELTプロセスを、Workflowsで一連のフローとして定義できます。

2. バッチ処理

定期的に実行する必要があるバッチジョブの自動化にも最適です。

具体例:

  • 日次でのレポート生成
  • 定期的なデータバックアップ
  • 機械学習モデルの再トレーニング
  • ログファイルのアーカイブ処理

Cloud Schedulerと組み合わせることで、cron形式で定期実行を設定できます。

3. イベントドリブン処理

特定のイベントをトリガーとした自動処理も実現できます。

具体例:

  • Cloud Storageへのファイルアップロード時の自動処理
  • Pub/Subメッセージ受信時の処理実行
  • HTTPリクエスト受信時のワークフロー起動

Eventarcと連携することで、さまざまなイベントソースからワークフローを起動できます。

4. マイクロサービスオーケストレーション

複数のマイクロサービスを連携させる際のオーケストレーターとしても機能します。

具体例:

  • 注文処理フロー(在庫確認→決済→配送手配→通知)
  • ユーザー登録フロー(検証→DB登録→メール送信→ウェルカムメッセージ)
  • 承認ワークフロー(申請→承認待ち→承認/却下→通知)

Workflowsの基本構成

ワークフロー定義

Workflowsは、YAMLまたはJSON形式で定義します。以下は基本的な例です:

main:
  steps:
    - step1:
        call: http.get
        args:
          url: https://api.example.com/data
        result: apiResponse
    - step2:
        call: sys.log
        args:
          text: ${"Received: " + apiResponse.body}

主要な構成要素

1. ステップ(Steps)

ワークフローの個別の処理単位です。各ステップでは以下のような操作が可能です:

  • 変数の定義と割り当て
  • サービスの呼び出し(HTTP API、Cloud Functions、BigQuery等)
  • 条件分岐
  • 繰り返し処理

2. 条件分岐(Switch)

switch文を使用して、条件に応じた処理を実行できます:

- checkStatus:
    switch:
      - condition: ${status == "success"}
        next: handleSuccess
      - condition: ${status == "error"}
        next: handleError

3. 繰り返し(For Loop)

forループを使用して、配列要素の反復処理が可能です:

- processItems:
    for:
      value: item
      in: ${items}
      steps:
        - processItem:
            call: processFunction
            args:
              data: ${item}

4. 並列実行(Parallel)

複数のステップを並列に実行することで、処理時間を短縮できます:

- parallelProcessing:
    parallel:
      branches:
        - branch1:
            steps:
              - callService1:
                  call: http.get
                  args:
                    url: https://api1.example.com
        - branch2:
            steps:
              - callService2:
                  call: http.get
                  args:
                    url: https://api2.example.com

実践例: データパイプラインの構築

ここでは、実際のデータパイプラインを構築する例を紹介します。

シナリオ

以下のような処理フローを実装します:

  1. Cloud Storageから新しいCSVファイルを検出
  2. ファイルをBigQueryにロード
  3. SQLクエリでデータを変換
  4. 変換結果を別のテーブルに保存
  5. 完了通知をSlackに送信

ワークフロー定義

main:
  params: [event]
  steps:
    - init:
        assign:
          - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          - datasetId: "my_dataset"
          - sourceTable: "raw_data"
          - targetTable: "processed_data"
          - bucketName: ${event.bucket}
          - fileName: ${event.name}
    
    - loadToBigQuery:
        call: googleapis.bigquery.v2.jobs.insert
        args:
          projectId: ${projectId}
          body:
            configuration:
              load:
                sourceUris: [${"gs://" + bucketName + "/" + fileName}]
                destinationTable:
                  projectId: ${projectId}
                  datasetId: ${datasetId}
                  tableId: ${sourceTable}
                autodetect: true
                writeDisposition: "WRITE_APPEND"
        result: loadJob
    
    - waitForLoad:
        call: googleapis.bigquery.v2.jobs.get
        args:
          projectId: ${projectId}
          jobId: ${loadJob.jobReference.jobId}
        result: loadStatus
    
    - checkLoadStatus:
        switch:
          - condition: ${loadStatus.status.state != "DONE"}
            next: waitForLoad
    
    - transformData:
        call: googleapis.bigquery.v2.jobs.query
        args:
          projectId: ${projectId}
          body:
            query: |
              INSERT INTO `${projectId}.${datasetId}.${targetTable}`
              SELECT 
                id,
                name,
                TIMESTAMP_TRUNC(created_at, DAY) as date,
                amount
              FROM `${projectId}.${datasetId}.${sourceTable}`
              WHERE DATE(created_at) = CURRENT_DATE()
            useLegacySql: false
        result: transformJob
    
    - notifySlack:
        call: http.post
        args:
          url: https://hooks.slack.com/services/YOUR/WEBHOOK/URL
          body:
            text: ${"Data pipeline completed: " + fileName}

ポイント解説

  1. イベントパラメータ: Cloud Storageのイベントからファイル情報を取得
  2. BigQueryコネクタ: googleapis.bigquery.v2を使用してBigQueryを操作
  3. ジョブの待機: ロードジョブの完了を待機してから次のステップへ
  4. SQLクエリ: データ変換ロジックをSQLで記述
  5. 外部通知: Slack WebhookでHTTP POSTリクエストを送信

エラーハンドリングとリトライ

Workflowsには、強力なエラーハンドリング機能が組み込まれています。

Try-Catch構文

- tryStep:
    try:
      steps:
        - callAPI:
            call: http.get
            args:
              url: https://api.example.com/data
    except:
      as: e
      steps:
        - logError:
            call: sys.log
            args:
              severity: ERROR
              text: ${"Error occurred: " + e.message}
        - handleError:
            call: sendErrorNotification
            args:
              error: ${e}

リトライポリシー

HTTPリクエストには、自動的にリトライポリシーを設定できます:

- callAPIWithRetry:
    call: http.get
    args:
      url: https://api.example.com/data
      timeout: 30
    retry:
      predicate: ${http.default_retry_predicate}
      max_retries: 5
      backoff:
        initial_delay: 1
        max_delay: 60
        multiplier: 2

料金体系

Workflowsは、実行されたステップ数に応じた従量課金制です。

料金の詳細

  • 内部ステップ: 1,000ステップあたり$0.01
  • 外部ステップ: 1,000ステップあたり$0.025

無料枠

  • 内部ステップ: 5,000回/月
  • 外部ステップ: 2,000回/月

内部ステップと外部ステップの違い

  • 内部ステップ: *.googleapis.comへのリクエスト、Cloud FunctionsやCloud Runの呼び出し、変数操作など
  • 外部ステップ: Google Cloud外部へのHTTPリクエスト、カスタムドメインへのリクエスト、コールバック待機

多くの一般的なユースケースでは、月額数ドル以下で運用可能です。

実行方法

Workflowsには、複数の実行方法があります。

1. Cloud Schedulerによる定期実行

cron形式で定期実行を設定できます:

gcloud scheduler jobs create http my-workflow-job \
  --schedule="0 2 * * *" \
  --uri="https://workflowexecutions.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/workflows/WORKFLOW_NAME/executions" \
  --message-body='{"argument":"value"}' \
  --oauth-service-account-email=SERVICE_ACCOUNT_EMAIL

2. Eventarcによるイベントドリブン実行

Cloud Storageへのファイルアップロードなど、イベントをトリガーに実行:

gcloud eventarc triggers create storage-trigger \
  --destination-workflow=WORKFLOW_NAME \
  --destination-workflow-location=LOCATION \
  --event-filters="type=google.cloud.storage.object.v1.finalized" \
  --event-filters="bucket=BUCKET_NAME"

3. API経由のオンデマンド実行

REST APIまたはgcloudコマンドで手動実行:

gcloud workflows execute WORKFLOW_NAME \
  --data='{"key":"value"}'

ベストプラクティス

Workflowsを効果的に活用するためのベストプラクティスを紹介します。

1. サブワークフローの活用

再利用可能なロジックは、サブワークフローとして分離しましょう:

main:
  steps:
    - callSubworkflow:
        call: validateInput
        args:
          data: ${inputData}
        result: validationResult

validateInput:
  params: [data]
  steps:
    - validate:
        switch:
          - condition: ${data == null}
            raise: "Input data is null"
    - return:
        return: true

2. 適切なエラーハンドリング

すべての外部呼び出しには、エラーハンドリングを実装しましょう。

3. ログの活用

sys.logを使用して、デバッグ情報を記録しましょう:

- logStep:
    call: sys.log
    args:
      severity: INFO
      json:
        message: "Processing item"
        itemId: ${item.id}

4. タイムアウトの設定

長時間実行される可能性のあるステップには、タイムアウトを設定しましょう:

- callWithTimeout:
    call: http.get
    args:
      url: https://api.example.com/data
      timeout: 300  # 5分

5. 並列実行の活用

独立した処理は並列実行することで、全体の実行時間を短縮できます。

まとめ

Google Cloud Workflowsは、サーバーレスで低コストなワークフローオーケストレーションサービスです。以下のような特徴があります:

  • フルマネージド: インフラ管理不要
  • シンプルな定義: YAMLで直感的に記述
  • 豊富な機能: 条件分岐、繰り返し、並列実行、エラーハンドリング
  • 低コスト: 無料枠が充実し、従量課金制
  • 高い拡張性: 自動スケーリングで大規模処理にも対応

データパイプライン、バッチ処理、イベントドリブン処理など、さまざまなユースケースで活用できます。ぜひ、あなたのプロジェクトでもWorkflowsを活用して、業務自動化を実現してください。

参考リンク


ArcHackでは、Google Cloudを活用したシステム開発やDX推進を支援しています。Workflowsを使った業務自動化やデータパイプラインの構築にご興味がある方は、お気軽にお問い合わせください。