I'm trying to use the DynamoDB Parallel Scan Example:
http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/LowLevelJavaScanning.html
I have 200,000 items, and I've taken the sequential code scan, and modified it slightly for my usage:
Map<String, AttributeValue> lastKeyEvaluated = null;
do
{
ScanRequest scanRequest = new ScanRequest()
.withTableName(tableName)
.withExclusiveStartKey(lastKeyEvaluated);
ScanResult result = client.scan(scanRequest);
double counter = 0;
for(Map<String, AttributeValue> item : result.getItems())
{
itemSerialize.add("Set:"+counter);
for (Map.Entry<String, AttributeValue> getItem : item.entrySet())
{
String attributeName = getItem.getKey();
AttributeValue value = getItem.getValue();
itemSerialize.add(attributeName
+ (value.getS() == null ? "" : ":" + value.getS())
+ (value.getN() == null ? "" : ":" + value.getN())
+ (value.getB() == null ? "" : ":" + value.getB())
+ (value.getSS() == null ? "" : ":" + value.getSS())
+ (value.getNS() == null ? "" : ":" + value.getNS())
+ (value.getBS() == null ? "" : ":" + value.getBS()));
}
counter += 1;
}
lastKeyEvaluated = result.getLastEvaluatedKey();
}
while(lastKeyEvaluated != null);
The counter gives exactly 200,000 when this code has finished, however, I also wanted to try the parallel scan.
Function Call:
ScanSegmentTask task = null;
ArrayList<String> list = new ArrayList<String>();
try
{
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
int totalSegments = numberOfThreads;
for (int segment = 0; segment < totalSegments; segment++)
{
// Runnable task that will only scan one segment
task = new ScanSegmentTask(tableName, itemLimit, totalSegments, segment, list);
// Execute the task
executor.execute(task);
}
shutDownExecutorService(executor);
}
.......Catches something if error
return list;
Class:
I have a static list that the data is shared with all the threads. I was able to retrieve the lists, and output the amount of data.
// Runnable task for scanning a single segment of a DynamoDB table
private static class ScanSegmentTask implements Runnable
{
// DynamoDB table to scan
private String tableName;
// number of items each scan request should return
private int itemLimit;
// Total number of segments
// Equals to total number of threads scanning the table in parallel
private int totalSegments;
// Segment that will be scanned with by this task
private int segment;
static ArrayList<String> list_2;
Object lock = new Object();
public ScanSegmentTask(String tableName, int itemLimit, int totalSegments, int segment, ArrayList<String> list)
{
this.tableName = tableName;
this.itemLimit = itemLimit;
this.totalSegments = totalSegments;
this.segment = segment;
list_2 = list;
}
public void run()
{
System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments + " segments " + itemLimit + " items at a time...");
Map<String, AttributeValue> exclusiveStartKey = null;
int totalScannedItemCount = 0;
int totalScanRequestCount = 0;
int counter = 0;
try
{
while(true)
{
ScanRequest scanRequest = new ScanRequest()
.withTableName(tableName)
.withLimit(itemLimit)
.withExclusiveStartKey(exclusiveStartKey)
.withTotalSegments(totalSegments)
.withSegment(segment);
ScanResult result = client.scan(scanRequest);
totalScanRequestCount++;
totalScannedItemCount += result.getScannedCount();
synchronized(lock)
{
for(Map<String, AttributeValue> item : result.getItems())
{
list_2.add("Set:"+counter);
for (Map.Entry<String, AttributeValue> getItem : item.entrySet())
{
String attributeName = getItem.getKey();
AttributeValue value = getItem.getValue();
list_2.add(attributeName
+ (value.getS() == null ? "" : ":" + value.getS())
+ (value.getN() == null ? "" : ":" + value.getN())
+ (value.getB() == null ? "" : ":" + value.getB())
+ (value.getSS() == null ? "" : ":" + value.getSS())
+ (value.getNS() == null ? "" : ":" + value.getNS())
+ (value.getBS() == null ? "" : ":" + value.getBS()));
}
counter += 1;
}
}
exclusiveStartKey = result.getLastEvaluatedKey();
if (exclusiveStartKey == null)
{
break;
}
}
}
catch (AmazonServiceException ase)
{
System.err.println(ase.getMessage());
}
finally
{
System.out.println("Scanned " + totalScannedItemCount + " items from segment " + segment + " out of " + totalSegments + " of " + tableName + " with " + totalScanRequestCount + " scan requests");
}
}
}
Executor Service Shut Down:
public static void shutDownExecutorService(ExecutorService executor)
{
executor.shutdown();
try
{
if (!executor.awaitTermination(10, TimeUnit.SECONDS))
{
executor.shutdownNow();
}
}
catch (InterruptedException e)
{
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
However, the amount of items changes every time I run this piece of code (Varies around 60000 in total, 6000 per threads, with 10 created threads). Removing synchronization does not change the result too.
Is there a bug with the synchronization or with the Amazon AWS API?
Thanks All
EDIT:
The new function call:
ScanSegmentTask task = null;
ArrayList<String> list = new ArrayList<String>();
try
{
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
int totalSegments = numberOfThreads;
for (int segment = 0; segment < totalSegments; segment++)
{
// Runnable task that will only scan one segment
task = new ScanSegmentTask(tableName, itemLimit, totalSegments, segment);
// Execute the task
Future<ArrayList<String>> future = executor.submit(task);
list.addAll(future.get());
}
shutDownExecutorService(executor);
}
The new class:
// Runnable task for scanning a single segment of a DynamoDB table
private static class ScanSegmentTask implements Callable<ArrayList<String>>
{
// DynamoDB table to scan
private String tableName;
// number of items each scan request should return
private int itemLimit;
// Total number of segments
// Equals to total number of threads scanning the table in parallel
private int totalSegments;
// Segment that will be scanned with by this task
private int segment;
ArrayList<String> list_2 = new ArrayList<String>();
static int counter = 0;
public ScanSegmentTask(String tableName, int itemLimit, int totalSegments, int segment)
{
this.tableName = tableName;
this.itemLimit = itemLimit;
this.totalSegments = totalSegments;
this.segment = segment;
}
@SuppressWarnings("finally")
public ArrayList<String> call()
{
System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments + " segments " + itemLimit + " items at a time...");
Map<String, AttributeValue> exclusiveStartKey = null;
try
{
while(true)
{
ScanRequest scanRequest = new ScanRequest()
.withTableName(tableName)
.withLimit(itemLimit)
.withExclusiveStartKey(exclusiveStartKey)
.withTotalSegments(totalSegments)
.withSegment(segment);
ScanResult result = client.scan(scanRequest);
for(Map<String, AttributeValue> item : result.getItems())
{
list_2.add("Set:"+counter);
for (Map.Entry<String, AttributeValue> getItem : item.entrySet())
{
String attributeName = getItem.getKey();
AttributeValue value = getItem.getValue();
list_2.add(attributeName
+ (value.getS() == null ? "" : ":" + value.getS())
+ (value.getN() == null ? "" : ":" + value.getN())
+ (value.getB() == null ? "" : ":" + value.getB())
+ (value.getSS() == null ? "" : ":" + value.getSS())
+ (value.getNS() == null ? "" : ":" + value.getNS())
+ (value.getBS() == null ? "" : ":" + value.getBS()));
}
counter += 1;
}
exclusiveStartKey = result.getLastEvaluatedKey();
if (exclusiveStartKey == null)
{
break;
}
}
}
catch (AmazonServiceException ase)
{
System.err.println(ase.getMessage());
}
finally
{
return list_2;
}
}
}
Final EDIT:
Function Call:
ScanSegmentTask task = null;
ArrayList<String> list = new ArrayList<String>();
ArrayList<Future<ArrayList<String>>> holdFuture = new ArrayList<Future<ArrayList<String>>>();
try
{
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
int totalSegments = numberOfThreads;
for (int segment = 0; segment < totalSegments; segment++)
{
// Runnable task that will only scan one segment
task = new ScanSegmentTask(tableName, itemLimit, totalSegments, segment);
// Execute the task
Future<ArrayList<String>> future = executor.submit(task);
holdFuture.add(future);
}
for (int i = 0 ; i < holdFuture.size(); i++)
{
boolean flag = false;
while(flag == false)
{
Thread.sleep(1000);
if(holdFuture.get(i).isDone())
{
list.addAll(holdFuture.get(i).get());
flag = true;
}
}
}
shutDownExecutorService(executor);
}
Class: private static class ScanSegmentTask implements Callable> {
// DynamoDB table to scan
private String tableName;
// number of items each scan request should return
private int itemLimit;
// Total number of segments
// Equals to total number of threads scanning the table in parallel
private int totalSegments;
// Segment that will be scanned with by this task
private int segment;
ArrayList<String> list_2 = new ArrayList<String>();
static AtomicInteger counter = new AtomicInteger(0);
public ScanSegmentTask(String tableName, int itemLimit, int totalSegments, int segment)
{
this.tableName = tableName;
this.itemLimit = itemLimit;
this.totalSegments = totalSegments;
this.segment = segment;
}
@SuppressWarnings("finally")
public ArrayList<String> call()
{
System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments + " segments " + itemLimit + " items at a time...");
Map<String, AttributeValue> exclusiveStartKey = null;
try
{
while(true)
{
ScanRequest scanRequest = new ScanRequest()
.withTableName(tableName)
.withLimit(itemLimit)
.withExclusiveStartKey(exclusiveStartKey)
.withTotalSegments(totalSegments)
.withSegment(segment);
ScanResult result = client.scan(scanRequest);
for(Map<String, AttributeValue> item : result.getItems())
{
list_2.add("Set:"+counter);
for (Map.Entry<String, AttributeValue> getItem : item.entrySet())
{
String attributeName = getItem.getKey();
AttributeValue value = getItem.getValue();
list_2.add(attributeName
+ (value.getS() == null ? "" : ":" + value.getS())
+ (value.getN() == null ? "" : ":" + value.getN())
+ (value.getB() == null ? "" : ":" + value.getB())
+ (value.getSS() == null ? "" : ":" + value.getSS())
+ (value.getNS() == null ? "" : ":" + value.getNS())
+ (value.getBS() == null ? "" : ":" + value.getBS()));
}
counter.addAndGet(1);
}
exclusiveStartKey = result.getLastEvaluatedKey();
if (exclusiveStartKey == null)
{
break;
}
}
}
catch (AmazonServiceException ase)
{
System.err.println(ase.getMessage());
}
finally
{
return list_2;
}
}
}