How can I easily test my MongoDB multi-document transaction code without setting up MongoDB on my device? One might argue that they have to set it up first because in order to carry out such a transaction it needs a session which requires a replica set. Thankfully, there is no need to create a 3-node replica set and we can run these transactions only against a single database instance.
To achieve this, we may do the following:
Run a MongoDB container of version 4 or higher and specify a --replSet command;
Initialize a single replica set by executing a proper command;
Wait for the initialization to complete;
Connect to a standalone without specifying a replica set in order not to worry about modifying our OS host file.
It is worth mentioning that a replica set is not the only option here because MongoDB version 4.2 introduces distributed transactions in sharded clusters, which is beyond the scope of this article.
There are a lot of ways of how to initialize a replica set, including Docker compose, bash scripts, services in a CI/CD etc. However, it takes some extra work in terms of scripting, handling random ports, and making it part of the CI/CD process. Fortunately, starting from Testcontainers’ version 1.14.2 we are able to delegate all the heavy lifting to the MongoDB Module.
Let us try it out on a small warehouse management system based on Spring Boot 2.3. In the recent past one had to use ReactiveMongoOperations and its inTransaction method, but since Spring Data MongoDB 2.2 M4 we have been able to leverage the good old @Transactional annotation or more advanced TransactionalOperator.
Our application should have a REST API to provide the information on successfully processed files including the number of the documents modified. Regarding the files causing errors along the way, we should skip them to process all the files.
It may be noted that even though duplicated articles and their sizes within a single file are a rare case, this possibility is quite realistic, and therefore should be handled as well.
As per business requirements to our system, we already have some products in our database and we upload a bunch of Excel (xlsx) files to update some fields of the matched documents in our storage. Data is supposed to be only at the first sheet of any workbook. Each file is processed in a separate multi-document transaction to prevent simultaneous modifications of the same documents. For example, Figure 1 shows collision cases on how a transaction ends up except for a possible scenario when transactions are executed sequentially (json representation is shortened here for the sake of simplicity). Transactional behavior helps us to avoid clashing the data and guarantees consistency.
As for a product collection, we have an article as a unique index. At the same time, each article is bound to a concrete size. Therefore, it is important for our application to verify that both of them are in the database before updating. Figure 2 gives an insight into this collection.
2. Business logic implementation
Let us elaborate on the major points of the above-mentioned business logic and start with ProductController as an entry point for the processing. You can find a complete project on GitHub. Prerequisites are Java8+ and Docker.
1) Wrap a response in a ResponseEntity and return the flux of the FileUploadDto;
2) Get a current authentication principal, coming in handy later on;
3) Pass the flux of the FilePart to process.
Here is the patchProductQuantity method of the UploadProductServiceImpl:
1) Use the name of the user as the root directory name;
2) Do the blocking initialization of the root directory on a separate elastic thread;
3) For each Excel file:
3.1) Save it on a disk;
3.2) Then update the quantity of the products on a separate elastic thread, as blocking processing of the file is ran.
The saveFileToDiskAndUpdate method does the following logic:
privateMono<FileUploadDto>saveFileToDiskAndUpdate(finalFilePartfile,finalStringuserName){finalStringfileName=file.filename();finalPathpath=Paths.get(pathToStorage,userName,fileName);returnMono.just(path).log(String.format("A file: %s has been uploaded",fileName)).flatMap(file::transferTo).log(String.format("A file: %s has been saved",fileName)).then(processExcelFile(fileName,userName,path));}
1) Copy the content of the file to the user’s directory;
2) After the copy stage is completed, call the processExcelFile method.
At this point, we are going to divide logic in accordance with the size of the file:
1) Wrap the blocking Files.size(path) call in Mono.fromCallable;
2) bigFileSizeThreshold is injected from a proper application.yml file via @Value("${upload-file.bigFileSizeThreshold}").
Before going into detail on processing Excel files depending on their size, we should take a look at the getProducts method of the ExcelFileDaoImpl:
@OverridepublicFlux<Product>getProducts(finalStringpathToStorage,finalStringfileName,finalStringuserName){returnFlux.defer(()->{FileInputStreamis;Workbookworkbook;try{finalFilefile=Paths.get(pathToStorage,userName,fileName).toFile();verifyFileAttributes(file);is=newFileInputStream(file);workbook=StreamingReader.builder().rowCacheSize(ROW_CACHE_SIZE).bufferSize(BUFFER_SIZE).open(is);}catch(IOExceptione){returnMono.error(newUploadProductException(String.format("An exception has been occurred while parsing a file: %s "+"has been saved",fileName),e));}try{finalSheetdatatypeSheet=workbook.getSheetAt(0);finalIterator<Row>iterator=datatypeSheet.iterator();finalAtomicIntegerrowCounter=newAtomicInteger();if(iterator.hasNext()){;rowCounter.incrementAndGet();verifyExcelFileHeader(fileName,currentRow);}returnFlux.<Product>create(fluxSink->fluxSink.onRequest(value->{try{for(inti=0;i<value;i++){if(!iterator.hasNext()){fluxSink.complete();return;};finalProductproduct=Objects.requireNonNull(getProduct(FileRow.builder().fileName(fileName).currentRow(currentRow).rowCounter(rowCounter.incrementAndGet()).build()),"product is not supposed to be null");;}}catch(Exceptione1){fluxSink.error(e1);}})).doFinally(signalType->{try{is.close();workbook.close();}catch(IOExceptione1){log.error("Error has occurred while releasing {} resources: {}",fileName,e1);}});}catch(Exceptione){returnMono.error(e);}});}
1) differ the whole logic once there is a new subscriber;
2) Verify the excel file header;
3) Create a flux to provide the requested number of products;
4) Convert an Excel row into a Product domain object;
5) Finally, close all of the opened resources.
Getting back to the processing of the Excel files in the UploadProductServiceImpl, we are going to use the MongoDB’s bulkWrite method on a collection to update products in bulk, which requires the eagerly evaluated list of the UpdateOneModel. In practice, collecting such a list is a memory-consuming operation, especially for big files.
Regarding small Excel files, we provide a more detailed log and do additional validation check:
privateMono<FileUploadDto>processSmallExcelFile(finalStringfileName,finalStringuserName){log.debug("processSmallExcelFile: {}",fileName);returnexcelFileDao.getProducts(pathToStorage,fileName,userName).reduce(newConcurrentHashMap<ProductArticleSizeDto,Tuple2<UpdateOneModel<Document>,BigInteger>>(),(indexMap,product)->{finalBigIntegerquantity=product.getQuantity();indexMap.merge(newProductArticleSizeDto(product.getArticle(),product.getSize()),Tuples.of(updateOneModelConverter.convert(Tuples.of(product,quantity,userName)),quantity),(oldValue,newValue)->{finalBigIntegermergedQuantity=oldValue.getT2().add(newValue.getT2());returnTuples.of(updateOneModelConverter.convert(Tuples.of(product,mergedQuantity,userName)),mergedQuantity);});returnindexMap;}).filterWhen(productIndexFile->productDao.findByArticleIn(extractArticles(productIndexFile.keySet())).<ProductArticleSizeDto>handle((productArticleSizeDto,synchronousSink)->{if(productIndexFile.containsKey(productArticleSizeDto)){;}else{synchronousSink.error(newUploadProductException(String.format("A file %s does not have an article: %d with size: %s",fileName,productArticleSizeDto.getArticle(),productArticleSizeDto.getSize())));}}).count().handle((sizeDb,synchronousSink)->{finalintsizeFile=productIndexFile.size();if(sizeDb==sizeFile){;}else{synchronousSink.error(newUploadProductException(String.format("Inconsistency between total element size in MongoDB: %d and a file %s: %d",sizeDb,fileName,sizeFile)));}})).onErrorResume(e->{log.debug("Exception while processExcelFile fileName: {}: {}",fileName,e);returnMono.empty();}).flatMap(productIndexFile->productPatcherService.incrementProductQuantity(fileName,productIndexFile.values().stream().map(Tuple2::getT1).collect(Collectors.toList()),userName)).map(bulkWriteResult->FileUploadDto.builder().fileName(fileName).matchedCount(bulkWriteResult.getMatchedCount()).modifiedCount(bulkWriteResult.getModifiedCount()).build());}
1) reduce helps us handle duplicate products whose quantities should be summed up;
2) Collect a map to get the list of the ProductArticleSizeDto against the pair of the list of the UpdateOneModel and the total quantity for a product. The former is in use for matching an article and its size in the file with those that are in the database via a projection ProductArticleSizeDto;
3) Use the atomic merge method of the ConcurrentMap to sum up the quantity of the same products and create a new UpdateOneModel;
4) Filter out all products in the file by those product’s articles that are in the database;
5) Each ProductArticleSizeDto found in the storage matches a ProductArticleSizeDto from the file summed up by quantity;
6) Then count the result after filtration which should be equal to the distinct number of products in the file;
7) Use the onErrorResume method to continue when any error occurs because we need to process all files as mentioned in the requirements;
8) Extract the list of the UpdateOneModel from the map collected earlier to be further used in the incrementProductQuantity method;
9) Then run the incrementProductQuantity method as a sub-process within flatMap and map its result in FileUploadDto that our business users are in need of.
Even though the filterWhen and the subsequent productDao.findByArticleIn allow us to do some additional validation at an early stage, they come at a price, which is especially noticeable while processing big files in practice. However, the incrementProductQuantity method can compare the number of modified documents and match them against the number of the distinct products in the file. Knowing that, we can implement a more light-weight option to process big files:
privateMono<FileUploadDto>processBigExcelFile(finalStringfileName,finalStringuserName){log.debug("processBigExcelFile: {}",fileName);returnexcelFileDao.getProducts(pathToStorage,fileName,userName).reduce(newConcurrentHashMap<Product,Tuple2<UpdateOneModel<Document>,BigInteger>>(),(indexMap,product)->{finalBigIntegerquantity=product.getQuantity();indexMap.merge(product,Tuples.of(updateOneModelConverter.convert(Tuples.of(product,quantity,userName)),quantity),(oldValue,newValue)->{finalBigIntegermergedQuantity=oldValue.getT2().add(newValue.getT2());returnTuples.of(updateOneModelConverter.convert(Tuples.of(product,mergedQuantity,userName)),mergedQuantity);});returnindexMap;}).map(indexMap->indexMap.values().stream().map(Tuple2::getT1).collect(Collectors.toList())).onErrorResume(e->{log.debug("Exception while processExcelFile: {}: {}",fileName,e);returnMono.empty();}).flatMap(dtoList->productPatcherService.incrementProductQuantity(fileName,dtoList,userName)).map(bulkWriteResult->FileUploadDto.builder().fileName(fileName).matchedCount(bulkWriteResult.getMatchedCount()).modifiedCount(bulkWriteResult.getModifiedCount()).build());}
Here is the ProductAndUserNameToUpdateOneModelConverter that we have used to create an UpdateOneModel:
1) Firstly, find a document by article and size. Figure 2 shows that we have a compound index on the size and article fields of the product collection to facilitate such a search;
2) Increment the quantity of the found document and set the name of the user in the lastModifiedBy field;
3) It is also possible to upsert a document here, but we are interested only in the modification of the existing documents in the storage.
Now we are ready to implement the central part of our processing which is the incrementProductQuantity method of the ProductPatcherDaoImpl:
@OverridepublicMono<BulkWriteResult>incrementProductQuantity(finalStringfileName,finalList<UpdateOneModel<Document>>models,finalStringuserName){returntransactionalOperator.execute(action->reactiveMongoOperations.getCollection(Product.COLLECTION_NAME).flatMap(collection->Mono.from(collection.bulkWrite(models,newBulkWriteOptions().ordered(true)))).<BulkWriteResult>handle((bulkWriteResult,synchronousSink)->{finalintfileCount=models.size();if(Objects.equals(bulkWriteResult.getModifiedCount(),fileCount)){;}else{synchronousSink.error(newIllegalStateException(String.format("Inconsistency between modified doc count: %d and file doc count: %d. Please, check file: %s",bulkWriteResult.getModifiedCount(),fileCount,fileName)));}}).onErrorResume(e->Mono.fromRunnable(action::setRollbackOnly).log("Exception while incrementProductQuantity: "+fileName+": "+e).then(Mono.empty()))).singleOrEmpty();}
1) Use a transactionalOperator to roll back a transaction manually. As has been mentioned before, our goal is to process all files while skipping those causing exceptions;
2) Run a single sub-process to bulk write modifications to the database sequentially for fail-fast and less resource-intensive behavior. The word "single" is of paramount importance here because we avoid the dangerous "N+1 Query Problem" leading to spawning a lot of sub-processes on a flux within flatMap;
3) Handle the situation when the number of the documents processed does not match the one coming from the distinct number of the products in the file;
4) The onErrorResume method handles the rollback of the transaction and then returns Mono.empty() to skip the current processing;
5) Expect either a single item or an empty Mono as the result of the transactionalOperator.execute method.
One would say: "You called collection.bulkWrite(models, new BulkWriteOptions().ordered(true)), what about setting a session?". The thing is that the SessionAwareMethodInterceptor of the Spring Data MongoDB does it via reflection:
So far so good, we can create integration tests to cover our logic.
To begin with, we create ProductControllerITTest to test our public API via the Spring’s WebTestClient and initialize a MongoDB instance to run tests against:
1) Use a static field to have single Testcontainers’ MongoDBContainer per all test methods in ProductControllerITTest;
2) We use 4.2.8 MongoDB container version from Docker Hub as it is the latest stable one, otherwise MongoDBContainer defaults to 4.0.10.
Then in static methods setUpAll and tearDownAll we start and stop the MongoDBContainer respectively. Even though we do not use Testcontainers' reusable feature here, we leave open the possibility of setting it. Which is why we call MONGO_DB_CONTAINER.stop() only if the reusable feature is turned off.
Now we are ready to write a first test without any transaction collision, because our test files (see Figure 3) have products whose articles do not clash with one another.
1) We can specify the maximum amount of time in milliseconds that multi-document transactions should wait to acquire locks required by the operations in the transaction (by default, multi-document transactions wait 5 milliseconds);
2) As an example here, we might use a helper method to change 5ms with 20ms (see an implementation details below).
Note that the maxTransactionLockRequestTimeoutMillis setting has no sense for this particular test case and serves the purpose of the example. After running this test class 120 times via a script ./ 120 ProductControllerITTest.shouldPatchProductQuantityConcurrently in the tools directory of the project, I got the following figures:
20ms, times
5ms(default), times
T1 successes
T2 successes
T1 and T2 success
Figure 5 Running the shouldPatchProductQuantityConcurrently test 120 times with 20 and 5 ms maxTransactionLockRequestTimeoutMillis respectively
While going through logs, we may come across something like:
Exception while incrementProductQuantity: products1.xlsx: com.mongodb.MongoCommandException: Command failed with error 112 (WriteConflict): 'WriteConflict' on server…
Initiating transaction rollback...
Initiating transaction commit...
About to abort transaction for session...
About to commit transaction for session...
Then, let us test the processing of the big file containing 1 million products in a separate PatchProductLoadITTest:
1) The general setup is similar to the ProductControllerITTest;
2) Unzip a json file containing 1 million products which requires about 254M on a disk;
3) Transactions have a lifetime limit as specified by transactionLifetimeLimitSeconds which is 60 seconds by default. We need to increase it here, because generally it takes more than 60 s to process such a file. For this, we use a helper method to change this lifespan to 900 s (see the implementation details below). For your information, the REST call with the file takes GitHub Actions about 9-12 minutes;
4) Before processing, we clean up a product collection, insert 1 million products from the json file and then get the total of the quantity;
5) Given the products in the json file and the big excel file are equal, we assert that the total quantity of the product after processing should double.
Such a test requires a relatively big heap of about 4GB (see Figure 6) and Docker's memory resource of about 6GB (see Figure 7):
As we can see, it is sensible to configure the maximum amount of disk space allowed for file parts and the maximum number of parts allowed in a given multipart request. Which is why I added properties to a proper application.yml file and then set them in the configureHttpMessageCodecs method of the implemented WebFluxConfigurer. However, adding Rate Limiter and configuring Schedulers might be a better solution in production environment. Note that we use Schedulers.boundedElastic() here having a pool of 10 * Runtime.getRuntime().availableProcessors() threads by default.
Here is TransactionUtilcontaining the above-mentioned helper methods:
1) The MongoDBContainer takes care of the complexity in the MongoDB replica set initialization allowing the developer to focus on testing. Now we can simply make MongoDB transaction testing part of our CI/CD process;
2) While processing data, it is sensible to favor MongoDB’s bulk methods, reducing the number of sub-processes within the flatMap method of the Flux and thus to avoid introducing the "N+1 Query problem". However, it also comes at a price because here we need to collect a list of UpdateOneModel and keep it in memory lacking reactive flexibility;
4) Even though are we allowed to set maxTransactionLockRequestTimeoutMillis and transactionLifetimeLimitSeconds as parameters during start-up to mongod, we may achieve the effect by calling the MongoDB's adminCommand via helper methods;
5) Processing big files is resource-consuming and thus better be limited.
6. Want to go deeper?
