このドキュメントでは、Function Compute を使用して Table Store 内のデータをクレンジングする方法について説明します。

Table Store の高度な同時書き込みパフォーマンスと低ストレージコストにより、ログの保存やデータの監視のような、モノのインターネット (IoT) アプリケーションに最適です。 データを Table Store に書き込み、同時に Function Compute に新しく追加されたデータの単純なクレンジングを実行し、クレンジングされたデータを Table Store の結果テーブルに書き戻すことができます。 その間、元のデータと結果データにリアルタイムでアクセスできます。

データ定義

書き込まれるデータが 3 つのフィールドを含むログデータである前提とします。

フィールド名 タイプ 意味
id Integer 型 ログ ID
level Integer 型 ログレベル (数値が大きいほどレベルが高い)
message String 型 ログ内容

専用クエリを実行するには、レベル 1 を超えるログを別のデータテーブルに書き込む必要があります。

インスタンスとデータテーブルの作成

Table Store コンソール (今回は例として East China Node 2 の分散テストを使用) で Table Store インスタンスを作成し、ソーステーブル (source_data) と結果テーブル (result) を作成します。 プライマリキーは id (整数) です。 Table Store はスキーマフリー構造を使用するため、他の属性列フィールドを事前定義する必要はありません。

例として source_data を取り上げ、次の図のように作成します。



データソーステーブルのストリーミング機能を有効にする

トリガー関数では、 Table Store に書き込まれた増分データを Function Compute で処理する前に、データテーブルのストリーム関数が有効化されている必要があります。



ストリームレコードの有効期限は、増分データがストリーム API を通じて読み取られる最大時間です。

トリガーは既存の関数のみをバインドできるため、最初に Function Compute コンソールで同じリージョンにサービスと関数を作成します。

Function Compute サービスの作成

次の手順では、例として East China Node 2 を使用して、 Function Compute コンソールでサービスと処理関数を作成する方法について説明します。

  1. East China Node 2 でサービスを作成します。

  2. 関数を作成して、 [空白の関数] > [トリガーを作成しない] をクリックします。

    • 関数名は "etl_test" です。 Python 2.7 環境を選択して、コードをオンラインで編集してください。
    • 関数のエントリは、" etl_test.handler" です。
    • コードは後で編集されます。 次に [次へ] をクリックします。
  3. サービス認証

    Function Compute は実行中のログをログサービスに書き込むと同時に Table Store データテーブルの読み取りと書き込みを行うため、Function Compute には特定の権限が必要です。 便宜のために、最初に "AliyunOTSFullAccess" および "AliyunLogFullAccess" 許可を追加してください。 実際の使用では、最小特権の原則に基づいて権限を追加することを推奨します。



  4. [認証を完了する] をクリックし、関数を作成します。
  5. 機能コードの修正

    関数を作成した後、対応する [関数] [コード実行] をクリックします。その後、コードを編集し保存します。 必要に応じて、"INSTANCE_NAME" (Table Store のインスタンス名) と "REGION" (使用されているリージョン) を変更します。



    サンプルコード:
    #! /usr/bin/env python
    # -*- coding: utf-8 -*-
    import cbor
    import json
    import tablestore as ots
    INSTANCE_NAME = 'distribute-test'
    REGION = 'cn-shanghai'
    ENDPOINT = 'http://%s.%s.ots-internal.aliyuncs.com'%(INSTANCE_NAME, REGION)
    RESULT_TABLENAME = 'result'
    def _utf8(input):
        return str(bytearray(input, "utf-8"))
    def get_attrbute_value(record, column):
        attrs = record[u'Columns']
        for x in attrs:
            if x[u'ColumnName'] == column:
                return x['Value']
    def get_pk_value(record, column):
        attrs = record[u'PrimaryKey']
        for x in attrs:
            if x['ColumnName'] == column:
                return x['Value']
    # AliyunOTSFullAccess 許可が付与されているため、ここで取得した認証情報は Table Store へのアクセスを許可されています。
    def get_ots_client(context):
        creds = context.credentials
        client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken)
        return client
    def save_to_ots(client, record):
        id = int(get_pk_value(record, 'id'))
        level = int(get_attrbute_value(record, 'level'))
        msg = get_attrbute_value(record, 'message')
        pk = [(_utf8('id'), id),]
        attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),]
        row = ots.Row(pk, attr)
        client.put_row(RESULT_TABLENAME, row)
    def handler(event, context):
        records = cbor.loads(event)
        #records = json.loads(event)
        client = get_ots_client(context)
        for record in records['Records']:
            level = int(get_attrbute_value(record, 'level'))
            if level > 1:
                save_to_ots(client, record)
            else:
                print "Level <= 1, ignore."

トリガーのバインド

  1. Table Store のインスタンス管理ページに戻り、 source_data テーブルの後ろにある [トリガーを使用] ボタンをクリックして、トリガーバインディングインターフェイスに入ります。 [既存の関数を使用] をクリックし、新しく作成したサービスと関数を選択して、 Table Store のイベント通知を送信許可を確認のためにチェックします。

  2. バインドが成功すると、以下の情報が表示されます。

実行の確認

  1. source_data テーブルにデータを書き込みます。

    source_data の「データエディタ」ページで、 [挿入] をクリックし、id、レベルおよびメッセージ情報を次の順序で入力します。



  2. 結果テーブルからクレンジングされたデータのクエリを実行します。

    結果テーブルの「データエディタ」ページをクリックします。ここで、 source_data に新しく書き込まれたデータのクエリを実行できます。

    soure_data に書き込まれたレベル 1 以下のデータは結果テーブルと同期されません。