edit-icon download-icon

Table Store トリガー

最終更新日: Jul 04, 2019

Table Store トリガー

Table Store は、Alibaba Cloud の Apsara 分散システム上に構築された分散 NoSQL データストレージサービスであり、データシャーディングとロードバランシング技術を通じて、同時アクセスでのデータスケーリングをシームレスに実現でき、大規模な構造化データへのストレージとリアルタイムのアクセスを提供します。Table Store は、Function Compute (FC) と統合されており、FC にアクセスするためのイベントソースとして使用されます。Table Store 内のデータが変更されると、データ変更情報をパラメーターとして関数の実行がトリガーされます。

fc-ots

上の図に示す典型的なユースケースのシナリオでは、元のソースデータが元のテーブル A に格納され、A のデータ変更情報に従ってカスタムデータクリーニングが実行されます。クリーニングされたデータはテーブル B に格納され、ユーザーが直接テーブル B のデータを読み込んで表示させると、柔軟でスケーラブルなサーバーレス Web アプリケーションを完成させることができます。

初めて Table Store トリガーを使用する場合は、リージョン制限と注意事項を必ずお読みください。

リージョン制限

現在、 Table Store トリガーをサポートしているリージョン

中国 (北京)、中国 (杭州)、中国 (上海)、中国 (深セン)、日本 (東京)、シンガポール、オーストラリア (シドニー)、ドイツ (フランクフルト),中国 (香港)

注意事項

無限ループの状態を避けるように注意してください。

ユーザーが関数を作成するときは、次のようなロジックがないように注意してください。Table Store テーブル A が 関数 B をトリガーし、関数 B がテーブル A のデータを更新することにより、関数が無限ループで呼び出される現象

Function Compute が VPC ネットワーク上にある場合

現在、作成した関数は Table Store のイントラネットエンドポイントを使用することができず、internal endpoint で Table Store サービスへアクセスすることができません。この問題は 2019 年初頭に解決する予定です。

Table Store トリガーイベントの定義

データ形式

Table Store トリガーは、 CBOR 形式で増分データをエンコードして Function Compute の Event を構成します。増分データの具体的なデータ形式は次のとおりです。

  1. {
  2. "Version": "string",
  3. "Records": [
  4. {
  5. "Type": "string",
  6. "Info": {
  7. "Timestamp": int64
  8. },
  9. "PrimaryKey": [
  10. {
  11. "ColumnName": "string",
  12. "Value": formated_value
  13. }
  14. ],
  15. "Columns": [
  16. {
  17. "Type": "string",
  18. "ColumnName": "string",
  19. "Value": formated_value,
  20. "Timestamp": int64
  21. }
  22. ]
  23. }
  24. ]
  25. }
  • メンバの定義
    • Version
      • 意味: payload のバージョン。現在は Sync-v1 です。
      • データ型: string
    • Records
      • 意味: データテーブル内の増分データの行配列。
      • 内部メンバ:
        • Type
          • 意味: データ行タイプ PutRow 、UpdateRow および DeleteRow
          • データ型: string
        • Info
          • 意味: データ行の基本情報。
          • 内部メンバ:
            • Timestamp
            • 含义: その行の最終修正 UTC 時刻
            • データ型: int64
        • PrimaryKey
          • 意味: 主キー列配列
          • 内部メンバ
            • ColumnName
              • 意味: 主キー列名
              • データ型: string
            • Value
              • 意味: 主キー
              • データ型: formated_value で、 integer、string および blob をサポートします。
        • Colunms
          • 意味: プロパティ列の配列
          • 内部メンバ
            • Type
              • 意味: プロパティ列のタイプ。 Put、DeleteOneVersion および DeleteAllVersions があります。
              • 类型: string
            • ColumnName
              • 意味: プロパティ列名
              • 类型: string
            • Value
              • 意味: プロパティ列の値
              • データ型: formated_value で、 integer、boolean、double、string および blob をサポートします。
            • Timestamp
              • 意味: プロパティー列の最終変更 UTC 時刻
              • データ型: int64
  • データの例

    1. {
    2. "Version": "Sync-v1",
    3. "Records": [
    4. {
    5. "Type": "PutRow",
    6. "Info": {
    7. "Timestamp": 1506416585740836
    8. },
    9. "PrimaryKey": [
    10. {
    11. "ColumnName": "pk_0",
    12. "Value": 1506416585881590900
    13. },
    14. {
    15. "ColumnName": "pk_1",
    16. "Value": "2017-09-26 17:03:05.8815909 +0800 CST"
    17. },
    18. {
    19. "ColumnName": "pk_2",
    20. "Value": 1506416585741000
    21. }
    22. ],
    23. "Columns": [
    24. {
    25. "Type": "Put",
    26. "ColumnName": "attr_0",
    27. "Value": "hello_table_store",
    28. "Timestamp": 1506416585741
    29. },
    30. {
    31. "Type": "Put",
    32. "ColumnName": "attr_1",
    33. "Value": 1506416585881590900,
    34. "Timestamp": 1506416585741
    35. }
    36. ]
    37. }
    38. ]
    39. }

    Table Store トリガーの設定

    トリガーの例:tablestore_trigger.yml

    1. triggerConfig:

    トリガーのパラメーターの説明:なし

    使用例

    関数の Table Store トリガーは、Function Compute コンソールコマンドラインツール fcli および SDK の 3 つの方法で設定できます。以下に、3 つの方法をそれぞれ紹介します。

    以下の関数例は python で書かれています。他のランタイムを使用する場合は、 Table Store で Function Compute をトリガーする例 Nodejs/Php/Java Runtime をご参照ください。

    例1: コンソールで Table Store トリガーの作成

    この例では、コンソールを使用して RDS トリガーを設定する方法を示します。トリガーの設定は関数の作成時でも作成後でも可能です。トリガーとその作成について不明点がある場合は、イベントソースサービスの使用および トリガーの作成をご参照ください。

    まず Function Compute コンソールにログインし、適切なリージョンとサービスを選択します。新しいサービスを作成していない場合は、サービスの作成をご参照ください。

    関数作成時にトリガーを設定

    1. [関数の作成] をクリックして [関数の作成] ページに移動し、 空白の関数を選択して [次へ] をクリックします。

    2. Table Store トリガーを選択して、以下のように設定します。ots-2

    3. 関数を作成して適切な情報を入力し、オンライン編集を選択して次の python ランタイムの関数サンプルコードを貼り付けて、[次へ] をクリックします。

      1. import logging
      2. import cbor
      3. import json
      4. def get_attrbute_value(record, column):
      5. attrs = record[u'Columns']
      6. for x in attrs:
      7. if x[u'ColumnName'] == column:
      8. return x['Value']
      9. def get_pk_value(record, column):
      10. attrs = record[u'PrimaryKey']
      11. for x in attrs:
      12. if x['ColumnName'] == column:
      13. return x['Value']
      14. def handler(event, context):
      15. logger = logging.getLogger()
      16. logger.info("Begin to handle event")
      17. #records = cbor.loads(event)
      18. records = json.loads(event)
      19. for record in records['Records']:
      20. logger.info("Handle record: %s", record)
      21. pk_0 = get_pk_value(record, "pk_0")
      22. attr_0 = get_attrbute_value(record, "attr_0")
      23. return 'OK'
    4. 権限(オプション)を設定します。 [次へ] をクリックし、 入力情報が正しいことを確認したら、 [作成] をクリックします。

    5. オンラインデバッグを行います。Table Store トリガー関数サービスのイベントは CBOR 形式なので、データ形式は JSON のようなバイナリ形式であり、次のようにオンラインでデバッグできます。

      • コードに cbor と json を同時に import します。
      • トリガーイベントをクリックして [カスタマイズ] を選択し、上記のデータ例の JSON ファイルを編集ボックスに貼り付け、実際の状況に応じて変更して保存します。
      • コードでは、まず records = json.loads(event) を使用してカスタマイズしたテストトリガーイベントに対して [実行] をクリックしてそれが期待どおりになるかどうかを確認します。
      • records=json.loads(event) でテスト結果が OK になると、records = cbor.loads(event) に変更されて保存されます。Table Store にデータが書き込まれると、関連する関数ロジックがトリガーされます。

    関数が作成された後にトリガーを設定

    1. 既存の関数を選択して[トリガー] > [トリガーの作成] の順でクリックします。
    2. トリガーの作成ページで次のように、Table Store トリガーを設定して [OK] をクリックします。

      注意:細かい粒度での権限付与を考慮する必要がない場合は、直接クイック承認を実行できます。

    ots-1

    例 2: fcli で TableStore トリガーの作成

    まず、Trigger Config を含む yaml ファイルを作成します。その yaml ファイルの内容は次のとおりです。

    1. triggerConfig:

    対応する fuction のディレクトリの下にトリガーを作成します。

    1. mkt triggerName serviceName/functionName -t tablestore -c TriggerConfig.yaml -r acs:ram::$accountId:role/aliyuntablestorestreamnotificationrole -s acs:mns:cn-hangzhou:$accountId:/topics/$topicName
    • -r —invocation-role string トリガーのロールを設定します。
    • -s —source-arn string イベントソースのリソースシンボル。例:acs:ots:region:$accountId:instance/$instanceName]/table/$tableName。
    • -c —trigger-config string トリガー設定ファイルを設定します。
    • -t —type string トリガーのタイプ。

    fcli の使用方法については、fcli をご参照ください。

    例 3: SDK プログラミングで Table Store トリガーの作成

    fc-python-sdk を例として、SDK で Table Store トリガーを作成する方法を説明します。Function Compute は、 fc-nodejs-sdkfc-java-sdk, fc-php-sdk も提供しています。

    トリガー作成コード

    1. client = fc2.Client(
    2. endpoint='<Your Endpoint>',
    3. accessKeyID='<Your AccessKeyID>',
    4. accessKeySecret='<Your AccessKeySecret>')
    5. service_name = 'serviceName'
    6. function_name = 'functionName'
    7. trigger_name = 'triggerName'
    8. trigger_type = 'tablestore'
    9. source_arn = 'acs:ots:region:<Your Account ID>:instance/<Your instanceName>/table/<Your tableName>'
    10. invocation_role = 'acs:ram::<Your Account ID>:role/<Your Invocation Role>'
    11. trigger_config = {}
    12. client.create_trigger(service_name, function_name, trigger_name, trigger_type, trigger_config, source_arn, invocation_role)

    関数コード

    1. import logging
    2. import cbor
    3. import json
    4. def get_attrbute_value(record, column):
    5. attrs = record[u'Columns']
    6. for x in attrs:
    7. if x[u'ColumnName'] == column:
    8. return x['Value']
    9. def get_pk_value(record, column):
    10. attrs = record[u'PrimaryKey']
    11. for x in attrs:
    12. if x['ColumnName'] == column:
    13. return x['Value']
    14. def handler(event, context):
    15. logger = logging.getLogger()
    16. logger.info("Begin to handle event")
    17. #records = cbor.loads(event)
    18. records = json.loads(event)
    19. for record in records['Records']:
    20. logger.info("Handle record: %s", record)
    21. pk_0 = get_pk_value(record, "pk_0")
    22. attr_0 = get_attrbute_value(record, "attr_0")
    23. return 'OK'