すべてのプロダクト
Search
ドキュメントセンター

:SDK クイックスタート

最終更新日:Mar 22, 2020

このドキュメントでは、DTS SDK を使用して基本操作を実行する方法について説明します。

RegionContext の初期化

RegionContext は、セキュリティ認証情報およびネットワークアクセスモードを、保存および設定するために使用します。次のコードは、RegionContext の初期化、セキュリティ認証資格情報の設定、およびネットワークアクセスモードの決定を行います。

  1. import java.util.List;
  2. import com.aliyun.drc.clusterclient.RegionContext;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. public class MainClass
  6. {
  7. public static void main(String[] args) throws Exception {
  8. // Create RegionContext
  9. RegionContext context = new RegionContext();
  10. // Configure AccessKey and AccessKeySecret of the Alibaba Cloud account
  11. context.setAccessKey("<AccessKey>");
  12. context.setSecret("<AccessKeySecret>");
  13. // Whether the server running the SDK uses public network IP address to connect DTS subscription channel
  14. context.setUsePublicIp(true);
  15. // The following is other calling code...
  16. …………
  17. }
  18. }

SDK を使用するには、RegionContext を初期化し、セキュリティ認証を設定し、接続されたサブスクリプションインスタンスのその他の情報を完成させることが不可欠です。

上記のコードでは、

  • setAccessKey インターフェイスは、Alibaba Cloud アカウントの AccessKey を設定します。

  • setSecret インターフェイスは、Alibaba Cloud アカウントの AccessKeySecret を設定します。

  • AccessKey と AccessKeySecret は、ID ペアとして Alibaba Cloud によってユーザに割り当てられます。この ID ペアは Alibaba Cloud のユーザーセンターで作成できます。

  • setUsePublicIp は、SDK を実行するサーバーが、パブリック IP アドレスを使用してサブスクリプションインスタンスに接続するかどうかを決定します。この決定において、true はパブリック IP アドレスの使用を意味し、false は不使用を意味します。

ClusterClient の初期化

SDK は ClusterClient クラスを使用してサブスクリプションインスタンスに接続し、変更データを受け取ります。次のコードを使用して ClusterClient を作成します。

  1. import java.util.List;
  2. import com.aliyun.drc.clusterclient.ClusterClient;
  3. import com.aliyun.drc.clusterclient.DefaultClusterClient;
  4. import com.aliyun.drc.clusterclient.RegionContext;
  5. public class MainClass
  6. {
  7. public static void main(String[] args) throws Exception {
  8. // Create RegionContext
  9. RegionContext context = new RegionContext();
  10. context.setAccessKey("<AccessKey>");
  11. context.setSecret("<AccessKeySecret>");
  12. context.setUsePublicIp(true);
  13. // Create subscription consumers
  14. final ClusterClient client = new DefaultClusterClient(context);
  15. // The following is other calling code...
  16. ……………
  17. }
  18. }

Listener の初期化

SDK は Listener クラスを使用して変更データを消費します。

ClusterClient の初期化後にリスナーを追加します。Listener は、サブスクリプションデータを受信し、消費する通知関数を定義します。次のコードは、最も基本的な消費ロジックを示すもので、サブスクリプションデータを画面に出力します。

  1. import com.aliyun.drc.clusterclient.ClusterClient;
  2. import com.aliyun.drc.clusterclient.ClusterListener;
  3. import com.aliyun.drc.clusterclient.DefaultClusterClient;
  4. import com.aliyun.drc.clusterclient.RegionContext;
  5. import com.aliyun.drc.clusterclient.message.ClusterMessage;
  6. public class MainClass
  7. {
  8. public static void main(String[] args) throws Exception {
  9. // Initialize a RegionContext object
  10. ...
  11. //Initialize ClusterClient object
  12. ………
  13. ClusterListener listener = new ClusterListener(){
  14. @Override
  15. public void notify(List<ClusterMessage> messages) throws Exception {
  16. for (ClusterMessage message : messages) {
  17. //Print the subscription change data
  18. System.out.println(message.getRecord() + ":" + message.getRecord().getTablename() + ":"
  19. + message.getRecord().getOpt());
  20. //After data consumption is completed, report ACK to DTS, and must call
  21. message.ackAsConsumed();
  22. }
  23. }
  24. }
  25. }

DTS は SDK のデータ消費タイムスタンプを DTS サーバーに保存します。これにより、SDK のディザスタリカバリプロセスが簡素化されます。

上記のサンプルコードの askAsConsumed() インターフェイスは、SDK によって消費された最新のデータ位置とタイムスタンプを DTS サーバーに報告します。不意のシステムダウン後に SDK が再起動すると、DTS サーバーから自動的に消費時刻を取得し、この時点からデータ取得を再開することでデータの重複を避けることができます。

ClusterClient の開始

  1. import java.util.List;
  2. import com.aliyun.drc.clusterclient.ClusterClient;
  3. import com.aliyun.drc.clusterclient.ClusterListener;
  4. import com.aliyun.drc.clusterclient.DefaultClusterClient;
  5. import com.aliyun.drc.clusterclient.RegionContext;
  6. import com.aliyun.drc.clusterclient.message.ClusterMessage;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. public class MainClass
  10. {
  11. public static void main(String[] args) throws Exception {
  12. //Initialize RegionContext
  13. ...
  14. //Initialize ClusterClient
  15. ...
  16. //Initialize ClusterListener
  17. ...
  18. // Add listeners
  19. client.addConcurrentListener(listener);
  20. // Set requested subscription channel ID
  21. client.askForGUID("dts_rdsrjiei2u2afnb_DSF");
  22. // Start background thread. Note that here will not get blocked and the main thread cannot exit
  23. client.start();
  24. }

上記のコードで askForGUID インターフェイスは、クライアントからの要求により、サブスクリプションインスタンス ID を設定します。サブスクリプションチャネル ID は、DTS コンソールから取得されます。一度サブスクリプションインスタンス ID が設定されれば、SDK はこのサブスクリプションインスタンスから変更データを取得できます。

クライアントを始動する前に、リスナーをクライアントに追加する必要があります。クライアントがサブスクリプションインスタンスから変更データを取得すると、リスナーの notify メソッドを同期的に呼び出すことで、データ消費を開始します。