I would like some one to review my code and let me know the feedback.
Each Kafka Message is like as follows
[{guid=id.Value1, timestamp=1386394980000, booleanValue=null, longValue=null, floatValue=null, stringValue=value1, source=X, metadata=null}]
This is what I implemented:
ExcelReader
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.xssf.usermodel.XSSFSheet;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import java.io.IOException;
import java.io.InputStream;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ExcelReader{
public static List<KafkaMessage> readExcel(final InputStream inputStream) throws ParseException{
List<KafkaMessage> messages = new ArrayList<>();
XSSFWorkbook workbook;
try{
workbook = new XSSFWorkbook(inputStream);
XSSFSheet sheet = workbook.getSheetAt(0);
messages = readRows(sheet, messages);
return messages;
}catch(IOException e){
e.printStackTrace();
}
return messages;
}
private static List<KafkaMessage> readRows(XSSFSheet sheet, List<KafkaMessage> rows) throws ParseException{
for(int i = 3; i <= sheet.getLastRowNum() - 1; i++) {
Row row = sheet.getRow(i);
for(int j = 6; j <= row.getLastCellNum(); j++) {
if(j == 6) {
KafkaMessage kafkaMessage = new KafkaMessage();
String str1 = row.getCell(5).getStringCellValue();
kafkaMessage.setTimestamp(convertToTimeStamp(str1));
String str2 = row.getCell(j).getStringCellValue();
kafkaMessage.setStringValue(str2);
kafkaMessage.setSource(sources().get(j));
kafkaMessage.setGuid(conditionParametersIds().get(j));
rows.add(kafkaMessage);
}
if(j == 7) {
KafkaMessage kafkaMessage = new KafkaMessage();
String str1 = row.getCell(5).getStringCellValue();
kafkaMessage.setTimestamp(convertToTimeStamp(str1));
double str3 = row.getCell(j).getNumericCellValue();
kafkaMessage.setLongValue((long) str3);
kafkaMessage.setSource(sources().get(j));
kafkaMessage.setGuid(conditionParametersIds().get(j));
rows.add(kafkaMessage);
}
if(j == 8) {
KafkaMessage kafkaMessage = new KafkaMessage();
String str1 = row.getCell(5).getStringCellValue();
kafkaMessage.setTimestamp(convertToTimeStamp(str1));
boolean str4 = row.getCell(j).getBooleanCellValue();
kafkaMessage.setBooleanValue(str4);
kafkaMessage.setSource(sources().get(j));
kafkaMessage.setGuid(conditionParametersIds().get(j));
rows.add(kafkaMessage);
}
if(j == 9) {
KafkaMessage kafkaMessage = new KafkaMessage();
String str1 = row.getCell(5).getStringCellValue();
kafkaMessage.setTimestamp(convertToTimeStamp(str1));
double str5 = row.getCell(j).getNumericCellValue();
kafkaMessage.setFloatValue((float) str5);
kafkaMessage.setSource(sources().get(j));
kafkaMessage.setGuid(conditionParametersIds().get(j));
rows.add(kafkaMessage);
}
}
}
return rows;
}
private static Long convertToTimeStamp(String dateTime) throws ParseException{
return DateUtil.provideDateFormat().parse(dateTime).getTime();
}
private static Map<Integer, String> conditionParametersIds(){
Map<Integer, String> map = new HashMap<>();
MapHelper.repeatPut(map, new Integer[]{6, 7, 8, 9}, new String[]{"id.Value1", "id.Value2", "id.Value3", "id.Value4"});
return map;
}
private static Map<Integer, String> sources(){
Map<Integer, String> map = new HashMap<>();
MapHelper.repeatPut(map, new Integer[]{6, 7}, "X");
MapHelper.repeatPut(map, new Integer[]{8, 9}, "Y");
return map;
}
public enum MapHelper{
; // Utility class for working with maps
public static <K, V> void repeatPut(Map<? super K, ? super V> map, K[] keys, V value){
for(K key : keys) {
map.put(key, value);
}
}
public static <K, V> void repeatPut(Map<? super K, ? super V> map, K[] keys, V[] values){
for(int index = 0; index < Math.min(keys.length, values.length); index++) {
map.put(keys[index], values[index]);
}
}
}
}
SendKafkaMessage
import org.json.JSONArray;
import org.json.JSONObject;
import java.util.List;
public class SendKafkaMessage{
public void sendToProducer(List<KafkaMessage> messages){
Producer producer = new Producer();
messages.forEach(msg -> producer.start(getMessage(msg)));
}
public String getMessage(KafkaMessage message){
JSONObject obj= new JSONObject(message);
JSONArray jsonArray = new JSONArray();
return jsonArray.put(obj).toString();
}
}
KafkaProducer
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class Producer {
private Properties properties = new Properties();
String topicName = "topicname";
public void start(String value){
String bootstrapServer = "url";
String keySerializer = StringSerializer.class.getName();
String valueSerializer = StringSerializer.class.getName();
String producerId = "simpleProducer";
int retries = 2;
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, producerId);
properties.put(ProducerConfig.RETRIES_CONFIG, retries);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, "1",value);
kafkaProducer.send(producerRecord);
kafkaProducer.close();
}
}
Test Class
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.text.ParseException;
public class ExcelToJsonTest {
public static void main(String[] args) throws FileNotFoundException, JsonProcessingException, ParseException{
FileInputStream fis = new FileInputStream("FilePath");
if(fis == null)
System.out.println("fis null");
SendKafkaMessage sendKafkaMessage = new SendKafkaMessage();
sendKafkaMessage.sendToProducer(ExcelReader.readExcel(fis));
}
}
````