All Products
Search
Document Center

Tablestore:Use Function Compute to cleanse data

最終更新日:Jul 03, 2023

Tablestore provides highly concurrent write performance and low storage cost and is suitable for storing IoT data, logs, and monitoring data. When you write data to a Tablestore data table, you can cleanse the data by using Function Compute and write the cleansed data to another data table in Tablestore. You can access raw data or cleansed data in Tablestore in real time.

Sample scenarios

You want to write log data that includes three fields to Tablestore. To efficiently query the logs, you must write the logs in which the value of the level field is greater than 1 to another data table named result. The following table describes the fields that are included in the logs.

Field

Type

Description

id

Integer

The ID of the log.

level

Integer

The level of the log. A larger value indicates a higher level.

message

String

The content of the log.

Step 1: Enable the Stream feature for the data table

Before you create a trigger, you must enable the Stream feature for the data table in the Tablestore console to allow the function to process incremental data that is written to the table.

  1. Log on to the Tablestore console.
  2. On the Overview page, click the name of the instance that you want to manage or click Manage Instance in the Actions column of the instance that you want to manage.
  3. In the Tables section of the Instance details tab, click the name of the required data table and click the Tunnels tab. Alternatively, you can click the fig_001 icon and then click Tunnels.
  4. On the Tunnels tab, click Enabled in the Stream Information section.
  5. In the Enable Stream dialog box, configure the Log Expiration Time parameter and click Enabled.

    The value of the Log Expiration Time parameter must be a non-zero integer and cannot be changed after it is specified. Unit: hours. Maximum value: 168.

    Note Specify a value for the Log Expiration Time parameter based on your business requirements.

Step 2: Configure a Tablestore trigger

You can create a Tablestore trigger in the Function Compute console to process the real-time data stream in a Tablestore data table.

  1. Create a Function Compute service.
    1. Log on to the Function Compute console.
    2. In the left-side navigation pane, click Services & Functions.
    3. In the top navigation bar, select a region.
    4. On the Services page, click Create Service.
    5. In the Create Service panel, configure the Name and Description parameters and configure the Logging and Tracing Analysis parameters based on your business requirements.
      For more information about the parameters, see Manage services.
    6. Click OK.
      After you create the service, you can view the service and service configurations on the Services page.
  2. Create a Function Compute function.
    Note You can select Create from Scratch with a Standard Runtime, Use a Custom Runtime to Migrate Web Server, or Use Container Images (for Kubernetes and Docker Users) as the method to create a function. You can also use a template to create a function. The following procedure describes how to create a function by selecting Create from Scratch with a Standard Runtime. For information about other methods that can be used to create a function, see Create a function and Use function templates to create functions.
    1. On the Services page, click the name of the service that you want to manage.
    2. In the left-side navigation pane, click Functions and then click Create Function.
    3. On the Create Function page, select Create from Scratch with a Standard Runtime.
    4. In the Basic Settings section, configure the parameters. The following table describes the parameters.
      ParameterRequired DescriptionExample
      Function NameNoEnter a name for the function.
      Note If you leave this parameter empty, Function Compute automatically creates a name for your function.
      Function
      Runtime EnvironmentsYesSelect a language, for example, Python, Java, PHP, or Node.js. For information about the runtime environments that are supported by Function Compute, see Manage functions. Python 3.6
      Code Upload MethodNoBy default, Use Sample Code is selected to create a function. After the function is created, the built-in sample code is provided for the function. You can also select one of the following methods to upload your code.
      • Upload ZIP: Select a ZIP package to upload your code.
      • Upload Folder: Select the folder that contains the function code.
      • OSS: Specify the Bucket Name and Object Name parameters for the code that you want to upload.
      Use Sample Code
      Startup CommandNoConfigure the startup command of the program. If you do not configure the startup command, you need to manually create the bootstrap startup script in the root directory of the code. Your program starts by using this script.
      Note This parameter is required only if you select Use a Custom Runtime to Migrate Web Server.
      npm run start
      Listening PortYesSpecify the port on which the HTTP server in your code listens.
      Note This parameter is required only if you select Use a Custom Runtime to Migrate Web Server.
      9000
      Request TypeYesSelect the handler type. Valid values:
      • Event Requests: Triggers the execution of the function by using a timer, calling API operations or using SDKs, or using triggers integrated with other Alibaba Cloud services.
      • HTTP Requests: Processes HTTP requests or WebSocket requests. For a web scenario, we recommend that you select Use a Custom Runtime to Migrate Web Server. For more information, see Function Compute console.

      If you want to use a Tablestore trigger, select Event Requests.

      Event Requests
      Instance CategoryYesSelect an instance category for the function. Valid values:
      • Elastic Instance
      • Performance Instance

      For more information, see Instance types and instance modes. For information about the billing of each instance category, see Billing overview.

      Elastic Instance
      Memory CapacityYesSpecify the size of the memory that is required to execute the function by using one of the following methods:
      • Select a value: Select a value from the drop-down list.
      • Input a value: Click Enter Memory Size and enter a value for the memory size. This method can be used only for elastic instances. Valid values: [128,3072]. Unit: MB.
        Note The value must be a multiple of 64 MB.
      512 MB
      Instance ConcurrencyYesSpecify the concurrency of the function instance. For more information, see Configure instance concurrency. 1
      HandlerYesSpecify the handler of the function. The Function Compute runtime loads and invokes the handler to process requests. index.handler

      After you create the function, you can view the function on the Functions page.

    5. In the Trigger Configurations section, configure the parameters. The following table describes the parameters.
      ParameterDescriptionExample
      Trigger TypeSelect Tablestore.
      Note If Request Type is set to HTTP Requests, the value of this parameter is HTTP Trigger by default.
      Tablestore
      NameEnter a name for the trigger. Tablestore-trigger
      InstanceSelect a Tablestore instance from the drop-down list. distribute-test
      TableSelect a data table from the drop-down list. source_data
      Role NameSelect AliyunTableStoreStreamNotificationRole.
      Note After you configure the preceding parameters, click OK. The first time you create a trigger of this type, click Authorize Now in the message that appears, create the role, and assign permissions to the role as prompted.
      AliyunTableStoreStreamNotificationRole
    6. Click Create.
      The trigger that you created is displayed on the Triggers tab.
      Note You can also view and create Tablestore triggers on the Trigger tab of the table in the Tablestore console.

Step 3: Verify data cleansing

After you create a trigger, you can write data to Tablestore and query the data to verify whether the data is cleansed as expected.

  1. Write code.

    1. On the Functions page, click the name of the required function.

    2. On the function details page, click the Code tab to write code in the code editor.

      In this example, the function code is written in Python. Set the following parameters to actual values: INSTANCE_NAME, REGION, and ENDPOINT.

      #!/usr/bin/env python
      # -*- coding: utf-8 -*-
      import cbor
      import json
      import tablestore as ots
      INSTANCE_NAME = 'distribute-test'
      REGION = 'cn-shanghai'
      ENDPOINT = 'http://%s.%s.vpc.tablestore.aliyuncs.com'%(INSTANCE_NAME, REGION)
      RESULT_TABLENAME = 'result'
      def _utf8(input):
          return str(bytearray(input, "utf-8"))
      def get_attrbute_value(record, column):
          attrs = record[u'Columns']
          for x in attrs:
              if x[u'ColumnName'] == column:
                  return x['Value']
      def get_pk_value(record, column):
          attrs = record[u'PrimaryKey']
          for x in attrs:
              if x['ColumnName'] == column:
                  return x['Value']
      # The obtained credentials can be used to access Tablestore because the AliyunOTSFullAccess policy is attached to the role. 
      def get_ots_client(context):
          creds = context.credentials
          client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken)
          return client
      def save_to_ots(client, record):
          id = int(get_pk_value(record, 'id'))
          level = int(get_attrbute_value(record, 'level'))
          msg = get_attrbute_value(record, 'message')
          pk = [(_utf8('id'), id),]
          attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),]
          row = ots.Row(pk, attr)
          client.put_row(RESULT_TABLENAME, row)
      def handler(event, context):
          records = cbor.loads(event)
          #records = json.loads(event)
          client = get_ots_client(context)
          for record in records['Records']:
              level = int(get_attrbute_value(record, 'level'))
              if level > 1:
                  save_to_ots(client, record)
              else:
                  print ("Level <= 1, ignore.")
  2. Write data to the data table named source_data. Enter the values of the id, level, and message fields and query the cleansed data in the table named result.

    • When you write a log in which the value of the level field is greater than 1 to the source_data table, the log is synchronized to the result table.

    • When you write a log in which the value of the level field is less than or equal to 1 to the source_data table, the log is not synchronized to the result table.