Day4 Elastic Search: simple Kafka Producer

Arthur Lee
3 min readJan 22, 2019

--

Yelp Business Data

Next, we want to output the Yelp data to Elastic Search!

Now we have the JSON file.

You can download the data from:

https://www.yelp.com/dataset

The JSON file has 192,609 business.

Why Kafka

Today I will try to setup Kafka producer.

And Why???

Actually, there are several ways to pass the JSON data to our ElasticSearch.

  1. type manually
  2. Using JSON server (it is very simple to use it via JS)
  3. using Kafka to simulate the streaming data

We choose the approach3

There are so many ways to launch Kafka. I choose docker yml

Launch Kafka

First, we have to install Kafka if you don’t have Kafka before.

Install Docker Compose

Please make sure you already install Docker.

If not, please go to https://docs.docker.com/compose/install/ to install that

Set up docker-compose to create zookeeper & kafka

vim docker-compose.yml and setup the zookeeper and kafka

version: '2.1'

services:
zoo1:
image:
zookeeper:3.4.9
hostname: zoo1
ports:
- "2181:2181"
environment:
ZOO_MY_ID:
1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zoo1:2888:3888
volumes:
- ./zk-single-kafka-single/zoo1/data:/data
- ./zk-single-kafka-single/zoo1/datalog:/datalog

kafka1:
image:
confluentinc/cp-kafka:5.1.0
hostname: kafka1
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS:
LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
depends_on:
- zoo1

Check Kafka and Zookeeper are available

type docker ps , you will find them in process.

Create a Kafka Producer in Java

Actually, you can also create Kafka producer in Scala.

Here I just take a example for Java because I am more familiar with Java :)

Import Dependency

compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.1.0'

And our build.gradle in Java project will be the following:

Create IKafkaConstants file to set the environment parameters

Create ProducerCreator to set the properties for Kafka Producer

simple ProducerTest test

@Test
public void runSimpleProducerTest() {
Producer<Long, String> producer = ProducerCreator.createProducer();
for (int index = 0; index < IKafkaConstants.MESSAGE_COUNT; index++) {
ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(IKafkaConstants.TOPIC_NAME,
"This is record " + index);
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("Record sent with key " + index + " to partition " + metadata.partition()
+ " with offset " + metadata.offset());
}
catch (ExecutionException | InterruptedException e) {
System.out.println("Error in sending record");
System.out.println(e);
}
}
}

When we run the test, we can see the result!

runJSONProducerTest()

Read the JSON data and use stream to produce the result

public void runJSONProducerTest() throws Exception {
Producer<Long, String> producer = ProducerCreator.createProducer();
String filePath = "business.json";
try (Stream<String> stream = Files.lines(Paths.get(filePath))) {
final AtomicInteger count = new AtomicInteger();
stream.forEach(line -> {
try {
doThing(line, producer);
System.out.println(count.incrementAndGet());
} catch(Exception e) {
throw new RuntimeException(e);
}
});
}
}

doThing()

private void doThing(String line, Producer<Long, String> producer) throws InterruptedException {
ProducerRecord<Long, String> record = new ProducerRecord<Long, String>
("yelpBusiness", line);
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println(String.format("Record sent with value %s to %s", record.value(), metadata.offset()));
}
catch (ExecutionException | InterruptedException e) {
System.out.println("Error in sending record");
System.out.println(e);
}
}

What is the next?

Next, we will get the data with Spark to do some transformation and publish to ElasticSearch!

--

--

Arthur Lee
Arthur Lee

Written by Arthur Lee

An machine learning engineer in Bay Area in the United States

No responses yet