このページでは、Spark が HBase にデータを書き込む方法について説明します。 注記 :

重要 コンピューティングクラスターは、HBase クラスターと同じセキュリティグループに属している必要があります。 別のグループの場合、ネットワークに接続できません。 E-MapReduce でクラスターを作成するときは、必ず HBase クラスターが配置されているセキュリティグループを選択します。

Spark への HBase アクセス許可

以下のコードを使用します。
object ConnectionUtil extends Serializable {
      private val conf = HBaseConfiguration.create()
      conf.set(HConstants.ZOOKEEPER_QUORUM,"ecs1,ecs1,ecs3")
      conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase")
      private val connection = ConnectionFactory.createConnection(conf)
      def getDefaultConn: Connection = connection
    }
    //Create data streaming unionStreams
    unionStreams.foreachRDD(rdd => {
      rdd.map(bytes => new String(bytes))
        .flatMap(line => line.split(" "))
        .map(word => (word, 1))
        .reduceByKey(_ + _)
        .mapPartitions {words => {
          val conn = ConnectionUtil.getDefaultConn
          val tableName = TableName.valueOf(tname)
          val t = conn.getTable(tableName)
          try {
            words.sliding(100, 100).foreach(slice => {
              val puts = slice.map(word => {
                println(s"word: $word")
                val put = new Put(Bytes.toBytes(word. _1 + System.currentTimeMillis()))
                put.addColumn(COLUMN_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES,
                  System.currentTimeMillis(), Bytes.toBytes(word._2))
                put
              }).toList
              t.put(puts)
            })
          } finally {
            t.close()
          }
          Iterator.empty
        }}.count()
     })
    ssc.start()
    ssc.awaitTermination()

付録

完全なサンプルコードについては、以下をご参照ください。