1. Introduction
In today’s article, I would like to walk you step by step through the process of creating a reactive MongoDB REST API CRUD with Spring Boot and Kotlin.
Reactive programming is becoming more and more popular nowadays and plenty of projects incorporated the Spring WebFlux framework, which provides support for creating non-blocking web applications. The reactive approach has plenty of advantages and I personally believe every programmer should know it, at least at a basic level. However, we need to keep in mind, that it is not the Golden Bullet and for some projects, the well-known Spring Web MVC will be a better choice.
After finishing this tutorial, you will be able to create a simple Spring Boot REST API CRUD and connect it to the MongoDB with Spring Data Reactive MongoDB (if you’d like to see how to connect to the MongoDB in a standard Web MVC application, please see my previous article). Additionally, if this is your first meeting with the WebFlux or Project Reactor, I highly encourage you to check out this article covering the basics.
2. Run MongoDB Server
Just like in the previous article, we need to make sure, that our MongoDB server is up and running. For simplicity, we will deploy it as a docker container. Nevertheless, if you would like to install a MongoDB on your local machine, please refer to this official manual.
2.1. Deploy MongoDB as a Container
Firstly, let’s pull the mongo image from the Docker Hub registry:
docker pull mongo
After that, let’s deploy it in a detached mode ( -d flag):
docker run -d -p 27017-27019:27017-27019 --name mongodb mongo
Besides deploying in a detached mode, the above command publishes the container’s ports to the host- in simple terms, we will be able to connect to the MongoDB using localhost.
Finally, let’s check if everything is OK with the following command:
docker ps #command output CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 26bd5c8dcc79 mongo "docker-entrypoint.sโฆ" 6 seconds ago Up 5 seconds 0.0.0.0:27017-27019->27017-27019/tcp mongodb
3. Imports
As the next step, let’s create a Spring Boot project and add necessary imports (I highly encourage you to use the Spring Initializr when doing that):
implementation("org.springframework.boot:spring-boot-starter-data-mongodb-reactive") implementation("org.springframework.boot:spring-boot-starter-webflux") implementation("io.projectreactor.kotlin:reactor-kotlin-extensions") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor") testImplementation("io.projectreactor:reactor-test")
4. Configure Application Properties
After we’ve configured the imports, let’s add an additional configuration to the application.yaml file:
server: error: include-message: always
Since the Spring Boot 2.3 release, error messages are no longer included in the server’s responses by default. In this guide, I would like to show you how to create a custom exception class with a message, so these 3 lines are necessary.
5. Create Models
After all of the above being done, we can start defining models, which will be used to persist and fetch objects from the database. In our example, we will define two separate classes- a Company and an Employee:
@Document("companies") data class Company( @Id val id: String? = null, var name: String, @Field("company_address") var address: String )
data class Employee( @Id val id: ObjectId? = null, var firstName: String, var lastName: String, var email: String, var company: Company? )
As you might have noticed, the Company class definition has two (optional) annotations more, than the Employee. Let me explain them:
- a @Document annotation- usually, we will use it to change the default name of the MongoDB collection used to store the document. By default, the name of the collection will be the class name changed to start with a lower-case letter. So, in our database, we will have two collections created: the companies and the employee.
- a @Field annotation- it’s used to define custom metadata for the document fields, like a key name, order, or a target type. In our example, we’ve changed the key name from the default “address” to the “company_address”.
5.1. What is ObjectId?
You might be wondering, what exactly the ObjectId is? Well, by default MongoDB uses ObjectIds as the default value of the _id field of each document. An ObjectId is a 12-byte BSON type having the following structure:
- The first 4 bytes represent the seconds since the Unix epoch
- The next 3 bytes are the machine identifier
- The next 2 bytes consists of process id
- The last 3 bytes are a random counter value
These 12 bytes altogether uniquely identify a document within the MongoDB collection and serve as a primary key for that document as well. From the Spring Boot perspective, we can treat them as ObjectIds or Strings– the decision is up to you.
6. Add Custom Exception
Nextly, let’s create a custom exception class named NotFoundException:
@ResponseStatus(HttpStatus.NOT_FOUND) class NotFoundException(msg: String) : RuntimeException(msg)
The @ResponseStatus annotation allows us to specify the status code to use for the response- in this case- 404 Not Found.
7. Create Repositories
As the next step, let’s create repositories for our data. Let’s start by adding the CompanyRepository first:
interface CompanyRepository : ReactiveMongoRepository<Company, String>
As you can see, the CompanyRepository interface extends the ReactiveMongoRepository, which provides us with basic CRUD operations (and of course a reactive support, as well). Similarly, let’s create the EmployeeRepository:
interface EmployeeRepository : ReactiveMongoRepository<Employee, ObjectId> { fun findByCompanyId(id: String): Flux<Employee> }
This time, we’ve provided additionally the definition of findByCompanyId function, which will be used later to find employees of a specific company.
If you have any prior experience with Spring Data repositories, you might have noticed the crucial difference between the standard and reactive repositories- they are using Fluxes and Monos, instead of Objects and Collections.
8. Create Services
8.1. Implement CompanyService
Before we will create the service, let’s add the CompanyRequest class, which will be used to store the data passed by the user:
class CompanyRequest( val name: String, val address: String )
Nextly, let’s create the CompanyService class. Please don’t worry, if anything is confusing to you, I’ll cover all functions later.
@Service class CompanyService( private val companyRepository: CompanyRepository, private val employeeRepository: EmployeeRepository ) { fun createCompany(request: CompanyRequest): Mono<Company> = companyRepository.save( Company( name = request.name, address = request.address ) ) fun findAll(): Flux<Company> = companyRepository.findAll() fun findById(id: String): Mono<Company> = companyRepository.findById(id) .switchIfEmpty { Mono.error( NotFoundException("Company with id $id not found") ) } fun updateCompany(id: String, request: CompanyRequest): Mono<Company> = findById(id) .flatMap { companyRepository.save( it.apply { name = request.name address = request.address } ) } .doOnSuccess { updateCompanyEmployees(it).subscribe() } fun deleteById(id: String): Mono<Void> = findById(id) .flatMap(companyRepository::delete) private fun updateCompanyEmployees(updatedCompany: Company): Flux<Employee> = employeeRepository.saveAll( employeeRepository.findByCompanyId(updatedCompany.id!!) .map { it.apply { company = updatedCompany } } ) }
Let’s start with the two easiest functions:
fun createCompany(request: CompanyRequest): Mono<Company> = companyRepository.save( Company( name = request.name, address = request.address ) ) fun findAll(): Flux<Company> = companyRepository.findAll()
They are responsible for creating and querying all companies, but instead of returning elements, they wrap them into Mono and Flux, which can be processed later.
What about a findById function?
fun findById(id: String): Mono<Company> = companyRepository.findById(id) .switchIfEmpty { Mono.error( NotFoundException("Company with id $id not found") ) }
The findById function from CompanyRepository returns a Mono with the existing data or an empty Mono in case if no data has been found. You might think about it as something similar to the Optional in Java. So in our case, if we would like to “throw” the exception, we need to create a new Mono instance with our Throwable- the NotFoundException object and pass it to the switchIfEmpty function.
Nextly, let’s check the updateCompany function:
fun updateCompany(id: String, request: CompanyRequest): Mono<Company> = findById(id) .flatMap { companyRepository.save( it.apply { name = request.name address = request.address } ) } .doOnSuccess { updateCompanyEmployees(it).subscribe() } private fun updateCompanyEmployees(updatedCompany: Company): Flux<Employee> = employeeRepository.saveAll( employeeRepository.findByCompanyId(updatedCompany.id!!) .map { it.apply { company = updatedCompany } } )
Firstly, it invokes the findById function to get the Mono object of the desired company. After that, we use the flatMap to transform the value emitted by it (the company) asynchronously and return the value emitted by the save method. If everything executed successfully, the updateCompanyEmployees function will be invoked. As the last step, we need to subscribe to the Flux returned from this function.
8.2. MongoDB Relations
You might be wondering why do we need to update all employees connected with the company, each time we are updating it? That’s because we keep the related company in denormalized form- inside the employee document. To visualize that, let’s check the example document from our database:
{ "firstName": "Piotr", "lastName": "Wolak", "email": "contact@codersee.com", "company": { "_id": { "$oid": "5fcb90f830e3af4497f5de14" }, "name": "Company Two", "company_address": "Address Two" }, "_class": "com.codersee.mongocrud.model.Employee" }
As you can see, the company document is stored as an inner document of each employee. This might seem pretty weird, especially when you have any previous experience working with SQL databases, but this is the optimal choice for most real-life scenarios when using MongoDB (if you would like to learn more about MongoDB relations, please let me know and I will create another tutorial about it :).
As the last step, let’s check the deleteById function:
fun deleteById(id: String): Mono<Void> = findById(id) .flatMap(companyRepository::delete)
All it does is to fetch the company’s Mono and delete its value returning the instance of Mono<Void>.
8.3. Create EmployeeService
Similarly, let’s create the EmployeeRequest responsible for data handling and EmployeeService:
class EmployeeRequest( val firstName: String, val lastName: String, val email: String, val companyId: String? )
@Service class EmployeeService( private val companyService: CompanyService, private val employeeRepository: EmployeeRepository ) { fun createEmployee(request: EmployeeRequest): Mono<Employee> { val companyId = request.companyId return if (companyId == null) { createEmployeeWithoutCompany(request) } else { createEmployeeWithCompany(companyId, request) } } private fun createEmployeeWithoutCompany(request: EmployeeRequest): Mono<Employee> { return employeeRepository.save( Employee( firstName = request.firstName, lastName = request.lastName, email = request.email, company = null ) ) } private fun createEmployeeWithCompany(companyId: String, request: EmployeeRequest) = companyService.findById(companyId) .flatMap { employeeRepository.save( Employee( firstName = request.firstName, lastName = request.lastName, email = request.email, company = it ) ) } fun findAll(): Flux<Employee> = employeeRepository.findAll() fun findAllByCompanyId(id: String): Flux<Employee> = employeeRepository.findByCompanyId(id) fun findById(id: ObjectId): Mono<Employee> = employeeRepository.findById(id) .switchIfEmpty { Mono.error( NotFoundException("Employee with id $id not found") ) } fun updateEmployee(id: ObjectId, request: EmployeeRequest): Mono<Employee> { val employeeToUpdate = findById(id) val companyId = request.companyId return if (companyId == null) { updateEmployeeWithoutCompany(employeeToUpdate, request) } else { updateEmployeeWithCompany(companyId, employeeToUpdate, request) } } private fun updateEmployeeWithoutCompany(employeeToUpdate: Mono, request: EmployeeRequest) = employeeToUpdate.flatMap { employeeRepository.save( it.apply { firstName = request.firstName lastName = request.lastName email = request.email company = null } ) } private fun updateEmployeeWithCompany( companyId: String, employeeToUpdate: Mono, request: EmployeeRequest ) = companyService.findById(companyId) .zipWith(employeeToUpdate) .flatMap { employeeRepository.save( it.t2.apply { firstName = request.firstName lastName = request.lastName email = request.email company = it.t1 } ) } fun deleteById(id: ObjectId): Mono<Employee> { return findById(id) .flatMap(employeeRepository::delete) } }
After implementing the previous chapter, most of the concepts used in the code above should be familiar to you, so I won’t be diving into the details here. However, you might be interested in what exactly the zipWith function does inside the updateEmployeeWithCompany?
private fun updateEmployeeWithCompany( companyId: String, employeeToUpdate: Mono<Employee>, request: EmployeeRequest ) = companyService.findById(companyId) .zipWith(employeeToUpdate) .flatMap { employeeRepository.save( it.t2.apply { firstName = request.firstName lastName = request.lastName email = request.email company = it.t1 } ) }
Well, the zipWith function combines the result of two Monos into a Tuple2 (a data structure holding two, non-null values). In our case, firstly, we obtain the Company Mono from the CompanyService and then couple it with Employee Mono passed to the function. After this operation, our tuple instance is passed to the flatMap function allowing us to reference the Employee (t2) and Company (t1) instances.
9. Implement REST Controllers
As the last step, we will implement the controller layer responsible for request handling. But before that, let’s prepare two classes: CompanyResponse and EmployeeResponse responsible for returning the data to the user:
class CompanyResponse( val id: String, val name: String, val address: String ) { companion object { fun fromEntity(company: Company): CompanyResponse = CompanyResponse( id = company.id!!, name = company.name, address = company.address ) } }
Important note: although the creation of separate classes for transferring the incoming and outgoing data adds some redundancy to the code, I personally believe it is a good way to separate these concepts and make the code cleaner.
class EmployeeResponse( val id: String, val firstName: String, val lastName: String, val email: String, val company: CompanyResponse? ) { companion object { fun fromEntity(employee: Employee): EmployeeResponse = EmployeeResponse( id = employee.id!!.toHexString(), firstName = employee.firstName, lastName = employee.lastName, email = employee.email, company = employee.company?.let { CompanyResponse.fromEntity(it) } ) } }
As you can see, both classes contain fromEntity functions inside companion objects. As the name suggests, they will be used to convert entity objects to response instances.
9.1. Create CompanyController
With that being done, let’s add the CompanyController class to our project:
@RestController @RequestMapping("/api/company") class CompanyController( private val companyService: CompanyService ) { @PostMapping fun createCompany(@RequestBody request: CompanyRequest): Mono<CompanyResponse> { return companyService.createCompany(request) .map { CompanyResponse.fromEntity(it) } } @GetMapping fun findAllCompanies(): Flux<CompanyResponse> { return companyService.findAll() .map { CompanyResponse.fromEntity(it) } .delayElements(Duration.ofSeconds(2)) } @GetMapping("/{id}") fun findCompanyById(@PathVariable id: String): Mono<CompanyResponse> { return companyService.findById(id) .map { CompanyResponse.fromEntity(it) } } @PutMapping("/{id}") fun updateCompany( @PathVariable id: String, @RequestBody request: CompanyRequest ): Mono<CompanyResponse> { return companyService.updateCompany(id, request) .map { CompanyResponse.fromEntity(it) } } @DeleteMapping("/{id}") fun deleteCompany(@PathVariable id: String): Mono<Void> { return companyService.deleteById(id) } }
You might have noticed the usage of the delayElements function. It adds the pause between emitting each element from our Flux instance. I’ve added it to simulate the additional delay, which might happen in real-life scenarios (for instance, when processing the data, or calling other services).
Additionally, all functions return either the Flux or Mono instance, but why do we do that?
Well, with this approach, the behavior of the WebFlux will be dependent on the type of the incoming request:
- if we call the endpoint without specifying the Accept header, or when we set it to application/json, the request will be handled in a standard, blocking way returning a JSON in response
- however, if we set the Accept header value to the text/event-stream,ย then the Server-Sent Event channel will be opened allowing the server to push the data to the client (in our case- with 2 seconds of the delay between each company response)
9.2. Create EmployeeController
Similarly, let’s implement the EmployeeController.
@RestController @RequestMapping("/api/employee") class EmployeeController( private val employeeService: EmployeeService ) { @PostMapping fun createEmployee(@RequestBody request: EmployeeRequest): Mono<EmployeeResponse> { return employeeService.createEmployee(request) .map { EmployeeResponse.fromEntity(it) } } @GetMapping fun findAllEmployees(): Flux<EmployeeResponse> { return employeeService.findAll() .map { EmployeeResponse.fromEntity(it) } } @GetMapping("/{id}") fun findEmployeeById(@PathVariable id: ObjectId): Mono<EmployeeResponse> { return employeeService.findById(id) .map { EmployeeResponse.fromEntity(it) } } @GetMapping("/company/{companyId}") fun findAllByCompanyId(@PathVariable companyId: String): Flux<EmployeeResponse> { return employeeService.findAllByCompanyId(companyId) .map { EmployeeResponse.fromEntity(it) } } @PutMapping("/{id}") fun updateUpdateEmployee( @PathVariable id: ObjectId, @RequestBody request: EmployeeRequest ): Mono<EmployeeResponse> { return employeeService.updateEmployee(id, request) .map { EmployeeResponse.fromEntity(it) } } @DeleteMapping("/{id}") fun deleteEmployee(@PathVariable id: ObjectId): Mono<Void> { return employeeService.deleteById(id) } }
10. Test with CURL
Finally, we can run our application and test the endpoints using cURL. If you would like to explore the database, I highly recommend the MongoDB Compass, which is the dedicated GUI for MongoDB.
Let’s start by adding two companies:
# 1st company curl --location --request POST 'localhost:8080/api/company' \ --header 'Content-Type: application/json' \ --data-raw '{ "name": "Company 1", "address": "Address 1" }' # 2nd company curl --location --request POST 'localhost:8080/api/company' \ --header 'Content-Type: application/json' \ --data-raw '{ "name": "Company 2", "address": "Address 2" }'
Nextly, let’s validate if the data has been added correctly in two ways. Let’s check out the blocking approach first:
curl 'localhost:8080/api/company' # Example output [ { "id": "5fcba07a30e3af4497f5de16", "name": "Company 1", "address": "Address 1" }, { "id": "5fcba07c30e3af4497f5de17", "name": "Company 2", "address": "Address 2" } ]
As the next step, let’s test the endpoint utilizing the Server-Sent Event channel:
curl http://localhost:8080/api/company -H "Accept: text/event-stream" # Example output data:{"id":"5fcba07a30e3af4497f5de16","name":"Company 1","address":"Address 1"} data:{"id":"5fcba07c30e3af4497f5de17","name":"Company 2","address":"Address 2"}
In the first case, we had to wait around 4 seconds for the server response (2 companies * 2 seconds of the delay). On the other hand, utilizing the SSE channel in the second case allows the server to push the data to the client chunk by chunk, with the delay between each entry.
Awesome, isn’t it? Let’s imagine that our endpoint returns thousands of data or the computation takes much longer. With the blocking request, we’d have to wait until everything will be processed to consume it. With the SSE, the client can process the data (for instance, displaying it immediately to the user) as soon, as it receives the first chunk.
After that, let’s validate if other endpoints are working as well. Let’s start by querying a specific company by the ID:
curl --location --request GET 'localhost:8080/api/company/5fcba07c30e3af4497f5de17' # Expected output { "id": "5fcba07c30e3af4497f5de17", "name": "Company 2", "address": "Address 2" }
To update the company, let’s use the PUT request:
curl --location --request PUT 'localhost:8080/api/company/5fcba07c30e3af4497f5de17' \ --header 'Content-Type: application/json' \ --data-raw '{ "name": "Company 2 Updated", "address": "Address 2 Updated" }'
If everything went well, we should see that our company data has been updated- you can always check it with another endpoint as well.
To delete the company, let’s use the DELETE handling endpoint:
curl --location --request DELETE 'localhost:8080/api/company/5fcba07c30e3af4497f5de17'
This time, we should receive an empty response.
Similarly, we can test the rest of the endpoints responsible for employees management- let it be homework for you- I’ve always believed there is no other way to learn anything, than trying on our own ๐
11. Summary
And that’s all for this article ๐ We’ve gone step by step through the process of deploying the MongoDB and a Docker container and creating theย Reactive MongoDB REST API CRUD with Spring Boot and Kotlin.
Please don’t worry, if some of the topics covered in this article seem to be difficult or unclear. The WebFlux and reactive approach, in general, are really complex topics and they require some time and practice to fully understand them. If you would like to ask about anything or need some explanation, please do it in the comment section below, or by using the contact form.
Traditionally, you can find the working project in this GitHub repository.
Finally, if you found this article useful, then you may want to check out how to generate test reports with Jacoco.