このドキュメントでは、DTS SDK を使用して基本操作を実行する方法について説明します。
RegionContext の初期化
RegionContext は、セキュリティ認証情報およびネットワークアクセスモードを、保存および設定するために使用します。次のコードは、RegionContext の初期化、セキュリティ認証資格情報の設定、およびネットワークアクセスモードの決定を行います。
import java.util.List;
import com.aliyun.drc.clusterclient.RegionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MainClass
{
public static void main(String[] args) throws Exception {
// Create RegionContext
RegionContext context = new RegionContext();
// Configure AccessKey and AccessKeySecret of the Alibaba Cloud account
context.setAccessKey("<AccessKey>");
context.setSecret("<AccessKeySecret>");
// Whether the server running the SDK uses public network IP address to connect DTS subscription channel
context.setUsePublicIp(true);
// The following is other calling code...
…………
}
}
SDK を使用するには、RegionContext を初期化し、セキュリティ認証を設定し、接続されたサブスクリプションインスタンスのその他の情報を完成させることが不可欠です。
上記のコードでは、
setAccessKey インターフェイスは、Alibaba Cloud アカウントの AccessKey を設定します。
setSecret インターフェイスは、Alibaba Cloud アカウントの AccessKeySecret を設定します。
AccessKey と AccessKeySecret は、ID ペアとして Alibaba Cloud によってユーザに割り当てられます。この ID ペアは Alibaba Cloud のユーザーセンターで作成できます。
setUsePublicIp は、SDK を実行するサーバーが、パブリック IP アドレスを使用してサブスクリプションインスタンスに接続するかどうかを決定します。この決定において、true はパブリック IP アドレスの使用を意味し、false は不使用を意味します。
ClusterClient の初期化
SDK は ClusterClient クラスを使用してサブスクリプションインスタンスに接続し、変更データを受け取ります。次のコードを使用して ClusterClient を作成します。
import java.util.List;
import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
public class MainClass
{
public static void main(String[] args) throws Exception {
// Create RegionContext
RegionContext context = new RegionContext();
context.setAccessKey("<AccessKey>");
context.setSecret("<AccessKeySecret>");
context.setUsePublicIp(true);
// Create subscription consumers
final ClusterClient client = new DefaultClusterClient(context);
// The following is other calling code...
……………
}
}
Listener の初期化
SDK は Listener クラスを使用して変更データを消費します。
ClusterClient の初期化後にリスナーを追加します。Listener は、サブスクリプションデータを受信し、消費する通知関数を定義します。次のコードは、最も基本的な消費ロジックを示すもので、サブスクリプションデータを画面に出力します。
import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
import com.aliyun.drc.clusterclient.message.ClusterMessage;
public class MainClass
{
public static void main(String[] args) throws Exception {
// Initialize a RegionContext object
...
//Initialize ClusterClient object
………
ClusterListener listener = new ClusterListener(){
@Override
public void notify(List<ClusterMessage> messages) throws Exception {
for (ClusterMessage message : messages) {
//Print the subscription change data
System.out.println(message.getRecord() + ":" + message.getRecord().getTablename() + ":"
+ message.getRecord().getOpt());
//After data consumption is completed, report ACK to DTS, and must call
message.ackAsConsumed();
}
}
}
}
DTS は SDK のデータ消費タイムスタンプを DTS サーバーに保存します。これにより、SDK のディザスタリカバリプロセスが簡素化されます。
上記のサンプルコードの askAsConsumed() インターフェイスは、SDK によって消費された最新のデータ位置とタイムスタンプを DTS サーバーに報告します。不意のシステムダウン後に SDK が再起動すると、DTS サーバーから自動的に消費時刻を取得し、この時点からデータ取得を再開することでデータの重複を避けることができます。
ClusterClient の開始
import java.util.List;
import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
import com.aliyun.drc.clusterclient.message.ClusterMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MainClass
{
public static void main(String[] args) throws Exception {
//Initialize RegionContext
...
//Initialize ClusterClient
...
//Initialize ClusterListener
...
// Add listeners
client.addConcurrentListener(listener);
// Set requested subscription channel ID
client.askForGUID("dts_rdsrjiei2u2afnb_DSF");
// Start background thread. Note that here will not get blocked and the main thread cannot exit
client.start();
}
上記のコードで askForGUID インターフェイスは、クライアントからの要求により、サブスクリプションインスタンス ID を設定します。サブスクリプションチャネル ID は、DTS コンソールから取得されます。一度サブスクリプションインスタンス ID が設定されれば、SDK はこのサブスクリプションインスタンスから変更データを取得できます。
クライアントを始動する前に、リスナーをクライアントに追加する必要があります。クライアントがサブスクリプションインスタンスから変更データを取得すると、リスナーの notify メソッドを同期的に呼び出すことで、データ消費を開始します。