このページでは、Spark が HBase にデータを書き込む方法について説明します。 注記 :
重要 コンピューティングクラスターは、HBase クラスターと同じセキュリティグループに属している必要があります。 別のグループの場合、ネットワークに接続できません。 E-MapReduce
でクラスターを作成するときは、必ず HBase クラスターが配置されているセキュリティグループを選択します。
Spark への HBase アクセス許可
以下のコードを使用します。
object ConnectionUtil extends Serializable {
private val conf = HBaseConfiguration.create()
conf.set(HConstants.ZOOKEEPER_QUORUM,"ecs1,ecs1,ecs3")
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase")
private val connection = ConnectionFactory.createConnection(conf)
def getDefaultConn: Connection = connection
}
//Create data streaming unionStreams
unionStreams.foreachRDD(rdd => {
rdd.map(bytes => new String(bytes))
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
.mapPartitions {words => {
val conn = ConnectionUtil.getDefaultConn
val tableName = TableName.valueOf(tname)
val t = conn.getTable(tableName)
try {
words.sliding(100, 100).foreach(slice => {
val puts = slice.map(word => {
println(s"word: $word")
val put = new Put(Bytes.toBytes(word. _1 + System.currentTimeMillis()))
put.addColumn(COLUMN_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES,
System.currentTimeMillis(), Bytes.toBytes(word._2))
put
}).toList
t.put(puts)
})
} finally {
t.close()
}
Iterator.empty
}}.count()
})
ssc.start()
ssc.awaitTermination()
付録
完全なサンプルコードについては、以下をご参照ください。