DTS データサブスクリプションは、DTS SDK を使用して変更データのサブスクライブおよび消費を行います。SDK を消費に使用する前に、DTS コンソールでサブスクリプションインスタンスを作成し、RDS インスタンスにサブスクライブする必要があります。
サブスクリプションインスタンスの作成後、SDK を使用すれば、サブスクリプションインスタンス内の変更データをリアルタイムに取得できます。なお、次の点にご注意ください。
- DTS は Java 版 SDK のみをサポートします。
- 1 つのサブスクリプションチャネルは、1 つの SDK でのみ消費できます。1 つのサブスクリプションインスタンスに複数の SDK が接続されている場合、1 つの SDK プロセスだけが変更データを消費できます。
- 複数の SDK を 1 つの RDS インスタンスに登録する必要がある場合は、各下流 SDK に対応するサブスクリプションインスタンスを作成できます。
SDK は、多くの型のクラスオブジェクトを定義しています。このドキュメントでは、これらの SDK クラスオブジェクトのインターフェイス定義について説明します。
RegionContext
setAccessKey(AccessKey)
セキュリティ権限情報を設定します。このパラメーターは、サブスクリプションインスタンスを所有する Alibaba Cloud アカウントの AccessKey に対応します。
setSecret(AccessKeySecret)
セキュリティ権限情報を設定します。このパラメーターは、Alibaba Cloud アカウントの AccessKeySecret です。これは AccessKey ページで作成し、取得可能です。
setUsePublicIp(usePublicIp)
SDK 実行サーバーによるデータサブスクリプションにインターネットを使用するかどうかを設定します。インターネットを使用する場合、usePublicIp の値は true です。それ以外の場合は false です。DTS はインターネット経由のデータサブスクリプションのみをサポートします。
ClusterClient
void addConcurrentListener(ClusterListener arg0)
下流リスナーを ClusterClient に追加します。リスナーは、サブスクリプションチャネルの変更データをサブスクライブすることができます。パラメーターの
ClusterListener arg0
は、ClusterListener クラスのオブジェクトです。void askForGUID(String arg0)
サブスクリプションインスタンスから変更データを要求します。パラメーター
String arg0
はサブスクリプションインスタンス ID であり、次の図に示すように、DTS コンソールから取得します。List<ClusterListener> getConcurrentListeners()
現在の ClusterClient のリスナーリストを取得します。インタフェースの戻り値の型は List<ClusterListener> です。
void start()
SDK クライアントを起動し、変更データのサブスクリプションを開始します。
void stop()
SDK クライアントをシャットダウンし、変更データのサブスクリプションを停止します。データの取得と通知のコールバックは、SDK の同じスレッドで実行されます。通知と消費を行う notify 関数内にシグナル割り込みを禁止する関数が含まれている場合、stop 関数はクライアントをスムーズに停止できないことがあります。
ClusterListener
void notify(List<ClusterMessage> arg0)
この関数は、変更データの消費方法を定義します。データを受け取った後、SDK は
notify
を使って ClusterListner にデータを消費するよう通知します。たとえば、デモプログラムで使われている消費方法は、サブスクリプションデータを画面に表示することです。この関数の入力パラメーターの型は、List<ClusterMessage> です。ここで
ClusterMessage
はサブスクリプションデータのストレージ構造オブジェクトです。詳細な定義については、ClusterMessage をご参照ください。
ClusterMessage
各 ClusterMessage は 1 つのトランザクションのデータレコードを持ち、各レコードは Record クラスを介して RDS に格納されます。この節では、ClusterMessage の主要なインターフェイス関数について説明します。
Record getRecord()
ClusterMessage から変更レコードを取得します。変更レコードには、begin、commit、update、insert など、RDS binlog ファイルのすべてのレコードが含まれます。
void ackAsConsumed()
下流 SDK のディザスタリカバリプロセスを簡素化するため、データサブスクリプションサービスエンドポイントは消費ポイントの保存をサポートしています。異常終了と再起動後、SDK は自動的にサブスクライブを再開し、異常終了前の最後の消費タイムスタンプ以降のデータを消費します。
注意: メッセージの消費が完了した後、このインターフェイスを呼び出して ACK を返し、DTS サービスエンドポイントに SDK の消費タイムスタンプを更新するよう通知します。これにより、SDK の異常からの再起動後の消費データの整合性が保証されます。
Record
Record は、begin、commit、update など、サブスクリプションインスタンスの RDS binlog のすべてのレコードを表します。
String getAttribute(String key)
レコードのメインプロパティー値を取得します。入力パラメーターはプロパティー名で、戻り値はプロパティー値です。
次の表は、この関数を呼び出すことによって取得可能なすべてのプロパティーを示しています。
キー 説明 record_id レコード ID。この ID 番号は、サブスクリプション実行中に値が上昇することはありません。 instance このレコードのデータベースインスタンスの接続アドレス。形式は IP:Port
です。source_type このレコードのデータベースインスタンスのエンジンタイプ。現在の値は mysql
です。source_category このレコードのタイプ。現在の値は full_recorded
です。timestamp binlog でのレコードの作成時刻。この時刻は RDS の SQL 実行時刻でもあります。 checkpoint このレコードの対応する binlog ファイルポイント。形式は file_offset@file_name
です。ここでfile_name
は binlog ファイルの数値サフィックスです。record_type このレコードの対応する操作タイプ。主な値として、insert、update、delete、replace、ddl、begin、commit および heartbeat があります。 db このレコードが更新するテーブルに対応するデータベース名。 table_name このレコードが更新するテーブルの名前。 record_recording このレコードのエンコーディング。 primary このレコードが更新するテーブルの主キーのカラム名。 fields_enc このレコードの各エンコーディングフィールドの値。各フィールドはコンマで区切られています。非文字型の値は空です。 Type getOpt()
insert、delete、update、replace、ddl、begin、commit および heartbeat を含む、このレコードの変更タイプを取得します。
heartbeat は、サブスクリプションインスタンスの正常性状態を反映するためだけに定義されたインジケーターです。理論上、DTS は毎秒 1 つの heartbeat レコードを生成します。
String getCheckpoint()
この変更レコードの binlog 内のチェックポイントを取得します。返されるチェックポイントのフォーマットは
binlog_offset@binlog_fid
です。binlog_offset
は、binlog ファイル内の変更レコードのオフセットを示します。binlog_fid
は、binlog ファイルの数値サフィックスを示します。たとえば、binlog ファイル名が mysql-bin.0008 の場合、binlog_fid は 8 です。String gettimestamp()
この変更レコードの binlog 内でのタイムスタンプを取得します。
String getDbname()
変更レコードで変更されたテーブルに対応するデータベース名を取得します。
String getTablename()
変更レコードの対応するテーブル名を取得します。
String getPrimaryKeys()
変更レコードの対応する主キーのカラム名を取得します。結合された主キーでは、カラム名はコンマで区切られます。
DBType getDbType()
サブスクリプションインスタンスのデータベースタイプを取得します。DTS は RDS MySQL のみをサポートしているため、値は MySQL です。
String getServerId()
変更レコードのプロセスを実行している RDS MySQL インスタンスに対応する IP:PORT を取得します。
int getFieldCount()
変更レコードのフィールド数を取得します。
List<Field> getFieldList()
この関数の返される結果のデータ型は、List<Field> です。これには、変更レコードのフィールド定義と、変更を適用する前後の値が含まれます。Field オブジェクトの定義については、Field をご参照ください。
Boolean isFirstInLogevent()
このレコードがデータベースのバッチ変更中の最初のトランザクションログかどうかをチェックします。真の場合、true が返されます。それ以外の場合は、false が返されます。
Field
Field クラスは、各フィールドのエンコーディング、タイプ、フィールド名、フィールド値、主キーであるかどうか等のプロパティーを定義します。この節では、Field クラスの各インターフェイス定義について説明します。
String getEncoding()
このフィールド値のエンコーディング形式を取得します。
String getFieldname()
このフィールドの名前を取得します。
Type getType()
このフィールドのデータ型を取得します。
ByteString getValue()
フィールドの値を取得します。返される型は ByteString です。値が空の場合、null が返されます。
Boolean isPrimary()
フィールドがテーブルの主キーのカラムであるかどうかをチェックします。真の場合、true が返されます。それ以外の場合は、false が返されます。