このトピックでは、Spark SQL を使用してストリーミングジョブを開発する方法について説明します。

クエリ文ブロック

SQL クエリ文でジョブ関連パラメーターを適切に表すことは困難です。 したがって、必須パラメーターを設定するには、SQL クエリ文の前に SET 文を追加する必要があります。 重要なパラメーターは、streaming.query.name です。 各 SQL クエリは、一意の streaming.query.name に関連付ける必要があります。 このクエリ文に基づき、各 SQL クエリに他のパラメーター (checkpoint など) を設定できます。 規則に基づき、SET streaming.query.name 文を各 SQL クエリの先頭に追加する必要があります。 そうしないと、クエリ時にエラーが返される場合があります。 有効なクエリ文ブロックは次のとおりです。
SET streaming.query.name=${queryName};

queryStatement

ジョブテンプレート

-- dbName: the name of the database where a table is to be created.
CREATE DATABASE IF NOT EXISTS ${dbName};
USE ${dbName};

-- Create a Log Service table.
-- slsTableName: the name of the Log Service table.
-- logProjectName: the name of the Log Service project.
-- logStoreName: the name of the Logstore in Log Service.
-- accessKeyId: the AccessKey ID provided by Alibaba Cloud.
-- accessKeySecret: the AccessKey secret provided by Alibaba Cloud.
-- endpoint: the endpoint of the Logstore in Log Service. For more information, see サービスエンドポイント.
CREATE TABLE IF NOT EXISTS ${slsTableName}
USING loghub
OPTIONS (
sls.project = '${logProjectName}',
sls.store = '${logStoreName}',
access.key.id = '${accessKeyId}',
access.key.secret = '${accessKeySecret}',
endpoint = '${endpoint}');

-- Create an HDFS table and define the column fields in the table.
-- hdfsTableName: the name of the HDFS table.
-- location: the storage path of data. Both HDFS and OSS paths are supported.
-- Supported data formats: CSX, JSON, ORC, and Parquet. The default format is Parquet.
CREATE TABLE IF NOT EXISTS ${hdfsTableName} (col1 dataType[, col2 dataType])
USING PARQUET
LOCATION '${location}';

-- Define some parameters for running each streaming query. Such parameters include:
-- streaming.query.name: the name of the streaming query job.
-- spark.sql.streaming.checkpointLocation.${queryName}: the directory of the checkpoint for the streaming query job.
SET streaming.query.name=${queryName};
SET spark.sql.streaming.query.options.${queryName}.checkpointLocation=${checkpointLocation};
-- The following parameters are optional and can be defined as required:
-- outputMode: the output mode of the query result. Default value: append.
-- trigger: the trigger controlling the moment where the query is executed. Default value: ProcessingTime. Currently, this parameter can only be set to ProcessingTime.
-- trigger.intervalMs: the interval between query batches. Unit: milliseconds. Default value: 0.
-- SET spark.sql.streaming.query.outputMode.${queryName}=${outputMode};
SET spark.sql.streaming.query.trigger.${queryName}=ProcessingTime;
SET spark.sql.streaming.query.trigger.intervalMs.${queryName}=30;

INSERT INTO ${hdfsTableName}
SELECT col1, col2
FROM ${slsTableName}
WHERE ${condition}

パラメーター

次の表に、主要なパラメーターを示します。
パラメーター 説明 デフォルト値
streaming.query.name クエリの名前。 このパラメーターは明示的に設定する必要があります。
spark.sql.streaming.query.options.${queryName}.checkpointLocation ストリーミングクエリジョブの checkpoint のディレクトリ。 このパラメーターは明示的に設定する必要があります。
spark.sql.streaming.query.outputMode.${queryName} クエリ結果の出力モード。 append
spark.sql.streaming.query.trigger.${queryName} クエリを実行するタイミングを制御するトリガー。 このパラメーターには、ProcessingTime のみを設定できます。 ProcessingTime
spark.sql.streaming.query.trigger.intervalMs.${queryName} クエリバッチ間の間隔。 単位:ミリ秒。 0