Efficient copying between DynamoDB tables using Parallel Scans and Batch Write
Oksana Horlock
Posted on August 7, 2021
Recently we had a situation where we needed to copy a large amount of data from a DynamoDB table into another one in a different account. Originally we used a Scan function to get items with a ExclusiveStartKey/LastEvaluatedKey marker to check if more records needed to be obtained. Then we used PutItem API call to insert data into the destination table. Because there were several hundred thousand records in the table, it took several hours to copy them; at that point we decided that we needed to use a faster method to copy the data. We achieved a massive improvement of the copying times by using Parallel Scans and Writing Items in Batch. This post will provide an example of how to use them and compare the results of the methods we used.
For demo purposes, I’m using the same account, rather than different ones. I have written a method to create a simple DynamoDB table programmatically. It will contain data about temperatures in 2020 in a few cities:
private static async Task<TableDescription> CreateTable(string name)
{
var request = new CreateTableRequest
{
AttributeDefinitions = new List<AttributeDefinition>()
{
new AttributeDefinition{
AttributeName = "City",
AttributeType = "S"
},
new AttributeDefinition{
AttributeName = "Date",
AttributeType = "S"
}
},
TableName = name,
ProvisionedThroughput = new ProvisionedThroughput
{
ReadCapacityUnits = 15,
WriteCapacityUnits = 15
},
KeySchema = new List<KeySchemaElement>
{
new KeySchemaElement
{
AttributeName="City",
KeyType="HASH"
},
new KeySchemaElement
{
AttributeName="Date",
KeyType="Range"
}
}
};
var response = await client.CreateTableAsync(request);
return response.TableDescription;
}
I use the method above to create several tables (and also wait to ensure the table I need to populate first has been created):
await CreateTable(sourceTableName);
await CreateTable(destinationTableNameSlow);
await CreateTable(destinationTableNameFast);
var describeTableRequest = new DescribeTableRequest
{
TableName = sourceTableName
};
DescribeTableResponse describeTableResponse;
do
{
System.Threading.Thread.Sleep(1000);
describeTableResponse = await client.DescribeTableAsync(describeTableRequest);
}
while (describeTableResponse.Table.TableStatus != TableStatus.ACTIVE);
Then I populate the source table with some data about temperatures:
for (var i = 0; i <= 100; i++)
{
var putItemRequest = new PutItemRequest
{
TableName = sourceTableName,
Item = new Dictionary<string, AttributeValue>()
{
{"City", new AttributeValue{S=cities[(new Random()).Next(cities.Length)] } },
{"Date", new AttributeValue{S=GetRandom2020Date() } },
{"Highest", new AttributeValue{N=(new Random()).Next(20,30).ToString() } },
{"Lowest", new AttributeValue{N=(new Random()).Next(1,10).ToString() } }
}
};
await client.PutItemAsync(putItemRequest);
}
Now, the most interesting part starts. First, I call a method to copy the data slowly. I use Scan and PutItem API calls.
private static async Task CopySlowly()
{
Stopwatch sw = new Stopwatch();
sw.Start();
var request = new ScanRequest
{
TableName = sourceTableName
};
var result = await client.ScanAsync(request, default);
foreach (var item in result.Items)
{
var putItemRequest = new PutItemRequest
{
TableName = destinationTableNameSlow,
Item = item
};
await client.PutItemAsync(putItemRequest, default);
}
sw.Stop();
Console.Write($"Copy slow - {sw.ElapsedMilliseconds} milliseconds elapsed");
Console.ReadLine();
}
Since the demo table only has 100 items, I’m not using ExclusiveStartKey/LastEvaluatedKey with the Scan operation; those are definitely necessary for large tables as Scan only gets maximum of 1MB of data.
I then call another method to copy data using Parallel Scans. I specify how many parallel worker threads I want to create by using totalSegments variable. In this case it was set to 3, but you can probably have as many as you like):
private static void CopyFast()
{
Stopwatch sw = new Stopwatch();
sw.Start();
Task[] tasks = new Task[totalSegments];
for (int segment = 0; segment < totalSegments; segment++)
{
int tmpSegment = segment;
tasks[segment] = Task.Run(() => ScanSegment(tmpSegment));
}
Task.WaitAll(tasks);
sw.Stop();
Console.WriteLine($"Copy fast - {sw.ElapsedMilliseconds} milliseconds elapsed");
Console.ReadLine();
}
private static async Task ScanSegment(int tmpSegment)
{
var request = new ScanRequest
{
TableName = sourceTableName,
Segment = tmpSegment,
TotalSegments = totalSegments,
};
var result = await client.ScanAsync(request);
for (var i = 0; i < result.Items.Count; i += 25)
{
var items = result.Items.Skip(i).Take(25).ToArray();
var req = new BatchWriteItemRequest
{
RequestItems = new Dictionary<string, List<WriteRequest>>
{
{
destinationTableNameFast,
items.Select(i => new WriteRequest(new PutRequest(i))).ToList()
}
}
};
await client.BatchWriteItemAsync(req);
}
}
While simple Scan accesses one partition at a time, when using Parallel Scans several worker threads are created, and each of them scans a certain segment of the table. BatchWriteItem operation allows you to create a PutItem request for up to 25 items at a time.
As a result, the difference in copying speed is noticeable:
Another point to mention is that this task has been helpful in preparation for the AWS Developer Associate exam which I took (and passed!) in February 2021 – there were a couple of questions about Parallel Scans and BatchWriteItem in practice tests, and I was very happy that I had come across this scenario at work and knew the correct answers straightaway!
Happy learning!
Posted on August 7, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.