All Products
Search
Document Center

E-MapReduce:Consume data in Simple Log Service

最終更新日:Aug 15, 2023

This topic describes how to use Spark Streaming to consume log data in Simple Log Service and calculate the number of log entries.

Use Spark to access Simple Log Service

  • Method 1: Receiver-based DStream

    val logServiceProject = args(0)    // The name of a project in Simple Log Service. 
        val logStoreName = args(1)     // The name of a Logstore in Simple Log Service. 
        val loghubConsumerGroupName = args(2)  // Jobs that have the same consumer group name jointly consume the data in the Logstore. 
        val loghubEndpoint = args(3)  // The data class API endpoint of Simple Log Service. 
        val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")     // The AccessKey ID that is used to access Simple Log Service. 
        val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET") // The AccessKey secret that is used to access Simple Log Service. 
        val numReceivers = args(4).toInt  // The number of receivers to be started to read data from the Logstore. 
        val batchInterval = Milliseconds(args(5).toInt * 1000) // The data processing interval of Spark Streaming. 
        val conf = new SparkConf().setAppName("Test Loghub Streaming")
        val ssc = new StreamingContext(conf, batchInterval)
        val loghubStream = LoghubUtils.createStream(
          ssc,
          logServiceProject,
          logStoreName,
          loghubConsumerGroupName,
          loghubEndpoint,
          numReceivers,
          accessKeyId,
          accessKeySecret,
          StorageLevel.MEMORY_AND_DISK)
        loghubStream.foreachRDD(rdd => println(rdd.count()))
        ssc.start()
        ssc.awaitTermination()
    Note

    You must configure environment variables before you can run the sample code. For more information about how to configure environment variables, see the Configure environment variables section in this topic.

  • Method 2: Direct API-based DStream

    val logServiceProject = args(0)
        val logStoreName = args(1)
        val loghubConsumerGroupName = args(2)
        val loghubEndpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
        val zkConnect = args(7)
        val checkpointPath = args(8)
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Direct Loghub Streaming")
          val ssc = new StreamingContext(conf, batchInterval)
          val zkParas = Map("zookeeper.connect" -> zkConnect, "enable.auto.commit" -> "false")
          val loghubStream = LoghubUtils.createDirectStream(
            ssc,
            logServiceProject,
            logStoreName,
            loghubConsumerGroupName,
            accessKeyId,
            accessKeySecret,
            loghubEndpoint,
            zkParas,
            LogHubCursorPosition.END_CURSOR)
          ssc.checkpoint(checkpointPath)
          val stream = loghubStream.checkpoint(batchInterval)
          stream.foreachRDD(rdd => {
            println(rdd.count())
            loghubStream.asInstanceOf[CanCommitOffsets].commitAsync()
          })
          ssc
        }
        val ssc = StreamingContext.getOrCreate(checkpointPath, functionToCreateContext _)
        ssc.start()
        ssc.awaitTermination()

    In E-MapReduce (EMR) SDK V1.4.0 and later, Spark Streaming can use Direct API-based DStreams to process data. If this method is used, data in LogHub is not repeatedly stored as write-ahead logging (WAL) files. This method allows you to write data at least once without the need to enable the WAL feature of Spark Streaming. This method is still in the experimental stage. When you use this method, take note of the following points:

    • When you perform a DStream action, you must make a commit.

    • In a Spark Streaming job, you can perform only one action on a Logstore.

    • This method requires the support of the ZooKeeper service.

Use MetaService for access

In the preceding sample code, an AccessKey pair is explicitly passed to the interface. In EMR SDK V1.3.2 and later, Spark Streaming can use MetaService to process data in Simple Log Service. An AccessKey pair is not required. For more information, see the description of the LoghubUtils class in EMR SDK.

LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, storageLevel)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, numReceivers, storageLevel)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, storageLevel, cursorPosition, mLoghubCursorStartTime, forceSpecial)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, numReceivers, storageLevel, cursorPosition, mLoghubCursorStartTime, forceSpecial)
Note
  • EMR SDK supports three consumption modes for Simple Log Service, which are BEGIN_CURSOR, END_CURSOR, and SPECIAL_TIMER_CURSOR. By default, END_CURSOR is used.

    • BEGIN_CURSOR: consumes data from the log header. If a checkpoint record exists, the consumption starts from the checkpoint.

    • END_CURSOR: consumes data from the end of the log. If a checkpoint record exists, the consumption starts from the checkpoint.

    • SPECIAL_TIMER_CURSOR: consumes data from a specified point in time. Unit: seconds. If a checkpoint record exists, the consumption starts from the checkpoint.

    All of the consumption modes are affected by checkpoint records. If a checkpoint record exists, the consumption always starts from the checkpoint. The SPECIAL_TIMER_CURSOR mode allows you to forcefully start data consumption from a specified point in time. To use this mode, you must configure the following parameters based on your business requirements in the createStream method of the LoghubUtils class:

    • cursorPosition: Set this parameter to LogHubCursorPosition.SPECIAL_TIMER_CURSOR.

    • forceSpecial: Set this parameter to true.

  • All the nodes of an EMR cluster, except for the master node, cannot be connected to the Internet. Therefore, you must configure an internal endpoint of Simple Log Service. Otherwise, you cannot request data from Simple Log Service.

Appendix

For the complete sample code, visit GitHub.