Site Overlay

Spark Streaming to consume data from Kafka – Spark 2.0 #Day5 #100daysofCode

Hello world,

Today worked on consuming the data from kafka using spark streaming as a consumer.

used direct stream approach for this.

Used Spark 2.2.0 for this development. Previous code for kafka producer also upgraded to 2.2.0 using new dependency jar.

Tomorrow have to work with writing the data to Hbase from spark streaming.

 


Today’s Code


import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

object SparkStreaming_Consumer {
def main(args: Array[String]) = {

val ssc = new StreamingContext(“local[*]”, “StreamingKafkaConsumer”, Seconds(2))

val kafkaParams = Map[String, Object](
“bootstrap.servers” -> “localhost:9092”,
“key.deserializer” -> classOf[StringDeserializer],
“value.deserializer” -> classOf[StringDeserializer],
“group.id” -> “group1”,
“auto.offset.reset” -> “latest”
)

val topics = Array( “tamilboomi”)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)

//stream.map(record => (record.key, record.value))
stream.foreachRDD { rdd =>
rdd.foreach{ data =>
println(data.value())
}
}
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate

}
}


Pom.xml


<project xmlns=”http://maven.apache.org/POM/4.0.0″ xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance” xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd”>
<modelVersion>4.0.0</modelVersion>
<groupId>SparkStreamingKafkaConsumer</groupId>
<artifactId>SparkStreamingKafkaConsumer</artifactId>
<version>0.0.1-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>

</dependencies>

<build>
<plugins>
<!– mixed scala/java compile –>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>sparkstreaming</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<!– for fatjar –>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>fully.qualified.MainClass</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!–This plugin’s configuration is used to store Eclipse m2e settings
only. It has no influence on the Maven build itself. –>
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.scala-tools</groupId>
<artifactId> maven-scala-plugin </artifactId>
<versionRange> [2.15.2,) </versionRange>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</pluginExecutionFilter>
<action>
<execute />
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>


Good day!

 

Leave a Reply

Your email address will not be published. Required fields are marked *