このトピックでは、ストリーミングクエリ設定の基本概念と関連パラメーターについて説明します。
クエリ設定
ストリーミングクエリに Spark SQL を使用する前に、次の 2 つの概念を理解する必要があります。
- データソース設定:テーブルの定義。
- クエリインスタンス設定:各ストリーミングクエリを実行するためのパラメーター設定。
テーブルの定義には、Kafka データソースの接続アドレスやトピック名など、データソース設定のみが含まれます。 テーブル内の複数の非ビジネスクエリを同時に実行できます。 したがって、テーブルの定義には、特定のクエリインスタンスを実行するための設定を含めることはできません。
各クエリインスタンスは個別に設定する必要があります。 クエリ SQL 文への不必要な変更を減らすため、クエリインスタンスごとに query name を設定できます。 query name を使用することで、各クエリインスタンスを実行するためのパラメーターを設定できます。 クエリインスタンスのパラメーターは、 SET
構文を使用して設定します。 詳細は、「設定パラメーター」をご参照ください。
クエリ設定に関する規則:各クエリインスタンスの query name は、最も近い SQL SET 文で使用できます。 例を 2 つ示します。
- 例 1
SET streaming.query.name=one_test_job -- query 1 INSERT INO tb_test_1 SELECT ... -- query 2 INSERT INO tb_test_2 SELECT ... -- The names of queries 1 and 2 are both one_test_job. However, this case is invalid because the name of each query instance must be unique.
- 例 2
SET streaming.query.name=one_test_job_1 SET streaming.query.name=one_test_job_2 -- query 1 CREATE TABLE tb_test_1 AS SELECT ... -- The name of query 1 is one_test_job_2.
クエリインスタンスの文には、次のものがあります。
INSERT INTO ...
CREATE TABLE ... AS SELECT ...
設定パラメーター
パラメーター | 対応する DataFrame API | SQL 文の形式 | 説明 | 必須 |
---|---|---|---|---|
queryName | writeStream.queryName(...) | SET streaming.query.name=$queryName | 各ストリーミングクエリの名前。 異なるクエリインスタンスのパラメーターは、クエリ名で区別されます。 | はい |
option | writeStream.option(...) | SET spark.sql.streaming.query.options.$queryName.$optionName=$optionValue | checkpointLocation:checkpoint のディレクトリ。 | はい |
カスタムオプション。 | いいえ | |||
outputMode | writeStream.outputMode(...) | SET spark.sql.streaming.query.outputMode.$queryName=$outputMode | クエリ結果の出力モード。 デフォルト値:append。 | いいえ |
trigger | writeStream.trigger(...) | SET spark.sql.streaming.query.trigger.$queryName=$triggerType | クエリを実行するタイミングを制御するトリガー。 デフォルト値:ProcessingTime。
注 このパラメーターには、ProcessingTime のみを設定できます。
|
いいえ |
SET spark.sql.streaming.query.trigger.intervalMs.$queryName=$intervalMs | クエリバッチ間の間隔。 単位 :ミリ秒。 デフォルト値:0。 | いいえ |