このページでは、Spark Streaming を使用して MNS 内のデータを消費し、各バッチの単語数を計算する方法について説明します。
Spark への MNS アクセス許可
サンプルコードは以下のとおりです。
val conf = new SparkConf().setAppName("Test MNS Streaming")
val batchInterval = Seconds(10)
val ssc = new StreamingContext(conf, batchInterval)
val queuename = "queuename"
val accessKeyId = "<accessKeyId>"
val accessKeySecret = "<accessKeySecret>"
val endpoint = "http://xxx.yyy.zzzz/abc"
val mnsStream = MnsUtils.createPullingStreamAsRawBytes(ssc, queuename, accessKeyId, accessKeySecret, endpoint,
StorageLevel.MEMORY_ONLY)
mnsStream.foreachRDD( rdd => {
rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
})
ssc.start()
ssc.awaitTermination()
Spark Streaming を MetaService と併用
上記の例では、AccessKey を明示的に API に渡しています。 E-MapReduce SDK 1.3.2 以降、Spark Streaming は AccessKey
なしで MetaService を使用して MNS データを処理できます。 詳細は、E-MapReduce SDK の MnsUtils クラスの説明をご参照ください。
MnsUtils.createPullingStreamAsBytes(ssc, queueName, endpoint, storageLevel)
MnsUtils.createPullingStreamAsRawBytes(ssc, queueName, endpoint, storageLevel)
付録
完全なサンプルコードについては、以下をご参照ください。Spark への MNS アクセス許可