Day4 Elastic Search: simple Kafka Producer
Yelp Business Data
Next, we want to output the Yelp data to Elastic Search!
Now we have the JSON file.
You can download the data from:
The JSON file has 192,609 business.
Why Kafka
Today I will try to setup Kafka producer.
And Why???
Actually, there are several ways to pass the JSON data to our ElasticSearch.
- type manually
- Using JSON server (it is very simple to use it via JS)
- using Kafka to simulate the streaming data
We choose the approach3
There are so many ways to launch Kafka. I choose docker yml
Launch Kafka
First, we have to install Kafka if you don’t have Kafka before.
Install Docker Compose
Please make sure you already install Docker.
If not, please go to https://docs.docker.com/compose/install/ to install that
Set up docker-compose to create zookeeper & kafka
vim docker-compose.yml
and setup the zookeeper and kafka
version: '2.1'
services:
zoo1:
image: zookeeper:3.4.9
hostname: zoo1
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zoo1:2888:3888
volumes:
- ./zk-single-kafka-single/zoo1/data:/data
- ./zk-single-kafka-single/zoo1/datalog:/datalog
kafka1:
image: confluentinc/cp-kafka:5.1.0
hostname: kafka1
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
depends_on:
- zoo1
Check Kafka and Zookeeper are available
type docker ps
, you will find them in process.
Create a Kafka Producer in Java
Actually, you can also create Kafka producer in Scala.
Here I just take a example for Java because I am more familiar with Java :)
Import Dependency
compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.1.0'
And our build.gradle in Java project will be the following:
Create IKafkaConstants file to set the environment parameters
Create ProducerCreator to set the properties for Kafka Producer
simple ProducerTest test
@Test
public void runSimpleProducerTest() {
Producer<Long, String> producer = ProducerCreator.createProducer();
for (int index = 0; index < IKafkaConstants.MESSAGE_COUNT; index++) {
ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(IKafkaConstants.TOPIC_NAME,
"This is record " + index);
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("Record sent with key " + index + " to partition " + metadata.partition()
+ " with offset " + metadata.offset());
}
catch (ExecutionException | InterruptedException e) {
System.out.println("Error in sending record");
System.out.println(e);
}
}
}
When we run the test, we can see the result!
runJSONProducerTest()
Read the JSON data and use stream to produce the result
public void runJSONProducerTest() throws Exception {
Producer<Long, String> producer = ProducerCreator.createProducer();
String filePath = "business.json";
try (Stream<String> stream = Files.lines(Paths.get(filePath))) {
final AtomicInteger count = new AtomicInteger();
stream.forEach(line -> {
try {
doThing(line, producer);
System.out.println(count.incrementAndGet());
} catch(Exception e) {
throw new RuntimeException(e);
}
});
}
}
doThing()
private void doThing(String line, Producer<Long, String> producer) throws InterruptedException {
ProducerRecord<Long, String> record = new ProducerRecord<Long, String>
("yelpBusiness", line);
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println(String.format("Record sent with value %s to %s", record.value(), metadata.offset()));
}
catch (ExecutionException | InterruptedException e) {
System.out.println("Error in sending record");
System.out.println(e);
}
}
What is the next?
Next, we will get the data with Spark to do some transformation and publish to ElasticSearch!