Function Compute は、サーバーの動作条件などを管理しなくても、ユーザーがコードを書いてアップロードできるようにするイベント駆動型サービスです。

ここでは、Function Compute を使用して、Table Store データテーブル内の増分データに対してリアルタイム計算を実行する方法を紹介します。

Function Compute は、ユーザーコードを実行するために適切な量のコンピューティングリソースを準備し、自動スケーリングします。 ユーザーは自分のコードを実行するために必要なリソースに対してのみ支払います。 詳細については、 「」「Function Compute とは」「」をご参照ください。

Table Store Stream は、Table Store データテーブルの増分データを取得するために使用されるデータチャネルです。 Table Store トリガーを作成することで、Table Store Stream と Function Compute を自動的にドッキングすることができます。 これにより、コンピューティング機能内のカスタムプログラムロジックは、Table Store データテーブル内の変更されたデータを自動的に処理することができます。 詳細については、 「Table Store Stream」をご参照ください。

ここでは、Function Compute を使用して、Table Store データテーブル内の増分データに対してリアルタイム計算を実行する方法を紹介します。

Table Store トリガーの設定

コンソールを使用して、Table Store データテーブルからのリアルタイムデータストリームを処理する Table Store トリガーを作成できます。

  1. ストリーミングを有効にしてデータテーブルを作成します。
    1. Table Store コンソールでインスタンスを作成します。

    2. このインスタンスの下にデータテーブルを作成し、ストリーミングを有効にします。

  2. Function Compute 関数を作成します。
    1. Function Compute コンソールでサービスを作成します。

    2. サービスの詳細設定では、ログを収集し、コンピューティング機能内のユーザーの他のリソースに引き続きアクセスするために、認証機能に対するサービスのロールを設定できます。 詳細については、 「」「ユーザー権限」「」を参照ください。

    3. 新しく作成されたサービスの下で [新規関数] をクリックします。
    4. 機能テンプレート」ページで [空白の関数] をクリックします。
    5. トリガー設定」ページで、 [トリガーを作成しない]をクリックして、 [次へ] をクリックします。
    6. 機能情報を設定します。

      Table Store トリガーでは CBOR 形式を使用して、増分データを Function Compute イベントとしてエンコードし、ユーザー関数を呼び出します。 次の関数例は、イベントをデコードしてログセンターに出力します。 データがデコードされた後に、必要に応じてデータを処理できます。



  3. Table Store トリガーを作成してテストします。
    1. Table Store コンソールで新しく作成されたデータテーブルの下で、 [既存の関数を使用] をクリックして、トリガーを作成します。
    2. 作成中に、イベント通知を送信するように Table Store を認証する必要があります。

      クリックすると、自動的に作成された認証ロール AliyunTableStoreStreamNotificationRole をRAM コンソールで確認できます。

情報処理

  • データフォーマット

    Table Store トリガーではCBOR 形式を使用しています。増分データをエンコードして関数計算イベントを形成します。 増分データの具体的なデータ形式は次のとおりです。

    {
        "Version": "string",
        "Records": [
            {
                "Type": "string",
                "Info": {
                    "Timestamp": int64
                },
                "PrimaryKey": [
                    {
                        "ColumnName": "string",
                        "Value": formated_value
                    }
                ],
                "Columns": [
                    {
                        "Type": "string",
                        "ColumnName": "string",
                        "Value": formated_value,
                        "Timestamp": int64
                    }
                ]
            }
        ]
    }
  • メンバー定義
    • バージョン
      • 説明: ペイロードのバージョン番号は、現在は Sync-v1 です。
      • タイプ:文字列
    • 記録
      • 説明: データテーブル内の増分データ行配列です
      • 以下を含みます。
        • タイプ
          • 説明: データ行タイプ。PutRow、UpdateRow または DeleteRow です。
          • タイプ:文字列
        • 情報
          • 説明: データ行に関する基本情報です。
          • 以下を含みます。
            • タイムスタンプ
            • 説明: 行の最終更新時刻 (UTC) です。
            • タイプ: int64
        • プライマリキー
          • 説明: プライマリキー列の配列
          • 以下を含みます。
            • ColumnName
              • 説明: プライマリキー列名です
              • 型:文字列
              • 説明: プライマリキー列の内容です。
              • 型: Formated_value です。整数、文字列、または BLOB です。
        • カラム
          • 説明: 属性カラム配列
          • 以下を含みます。
            • タイプ
              • 説明: 属性列タイプ。Put、DeleteOneVersion または DeleteAllVersions のいずれかです。
              • 型:文字列
            • ColumnName
              • 説明: 属性列名
              • 型:文字列
              • 説明: 属性列の内容
              • タイプ: Formated_value です。整数、ブール値、ダブル、文字列または BLOB です。
            • タイムスタンプ
              • 説明: 属性列の最終更新日時 (UTC) です。
              • タイプ: int64
  • データ例
    {
        "Version": "Sync-v1",
        "Records": [
            {
                "Type": "PutRow",
                "Info": {
                    "Timestamp": 1506416585740836
                },
                "PrimaryKey": [
                    {
                        "ColumnName": "pk_0",
                        "Value": 1506416585881590900
                    },
                    {
                        "ColumnName": "pk_1",
                        "Value": "2017-09-26 17:03:05.8815909 +0800 CST"
                    },
                    {
                        "ColumnName": "pk_2",
                        "Value": 1506416585741000
                    }
                ],
                "Columns": [
                    {
                        "Type": "Put",
                        "ColumnName": "attr_0",
                        "Value": "hello_table_store",
                        "Timestamp": 1506416585741
                    },
                    {
                        "Type": "Put",
                        "ColumnName": "attr_1",
                        "Value": 1506416585881590900,
                        "Timestamp": 1506416585741
                    }
                ]
            }
        ]
    }

オンラインデバッグ

Function Compute は関数のオンラインデバッグをサポートします。 ユーザーはトリガーされたイベントを作成してコードロジックをテストできます。

Table Store トリガー機能サービスのイベントは CBOR 形式であるため、データ形式は JSON に似たバイナリ形式であり、次のようにオンラインでデバッグできます。

  1. コードに CBOR と JSON を同時にインポートします。
  2. [トリガーイベント] をクリックし、 [カスタム] をクリックします。上記のデータ例から JSON ファイルを編集ボックスに貼り付けます。要件に基づいて変更して保存します。
  3. コードでは、最初に "records = json.loads(event)" を使用して、カスタマイズしたテストトリガーイベントを処理します。その後、 [実行] をクリックして、コードをテストします。
  4. "records=json.loads(event)" のテストが成功したら、コードを "records = cbor.loads(event)" に変更して、保存することができます。 その後、データが Table Store に書き込まれると、関連する関数ロジックがトリガーされます。

サンプルコード
import logging
import cbor
import json
def get_attrbute_value(record, column):
    attrs = record[u'Columns']
    for x in attrs:
        if x[u'ColumnName'] == column:
            return x['Value']
def get_pk_value(record, column):
    attrs = record[u'PrimaryKey']
    for x in attrs:
        if x['ColumnName'] == column:
            return x['Value']
def handler(event, context):
    logger = logging.getLogger()
    logger.info("Begin to handle event")
    #records = cbor.loads(event)
    records = json.loads(event)
    for record in records['Records']:
        logger.info("Handle record: %s", record)
        pk_0 = get_pk_value(record, "pk_0")
        attr_0 = get_attrbute_value(record, "attr_0")
    return 'OK'