1
\$\begingroup\$

I have ~50 files, each file contains records that look like the following:

{
    "urls":[
        "domain.com/id/emp-name-123abc456",
        "domain.com/id/emp/name/456/abc/123"
    ], 
    "name": "George Lucas"
}

The records from file to file do differ in what fields they contain, but all contain the URL records. The total size of all files is ~2TB.

What I am trying to do is to read each file in and output a bunch of files (will end up with ~200m files) to an XFS mount. I have working code, but the problem is that it is slow (meaning it will take at least 2 weeks for the code to run). I'm just trying to see if there's a faster way to do this.

Here's the code:

Main method, puts input files on the queue and starts Producer/Consumer threads.

public class GroupBy {

    public static void main(String[] args) throws IOException {
        String directory = args[0];
        BlockingQueue<String> fileQueue = new ArrayBlockingQueue<>(10000000);
        BlockingQueue<DedupeInstruction> writeQueue = new ArrayBlockingQueue<>(10000000);

        Path path = Paths.get(directory);
        walkFileTree(path, new SimpleFileVisitor<Path>() {
            @Override
            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
                    throws IOException
            {
                try {
                    if (file.toString().contains("json")) {
                        fileQueue.put(file.toString());
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return FileVisitResult.CONTINUE;
            }
            @Override
            public FileVisitResult postVisitDirectory(Path dir, IOException e)
                    throws IOException
            {
                if (e == null) {
                    return FileVisitResult.CONTINUE;
                }
                else {
                    throw e;
                }
            }
        });

        for (int i = 0; i<=10; i++) {
            new Thread(new GroupByReadFile(fileQueue, writeQueue)).start();
            new Thread(new GroupByWriteFile(writeQueue)).start();
        }


    }
}

Producer thread: Deserializes JSON, loops through URLs and identifies the output filename.

public class GroupByReadFile implements Runnable {
    private final BlockingQueue<String> fileQueue;
    private final BlockingQueue<DedupeInstruction> writeQueue;
    private static Pattern p0 = Pattern.compile("domain.com/+id/+(.+?-.+?-.{5,10})(?:/+|$)");
    private static Pattern p1 = Pattern.compile("domain.com/+id2/+(.+?/+.+?/+.+)(?:/+|$)");

    public GroupByReadFile(BlockingQueue<String> fileQueue, BlockingQueue<DedupeInstruction> writeQueue) {
        this.fileQueue = fileQueue;
        this.writeQueue = writeQueue;
    }

    @Override
    public void run() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        while (true) {
            try {
                String file = fileQueue.take();
                try (BufferedReader br = new BufferedReader(new FileReader(file))) {
                    String line;
                    int count = 0;
                    while ((line = br.readLine()) != null) {
                        System.out.println(file + ";" + count);
                        count++;
                        Profile p = mapper.readValue(line, Profile.class);
                        if (p.getUrls() != null && !p.getUrls().isEmpty()) {
                            String idUrl = null;
                            String id2Url = null;
                            for (String u : p.getUrls()) {
                                Matcher m0 = p0.matcher(u);
                                Matcher m1 = p1.matcher(u);
                                if (m0.find()) {
                                    idUrl = m0.group(1);
                                }
                                else if (m1.find()) {
                                    id2Url = ProfileUtils.Id1ToId2(m1.group(1));

                                }
                            }
                            if (idUrl != null) {
                                writeQueue.put(new DedupeInstruction("/output/".concat(idUrl), line));
                            } else if (id2Url != null) {
                                writeQueue.put(new DedupeInstruction("/output/".concat(id2Url), line));
                            } else {
                                System.err.println(line);
                            }
                        }
                    }
                } catch (FileNotFoundException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

Thread responsible for writing output files:

public class GroupByWriteFile implements Runnable {
    private final BlockingQueue<DedupeInstruction> writeQueue;

    public GroupByWriteFile(BlockingQueue<DedupeInstruction> writeQueue) {
        this.writeQueue = writeQueue;
    }


    @Override
    public void run() {
        while (true) {
            try {
                DedupeInstruction d = writeQueue.take();
                FileOp.appendContents(d.getOutFile(), d.getLine());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Does the actual writing

public class FileOp {


    public static void appendContents(String sFile, String sContent) throws IOException {
        if (sFile.length() > 255) {
            sFile = StringUtils.left(sFile, 255);
        }
        File file = new File(sFile);
        java.nio.file.Files.write(
                Paths.get(file.toURI()),
                sContent.concat(System.lineSeparator()).getBytes("utf-8"),
                StandardOpenOption.CREATE,
                StandardOpenOption.APPEND
        );
    }
}

DedupeInstruction class

public class DedupeInstruction {
    private String outFile;
    private String line;

    public DedupeInstruction(String outFile, String line) {
        this.outFile = outFile;
        this.line = line;
    }

    public String getOutFile() {
        return outFile;
    }

    public void setOutFile(String outFile) {
        this.outFile = outFile;
    }

    public String getLine() {
        return line;
    }

    public void setLine(String line) {
        this.line = line;
    }

}

I'm running this on an m4.4xlarge AWS EC2 instance with 2 10TB SSD drives (one contains input files, the other is mounted as /output and contains the output files). The /output drive is XFS.

I noticed that this runs pretty fast to begin with, but seems to slow down after a few days. The first thing I was thinking was that potentially it slows down due to too many files in a single directory.

Just trying to figure out here if it's something with my code here that is the bottleneck, or if there's a better way to do this.

\$\endgroup\$

1 Answer 1

1
\$\begingroup\$

I am not a performance expert, but here what I think that might help:

  1. Try to avoid the creation of new DedupeInstruction on every iteration of the loop, since the creation of big amount of object instances is time consuming operation. Instead you could add idUrl/id2Url and line in some collection and push them to the writing queue in bulk i.e. on every 50 elements push to the queue. Try to avoid also the concatenation that you've use there, the output directory is one and the same every time, just open the File with that directory.
  2. I am not sure how many URLs could contain a single JSON entry, but if they are a lot then you could also optimize foreach loop that is used for its iterations. Just break the loop when idUrl and id2Url had found. Also you could avoid the creation every time of new matchers. You should check whether idUrl/id2Url is null and only if they are to create a new matcher and execute find operation.
  3. Do you really need this system out just below the while loop? System out might also add value to the overall performance issue that you have, just delete it. In such big amount of data i doubt that it is usefull
  4. I am not really sure how Files.write method works but, does it automatically close the stream and the file after the operation has end? If not, you need also to close them

P.S. I am not creating a review of the code, since you are asking for a performance suggestions and advices

\$\endgroup\$

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.