3

I'm trying to use the DynamoDB Parallel Scan Example:

http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/LowLevelJavaScanning.html

I have 200,000 items, and I've taken the sequential code scan, and modified it slightly for my usage:

Map<String, AttributeValue> lastKeyEvaluated = null;
do
{
    ScanRequest scanRequest = new ScanRequest()
    .withTableName(tableName)
    .withExclusiveStartKey(lastKeyEvaluated);

    ScanResult result = client.scan(scanRequest);


    double counter = 0;
    for(Map<String, AttributeValue> item : result.getItems())
    {
        itemSerialize.add("Set:"+counter);
        for (Map.Entry<String, AttributeValue> getItem : item.entrySet()) 
        {
            String attributeName = getItem.getKey();
            AttributeValue value = getItem.getValue();

            itemSerialize.add(attributeName
                    + (value.getS() == null ? "" : ":" + value.getS())
                    + (value.getN() == null ? "" : ":" + value.getN())
                    + (value.getB() == null ? "" : ":" + value.getB())
                    + (value.getSS() == null ? "" : ":" + value.getSS())
                    + (value.getNS() == null ? "" : ":" + value.getNS())
                    + (value.getBS() == null ? "" : ":" + value.getBS()));
        }
        counter += 1;
    }

    lastKeyEvaluated = result.getLastEvaluatedKey();
}
while(lastKeyEvaluated != null);

The counter gives exactly 200,000 when this code has finished, however, I also wanted to try the parallel scan.

Function Call:

ScanSegmentTask task = null;
ArrayList<String> list = new ArrayList<String>();
try
{
    ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
    int totalSegments = numberOfThreads;

    for (int segment = 0; segment < totalSegments; segment++) 
    {
        // Runnable task that will only scan one segment
        task = new ScanSegmentTask(tableName, itemLimit, totalSegments, segment, list);

        // Execute the task
        executor.execute(task);
    }
    shutDownExecutorService(executor);
}
.......Catches something if error
return list;

Class:

I have a static list that the data is shared with all the threads. I was able to retrieve the lists, and output the amount of data.

// Runnable task for scanning a single segment of a DynamoDB table
private static class ScanSegmentTask implements Runnable 
{

    // DynamoDB table to scan
    private String tableName;

    // number of items each scan request should return
    private int itemLimit;

    // Total number of segments
    // Equals to total number of threads scanning the table in parallel
    private int totalSegments;

    // Segment that will be scanned with by this task
    private int segment;

    static ArrayList<String> list_2;

    Object lock = new Object();

    public ScanSegmentTask(String tableName, int itemLimit, int totalSegments, int segment, ArrayList<String> list) 
    {
        this.tableName = tableName;
        this.itemLimit = itemLimit;
        this.totalSegments = totalSegments;
        this.segment = segment;
        list_2 = list;
    }

    public void run() 
    {
        System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments + " segments " + itemLimit + " items at a time...");
        Map<String, AttributeValue> exclusiveStartKey = null;
        int totalScannedItemCount = 0;
        int totalScanRequestCount = 0;
        int counter = 0;

        try 
        {
            while(true) 
            {
                ScanRequest scanRequest = new ScanRequest()
                    .withTableName(tableName)
                    .withLimit(itemLimit)
                    .withExclusiveStartKey(exclusiveStartKey)
                    .withTotalSegments(totalSegments)
                    .withSegment(segment);

                ScanResult result = client.scan(scanRequest);

                totalScanRequestCount++;
                totalScannedItemCount += result.getScannedCount();

                synchronized(lock)
                {
                    for(Map<String, AttributeValue> item : result.getItems())
                    {
                        list_2.add("Set:"+counter);
                        for (Map.Entry<String, AttributeValue> getItem : item.entrySet()) 
                        {
                            String attributeName = getItem.getKey();
                            AttributeValue value = getItem.getValue();

                            list_2.add(attributeName
                                    + (value.getS() == null ? "" : ":" + value.getS())
                                    + (value.getN() == null ? "" : ":" + value.getN())
                                    + (value.getB() == null ? "" : ":" + value.getB())
                                    + (value.getSS() == null ? "" : ":" + value.getSS())
                                    + (value.getNS() == null ? "" : ":" + value.getNS())
                                    + (value.getBS() == null ? "" : ":" + value.getBS()));
                        }
                        counter += 1;
                    }
                }

                exclusiveStartKey = result.getLastEvaluatedKey();
                if (exclusiveStartKey == null) 
                {
                    break;
                }
            }
        } 
        catch (AmazonServiceException ase) 
        {
            System.err.println(ase.getMessage());
        } 
        finally 
        {
            System.out.println("Scanned " + totalScannedItemCount + " items from segment " + segment + " out of " + totalSegments + " of " + tableName + " with " + totalScanRequestCount + " scan requests");
        }
    }
}

Executor Service Shut Down:

public static void shutDownExecutorService(ExecutorService executor) 
{
    executor.shutdown();
    try 
    {
        if (!executor.awaitTermination(10, TimeUnit.SECONDS)) 
        {
            executor.shutdownNow();
        }
    } 
    catch (InterruptedException e) 
    {
        executor.shutdownNow();
        Thread.currentThread().interrupt();
    }
}

However, the amount of items changes every time I run this piece of code (Varies around 60000 in total, 6000 per threads, with 10 created threads). Removing synchronization does not change the result too.

Is there a bug with the synchronization or with the Amazon AWS API?

Thanks All

EDIT:

The new function call:

ScanSegmentTask task = null;
ArrayList<String> list = new ArrayList<String>();

try
{
    ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
    int totalSegments = numberOfThreads;

    for (int segment = 0; segment < totalSegments; segment++) 
    {
        // Runnable task that will only scan one segment
        task = new ScanSegmentTask(tableName, itemLimit, totalSegments, segment);

        // Execute the task
        Future<ArrayList<String>> future = executor.submit(task);

        list.addAll(future.get());
    }
    shutDownExecutorService(executor);
}

The new class:

// Runnable task for scanning a single segment of a DynamoDB table
private static class ScanSegmentTask implements Callable<ArrayList<String>>
{

    // DynamoDB table to scan
    private String tableName;

    // number of items each scan request should return
    private int itemLimit;

    // Total number of segments
    // Equals to total number of threads scanning the table in parallel
    private int totalSegments;

    // Segment that will be scanned with by this task
    private int segment;

    ArrayList<String> list_2 = new ArrayList<String>();

    static int counter = 0;

    public ScanSegmentTask(String tableName, int itemLimit, int totalSegments, int segment)
    {
        this.tableName = tableName;
        this.itemLimit = itemLimit;
        this.totalSegments = totalSegments;
        this.segment = segment;
    }

    @SuppressWarnings("finally")
    public ArrayList<String> call() 
    {
        System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments + " segments " + itemLimit + " items at a time...");
        Map<String, AttributeValue> exclusiveStartKey = null;

        try 
        {
            while(true) 
            {
                ScanRequest scanRequest = new ScanRequest()
                    .withTableName(tableName)
                    .withLimit(itemLimit)
                    .withExclusiveStartKey(exclusiveStartKey)
                    .withTotalSegments(totalSegments)
                    .withSegment(segment);

                ScanResult result = client.scan(scanRequest);

                for(Map<String, AttributeValue> item : result.getItems())
                {
                    list_2.add("Set:"+counter);
                    for (Map.Entry<String, AttributeValue> getItem : item.entrySet()) 
                    {
                        String attributeName = getItem.getKey();
                        AttributeValue value = getItem.getValue();

                        list_2.add(attributeName
                                + (value.getS() == null ? "" : ":" + value.getS())
                                + (value.getN() == null ? "" : ":" + value.getN())
                                + (value.getB() == null ? "" : ":" + value.getB())
                                + (value.getSS() == null ? "" : ":" + value.getSS())
                                + (value.getNS() == null ? "" : ":" + value.getNS())
                                + (value.getBS() == null ? "" : ":" + value.getBS()));
                    }
                    counter += 1;
                }

                exclusiveStartKey = result.getLastEvaluatedKey();
                if (exclusiveStartKey == null) 
                {
                    break;
                }
            }
        } 
        catch (AmazonServiceException ase) 
        {
            System.err.println(ase.getMessage());
        } 
        finally 
        {
            return list_2;
        }
    }
}

Final EDIT:

Function Call:

ScanSegmentTask task = null;
ArrayList<String> list = new ArrayList<String>();
ArrayList<Future<ArrayList<String>>> holdFuture = new ArrayList<Future<ArrayList<String>>>();

try
{
    ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
    int totalSegments = numberOfThreads;

    for (int segment = 0; segment < totalSegments; segment++) 
    {
        // Runnable task that will only scan one segment
        task = new ScanSegmentTask(tableName, itemLimit, totalSegments, segment);

        // Execute the task
        Future<ArrayList<String>> future = executor.submit(task);
        holdFuture.add(future);
    }

    for (int i = 0 ; i < holdFuture.size(); i++)
    {
        boolean flag = false;
        while(flag == false)
        {
            Thread.sleep(1000);
            if(holdFuture.get(i).isDone())
            {
                list.addAll(holdFuture.get(i).get());
                flag = true;
            }
        }
    }
    shutDownExecutorService(executor);
}

Class: private static class ScanSegmentTask implements Callable> {

    // DynamoDB table to scan
    private String tableName;

    // number of items each scan request should return
    private int itemLimit;

    // Total number of segments
    // Equals to total number of threads scanning the table in parallel
    private int totalSegments;

    // Segment that will be scanned with by this task
    private int segment;

    ArrayList<String> list_2 = new ArrayList<String>();

    static AtomicInteger counter = new AtomicInteger(0);

    public ScanSegmentTask(String tableName, int itemLimit, int totalSegments, int segment)
    {
        this.tableName = tableName;
        this.itemLimit = itemLimit;
        this.totalSegments = totalSegments;
        this.segment = segment;
    }

    @SuppressWarnings("finally")
    public ArrayList<String> call() 
    {
        System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments + " segments " + itemLimit + " items at a time...");
        Map<String, AttributeValue> exclusiveStartKey = null;

        try 
        {
            while(true) 
            {
                ScanRequest scanRequest = new ScanRequest()
                    .withTableName(tableName)
                    .withLimit(itemLimit)
                    .withExclusiveStartKey(exclusiveStartKey)
                    .withTotalSegments(totalSegments)
                    .withSegment(segment);

                ScanResult result = client.scan(scanRequest);

                for(Map<String, AttributeValue> item : result.getItems())
                {
                    list_2.add("Set:"+counter);
                    for (Map.Entry<String, AttributeValue> getItem : item.entrySet()) 
                    {
                        String attributeName = getItem.getKey();
                        AttributeValue value = getItem.getValue();

                        list_2.add(attributeName
                                + (value.getS() == null ? "" : ":" + value.getS())
                                + (value.getN() == null ? "" : ":" + value.getN())
                                + (value.getB() == null ? "" : ":" + value.getB())
                                + (value.getSS() == null ? "" : ":" + value.getSS())
                                + (value.getNS() == null ? "" : ":" + value.getNS())
                                + (value.getBS() == null ? "" : ":" + value.getBS()));
                    }
                    counter.addAndGet(1);
                }

                exclusiveStartKey = result.getLastEvaluatedKey();
                if (exclusiveStartKey == null) 
                {
                    break;
                }
            }
        } 
        catch (AmazonServiceException ase) 
        {
            System.err.println(ase.getMessage());
        } 
        finally 
        {
            return list_2;
        }
    }
}
12
  • "I have a static list that the data is shared with all the threads." I'm guessing that this could be the problem. As soon as you share something with mutable state among threads, you have a potential problem. I'll have to take a closer look though...
    – awksp
    Commented May 21, 2014 at 5:24
  • Just wondering, what's the range for the amount of items you see printed out?
    – awksp
    Commented May 21, 2014 at 5:29
  • Thanks for checking the code out. I've put a lock on the Object, hopefully is supposed to prevent any potential problem. The range is around 60000, where each thread prints out 6000 items. Commented May 21, 2014 at 5:30
  • So is 6000 the expected result? If so, what are you seeing instead?
    – awksp
    Commented May 21, 2014 at 5:31
  • 200000 is the total amount of items. The sequential prints out 200000 amount of items no problem. But in the parallel version, I create 10 threads, and each gets about 6000 items, totalling 60000. Commented May 21, 2014 at 5:33

1 Answer 1

4

OK, I believe the issue is in the way you synchronized.

In your case, your lock is pretty much pointless, as each thread has its own lock, and so synchronizing never actually blocks one thread from running the same piece of code. I believe that this is the reason that removing synchronization does not change the result -- because it never would have had an effect in the first place.

I believe your issue is in fact due to the static ArrayList<String> that's shared by your threads. This is because ArrayList is actually not thread-safe, and so operations on it are not guaranteed to succeed; as a result, you have to synchronize operations to/from it. Without proper synchronization, it could be possible to have two threads add something to an empty ArrayList, yet have the resulting ArrayList have a size of 1! (or at least if my memory hasn't failed me. I believe this is the case for non-thread-safe objects, though)

As I said before, while you do have a synchronized block, it really isn't doing anything. You could synchronize on list_2, but all that would do is effectively make all your threads run in sequence, as the lock on the ArrayList wouldn't be released until one of your threads was done.

There are a few solutions to this. You can use Collections.synchronizedList(list_2) to create a synchronized wrapper to your ArrayList. This way, adding to the list is guaranteed to succeed. However, this induces a synchronization cost per operations, and so isn't ideal.

What I would do is actually have ScanSegmentTask implement Callable (technically Callable<ArrayList<String>>. The Callable interface is almost exactly like the Runnable interface, except its method is call(), which returns a value.

Why is this important? I think that what would produce the best results for you is this:

  1. Make list_2 an instance variable, initialized to a blank list
  2. Have each thread add to this list exactly as you have done
  3. Return list_2 when you are done
  4. Concatenate each resulting ArrayList<String> to the original ArrayList using addAll()

This way, you have no synchronization overhead to deal with!

This will require a few changes to your executor code. Instead of calling execute(), you'll need to call submit(). This returns a Future object (Future<ArrayList<String>> in your case) that holds the results of the call() method. You'll need to store this into some collection -- an array, ArrayList, doesn't matter.

To retrieve the results, simply loop through the collection of Future objects and call get() (I think). This call will block until the thread that the Future object corresponds to is complete.

I think that's it. While this is more complicated, I think that this is be best performance you're going to get, as with enough threads either CPU contention or your network link will become the bottleneck. Please ask if you have any questions, and I'll update as needed.

12
  • 1
    @user1157751 Ah, that's because you're trying to get results from the Future too early. You'll need to put the Future objects into a separate ArrayList<Future>, then get the results after you've started all the threads, in a separate for loop. The way it is now, you start a thread, then immediately try to get the results from it, which blocks your code until the first thread is done. So in effect, you're running your code sequentially.
    – awksp
    Commented May 21, 2014 at 6:09
  • 1
    @user1157751 In addition, your counter variable will almost certainly hold an incorrect value if you properly multithread your code, as counter += 1; isn't thread-safe. Use an AtomicInteger instead.
    – awksp
    Commented May 21, 2014 at 6:10
  • 1
    @user1157751 To expand on the earlier point, when a thread executes counter += 1 or counter++, it's done in 3 stages: read counter, add 1 to it, write back to counter. The issue is if one thread reads counter, then another thread also reads counter before the first thread wrote back to the variable, you'll end up with a counter that only increased by 1 instead of 2. In addition, changes to counter aren't guaranteed to be seen by other threads as locally cached copies of counter could get out of date.
    – awksp
    Commented May 21, 2014 at 6:12
  • 1
    @user1157751 Ah, I see. If you're on Eclipse, there's an easy way to verify all your threads are actually doing work -- go to the Debug perspective, then launch your program in debug mode. You should see a lot of threads appear in the debug view. If you quickly select the program icon (should have an icon with two gears, while each thread has a single gear and a "play" icon) and hit pause, you'll be able to count how many threads you have going and see if they are waiting on anything (they shouldn't).
    – awksp
    Commented May 21, 2014 at 6:48
  • 1
    @user1157751 But in any case, let me know how the test goes! Glad to hear things work. And no need to call me that, just happens that I had some experience with threading. Happy to help though!
    – awksp
    Commented May 21, 2014 at 6:51

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.