I have ~50 files, each file contains records that look like the following:
{
"urls":[
"domain.com/id/emp-name-123abc456",
"domain.com/id/emp/name/456/abc/123"
],
"name": "George Lucas"
}
The records from file to file do differ in what fields they contain, but all contain the URL records. The total size of all files is ~2TB.
What I am trying to do is to read each file in and output a bunch of files (will end up with ~200m files) to an XFS mount. I have working code, but the problem is that it is slow (meaning it will take at least 2 weeks for the code to run). I'm just trying to see if there's a faster way to do this.
Here's the code:
Main method, puts input files on the queue and starts Producer/Consumer threads.
public class GroupBy {
public static void main(String[] args) throws IOException {
String directory = args[0];
BlockingQueue<String> fileQueue = new ArrayBlockingQueue<>(10000000);
BlockingQueue<DedupeInstruction> writeQueue = new ArrayBlockingQueue<>(10000000);
Path path = Paths.get(directory);
walkFileTree(path, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
throws IOException
{
try {
if (file.toString().contains("json")) {
fileQueue.put(file.toString());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException e)
throws IOException
{
if (e == null) {
return FileVisitResult.CONTINUE;
}
else {
throw e;
}
}
});
for (int i = 0; i<=10; i++) {
new Thread(new GroupByReadFile(fileQueue, writeQueue)).start();
new Thread(new GroupByWriteFile(writeQueue)).start();
}
}
}
Producer thread: Deserializes JSON, loops through URLs and identifies the output filename.
public class GroupByReadFile implements Runnable {
private final BlockingQueue<String> fileQueue;
private final BlockingQueue<DedupeInstruction> writeQueue;
private static Pattern p0 = Pattern.compile("domain.com/+id/+(.+?-.+?-.{5,10})(?:/+|$)");
private static Pattern p1 = Pattern.compile("domain.com/+id2/+(.+?/+.+?/+.+)(?:/+|$)");
public GroupByReadFile(BlockingQueue<String> fileQueue, BlockingQueue<DedupeInstruction> writeQueue) {
this.fileQueue = fileQueue;
this.writeQueue = writeQueue;
}
@Override
public void run() {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
while (true) {
try {
String file = fileQueue.take();
try (BufferedReader br = new BufferedReader(new FileReader(file))) {
String line;
int count = 0;
while ((line = br.readLine()) != null) {
System.out.println(file + ";" + count);
count++;
Profile p = mapper.readValue(line, Profile.class);
if (p.getUrls() != null && !p.getUrls().isEmpty()) {
String idUrl = null;
String id2Url = null;
for (String u : p.getUrls()) {
Matcher m0 = p0.matcher(u);
Matcher m1 = p1.matcher(u);
if (m0.find()) {
idUrl = m0.group(1);
}
else if (m1.find()) {
id2Url = ProfileUtils.Id1ToId2(m1.group(1));
}
}
if (idUrl != null) {
writeQueue.put(new DedupeInstruction("/output/".concat(idUrl), line));
} else if (id2Url != null) {
writeQueue.put(new DedupeInstruction("/output/".concat(id2Url), line));
} else {
System.err.println(line);
}
}
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Thread responsible for writing output files:
public class GroupByWriteFile implements Runnable {
private final BlockingQueue<DedupeInstruction> writeQueue;
public GroupByWriteFile(BlockingQueue<DedupeInstruction> writeQueue) {
this.writeQueue = writeQueue;
}
@Override
public void run() {
while (true) {
try {
DedupeInstruction d = writeQueue.take();
FileOp.appendContents(d.getOutFile(), d.getLine());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
Does the actual writing
public class FileOp {
public static void appendContents(String sFile, String sContent) throws IOException {
if (sFile.length() > 255) {
sFile = StringUtils.left(sFile, 255);
}
File file = new File(sFile);
java.nio.file.Files.write(
Paths.get(file.toURI()),
sContent.concat(System.lineSeparator()).getBytes("utf-8"),
StandardOpenOption.CREATE,
StandardOpenOption.APPEND
);
}
}
DedupeInstruction class
public class DedupeInstruction {
private String outFile;
private String line;
public DedupeInstruction(String outFile, String line) {
this.outFile = outFile;
this.line = line;
}
public String getOutFile() {
return outFile;
}
public void setOutFile(String outFile) {
this.outFile = outFile;
}
public String getLine() {
return line;
}
public void setLine(String line) {
this.line = line;
}
}
I'm running this on an m4.4xlarge AWS EC2 instance with 2 10TB SSD drives (one contains input files, the other is mounted as /output and contains the output files). The /output drive is XFS.
I noticed that this runs pretty fast to begin with, but seems to slow down after a few days. The first thing I was thinking was that potentially it slows down due to too many files in a single directory.
Just trying to figure out here if it's something with my code here that is the bottleneck, or if there's a better way to do this.