Containers Trend Report. Explore the current state of containers, containerization strategies, and modernizing architecture.
Securing Your Software Supply Chain with JFrog and Azure. Leave with a roadmap for keeping your company and customers safe.
Welcome to the Data Engineering category of DZone, where you will find all the information you need for AI/ML, big data, data, databases, and IoT. As you determine the first steps for new systems or reevaluate existing ones, you're going to require tools and resources to gather, store, and analyze data. The Zones within our Data Engineering category contain resources that will help you expertly navigate through the SDLC Analysis stage.
Artificial intelligence (AI) and machine learning (ML) are two fields that work together to create computer systems capable of perception, recognition, decision-making, and translation. Separately, AI is the ability for a computer system to mimic human intelligence through math and logic, and ML builds off AI by developing methods that "learn" through experience and do not require instruction. In the AI/ML Zone, you'll find resources ranging from tutorials to use cases that will help you navigate this rapidly growing field.
Big data comprises datasets that are massive, varied, complex, and can't be handled traditionally. Big data can include both structured and unstructured data, and it is often stored in data lakes or data warehouses. As organizations grow, big data becomes increasingly more crucial for gathering business insights and analytics. The Big Data Zone contains the resources you need for understanding data storage, data modeling, ELT, ETL, and more.
Data is at the core of software development. Think of it as information stored in anything from text documents and images to entire software programs, and these bits of information need to be processed, read, analyzed, stored, and transported throughout systems. In this Zone, you'll find resources covering the tools and strategies you need to handle data properly.
A database is a collection of structured data that is stored in a computer system, and it can be hosted on-premises or in the cloud. As databases are designed to enable easy access to data, our resources are compiled here for smooth browsing of everything you need to know from database management systems to database languages.
IoT, or the Internet of Things, is a technological field that makes it possible for users to connect devices and systems and exchange data over the internet. Through DZone's IoT resources, you'll learn about smart devices, sensors, networks, edge computing, and many other technologies — including those that are now part of the average person's daily life.
Enterprise AI
In recent years, artificial intelligence has become less of a buzzword and more of an adopted process across the enterprise. With that, there is a growing need to increase operational efficiency as customer demands arise. AI platforms have become increasingly more sophisticated, and there has become the need to establish guidelines and ownership. In DZone’s 2022 Enterprise AI Trend Report, we explore MLOps, explainability, and how to select the best AI platform for your business. We also share a tutorial on how to create a machine learning service using Spring Boot, and how to deploy AI with an event-driven platform. The goal of this Trend Report is to better inform the developer audience on practical tools and design paradigms, new technologies, and the overall operational impact of AI within the business. This is a technology space that's constantly shifting and evolving. As part of our December 2022 re-launch, we've added new articles pertaining to knowledge graphs, a solutions directory for popular AI tools, and more.
Abstract “Idempotence is the property of certain operations in mathematics and computer science whereby they can be applied multiple times without changing the result beyond the initial application” [Resource 3]. The purpose of this article is to outline a few ways of creating idempotent changes when the database modifications are managed with Liquibase. Throughout the lifetime of a software product that has such tier, various database modifications are being applied as it evolves. The more robust the modifications are, the more maintainable the solution is. In order to accomplish such a way of working, it is usually a good practice to design the executed changesets to have zero side effects, that is, to be able to be run as many times as needed with the same end result. The simple proof of concept built here aims to show case how Liquibase changesets may be written to be idempotent. Moreover, the article explains in more depth what exactly happens when the application starts. Set Up Java 17 Spring Boot v.3.1.0 Liquibase 4.20.0 PostgreSQL Driver 42.6.0 Maven 3.6.3 Proof of Concept As PostgreSQL is the database used here, first and foremost one shall create a new schema — liquidempo. This operation is easy to accomplish by issuing the following SQL command, once connected to the database. SQL create schema liquidempo; At the application level: The Maven Spring Boot project is created and configured to use the PostgreSQL Driver, Spring Data JPA and Liquibase dependencies. A simple entity is created — Human — with only one attribute, a unique identifier which is also the primary key at database level. Java @Entity @Table(name = "human") @SequenceGenerator(sequenceName = "human_seq", name = "CUSTOM_SEQ_GENERATOR", allocationSize = 1) public class Human { @Id @GeneratedValue(strategy = GenerationType.AUTO, generator = "CUSTOM_SEQ_GENERATOR") @Column(name = "id") private Long id; public Long getId() { return id; } public void setId(Long id) { this.id = id; } } For convenience, when entities are stored, their unique identifiers are generated using a database sequence called human_seq. The data source is configured as usual in the application.properties file. Properties files spring.datasource.type=com.zaxxer.hikari.HikariDataSource spring.datasource.url=jdbc:postgresql://localhost:5432/postgres?currentSchema=liquidempo&useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=true spring.datasource.username=postgres spring.datasource.password=123456 spring.jpa.database-platform=org.hibernate.dialect.PostgreSQLDialect spring.jpa.hibernate.ddl-auto=none The previously created schema is referred to in the connection URL. DDL handling is disabled, as the infrastructure and the data are intended to be persistent when the application is restarted. As Liquibase is the database migration manager, the changelog path is configured in the application.properties file as well. Properties files spring.liquibase.change-log=classpath:/db/changelog/db.changelog-root.xml For now, the db.changelog-root.xml file is empty. The current state of the project requires a few simple changesets, in order to create the database elements depicted around the Human entity — the table, the sequence, and the primary key constraint. XML <?xml version="1.0" encoding="UTF-8"?> <databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.17.xsd"> <changeSet author="horatiucd" id="100"> <createSequence sequenceName="human_seq" startValue="1" incrementBy="1"/> </changeSet> <changeSet author="horatiucd" id="200"> <createTable tableName="human"> <column name="id" type="BIGINT"> <constraints nullable="false"/> </column> </createTable> </changeSet> <changeSet author="horatiucd" id="300"> <addPrimaryKey columnNames="id" constraintName="human_pk" tableName="human"/> </changeSet> </databaseChangeLog> In order for these to be applied, they need to be recorded as part of db.changelog-root.xml file, as indicated below. XML <?xml version="1.0" encoding="UTF-8"?> <databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.17.xsd"> <include file="db/changelog/human_init.xml"/> </databaseChangeLog> When the application is restarted, the three changesets are executed in the order they are declared. Plain Text INFO 9092 --- [main] liquibase.database : Set default schema name to liquidempo INFO 9092 --- [main] liquibase.lockservice : Successfully acquired change log lock INFO 9092 --- [main] liquibase.changelog : Creating database history table with name: liquidempo.databasechangelog INFO 9092 --- [main] liquibase.changelog : Reading from liquidempo.databasechangelog Running Changeset: db/changelog/human_init.xml::100::horatiucd INFO 9092 --- [main] liquibase.changelog : Sequence human_seq created INFO 9092 --- [main] liquibase.changelog : ChangeSet db/changelog/human_init.xml::100::horatiucd ran successfully in 6ms Running Changeset: db/changelog/human_init.xml::200::horatiucd INFO 9092 --- [main] liquibase.changelog : Table human created INFO 9092 --- [main] liquibase.changelog : ChangeSet db/changelog/human_init.xml::200::horatiucd ran successfully in 4ms Running Changeset: db/changelog/human_init.xml::300::horatiucd INFO 9092 --- [main] liquibase.changelog : Primary key added to human (id) INFO 9092 --- [main] liquibase.changelog : ChangeSet db/changelog/human_init.xml::300::horatiucd ran successfully in 8ms INFO 9092 --- [main] liquibase : Update command completed successfully. INFO 9092 --- [main] liquibase.lockservice : Successfully released change log lock Moreover, they are recorded as separate rows in the databasechangelog database table. Plain Text +---+---------+---------------------------+--------------------------+-------------+--------+----------------------------------+------------------------------------------------------+ |id |author |filename |dateexecuted |orderexecuted|exectype|md5sum |description | +---+---------+---------------------------+--------------------------+-------------+--------+----------------------------------+------------------------------------------------------+ |100|horatiucd|db/changelog/human_init.xml|2023-05-26 16:23:17.184239|1 |EXECUTED|8:db8c5fb392dc96efa322da2c326b5eba|createSequence sequenceName=human_seq | |200|horatiucd|db/changelog/human_init.xml|2023-05-26 16:23:17.193031|2 |EXECUTED|8:ed8e5e7df5edb17ed9a0682b9b640d7f|createTable tableName=human | |300|horatiucd|db/changelog/human_init.xml|2023-05-26 16:23:17.204184|3 |EXECUTED|8:a2d6eff5a1e7513e5ab7981763ae532b|addPrimaryKey constraintName=human_pk, tableName=human| +---+---------+---------------------------+--------------------------+-------------+--------+----------------------------------+------------------------------------------------------+ So far, everything is straightforward, nothing out of the ordinary — a simple Spring Boot application whose database changes are managed with Liquibase. When examining the above human_init.xml file, one can easily depict the three scripts that result from the three changesets. None is idempotent. It means that if they are executed again (although there is no reason for doing it here) errors will occur because the human_seq sequence, the human table, and the human_pk primary key already exist. Idempotent Changesets If the SQL code that results from the XML changesets had been written directly and aimed to be idempotent, it would have read as follows: SQL CREATE SEQUENCE IF NOT EXISTS human_seq INCREMENT 1 MINVALUE 1 MAXVALUE 99999999999; CREATE TABLE IF NOT EXISTS human ( id SERIAL CONSTRAINT human_pk PRIMARY KEY ); If the two commands are executed several times, no errors occur and the outcome remains the same. After the first run, the sequence, the table, and the constraint are created, then every new execution leaves them in the same usable state. The aim is to accomplish the same in the written Liquibase changesets (changelog). According to the Liquibase documentation [Resource 1]: “Preconditions are tags you add to your changelog or individual changesets to control the execution of an update based on the state of the database. Preconditions let you specify security and standardization requirements for your changesets. If a precondition on a changeset fails, Liquibase does not deploy that changeset.” These constructs may be configured in various ways, either at changelog or changeset level. For simplicity, the three changesets of this proof of concept will be made idempotent. Basically, whenever a changeset fails to execute because the entity (sequence, table, or primary key) already exists, it would be convenient to continue and not halt the execution of the entire changelog and not be able to start the application. In this direction, Liquibase preconditions provide at least two options: Either skip over the changeset and continue with the changelog, or Skip over the changeset but mark it as executed and continue with the changelog. Either of the two can be configured by adding a preConditions tag in the changeset of interest and setting the onFail attribute as CONTINUE (the former case) or MARK_RAN (the latter case). In pseudo-code, this looks as below: XML <changeSet author="horatiucd" id="100"> <preConditions onFail="CONTINUE or MARK_RAN"> ... </preConditions> ... </changeSet> This seems in line with the initial desire — execute the changeset only if the preconditions are met. Next, each of the two situations is analyzed. onFail=”CONTINUE” The changelog file — human_init_idempo_continue.xml — becomes as below: XML <?xml version="1.0" encoding="UTF-8"?> <databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.17.xsd"> <changeSet author="horatiucd" id="101"> <preConditions onFail="CONTINUE"> <not> <sequenceExists sequenceName="human_seq"/> </not> </preConditions> <createSequence sequenceName="human_seq" startValue="1" incrementBy="1"/> </changeSet> <changeSet author="horatiucd" id="201"> <preConditions onFail="CONTINUE"> <not> <tableExists tableName="human"/> </not> </preConditions> <createTable tableName="human"> <column name="id" type="BIGINT"> <constraints nullable="false"/> </column> </createTable> </changeSet> <changeSet author="horatiucd" id="301"> <preConditions onFail="CONTINUE"> <not> <primaryKeyExists primaryKeyName="human_pk" tableName="human"/> </not> </preConditions> <addPrimaryKey columnNames="id" constraintName="human_pk" tableName="human"/> </changeSet> </databaseChangeLog> For each item, the precondition checks if it does not exist. When running the application, the log shows what is executed: Plain Text INFO 49016 --- [main] liquibase.database : Set default schema name to liquidempo INFO 49016 --- [main] liquibase.changelog : Reading from liquidempo.databasechangelog INFO 49016 --- [main] liquibase.lockservice : Successfully acquired change log lock Running Changeset: db/changelog/human_init_idempo_continue.xml::101::horatiucd INFO 49016 --- [main] liquibase.changelog : Continuing past: db/changelog/human_init_idempo_continue.xml::101::horatiucd despite precondition failure due to onFail='CONTINUE': db/changelog/db.changelog-root.xml : Not precondition failed Running Changeset: db/changelog/human_init_idempo_continue.xml::201::horatiucd INFO 49016 --- [main] liquibase.changelog : Continuing past: db/changelog/human_init_idempo_continue.xml::201::horatiucd despite precondition failure due to onFail='CONTINUE': db/changelog/db.changelog-root.xml : Not precondition failed Running Changeset: db/changelog/human_init_idempo_continue.xml::301::horatiucd INFO 49016 --- [main] liquibase.changelog : Continuing past: db/changelog/human_init_idempo_continue.xml::301::horatiucd despite precondition failure due to onFail='CONTINUE': db/changelog/db.changelog-root.xml : Not precondition failed INFO 49016 --- [main] liquibase : Update command completed successfully. INFO 49016 --- [main] liquibase.lockservice : Successfully released change log lock As expected, all three preconditions failed and the execution of the changelog continued. The databasechangelog database table does not have any records in addition to the previous three, which means the changesets will be attempted to be executed again at the next startup of the application. onFail=”MARK_RAN” The changelog file — human_init_idempo_mark_ran.xml — is similar to the one in human_init_idempo_continue.xml. The only difference is the onFail attribute, which is set as onFail="MARK_RAN". The db.changelog-root.xml root changelog now looks as below: XML <?xml version="1.0" encoding="UTF-8"?> <databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog https://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.17.xsd"> <include file="db/changelog/human_init.xml"/> <include file="db/changelog/human_init_idempo_continue.xml"/> <include file="db/changelog/human_init_idempo_mark_ran.xml"/> </databaseChangeLog> For this proof of concept, all three files were kept on purpose, in order to be able to observe the behavior in detail. If the application is restarted, no errors are encountered and the log depicts the following: Plain Text INFO 38788 --- [main] liquibase.database : Set default schema name to liquidempo INFO 38788 --- [main] liquibase.changelog : Reading from liquidempo.databasechangelog INFO 38788 --- [main] liquibase.lockservice : Successfully acquired change log lock INFO 38788 --- [main] liquibase.changelog : Reading from liquidempo.databasechangelog Running Changeset: db/changelog/human_init_idempo_continue.xml::101::horatiucd INFO 38788 --- [main] liquibase.changelog : Continuing past: db/changelog/human_init_idempo_continue.xml::101::horatiucd despite precondition failure due to onFail='CONTINUE': db/changelog/db.changelog-root.xml : Not precondition failed Running Changeset: db/changelog/human_init_idempo_continue.xml::201::horatiucd INFO 38788 --- [main] liquibase.changelog : Continuing past: db/changelog/human_init_idempo_continue.xml::201::horatiucd despite precondition failure due to onFail='CONTINUE': db/changelog/db.changelog-root.xml : Not precondition failed Running Changeset: db/changelog/human_init_idempo_continue.xml::301::horatiucd INFO 38788 --- [main] liquibase.changelog : Continuing past: db/changelog/human_init_idempo_continue.xml::301::horatiucd despite precondition failure due to onFail='CONTINUE': db/changelog/db.changelog-root.xml : Not precondition failed Running Changeset: db/changelog/human_init_idempo_mark_ran.xml::101::horatiucd INFO 38788 --- [main] liquibase.changelog : Marking ChangeSet: "db/changelog/human_init_idempo_mark_ran.xml::101::horatiucd" as ran despite precondition failure due to onFail='MARK_RAN': db/changelog/db.changelog-root.xml : Not precondition failed Running Changeset: db/changelog/human_init_idempo_mark_ran.xml::201::horatiucd INFO 38788 --- [main] liquibase.changelog : Marking ChangeSet: "db/changelog/human_init_idempo_mark_ran.xml::201::horatiucd" as ran despite precondition failure due to onFail='MARK_RAN': db/changelog/db.changelog-root.xml : Not precondition failed Running Changeset: db/changelog/human_init_idempo_mark_ran.xml::301::horatiucd INFO 38788 --- [main] liquibase.changelog : Marking ChangeSet: "db/changelog/human_init_idempo_mark_ran.xml::301::horatiucd" as ran despite precondition failure due to onFail='MARK_RAN': db/changelog/db.changelog-root.xml : Not precondition failed INFO 38788 --- [main] liquibase : Update command completed successfully. INFO 38788 --- [main] liquibase.lockservice : Successfully released change log lock The changesets with onFail="CONTINUE" were tried to be re-executed, as this is a new attempt, while the ones with onFail="MARK_RAN" were marked in the databasechangelog and will be passed over at the next start-up. Plain Text +---+---------+-------------------------------------------+--------------------------+-------------+--------+----------------------------------+------------------------------------------------------+ |id |author |filename |dateexecuted |orderexecuted|exectype|md5sum |description | +---+---------+-------------------------------------------+--------------------------+-------------+--------+----------------------------------+------------------------------------------------------+ |100|horatiucd|db/changelog/human_init.xml |2023-05-26 16:23:17.184239|1 |EXECUTED|8:db8c5fb392dc96efa322da2c326b5eba|createSequence sequenceName=human_seq | |200|horatiucd|db/changelog/human_init.xml |2023-05-26 16:23:17.193031|2 |EXECUTED|8:ed8e5e7df5edb17ed9a0682b9b640d7f|createTable tableName=human | |300|horatiucd|db/changelog/human_init.xml |2023-05-26 16:23:17.204184|3 |EXECUTED|8:a2d6eff5a1e7513e5ab7981763ae532b|addPrimaryKey constraintName=human_pk, tableName=human| |101|horatiucd|db/changelog/human_init_idempo_mark_ran.xml|2023-05-29 16:40:26.453305|4 |MARK_RAN|8:db8c5fb392dc96efa322da2c326b5eba|createSequence sequenceName=human_seq | |201|horatiucd|db/changelog/human_init_idempo_mark_ran.xml|2023-05-29 16:40:26.463021|5 |MARK_RAN|8:ed8e5e7df5edb17ed9a0682b9b640d7f|createTable tableName=human | |301|horatiucd|db/changelog/human_init_idempo_mark_ran.xml|2023-05-29 16:40:26.475153|6 |MARK_RAN|8:a2d6eff5a1e7513e5ab7981763ae532b|addPrimaryKey constraintName=human_pk, tableName=human| +---+---------+-------------------------------------------+--------------------------+-------------+--------+----------------------------------+------------------------------------------------------+ At the next run of the application, the log will be similar to the one where the onFail was set on "CONTINUE". One more observation is worth making at this point. In case of a changeset whose preconditions do not fail, they are executed normally and recorded with exectype = EXECUTED in the databasechangelog table. Conclusions This article presented two ways of writing idempotent Liquibase changesets, a practice that allows having more robust and easy-to-maintain applications. This was accomplished by leveraging the changeset preConditions tag inside the changelog files. While both onFail attribute values — CONTINUE and MARK_RAN — may be used depending on the actual performed operation, the latter seems more appropriate for this proof of concept, as it does not attempt to re-run the changesets at every start-up of the application. Resources Liquibase Documentation Source code for the sample application Idempotence
Microservices architecture has gained popularity in recent years as a way to design complex and scalable applications. In this architecture, applications are divided into small, autonomous services that work together to provide the necessary functionality. Each microservice performs a specific task and communicates with other microservices through APIs. Data management is an essential part of microservices, and there are various patterns that can be used to handle data effectively. Data Management in Microservices Microservices architecture is characterized by a collection of small, independent services that are loosely coupled and communicate with each other using APIs. Each service is responsible for performing a specific business function and can be developed, deployed, and scaled independently. In a microservices architecture, data is distributed across multiple services, and each service has its database or data store. This distribution of data can present challenges in managing data consistency, data redundancy, data access, and data storage. Data Management Challenges in Microservices One of the primary challenges in managing data within a microservices architecture is maintaining data consistency across services. Since each service has its own database, it is crucial to ensure that the data in each database is synchronized and consistent. In addition, managing data across services can be complex since services may use different data models or even different database technologies. Furthermore, as microservices are independently deployable, they may be deployed in different locations, which can further complicate data management. To address these challenges, various data management patterns have emerged in microservices architecture. These patterns are designed to ensure data consistency, facilitate data exchange between services, and simplify the management of data across services. Data Consistency In a microservices architecture, data consistency is a significant challenge because data is distributed across multiple services, and each service has its database. Maintaining consistency across all these databases can be challenging, and it requires careful consideration of data consistency patterns. One common pattern used to ensure data consistency in microservices architecture is the saga pattern. The saga pattern is a distributed transaction pattern that ensures data consistency in a microservices architecture. The pattern is based on the idea of a saga, which is a sequence of local transactions that are executed in each service. Each local transaction updates the local database and publishes a message to the message broker to indicate that the transaction has been completed. The message contains information that is used by other services to determine whether they can proceed with their transactions. If all the local transactions are successful, the saga is considered to be successful. Otherwise, the saga is aborted, and compensating transactions are executed to undo the changes made by the local transactions. Data Redundancy Data redundancy is another challenge in data management in a microservices architecture. In a microservices architecture, each service has its database, which can lead to data duplication across multiple services. Data duplication can lead to inconsistencies, increased storage costs, and decreased system performance. One pattern used to address data redundancy in microservices architecture is event-driven architecture. In an event-driven architecture, events are used to propagate changes in data across multiple services. When a service updates its database, it publishes an event to the message broker, indicating that the data has changed. Other services that are interested in the data subscribe to the event and update their databases accordingly. This approach ensures that data is consistent across all the services and reduces data redundancy. Data Access Data access is another challenge in data management in a microservices architecture. In a microservices architecture, each service has its database, which can make it challenging to access data across services. One pattern used to address this challenge is the API gateway pattern. In the API gateway pattern, a single entry point is used to access all the services in the system. The API gateway provides a unified interface for accessing data across multiple services. When a client makes a request to the API gateway, the gateway translates the request into calls to the appropriate services and aggregates the responses to provide a unified response to the client. This approach simplifies the client’s interaction with the system and makes it easier to manage data access across services. Data Storage Data storage is another challenge in data management in a microservices architecture. In a microservices architecture, each service has a database, which can increase storage costs and decrease system performance. One pattern used to address this challenge is the database per service pattern. In the database per service pattern, each service has its database, which is used to store data specific to that service. This approach provides isolation between services and reduces the risk of data being read by multiple users. Data Management Patterns in Microservices While we have discussed the data management challenges in microservices. Let’s now discuss the data management patterns that could be used in a microservices architecture. Database per Service Pattern The database per service pattern is a popular approach for managing data in microservices. This approach provides several benefits, such as better scalability and flexibility, as each service can use a database technology that best suits its needs. In this pattern, each microservice has its own database, and the database schema is designed to serve the specific needs of that service. This pattern allows each microservice to manage its data independently without interfering with other microservices. This pattern is suitable for applications that have a large number of microservices and require a high degree of autonomy. Scalability is improved because each service can be scaled independently, allowing for more granular control over resource allocation. Fault isolation is also improved because if a particular service fails, it will not affect the data stored by other services. Deployment is also simplified because each service can be deployed independently, without impacting other services. However, there are also some drawbacks to the database per service pattern. For example, it can lead to data duplication and inconsistency if services are not designed properly. Additionally, managing multiple databases can be challenging, and it can be difficult to ensure that all databases are kept up-to-date and synchronized. One of the biggest challenges is maintaining consistency across multiple databases. When data is distributed across multiple databases, it can be difficult to ensure that all databases are in sync. To address this challenge, some organizations use event-driven architectures or distributed transactions to ensure consistency across multiple databases. Shared Database Pattern In this pattern, all microservices share a common database, and each microservice has access to the data it needs. This pattern simplifies data management by centralizing the data, but it can lead to tight coupling between microservices. Changes made to the database schema can affect multiple microservices, making it difficult to evolve the system over time. So, this is another approach for managing data in microservices is the shared database pattern. In this pattern, multiple services share a common database. This approach simplifies data management, as all data is stored in a single database. It also makes it easier to maintain consistency across services, as all services are using the same database. Data consistency is improved because all services use the same database, which ensures that data is always up-to-date and consistent across all services. Data duplication is also reduced because all services use the same database, eliminating the need for multiple copies of the same data. However, there are also some drawbacks to the shared database pattern. For example, scalability can be challenging because all services share the same database, which can create bottlenecks and limit scalability. Fault isolation can also be challenging because if the shared database fails, it will affect all services that rely on it. Additionally, deployment can be more complex because all services need to be updated and tested when changes are made to the shared database schema. One of the biggest challenges is that it can create tight coupling between services. When multiple services share a database, changes to the database schema or data can impact other services. This can make it difficult to evolve services independently, which is one of the key benefits of microservices. Saga Pattern In the saga pattern, a sequence of transactions is executed across multiple microservices to ensure consistency. If one transaction fails, the entire sequence is rolled back, and the system returns to its previous state. This pattern is useful for applications that require distributed transactions, but it can be challenging to implement and manage. The saga pattern is a way of managing transactions across multiple services. In a microservices architecture, a single business transaction may span multiple services. The saga pattern provides a way to ensure that all services involved in a transaction either complete successfully or rollback if there is a failure. In the saga pattern, each service in the transaction is responsible for executing a portion of the transaction and then notifying the coordinator of its completion. The coordinator then decides whether to proceed with the next step in the transaction or to rollback if there is a failure. This approach provides a way to ensure consistency across services and is especially useful when using the database per service pattern. Event Sourcing Pattern In the event sourcing pattern, each microservice maintains a log of events that have occurred within its domain. These events are used to reconstruct the state of the system at any point in time. This pattern allows for a high degree of flexibility in data management and provides a reliable audit trail of all changes made to the system. The event sourcing pattern is a way of managing data by storing a sequence of events that describe changes to an application’s state. In a microservices architecture, each service is responsible for handling a specific set of operations. The event sourcing pattern provides a way to store the history of changes to the state of a service. In the event sourcing pattern, each service maintains a log of events that describe changes to its state. These events can then be used to reconstruct the current state of the service. This approach provides several benefits, such as better auditability and scalability, as events can be processed asynchronously. CQRS Pattern In the CQRS (Command Query Responsibility Segregation) pattern, a microservice is responsible for handling commands (write operations) while another microservice is responsible for handling queries (read operations). This pattern separates the concerns of reading and writing data, which can lead to better scalability and performance. The Command Query Responsibility Segregation (CQRS) pattern is a way of separating the write and read operations in a system. In a microservices architecture, each service is responsible for handling a specific set of operations. The CQRS pattern provides a way to separate the read operations from the write operations. In the CQRS pattern, each service has two separate models: a write model and a read model. The write model is responsible for handling the write operations, while the read model is responsible for handling the read operations. This approach provides several benefits, such as better scalability and performance, as read and write operations can be optimized separately. API Composition Pattern In the API composition pattern, multiple microservices are combined to provide a unified API to the client. This pattern allows for a high degree of flexibility in data management, as each microservice can manage its data independently. However, it can be challenging to manage the dependencies between microservices. The API gateway pattern is a data management pattern that is often used in conjunction with the database per service pattern. In this pattern, a single API gateway is used to provide a unified interface for all services, allowing clients to access data from multiple services through a single API. This pattern provides several benefits, such as improved security, simplified client development, and improved performance. Security is improved because the API gateway can be used to enforce authentication and authorization rules, ensuring that only authorized clients can access data from the various services. Simplified client development is achieved because clients only need to interact with a single API, rather than multiple APIs for each service. Improved performance is achieved because the API gateway can be used to cache frequently accessed data, reducing the number of requests that need to be made to the various services. However, there are also some drawbacks to the API gateway pattern. For example, it can introduce a single point of failure, as all requests must pass through the API gateway. Additionally, the API gateway can become a performance bottleneck if it is not designed and implemented properly. Conclusion In conclusion, data management is a crucial aspect of microservices architecture, and there are various patterns that can be used to handle data effectively. Each pattern has its strengths and weaknesses, and the choice of pattern depends on the specific requirements of the application. By using these patterns, developers can build scalable and reliable microservices that can handle large volumes of data.
The reason why we need a Transactional Outbox is that a service often needs to publish messages as part of a transaction that updates the database. Both the database update and the sending of the message must happen within a transaction. Otherwise, if the service doesn’t perform these two operations automatically, a failure could leave the system in an inconsistent state. The GitHub repository with the source code for this article. In this article, we will implement it using Reactive Spring and Kotlin with Coroutines. Here is a full list of used dependencies: Kotlin with Coroutines, Spring Boot 3, WebFlux, R2DBC, Postgres, MongoDB, Kafka, Grafana, Prometheus, Zipkin, and Micrometer for observability. The Transactional Outbox pattern solves the problem of the implementation where usually the transaction tries to update the database table, then publishes a message to the broker and commits the transaction. But here is the problem: If the last step of the transaction fails, the transaction will roll back database changes, but the event has already been published to the broker. So, we need to find a way to guarantee both databases are written and published to the broker. The idea of how we can solve it is in one transaction, save it to the orders table, and in the same transaction, save to the outbox table and commit the transaction. Then, we have to publish saved events from the outbox table to the broker. We have two ways to do that; using a CDC (Change data capture) tool like Debezium, which continuously monitors your databases and lets any of your applications stream every row-level change in the same order they were committed to the database and polling publisher. For this project, we used polling publisher. Highly recommend Chris Richardson's Book: Microservices Patterns, where the Transactional Outbox pattern is very well explained. One more important thing is that we have to be ready for cases when the same event can be published more than one time, so the consumer must be idempotent. Idempotence describes the reliability of messages in a distributed system, specifically the reception of duplicated messages. Because of retries or message broker features, a message sent once can be received multiple times by consumers. A service is idempotent if processing the same event multiple times results in the same state and output as processing that event just a single time. The reception of a duplicated event does not change the application state or behavior. Most of the time, an idempotent service detects these events and ignores them. Idempotence can be implemented using unique identifiers. So, let’s implement it. The business logic of our example microservice is simple: orders with product shop items; it’s two tables for simplicity and an outbox table, of course. Usually, when an outbox table looks like it does when in the data field, we store serialized events. The most common is the JSON format, but it’s up to you and concrete microservices. We can put as data field state changes or can simply put every time the last updated full order domain entity; of course, state changes take much less size, but again it’s up to you. Other fields in the outbox table usually include event type, timestamp, version, and other metadata. It depends on each concrete implementation, but often it’s required minimum. The version field is for concurrency control. All UI interfaces will be available on ports: Swagger UI URL. Grafana UI URL. Zipkin UI URL. Kafka UI URL. Prometheus UI URL. The docker-compose file for this article has Postgres, MongoDB, zookeeper, Kafka, Kafka-ui, Zipkin, Prometheus, and Grafana, For local development run: use make local or make develop, first run only docker-compose, second same include the microservice image. YAML version: "3.9" services: microservices_postgresql: image: postgres:latest container_name: microservices_postgresql expose: - "5432" ports: - "5432:5432" restart: always environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres - POSTGRES_DB=microservices - POSTGRES_HOST=5432 command: -p 5432 volumes: - ./docker_data/microservices_pgdata:/var/lib/postgresql/data networks: [ "microservices" ] zoo1: image: confluentinc/cp-zookeeper:7.3.0 hostname: zoo1 container_name: zoo1 ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_SERVERS: zoo1:2888:3888 volumes: - "./zookeeper:/zookeeper" networks: [ "microservices" ] kafka1: image: confluentinc/cp-kafka:7.3.0 hostname: kafka1 container_name: kafka1 ports: - "9092:9092" - "29092:29092" - "9999:9999" environment: KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" KAFKA_BROKER_ID: 1 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_JMX_PORT: 9999 KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1} KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" depends_on: - zoo1 volumes: - "./kafka_data:/kafka" networks: [ "microservices" ] kafka-ui: image: provectuslabs/kafka-ui container_name: kafka-ui ports: - "8086:8080" restart: always environment: - KAFKA_CLUSTERS_0_NAME=local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:19092 networks: [ "microservices" ] zipkin-all-in-one: image: openzipkin/zipkin:latest restart: always ports: - "9411:9411" networks: [ "microservices" ] mongo: image: mongo restart: always ports: - "27017:27017" environment: MONGO_INITDB_ROOT_USERNAME: admin MONGO_INITDB_ROOT_PASSWORD: admin MONGODB_DATABASE: bank_accounts networks: [ "microservices" ] prometheus: image: prom/prometheus:latest container_name: prometheus ports: - "9090:9090" command: - --config.file=/etc/prometheus/prometheus.yml volumes: - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro networks: [ "microservices" ] node_exporter: container_name: microservices_node_exporter restart: always image: prom/node-exporter ports: - '9101:9100' networks: [ "microservices" ] grafana: container_name: microservices_grafana restart: always image: grafana/grafana ports: - '3000:3000' networks: [ "microservices" ] networks: microservices: name: microservices The Postgres database schema for this project is: Orders domain REST Controller has the following methods: Kotlin @RestController @RequestMapping(path = ["/api/v1/orders"]) class OrderController(private val orderService: OrderService, private val or: ObservationRegistry) { @GetMapping @Operation(method = "getOrders", summary = "get order with pagination", operationId = "getOrders") suspend fun getOrders( @RequestParam(name = "page", defaultValue = "0") page: Int, @RequestParam(name = "size", defaultValue = "20") size: Int, ) = coroutineScopeWithObservation(GET_ORDERS, or) { observation -> ResponseEntity.ok() .body(orderService.getAllOrders(PageRequest.of(page, size)) .map { it.toSuccessResponse() } .also { response -> observation.highCardinalityKeyValue("response", response.toString()) } ) } @GetMapping(path = ["{id}"]) @Operation(method = "getOrderByID", summary = "get order by id", operationId = "getOrderByID") suspend fun getOrderByID(@PathVariable id: String) = coroutineScopeWithObservation(GET_ORDER_BY_ID, or) { observation -> ResponseEntity.ok().body(orderService.getOrderWithProductsByID(UUID.fromString(id)).toSuccessResponse()) .also { response -> observation.highCardinalityKeyValue("response", response.toString()) log.info("getOrderByID response: $response") } } @PostMapping @Operation(method = "createOrder", summary = "create new order", operationId = "createOrder") suspend fun createOrder(@Valid @RequestBody createOrderDTO: CreateOrderDTO) = coroutineScopeWithObservation(CREATE_ORDER, or) { observation -> ResponseEntity.status(HttpStatus.CREATED).body(orderService.createOrder(createOrderDTO.toOrder()).toSuccessResponse()) .also { log.info("created order: $it") observation.highCardinalityKeyValue("response", it.toString()) } } @PutMapping(path = ["/add/{id}"]) @Operation(method = "addProductItem", summary = "add to the order product item", operationId = "addProductItem") suspend fun addProductItem( @PathVariable id: UUID, @Valid @RequestBody dto: CreateProductItemDTO ) = coroutineScopeWithObservation(ADD_PRODUCT, or) { observation -> ResponseEntity.ok().body(orderService.addProductItem(dto.toProductItem(id))) .also { observation.highCardinalityKeyValue("CreateProductItemDTO", dto.toString()) observation.highCardinalityKeyValue("id", id.toString()) log.info("addProductItem id: $id, dto: $dto") } } @PutMapping(path = ["/remove/{orderId}/{productItemId}"]) @Operation(method = "removeProductItem", summary = "remove product from the order", operationId = "removeProductItem") suspend fun removeProductItem( @PathVariable orderId: UUID, @PathVariable productItemId: UUID ) = coroutineScopeWithObservation(REMOVE_PRODUCT, or) { observation -> ResponseEntity.ok().body(orderService.removeProductItem(orderId, productItemId)) .also { observation.highCardinalityKeyValue("productItemId", productItemId.toString()) observation.highCardinalityKeyValue("orderId", orderId.toString()) log.info("removeProductItem orderId: $orderId, productItemId: $productItemId") } } @PutMapping(path = ["/pay/{id}"]) @Operation(method = "payOrder", summary = "pay order", operationId = "payOrder") suspend fun payOrder(@PathVariable id: UUID, @Valid @RequestBody dto: PayOrderDTO) = coroutineScopeWithObservation(PAY_ORDER, or) { observation -> ResponseEntity.ok().body(orderService.pay(id, dto.paymentId).toSuccessResponse()) .also { observation.highCardinalityKeyValue("response", it.toString()) log.info("payOrder result: $it") } } @PutMapping(path = ["/cancel/{id}"]) @Operation(method = "cancelOrder", summary = "cancel order", operationId = "cancelOrder") suspend fun cancelOrder(@PathVariable id: UUID, @Valid @RequestBody dto: CancelOrderDTO) = coroutineScopeWithObservation(CANCEL_ORDER, or) { observation -> ResponseEntity.ok().body(orderService.cancel(id, dto.reason).toSuccessResponse()) .also { observation.highCardinalityKeyValue("response", it.toString()) log.info("cancelOrder result: $it") } } @PutMapping(path = ["/submit/{id}"]) @Operation(method = "submitOrder", summary = "submit order", operationId = "submitOrder") suspend fun submitOrder(@PathVariable id: UUID) = coroutineScopeWithObservation(SUBMIT_ORDER, or) { observation -> ResponseEntity.ok().body(orderService.submit(id).toSuccessResponse()) .also { observation.highCardinalityKeyValue("response", it.toString()) log.info("submitOrder result: $it") } } @PutMapping(path = ["/complete/{id}"]) @Operation(method = "completeOrder", summary = "complete order", operationId = "completeOrder") suspend fun completeOrder(@PathVariable id: UUID) = coroutineScopeWithObservation(COMPLETE_ORDER, or) { observation -> ResponseEntity.ok().body(orderService.complete(id).toSuccessResponse()) .also { observation.highCardinalityKeyValue("response", it.toString()) log.info("completeOrder result: $it") } } } As I mentioned earlier, the main idea of implementation for the transactional outbox is that in the first step in the one transaction, write to orders and outbox tables and commit the transaction—additionally, but not required, optimization. We can, in the same methods, after successfully committing a transaction, then publish the event and delete it from the outbox table. But here, if any one step of publishing to the broker or deleting from the outbox table fails, it’s ok because we have polling producer as a scheduled process. It’s small optimization and improvement, and it’s not mandatory to implement an outbox pattern. Try both variants and chose the best for your case. In our case, we are using Kafka, so we have to remember that producers have acks setting, When acks=0, producers consider messages as “written successfully” the moment the message was sent without waiting for the broker to accept it at all. If the broker goes offline or an exception happens, we won’t know and will lose data, so be careful with this setting and don’t use acks=0. When acks=1, producers consider messages as “written successfully” when the message was acknowledged by only the leader. When acks=all, producers consider messages as “written successfully” when the message is accepted by all in-sync replicas (ISR). In the simplified sequence diagram for service layer business logic, steps 5 and 6 are optional and not required optimization because we have polling publisher anyway: The order service implementation: Kotlin interface OrderService { suspend fun createOrder(order: Order): Order suspend fun getOrderByID(id: UUID): Order suspend fun addProductItem(productItem: ProductItem) suspend fun removeProductItem(orderID: UUID, productItemId: UUID) suspend fun pay(id: UUID, paymentId: String): Order suspend fun cancel(id: UUID, reason: String?): Order suspend fun submit(id: UUID): Order suspend fun complete(id: UUID): Order suspend fun getOrderWithProductsByID(id: UUID): Order suspend fun getAllOrders(pageable: Pageable): Page<Order> suspend fun deleteOutboxRecordsWithLock() } Kotlin @Service class OrderServiceImpl( private val orderRepository: OrderRepository, private val productItemRepository: ProductItemRepository, private val outboxRepository: OrderOutboxRepository, private val orderMongoRepository: OrderMongoRepository, private val txOp: TransactionalOperator, private val eventsPublisher: EventsPublisher, private val kafkaTopicsConfiguration: KafkaTopicsConfiguration, private val or: ObservationRegistry, private val outboxEventSerializer: OutboxEventSerializer ) : OrderService { override suspend fun createOrder(order: Order): Order = coroutineScopeWithObservation(CREATE, or) { observation -> txOp.executeAndAwait { orderRepository.insert(order).let { val productItemsEntityList = ProductItemEntity.listOf(order.productsList(), UUID.fromString(it.id)) val insertedItems = productItemRepository.insertAll(productItemsEntityList).toList() it.addProductItems(insertedItems.map { item -> item.toProductItem() }) Pair(it, outboxRepository.save(outboxEventSerializer.orderCreatedEventOf(it))) } }.run { observation.highCardinalityKeyValue("order", first.toString()) observation.highCardinalityKeyValue("outboxEvent", second.toString()) publishOutboxEvent(second) first } } override suspend fun addProductItem(productItem: ProductItem): Unit = coroutineScopeWithObservation(ADD_PRODUCT, or) { observation -> txOp.executeAndAwait { val order = orderRepository.findOrderByID(UUID.fromString(productItem.orderId)) order.incVersion() val updatedProductItem = productItemRepository.upsert(productItem) val savedRecord = outboxRepository.save( outboxEventSerializer.productItemAddedEventOf( order, productItem.copy(version = updatedProductItem.version).toEntity() ) ) orderRepository.updateVersion(UUID.fromString(order.id), order.version) .also { result -> log.info("addOrderItem result: $result, version: ${order.version}") } savedRecord }.run { observation.highCardinalityKeyValue("outboxEvent", this.toString()) publishOutboxEvent(this) } } override suspend fun removeProductItem(orderID: UUID, productItemId: UUID): Unit = coroutineScopeWithObservation(REMOVE_PRODUCT, or) { observation -> txOp.executeAndAwait { if (!productItemRepository.existsById(productItemId)) throw ProductItemNotFoundException(productItemId) val order = orderRepository.findOrderByID(orderID) productItemRepository.deleteById(productItemId) order.incVersion() val savedRecord = outboxRepository.save(outboxEventSerializer.productItemRemovedEventOf(order, productItemId)) orderRepository.updateVersion(UUID.fromString(order.id), order.version) .also { log.info("removeProductItem update order result: $it, version: ${order.version}") } savedRecord }.run { observation.highCardinalityKeyValue("outboxEvent", this.toString()) publishOutboxEvent(this) } } override suspend fun pay(id: UUID, paymentId: String): Order = coroutineScopeWithObservation(PAY, or) { observation -> txOp.executeAndAwait { val order = orderRepository.getOrderWithProductItemsByID(id) order.pay(paymentId) val updatedOrder = orderRepository.update(order) Pair(updatedOrder, outboxRepository.save(outboxEventSerializer.orderPaidEventOf(updatedOrder, paymentId))) }.run { observation.highCardinalityKeyValue("order", first.toString()) observation.highCardinalityKeyValue("outboxEvent", second.toString()) publishOutboxEvent(second) first } } override suspend fun cancel(id: UUID, reason: String?): Order = coroutineScopeWithObservation(CANCEL, or) { observation -> txOp.executeAndAwait { val order = orderRepository.findOrderByID(id) order.cancel() val updatedOrder = orderRepository.update(order) Pair(updatedOrder, outboxRepository.save(outboxEventSerializer.orderCancelledEventOf(updatedOrder, reason))) }.run { observation.highCardinalityKeyValue("order", first.toString()) observation.highCardinalityKeyValue("outboxEvent", second.toString()) publishOutboxEvent(second) first } } override suspend fun submit(id: UUID): Order = coroutineScopeWithObservation(SUBMIT, or) { observation -> txOp.executeAndAwait { val order = orderRepository.getOrderWithProductItemsByID(id) order.submit() val updatedOrder = orderRepository.update(order) updatedOrder.addProductItems(order.productsList()) Pair(updatedOrder, outboxRepository.save(outboxEventSerializer.orderSubmittedEventOf(updatedOrder))) }.run { observation.highCardinalityKeyValue("order", first.toString()) observation.highCardinalityKeyValue("outboxEvent", second.toString()) publishOutboxEvent(second) first } } override suspend fun complete(id: UUID): Order = coroutineScopeWithObservation(COMPLETE, or) { observation -> txOp.executeAndAwait { val order = orderRepository.findOrderByID(id) order.complete() val updatedOrder = orderRepository.update(order) log.info("order submitted: ${updatedOrder.status} for id: $id") Pair(updatedOrder, outboxRepository.save(outboxEventSerializer.orderCompletedEventOf(updatedOrder))) }.run { observation.highCardinalityKeyValue("order", first.toString()) observation.highCardinalityKeyValue("outboxEvent", second.toString()) publishOutboxEvent(second) first } } @Transactional(readOnly = true) override suspend fun getOrderWithProductsByID(id: UUID): Order = coroutineScopeWithObservation(GET_ORDER_WITH_PRODUCTS_BY_ID, or) { observation -> orderRepository.getOrderWithProductItemsByID(id).also { observation.highCardinalityKeyValue("order", it.toString()) } } override suspend fun getAllOrders(pageable: Pageable): Page<Order> = coroutineScopeWithObservation(GET_ALL_ORDERS, or) { observation -> orderMongoRepository.getAllOrders(pageable).also { observation.highCardinalityKeyValue("pageResult", it.toString()) } } override suspend fun deleteOutboxRecordsWithLock() = coroutineScopeWithObservation(DELETE_OUTBOX_RECORD_WITH_LOCK, or) { observation -> outboxRepository.deleteOutboxRecordsWithLock { observation.highCardinalityKeyValue("outboxEvent", it.toString()) eventsPublisher.publish(getTopicName(it.eventType), it) } } override suspend fun getOrderByID(id: UUID): Order = coroutineScopeWithObservation(GET_ORDER_BY_ID, or) { observation -> orderMongoRepository.getByID(id.toString()) .also { log.info("getOrderByID: $it") } .also { observation.highCardinalityKeyValue("order", it.toString()) } } private suspend fun publishOutboxEvent(event: OutboxRecord) = coroutineScopeWithObservation(PUBLISH_OUTBOX_EVENT, or) { observation -> try { log.info("publishing outbox event: $event") outboxRepository.deleteOutboxRecordByID(event.eventId!!) { eventsPublisher.publish(getTopicName(event.eventType), event.aggregateId.toString(), event) } log.info("outbox event published and deleted: $event") observation.highCardinalityKeyValue("event", event.toString()) } catch (ex: Exception) { log.error("exception while publishing outbox event: ${ex.localizedMessage}") observation.error(ex) } } } Order and product items Postgres repositories are a combination of CoroutineCrudRepository and custom implementation using DatabaseClient and R2dbcEntityTemplate, supporting optimistic and pessimistic locking, depending on method requirements. Kotlin @Repository interface OrderRepository : CoroutineCrudRepository<OrderEntity, UUID>, OrderBaseRepository @Repository interface OrderBaseRepository { suspend fun getOrderWithProductItemsByID(id: UUID): Order suspend fun updateVersion(id: UUID, newVersion: Long): Long suspend fun findOrderByID(id: UUID): Order suspend fun insert(order: Order): Order suspend fun update(order: Order): Order } @Repository class OrderBaseRepositoryImpl( private val dbClient: DatabaseClient, private val entityTemplate: R2dbcEntityTemplate, private val or: ObservationRegistry ) : OrderBaseRepository { override suspend fun updateVersion(id: UUID, newVersion: Long): Long = coroutineScopeWithObservation(UPDATE_VERSION, or) { observation -> dbClient.sql("UPDATE microservices.orders SET version = (version + 1) WHERE id = :id AND version = :version") .bind(ID, id) .bind(VERSION, newVersion - 1) .fetch() .rowsUpdated() .awaitSingle() .also { log.info("for order with id: $id version updated to $newVersion") } .also { observation.highCardinalityKeyValue("id", id.toString()) observation.highCardinalityKeyValue("newVersion", newVersion.toString()) } } override suspend fun getOrderWithProductItemsByID(id: UUID): Order = coroutineScopeWithObservation(GET_ORDER_WITH_PRODUCTS_BY_ID, or) { observation -> dbClient.sql( """SELECT o.id, o.email, o.status, o.address, o.version, o.payment_id, o.created_at, o.updated_at, |pi.id as productId, pi.price, pi.title, pi.quantity, pi.order_id, pi.version as itemVersion, pi.created_at as itemCreatedAt, pi.updated_at as itemUpdatedAt |FROM microservices.orders o |LEFT JOIN microservices.product_items pi on o.id = pi.order_id |WHERE o.id = :id""".trimMargin() ) .bind(ID, id) .map { row, _ -> Pair(OrderEntity.of(row), ProductItemEntity.of(row)) } .flow() .toList() .let { orderFromList(it) } .also { log.info("getOrderWithProductItemsByID order: $it") observation.highCardinalityKeyValue("order", it.toString()) } } override suspend fun findOrderByID(id: UUID): Order = coroutineScopeWithObservation(FIND_ORDER_BY_ID, or) { observation -> val query = Query.query(Criteria.where(ID).`is`(id)) entityTemplate.selectOne(query, OrderEntity::class.java).awaitSingleOrNull()?.toOrder() .also { observation.highCardinalityKeyValue("order", it.toString()) } ?: throw OrderNotFoundException(id) } override suspend fun insert(order: Order): Order = coroutineScopeWithObservation(INSERT, or) { observation -> entityTemplate.insert(order.toEntity()).awaitSingle().toOrder() .also { log.info("inserted order: $it") observation.highCardinalityKeyValue("order", it.toString()) } } override suspend fun update(order: Order): Order = coroutineScopeWithObservation(UPDATE, or) { observation -> entityTemplate.update(order.toEntity()).awaitSingle().toOrder() .also { log.info("updated order: $it") observation.highCardinalityKeyValue("order", it.toString()) } } } Kotlin interface ProductItemBaseRepository { suspend fun insert(productItemEntity: ProductItemEntity): ProductItemEntity suspend fun insertAll(productItemEntities: List<ProductItemEntity>): List<ProductItemEntity> suspend fun upsert(productItem: ProductItem): ProductItem } @Repository class ProductItemBaseRepositoryImpl( private val entityTemplate: R2dbcEntityTemplate, private val or: ObservationRegistry, ) : ProductItemBaseRepository { override suspend fun upsert(productItem: ProductItem): ProductItem = coroutineScopeWithObservation(UPDATE, or) { observation -> val query = Query.query( Criteria.where("id").`is`(UUID.fromString(productItem.id)) .and("order_id").`is`(UUID.fromString(productItem.orderId)) ) val product = entityTemplate.selectOne(query, ProductItemEntity::class.java).awaitSingleOrNull() if (product != null) { val update = Update .update("quantity", (productItem.quantity + product.quantity)) .set("version", product.version + 1) .set("updated_at", LocalDateTime.now()) val updatedProduct = product.copy(quantity = (productItem.quantity + product.quantity), version = product.version + 1) val updateResult = entityTemplate.update(query, update, ProductItemEntity::class.java).awaitSingle() log.info("updateResult product: $updateResult") log.info("updateResult updatedProduct: $updatedProduct") return@coroutineScopeWithObservation updatedProduct.toProductItem() } entityTemplate.insert(ProductItemEntity.of(productItem)).awaitSingle().toProductItem() .also { productItem -> log.info("saved productItem: $productItem") observation.highCardinalityKeyValue("productItem", productItem.toString()) } } override suspend fun insert(productItemEntity: ProductItemEntity): ProductItemEntity = coroutineScopeWithObservation(INSERT, or) { observation -> val product = entityTemplate.insert(productItemEntity).awaitSingle() log.info("saved product: $product") observation.highCardinalityKeyValue("product", product.toString()) product } override suspend fun insertAll(productItemEntities: List<ProductItemEntity>) = coroutineScopeWithObservation(INSERT_ALL, or) { observation -> val result = productItemEntities.map { entityTemplate.insert(it) }.map { it.awaitSingle() } log.info("inserted product items: $result") observation.highCardinalityKeyValue("result", result.toString()) result } } The important detail here is to be able to handle the case of multiple pod instances processing in a parallel outbox table. We have idempotent consumers, but we have to avoid processing the same table events more than one time. To prevent multiple instances select and publish the same events, we use FOR UPDATE SKIP LOCKED.This combination tries to select a batch of outbox events. If some other instance has already selected these records, first, one will skip locked records and select the next available and not locked, and so on. Kotlin @Repository interface OutboxBaseRepository { suspend fun deleteOutboxRecordByID(id: UUID, callback: suspend () -> Unit): Long suspend fun deleteOutboxRecordsWithLock(callback: suspend (outboxRecord: OutboxRecord) -> Unit) } class OutboxBaseRepositoryImpl( private val dbClient: DatabaseClient, private val txOp: TransactionalOperator, private val or: ObservationRegistry, private val transactionalOperator: TransactionalOperator ) : OutboxBaseRepository { override suspend fun deleteOutboxRecordByID(id: UUID, callback: suspend () -> Unit): Long = coroutineScopeWithObservation(DELETE_OUTBOX_RECORD_BY_ID, or) { observation -> withTimeout(DELETE_OUTBOX_RECORD_TIMEOUT_MILLIS) { txOp.executeAndAwait { callback() dbClient.sql("DELETE FROM microservices.outbox_table WHERE event_id = :eventId") .bind("eventId", id) .fetch() .rowsUpdated() .awaitSingle() .also { log.info("outbox event with id: $it deleted") observation.highCardinalityKeyValue("id", it.toString()) } } } } override suspend fun deleteOutboxRecordsWithLock(callback: suspend (outboxRecord: OutboxRecord) -> Unit) = coroutineScopeWithObservation(DELETE_OUTBOX_RECORD_WITH_LOCK, or) { observation -> withTimeout(DELETE_OUTBOX_RECORD_TIMEOUT_MILLIS) { txOp.executeAndAwait { dbClient.sql("SELECT * FROM microservices.outbox_table ORDER BY timestamp ASC LIMIT 10 FOR UPDATE SKIP LOCKED") .map { row, _ -> OutboxRecord.of(row) } .flow() .onEach { log.info("deleting outboxEvent with id: ${it.eventId}") callback(it) dbClient.sql("DELETE FROM microservices.outbox_table WHERE event_id = :eventId") .bind("eventId", it.eventId!!) .fetch() .rowsUpdated() .awaitSingle() log.info("outboxEvent with id: ${it.eventId} published and deleted") observation.highCardinalityKeyValue("eventId", it.eventId.toString()) } .collect() } } } } The polling producer implementation is a scheduled process that does the same job for publishing and deleting events at the given interval as typed earlier and uses the same service method: Kotlin @Component @ConditionalOnProperty(prefix = "schedulers", value = ["outbox.enable"], havingValue = "true") class OutboxScheduler(private val orderService: OrderService, private val or: ObservationRegistry) { @Scheduled(initialDelayString = "\${schedulers.outbox.initialDelayMillis}", fixedRateString = "\${schedulers.outbox.fixedRate}") fun publishAndDeleteOutboxRecords() = runBlocking { coroutineScopeWithObservation(PUBLISH_AND_DELETE_OUTBOX_RECORDS, or) { log.debug("starting scheduled outbox table publishing") orderService.deleteOutboxRecordsWithLock() log.debug("completed scheduled outbox table publishing") } } companion object { private val log = LoggerFactory.getLogger(OutboxScheduler::class.java) private const val PUBLISH_AND_DELETE_OUTBOX_RECORDS = "OutboxScheduler.publishAndDeleteOutboxRecords" } } Usually, the transactional outbox is more often required to guarantee data consistency between microservices. Here, for example, consumers in the same microservice process it and save it to MongoDB. The one more important detail here, as we’re processing Kafka events in multiple consumer processes, possible use cases when the order of the events processing can be randomized. In Kafka, we have key features, and it helps us because it sends messages with the same key to one partition. But if the broker has not had this feature, we have to handle it manually. Cases when, for example, first, some of the consumers are trying to process event #6 before events #4 and #5 were processed. For this reason, have a domain entity version field in outbox events, so we can simply look at the version and validate if in our database we have order version #3, but now processing event with version #6, we need first wait for #4,#5 and process them first, but of course, these details depend on each concrete business logic of the application, here shows only the idea that it’s a possible case. And one more important detail — is to retry topics. If we need to retry the process of the messages, better to create a retry topic and process retry here, how much time to retry, and other advanced logic detail depending on your concrete case. In the example, we have two listeners. Where one of them is for retry topic message processing: Kotlin @Component class OrderConsumer( private val kafkaTopicsConfiguration: KafkaTopicsConfiguration, private val serializer: Serializer, private val eventsPublisher: EventsPublisher, private val orderEventProcessor: OrderEventProcessor, private val or: ObservationRegistry, ) { @KafkaListener( groupId = "\${kafka.consumer-group-id:order-service-group-id}", topics = [ "\${topics.orderCreated.name}", "\${topics.productAdded.name}", "\${topics.productRemoved.name}", "\${topics.orderPaid.name}", "\${topics.orderCancelled.name}", "\${topics.orderSubmitted.name}", "\${topics.orderCompleted.name}", ], id = "orders-consumer" ) fun process(ack: Acknowledgment, consumerRecord: ConsumerRecord<String, ByteArray>) = runBlocking { coroutineScopeWithObservation(PROCESS, or) { observation -> try { observation.highCardinalityKeyValue("consumerRecord", getConsumerRecordInfoWithHeaders(consumerRecord)) processOutboxRecord(serializer.deserialize(consumerRecord.value(), OutboxRecord::class.java)) ack.acknowledge() log.info("committed record: ${getConsumerRecordInfo(consumerRecord)}") } catch (ex: Exception) { observation.highCardinalityKeyValue("consumerRecord", getConsumerRecordInfoWithHeaders(consumerRecord)) observation.error(ex) if (ex is SerializationException || ex is UnknownEventTypeException || ex is AlreadyProcessedVersionException) { log.error("ack not serializable, unknown or already processed record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}") ack.acknowledge() return@coroutineScopeWithObservation } if (ex is InvalidVersionException || ex is NoSuchElementException || ex is OrderNotFoundException) { publishRetryTopic(kafkaTopicsConfiguration.retryTopic.name, consumerRecord, 1) ack.acknowledge() log.warn("ack concurrency write or version exception ${ex.localizedMessage}") return@coroutineScopeWithObservation } publishRetryTopic(kafkaTopicsConfiguration.retryTopic.name, consumerRecord, 1) ack.acknowledge() log.error("ack exception while processing record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}", ex) } } } @KafkaListener(groupId = "\${kafka.consumer-group-id:order-service-group-id}", topics = ["\${topics.retryTopic.name}"], id = "orders-retry-consumer") fun processRetry(ack: Acknowledgment, consumerRecord: ConsumerRecord<String, ByteArray>): Unit = runBlocking { coroutineScopeWithObservation(PROCESS_RETRY, or) { observation -> try { log.warn("processing retry topic record >>>>>>>>>>>>> : ${getConsumerRecordInfoWithHeaders(consumerRecord)}") observation.highCardinalityKeyValue("consumerRecord", getConsumerRecordInfoWithHeaders(consumerRecord)) processOutboxRecord(serializer.deserialize(consumerRecord.value(), OutboxRecord::class.java)) ack.acknowledge() log.info("committed retry record: ${getConsumerRecordInfo(consumerRecord)}") } catch (ex: Exception) { observation.highCardinalityKeyValue("consumerRecord", getConsumerRecordInfoWithHeaders(consumerRecord)) observation.error(ex) val currentRetry = String(consumerRecord.headers().lastHeader(RETRY_COUNT_HEADER).value()).toInt() observation.highCardinalityKeyValue("currentRetry", currentRetry.toString()) if (ex is InvalidVersionException || ex is NoSuchElementException || ex is OrderNotFoundException) { publishRetryTopic(kafkaTopicsConfiguration.retryTopic.name, consumerRecord, currentRetry) log.warn("ack concurrency write or version exception ${ex.localizedMessage},record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}") ack.acknowledge() return@coroutineScopeWithObservation } if (currentRetry > MAX_RETRY_COUNT) { publishRetryTopic(kafkaTopicsConfiguration.deadLetterQueue.name, consumerRecord, currentRetry + 1) ack.acknowledge() log.error("MAX_RETRY_COUNT exceed, send record to DLQ: ${getConsumerRecordInfoWithHeaders(consumerRecord)}") return@coroutineScopeWithObservation } if (ex is SerializationException || ex is UnknownEventTypeException || ex is AlreadyProcessedVersionException) { ack.acknowledge() log.error("commit not serializable, unknown or already processed record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}") return@coroutineScopeWithObservation } log.error("exception while processing: ${ex.localizedMessage}, record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}") publishRetryTopic(kafkaTopicsConfiguration.retryTopic.name, consumerRecord, currentRetry + 1) ack.acknowledge() } } } private suspend fun publishRetryTopic(topic: String, record: ConsumerRecord<String, ByteArray>, retryCount: Int) = coroutineScopeWithObservation(PUBLISH_RETRY_TOPIC, or) { observation -> observation.highCardinalityKeyValue("topic", record.topic()) .highCardinalityKeyValue("key", record.key()) .highCardinalityKeyValue("offset", record.offset().toString()) .highCardinalityKeyValue("value", String(record.value())) .highCardinalityKeyValue("retryCount", retryCount.toString()) record.headers().remove(RETRY_COUNT_HEADER) record.headers().add(RETRY_COUNT_HEADER, retryCount.toString().toByteArray()) mono { publishRetryRecord(topic, record, retryCount) } .retryWhen(Retry.backoff(PUBLISH_RETRY_COUNT, Duration.ofMillis(PUBLISH_RETRY_BACKOFF_DURATION_MILLIS)) .filter { it is SerializationException }) .awaitSingle() } } The role of the orders events processor at this microservice is validating the version of the events and updating MongoDB: Kotlin interface OrderEventProcessor { suspend fun on(orderCreatedEvent: OrderCreatedEvent) suspend fun on(productItemAddedEvent: ProductItemAddedEvent) suspend fun on(productItemRemovedEvent: ProductItemRemovedEvent) suspend fun on(orderPaidEvent: OrderPaidEvent) suspend fun on(orderCancelledEvent: OrderCancelledEvent) suspend fun on(orderSubmittedEvent: OrderSubmittedEvent) suspend fun on(orderCompletedEvent: OrderCompletedEvent) } @Service class OrderEventProcessorImpl( private val orderMongoRepository: OrderMongoRepository, private val or: ObservationRegistry, ) : OrderEventProcessor { override suspend fun on(orderCreatedEvent: OrderCreatedEvent): Unit = coroutineScopeWithObservation(ON_ORDER_CREATED_EVENT, or) { observation -> orderMongoRepository.insert(orderCreatedEvent.order).also { log.info("created order: $it") observation.highCardinalityKeyValue("order", it.toString()) } } override suspend fun on(productItemAddedEvent: ProductItemAddedEvent): Unit = coroutineScopeWithObservation(ON_ORDER_PRODUCT_ADDED_EVENT, or) { observation -> val order = orderMongoRepository.getByID(productItemAddedEvent.orderId) validateVersion(order.id, order.version, productItemAddedEvent.version) order.addProductItem(productItemAddedEvent.productItem) order.version = productItemAddedEvent.version orderMongoRepository.update(order).also { log.info("productItemAddedEvent updatedOrder: $it") observation.highCardinalityKeyValue("order", it.toString()) } } override suspend fun on(productItemRemovedEvent: ProductItemRemovedEvent): Unit = coroutineScopeWithObservation(ON_ORDER_PRODUCT_REMOVED_EVENT, or) { observation -> val order = orderMongoRepository.getByID(productItemRemovedEvent.orderId) validateVersion(order.id, order.version, productItemRemovedEvent.version) order.removeProductItem(productItemRemovedEvent.productItemId) order.version = productItemRemovedEvent.version orderMongoRepository.update(order).also { log.info("productItemRemovedEvent updatedOrder: $it") observation.highCardinalityKeyValue("order", it.toString()) } } override suspend fun on(orderPaidEvent: OrderPaidEvent): Unit = coroutineScopeWithObservation(ON_ORDER_PAID_EVENT, or) { observation -> val order = orderMongoRepository.getByID(orderPaidEvent.orderId) validateVersion(order.id, order.version, orderPaidEvent.version) order.pay(orderPaidEvent.paymentId) order.version = orderPaidEvent.version orderMongoRepository.update(order).also { log.info("orderPaidEvent updatedOrder: $it") observation.highCardinalityKeyValue("order", it.toString()) } } override suspend fun on(orderCancelledEvent: OrderCancelledEvent): Unit = coroutineScopeWithObservation(ON_ORDER_CANCELLED_EVENT, or) { observation -> val order = orderMongoRepository.getByID(orderCancelledEvent.orderId) validateVersion(order.id, order.version, orderCancelledEvent.version) order.cancel() order.version = orderCancelledEvent.version orderMongoRepository.update(order).also { log.info("orderCancelledEvent updatedOrder: $it") observation.highCardinalityKeyValue("order", it.toString()) } } override suspend fun on(orderSubmittedEvent: OrderSubmittedEvent): Unit = coroutineScopeWithObservation(ON_ORDER_SUBMITTED_EVENT, or) { observation -> val order = orderMongoRepository.getByID(orderSubmittedEvent.orderId) validateVersion(order.id, order.version, orderSubmittedEvent.version) order.submit() order.version = orderSubmittedEvent.version orderMongoRepository.update(order).also { log.info("orderSubmittedEvent updatedOrder: $it") observation.highCardinalityKeyValue("order", it.toString()) } } override suspend fun on(orderCompletedEvent: OrderCompletedEvent): Unit = coroutineScopeWithObservation(ON_ORDER_COMPLETED_EVENT, or) { observation -> val order = orderMongoRepository.getByID(orderCompletedEvent.orderId) validateVersion(order.id, order.version, orderCompletedEvent.version) order.complete() order.version = orderCompletedEvent.version orderMongoRepository.update(order).also { log.info("orderCompletedEvent updatedOrder: $it") observation.highCardinalityKeyValue("order", it.toString()) } } private fun validateVersion(id: Any, currentDomainVersion: Long, eventVersion: Long) { log.info("validating version for id: $id, currentDomainVersion: $currentDomainVersion, eventVersion: $eventVersion") if (currentDomainVersion >= eventVersion) { log.warn("currentDomainVersion >= eventVersion validating version for id: $id, currentDomainVersion: $currentDomainVersion, eventVersion: $eventVersion") throw AlreadyProcessedVersionException(id, eventVersion) } if ((currentDomainVersion + 1) < eventVersion) { log.warn("currentDomainVersion + 1) < eventVersion validating version for id: $id, currentDomainVersion: $currentDomainVersion, eventVersion: $eventVersion") throw InvalidVersionException(eventVersion) } } } The MongoDB repository code is quite simple: Kotlin interface OrderMongoRepository { suspend fun insert(order: Order): Order suspend fun update(order: Order): Order suspend fun getByID(id: String): Order suspend fun getAllOrders(pageable: Pageable): Page<Order> } @Repository class OrderMongoRepositoryImpl( private val mongoTemplate: ReactiveMongoTemplate, private val or: ObservationRegistry, ) : OrderMongoRepository { override suspend fun insert(order: Order): Order = coroutineScopeWithObservation(INSERT, or) { observation -> withContext(Dispatchers.IO) { mongoTemplate.insert(OrderDocument.of(order)).awaitSingle().toOrder() .also { log.info("inserted order: $it") } .also { observation.highCardinalityKeyValue("order", it.toString()) } } } override suspend fun update(order: Order): Order = coroutineScopeWithObservation(UPDATE, or) { observation -> withContext(Dispatchers.IO) { val query = Query.query(Criteria.where(ID).`is`(order.id).and(VERSION).`is`(order.version - 1)) val update = Update() .set(EMAIL, order.email) .set(ADDRESS, order.address) .set(STATUS, order.status) .set(VERSION, order.version) .set(PAYMENT_ID, order.paymentId) .set(PRODUCT_ITEMS, order.productsList()) val options = FindAndModifyOptions.options().returnNew(true).upsert(false) val updatedOrderDocument = mongoTemplate.findAndModify(query, update, options, OrderDocument::class.java) .awaitSingleOrNull() ?: throw OrderNotFoundException(order.id.toUUID()) observation.highCardinalityKeyValue("order", updatedOrderDocument.toString()) updatedOrderDocument.toOrder().also { orderDocument -> log.info("updated order: $orderDocument") } } } override suspend fun getByID(id: String): Order = coroutineScopeWithObservation(GET_BY_ID, or) { observation -> withContext(Dispatchers.IO) { mongoTemplate.findById(id, OrderDocument::class.java).awaitSingle().toOrder() .also { log.info("found order: $it") } .also { observation.highCardinalityKeyValue("order", it.toString()) } } } override suspend fun getAllOrders(pageable: Pageable): Page<Order> = coroutineScopeWithObservation(GET_ALL_ORDERS, or) { observation -> withContext(Dispatchers.IO) { val query = Query().with(pageable) val data = async { mongoTemplate.find(query, OrderDocument::class.java).collectList().awaitSingle() }.await() val count = async { mongoTemplate.count(Query(), OrderDocument::class.java).awaitSingle() }.await() PageableExecutionUtils.getPage(data.map { it.toOrder() }, pageable) { count } .also { observation.highCardinalityKeyValue("pageResult", it.pageable.toString()) } } } } More details and source code of the full project you can find here in the GitHub repository. In real-world applications, we have to implement many more necessary features, like k8s health checks, rate limiters, etc. Depending on the project, it can be implemented in different ways. For example, you can use Kubernetes and Istio for some of them. I hope this article is useful and helpful, and am happy to receive any feedback or questions. Feel free to contact me by email or any messengers :)
Sometimes you need to control access to the data in your databases in a very granular way - much more granular than most databases allow. For instance, you might want some database users to be able to read only the last few digits of some credit card number, or you may need certain columns of certain rows to be readable by certain users only. Or maybe you need to hide some rows from some users under specific circumstances. The data still needs to be stored in the database, we just need to restrict who can see certain parts of that data. This is called data masking, and I've already talked about the two main approaches: static vs. dynamic data masking in a previous article. In this article, I'll show you how to roll your own dynamic data masking solution for Cassandra and Cassandra-compatible databases such as AWS Keyspaces, Azure Cosmos DB, and DataStax DSE, using a couple of off-the-shelf tools. What Cassandra Can Do on Its Own When it comes to hiding data, Cassandra provides table-level GRANT permissions, but nothing more fine-grained than that. Other Cassandra-compatible products, such as DataStax DSE, do provide some row- and column-level security, but even that has significant limitations. To narrow down how people access some tables, most relational databases offer the concept of views. Cassandra has materialized views, which are tables that are derived from other tables. Unfortunately, for materialized views, among many other restrictions, Cassandra requires that the columns must be the naked columns from the base table. This, and the other restrictions, means that materialized views are only tangentially useful for data masking, and cannot cover the vast majority of use cases. You might think you're stuck at this point. The fine folks in the Cassandra team are in fact working on a data masking solution, but that's still some ways away, and in any case, it will be limited. There is another option: using a programmable database proxy to shape the queries and the corresponding result sets. How Would a Proxy Help? The idea is simple: we introduce a programmable proxy between the database clients and the database server(s). We can then define some simple logic in the proxy, which will enforce our data masking policies as the network traffic goes back and forth (through the proxy) between clients and servers: Standing up a database proxy is easy: it's just a Docker container, and we can set up the database connection in just a few clicks. The database clients then connect to the proxy instead of directly to the database. Other than that, the clients and the server will have no idea that they are talking to each other through a proxy. Because the proxy works at the network level, no changes are required to either side, and this works with any Cassandra-compatible implementations such as AWS Keyspaces and Azure CosmosDB. Once the proxy is in place, we can define filters in the proxy to shape the traffic. For data masking, we have three possibilities: Reject invalid queries Rewrite invalid queries Modify result sets Let's take a look at each approach. Rejecting Queries Just because a database client sends you a request doesn't mean that you have to execute it. The proxy can look at a query and decide to reject it if it does not obey our access requirements. There are two main ways to do this: Limiting queries to a list of pre-approved queries, possibly depending on the user, the user's IP address, or any other factor Examining the query before it gets executed, and rejecting it if it does not satisfy certain criteria Query Control Enforcing a list of pre-approved queries is called query control, and I have covered that topic in a previous article. It's a simple idea: you record all the queries during a period of time (like during testing), then you only allow those queries after that. Any query that is not on the list gets rejected (or given an empty result set if we want to be more sneaky). This is a solid approach, but it only works for some use cases. For instance, this is not a viable solution if your queries are not 100% predictable. Still, it's a good tool to have in your toolbox. Vetting Queries A more subtle approach consists of examining the queries and determining whether they are acceptable or not. This is of course trickier - people can be very clever - but Cassandra's query language CQL is not as complex as typical SQL languages, making this a practical solution. For instance, we could decide that we don't want certain users to have access to the phones column in our customers table. In that case, we could simply reject any queries on that table that either specify the phones column, or that try to use the * operator to get all columns. This is easily done thanks to Gallium Data's CQL parser service, which can parse any CQL command and tell us exactly what that command does, and which tables/columns are involved. In the proxy, our filter will: Get the CQL from the query or prepared statement Send it to the parser service to analyze it If the CQL refers to the phones column, reject the request Otherwise let the query proceed to Cassandra See the hands-on tutorial for this article for all the details. Rewriting Queries A more flexible approach is to modify incoming queries so that they satisfy our criteria. For instance, let's say we still want to restrict access to the column phones in the table customers. Again, we can use the CQL parser service to determine whether an incoming query uses this column, or uses * to request all columns. If the query does use * to request all columns, the safest thing to do is to reject the query. It would be tempting to think that we can replace the asterisk with the names of the columns, but that is actually quite difficult to do correctly, as illustrated by this perfectly valid query: CQL SELECT /* all */ * FROM credit_card_txs If the query uses the phones column, we can replace it with something that will hide the data as we wish. Let's say we want to hide the phones column completely. You might think that you can rewrite the query from: CQL SELECT id, country, first_name, last_name, phones FROM customers to: CQL SELECT id, country, first_name, last_name, '****' as phones FROM customers That seems reasonable, but sadly, Cassandra does not support this: Shell InvalidRequest: Error from server: code=2200 [Invalid query] message="Cannot infer type for term '****' in selection clause (try using a cast to force a type)" Thankfully, there is a slightly ugly workaround: CQL SELECT id, country, first_name, last_name, blobAsText(textAsBlob('****')) as phones FROM customers We could do somewhat better using user-defined functions, if your Cassandra implementation supports them. We can thus easily create a filter in the proxy that will rewrite the query to mask the value of the phones column (see the hands-on tutorial linked previously for all the details). Let's test that: Shell cqlsh:demo> SELECT id, country, first_name, last_name, phones FROM customers; id | country | first_name | last_name | phones ----+---------+------------+---------------+--------- 23 | WF | Wanda | Williams | **** 5 | ES | Eric | Edmunds | **** 10 | JP | Juliet | Jefferson | **** 16 | PE | Patricia | Pérez | **** etc... If you need to hide only portions of a column, and your Cassandra implementation does not allow for user-defined functions, your only option is to modify result sets - let's look at that now. Modifying Result Sets For the ultimate in power and flexibility, we can modify result sets on their way back to the database clients: We can modify individual columns in specific rows. We can remove entire rows from result sets. We can also insert new rows in result sets, change the shape of result sets, etc., but that's beyond the scope of this article. Changing a column in a row is usually trivial with a few lines of code in a filter, e.g.: JavaScript let phones = context.row.phones; if (phones && phones.home) { phones.home.phone_number = "####"; } Let's try it out: Shell cqlsh:gallium_demo> SELECT id, country, last_name, phones FROM customers; id | country | last_name | phones ----+---------+---------------+----------------------------------------------------- 23 | WF | Williams | {'home': {country_code: 123, phone_number: '####'} 5 | ES | Edmunds | {'home': {country_code: 55, phone_number: '####'} 16 | PE | Pérez | {'home': {country_code: 116, phone_number: '####'} etc... Notice how much more precise this is: we're not blotting out the entire column, we're only hiding parts of it. Removing a row from a result set is also easy. It can be done either by setting parameters in the filter, or for more complex cases, in filter code, e.g.: JavaScript // Hide customers whose home phone number is in Japan let phones = context.row.phones; if (phones && phones.home && phones.home.country_code === 81) { context.row.remove(); } Again, you can see this in action in the hands-on tutorial for this article. Nothing has changed in the database: we're only affecting the data as it travels back to the Cassandra client. In Summary We've looked at three general techniques for hiding data in Cassandra with a proxy: Rejecting queries that try to access secret data Modifying queries so they do not show secret data Modifying result sets to hide secret data Rejecting queries is a blunt but effective tool. It might be sufficient for many use cases. Modifying queries has the advantage of performance: only one packet (the query) has to be modified, and the rest can work as usual. However, this technique can only work for some types of data masking requirements. Modifying result sets, on the other hand, is slightly more expensive, but it gives you complete control: you can change literally anything in the result set, no matter how fine-grained the required changes are. These techniques are not mutually exclusive: many solutions will involve a combination, perhaps in conjunction with other approaches such as fine-grained security (if available) or the data masking solution that will someday be available in Cassandra. But for complete power and flexibility, you can't beat a programmable database proxy.
In today's interconnected world, application users can span multiple countries and continents. Maintaining low latency across distant geographies while dealing with data regulatory requirements can be a challenge. The geo-partitioning feature of distributed SQL databases can help solve that challenge by pinning user data to the required locations. So, let’s explore how you can deploy a geo-partitioned database cluster that complies with data regulations and delivers low latency across multiple regions using YugabyteDB Managed. Deploying a Geo-Partitioned Cluster Using YugabyteDB Managed YugabyteDB is an open-source distributed SQL database built on PostgreSQL. You can deploy a geo-partitioned cluster within minutes using YugabyteDB Managed, the DBaaS version of YugabyteDB. Getting started with a geo-partitioned YugabyteDB Managed cluster is easy. Simply follow the below: Select the Multi-region Deployment option. When creating a dedicated YugabyteDB Managed cluster, choose the “multi-region” option to ensure your data is distributed across multiple regions. Set the Data Distribution Mode to “partitioned." Select the "partition by region" data distribution option so that you can pin data to specific geographical locations. Choose target cloud regions. Place database nodes in the cloud regions of your choice. In this blog, we spread data across two regions - South Carolina (us-east1) and Frankfurt (europe-west3). Once you've set up a geo-partitioned YugabyteDB Managed cluster, you can connect to it and create tables with partitioned data. Create a Geo-Partitioned Table To demonstrate how geo-partitioning improves latency and data regulation compliance, let's take a look at an example Account table. First, create PostgreSQL tablespaces that let you pin data to the YugabyteDB nodes in the USA (usa_tablespace) or in Europe (europe_tablespace): SQL CREATE TABLESPACE usa_tablespace WITH ( replica_placement = '{"num_replicas": 3, "placement_blocks": [ {"cloud":"gcp","region":"us-east1","zone":"us-east1-c","min_num_replicas":1}, {"cloud":"gcp","region":"us-east1","zone":"us-east1-d","min_num_replicas":1}, {"cloud":"gcp","region":"us-east1","zone":"us-east1-b","min_num_replicas":1} ]}' ); CREATE TABLESPACE europe_tablespace WITH ( replica_placement = '{"num_replicas": 3, "placement_blocks": [ {"cloud":"gcp","region":"europe-west3","zone":"europe-west3-a","min_num_replicas":1}, {"cloud":"gcp","region":"europe-west3","zone":"europe-west3-b","min_num_replicas":1}, {"cloud":"gcp","region":"europe-west3","zone":"europe-west3-c","min_num_replicas":1} ]}' ); num_replicas: 3 - Each tablespace requires you to store a copy of data across 3 availability zones within a region. This lets you tolerate zone-level outages in the cloud. Second, create the Account table and partition it by the country_code column: SQL CREATE TABLE Account ( id integer NOT NULL, full_name text NOT NULL, email text NOT NULL, phone text NOT NULL, country_code varchar(3) ) PARTITION BY LIST (country_code); Third, define partitioned tables for USA and European records: SQL CREATE TABLE Account_USA PARTITION OF Account (id, full_name, email, phone, country_code, PRIMARY KEY (id, country_code)) FOR VALUES IN ('USA') TABLESPACE usa_tablespace; CREATE TABLE Account_EU PARTITION OF Account (id, full_name, email, phone, country_code, PRIMARY KEY (id, country_code)) FOR VALUES IN ('EU') TABLESPACE europe_tablespace; FOR VALUES IN ('USA') - If the country_code is equal to the ‘USA’, then the record is automatically placed or queried from the Account_USA partition that is stored in the usa_tablespace (the region in South Carolina). FOR VALUES IN ('EU') - Otherwise, if the record belongs to the European Union (country_code is equal to 'EU'), then it’s stored in the Account_EU partition from the europe_tablespace (the region in Frankfurt). Now, let’s examine the read-and-write latency when a user connects from the United States. Latency When Connecting From the United States Let’s open a client connection from Iowa (us-central1) to a database node located in South Carolina (us-east1) and insert a new Account record: SQL INSERT INTO Account (id, full_name, email, phone, country_code) VALUES (1, 'John Smith', 'john@gmail.com', '650-346-1234', 'USA'); As long as the country_code is 'USA', the record will be stored on the database node from South Carolina. The write and read latency will be approximately 30 milliseconds because the client requests need to travel between Iowa and South Carolina. Next, let’s see what happens when we add and query an account with the country_code set to 'EU': SQL INSERT INTO Account (id, full_name, email, phone, country_code) VALUES (2, 'Emma Schmidt', 'emma@gmail.com', '49-346-23-1234', 'EU'); SELECT * FROM Account WHERE id=2 and country_code='EU'; Since this account must be stored in a European data center and must be transferred between the United States and Europe, the latency increases. The latency for the INSERT (230 ms) is higher than for the SELECT (130 ms) because during the INSERT the record is replicated across three availability zones in Frankfurt. The higher latency between the client connection in the USA and the database node in Europe signifies that the geo-partitioned cluster makes you compliant with data regulatory requirements. Even if the client from the USA connects to the US-based database node and writes/reads records of residents from the European Union, those records will always be stored/retrieved from database nodes in Europe. Latency When Connecting From Europe Let’s see how the latency improves if you open a client connection from Frankfurt (europe-west3) to the database node in the same region, and query the European record recently added from the USA: This time the latency is as low as 3 milliseconds (vs. 130 ms when you queried the same record from the USA) because the record is stored in and retrieved from European data centers. Adding and querying another European record also maintains low latency, as long as the data is not replicated to the United States. SQL INSERT INTO Account (id, full_name, email, phone, country_code) VALUES (3, 'Otto Weber', 'otto@gmail.com', '49-546-33-0034', 'EU'); SELECT * FROM Account WHERE id=3 and country_code='EU'; When accessing data stored in the same region, latency is significantly reduced. The result is a much better user experience while remaining compliant with data regulatory requirements. Wrap Up Geo-partitioning is an effective way to comply with data regulations and achieve global low latency. By deploying a geo-partitioned cluster using YugabyteDB Managed, it's possible to intelligently distribute data across regions, while maintaining high-performance querying capabilities.
In a recent survey by Great Expectations, 91% of respondents revealed that data quality issues had some level of impact on their organization. It highlights the critical importance of data quality in data engineering pipelines. Organizations can avoid costly mistakes, make better decisions, and ultimately drive better business outcomes by ensuring that data is accurate, consistent, and reliable. However, 41% of respondents in the survey also reported that lack of tooling was a major contributing factor to data quality issues. Employing data quality management tools in data pipelines can automate various processes required to ensure that the data remains fit for purpose across analytics, data science, and machine learning use cases. They also assess existing data pipelines, identify quality bottlenecks, and automate various remediation steps. To help organizations find the best tools, this article lists some popular tools for automating data quality checks in data engineering pipelines. Importance of Data Quality Check-In Data Engineering Pipelines Data quality tools are as essential as other data engineering tools, such as integration, warehousing, processing, storage, governance, and security. Here are several reasons why data quality check is essential in data pipelines: Accuracy: It ensures that the data is accurate and error-free. This is crucial for making informed decisions based on the data. If the data is inaccurate, it can lead to incorrect conclusions and poor business decisions. Completeness: It ensures that all required data is present in the pipeline and the pipeline is free from duplicate data. Incomplete data can result in missing insights, leading to incorrect or incomplete analysis. Consistency: Data quality check ensures consistency across different sources and pipelines. Inconsistent data can lead to discrepancies in the analysis and affect the overall reliability of the data. Compliance: It ensures the data complies with regulatory requirements and industry standards. Non-compliance can result in legal and financial consequences. Efficiency: Data quality checks help identify and fix data issues early in the pipeline, reducing the time and effort required for downstream processing and analysis. The data quality checks in the ingestion, storage, ETL, and processing layers are usually similar, regardless of the business needs and differing industries. The goal is to ensure that data is not lost or degraded while moving from source to target systems. Why Automate? Here’s how automating data testing and data quality checks can enhance the performance of data engineering pipelines: By testing data at every pipeline stage with automation, data engineers can identify and address issues early, preventing errors and data quality issues from being propagated downstream. Automation saves time and reduces the manual effort required to validate data. This, in turn, speeds up the development cycle and enables faster time-to-market. Automation tools can automate repetitive tasks such as data validation, reducing the time and effort required to perform these tasks manually. It increases the efficiency of the data engineering pipeline and allows data engineers to focus on more complex tasks. Data engineers can ensure that their pipelines and storage comply with regulatory and legal requirements and avoid costly penalties by automatically testing for data privacy, security, and compliance issues. Detecting errors early through automated checks reduces the risk of data processing errors and data quality issues. This saves time, money, and resources that would otherwise be spent on fixing issues downstream. List of Top Tools to Automate Data Quality Check Each data quality management tool has its own set of capabilities and workflows for automation. Most tools include features for data profiling, cleansing, tracking data lineage, and standardizing data. Some may also have parsing and monitoring capabilities or more. Here are some popular tools with their features: 1. Great Expectations Great Expectations provides a flexible way to define, manage, and automate data quality checks in data engineering pipelines. It supports various data sources, including SQL, Pandas, Spark, and more. Key features: Mechanisms for a shared understanding of data. Faster data discovery Integrates with your existing stack. Essential security and governance. Integrates with other data engineering tools such as AWS Glue, Snowflake, BigQuery, etc. Pricing: Open-source Popular companies using it: Moody’s Analytics, Calm, CarNext.com 2. IBM InfoSphere Information Server for Data Quality IBM InfoSphere Information Server for Data Quality offers end-to-end data quality tools for data cleansing, automating source data investigation, data standardization, validation, and more. It also enables you to continuously monitor and analyze data quality to prevent incorrect and inconsistent data. Key features: Designed to be scalable and handle large volumes of data across distributed environments. Offers flexible deployment options. Helps maintain data lineage. Supports various data sources and integration with other IBM data management products. Pricing: Varied pricing Popular companies using it: Toyota, Mastercard, UPS 3. Apache Airflow Apache Airflow is a platform to programmatically author, schedule, and monitor workflows. It provides features like task dependencies, retries, and backfills to automate data engineering pipelines and can be used for performing data quality checks as well. Key features: Modular architecture that can scale to infinity. Defined in Python, which allows for dynamic pipeline generation. Robust integrations with many third-party services, including AWS, Azure, GCP, and other next-gen technologies. Pricing: Open-source Popular companies using it: Airbnb, PayPal, Slack 4. Apache Nifi Apache Nifi provides a visual interface for designing and automating data engineering pipelines. It has built-in processors for performing data quality checks, such as validating data schema, checking for null values, and ensuring data completeness. Key features: Browser-based UI Data provenance Extensible architecture Supports powerful and scalable directed graphs (DAGs) of data routing, transformation, and system mediation logic. Pricing: Open-source Popular companies using it: Adobe, Capital One, The Weather Company 5. Talend Talend is a comprehensive platform that provides data quality solutions for data profiling, cleansing, enrichment, and standardization across your systems. It supports various data sources, including databases, files, and cloud-based platforms. Key features: Intuitive UI ML-powered recommendations to address data quality issues. Real-time capabilities Automates better data Pricing: Varied pricing plans Popular companies using it: Beneva, Air France, Allianz 6. Informatica Data Quality Informatica Data Quality is an enterprise-level data quality tool with data profiling, cleansing, and validation features. It also provides other capabilities such as data de-duplication, enrichment, and consolidation. Key features: Reliable data quality powered by AI. Reusability (of rules and accelerators) to save time and resources. Exception management through an automated process. Pricing: IPU (Informatica Processing Unit) pricing Popular companies using it: Lowell, L.A. Care, HSB Conclusion The above is not a definitive list. There are many other popular tools, such as Precisely Trillium, Ataccama One, SAS Data Quality, etc. Choosing the right data engineering tools for a pipeline involves considering several factors. It involves understanding your data pipeline and quality requirements, evaluating available tools, and their automation capabilities, considering cost and ROI, the ability to integrate with your current stack, and testing the tool with your pipeline.
To get more clarity about ISR in Apache Kafka, we should first carefully examine the replication process in the Kafka broker. In short, replication means having multiple copies of our data spread across multiple brokers. Maintaining the same copies of data in different brokers makes possible the high availability in case one or more brokers go down or are untraceable in a multi-node Kafka cluster to server the requests. Because of this reason, it is mandatory to mention how many copies of data we want to maintain in the multi-node Kafka cluster while creating a topic. It is termed a replication factor, and that’s why it can’t be more than one while creating a topic on a single-node Kafka cluster. The number of replicas specified while creating a topic can be changed in the future based on node availability in the cluster. On a single-node Kafka cluster, however, we can have more than one partition in the broker because each topic can have one or more partitions. The Partitions are nothing but sub-divisions of the topic into multiple parts across all the brokers on the cluster, and each partition would hold the actual data(messages). Internally, each partition is a single log file upon which records are written in an append-only fashion. Based on the provided number, the topic internally split into the number of partitions at the time of creation. Thanks to partitioning, messages can be distributed in parallel among several brokers in the cluster. Kafka scales to accommodate several consumers and producers at once by employing this parallelism technique. This partitioning technique enables linear scaling for both consumers and providers. Even though more partitions in a Kafka cluster provide a higher throughput but with more partitions, there are pitfalls too. Briefly, more file handlers would be created if we increase the number of partitions as each partition maps to a directory in the file system in the broker. Now it would be easy for us to understand better the ISR as we have discussed replication and partitions of Apache Kafka above. The ISR is just a partition’s replicas that are “in sync” with the leader, and the leader is nothing but a replica that all requests from clients and other brokers of Kafka go to it. Other replicas that are not the leader are termed followers. A follower that is in sync with the leader is called an ISR (in-sync replica). For example, if we set the topic’s replication factor to 3, Kafka will store the topic-partition log in three different places and will only consider a record to be committed once all three of these replicas have verified that they have written the record to the disc successfully and eventually send back the acknowledgment to the leader. In a multi-broker (multi-node) Kafka cluster (please click here to read how a multi-node Kafka cluster can be created), one broker is selected as the leader to serve the other brokers, and this leader broker would be responsible to handle all the read and write requests for a partition while the followers (other brokers) passively replicate the leader to achieve the data consistency. Each partition can only have one leader at a time and handles all reads and writes of records for that partition. The Followers replicate leaders and take over if the leader dies. By leveraging Apache Zookeeper, Kafka internally selects the replica of one broker’s partition, and if the leader of that partition fails (due to an outage of that broker), Kafka chooses a new ISR (in-sync replica) as the new leader. When all of the ISRs for a partition write to their log, the record is said to have been “committed,” and the consumer can only read committed records. The minimum in-sync replica count specifies the minimum number of replicas that must be present for the producer to successfully send records to a partition. Even though the high number of minimum in-sync replicas gives a higher persistence but there might be a repulsive effect, too, in terms of availability. The data availability automatically gets reduced if the minimum number of in-sync replicas won’t be available before publishing. The minimum number of in-sync replicas indicates how many replicas must be available for the producer to send records to a partition successfully. For example, if we have a three-node operational Kafka cluster with minimum in-sync replicas configuration as three, and subsequently, if one node goes down or unreachable, then the rest other two nodes will not be able to receive any data/messages from the producers because of only two active/available in sync replicas across the brokers. The third replica, which existed on the dead or unavailable broker, won’t be able to send the acknowledgment to the leader that it was synced with the latest data like how the other two live replicas did on the available brokers in the cluster. Hope you have enjoyed this read. Please like and share if you feel this composition is valuable.
Exploratory Data Analysis (EDA) is an essential step in any data science project, as it allows us to understand the data, detect patterns, and identify potential issues. In this article, we will explore how to use two popular Python libraries, Pandas and Matplotlib, to perform EDA. Pandas is a powerful library for data manipulation and analysis, while Matplotlib is a versatile library for data visualization. We will cover the basics of loading data into a pandas DataFrame, exploring the data using pandas functions, cleaning the data, and finally, visualizing the data using Matplotlib. By the end of this article, you will have a solid understanding of how to use Pandas and Matplotlib to perform EDA in Python. Importing Libraries and Data Importing Libraries To use the pandas and Matplotlib libraries in your Python code, you need to first import them. You can do this using the import statement followed by the name of the library. Python python import pandas as pd import matplotlib.pyplot as plt In this example, we're importing pandas and aliasing it as 'pd', which is a common convention in the data science community. We're also importing matplotlib.pyplot and aliasing it as 'plt'. By importing these libraries, we can use their functions and methods to work with data and create visualizations. Loading Data Once you've imported the necessary libraries, you can load the data into a pandas DataFrame. Pandas provides several methods to load data from various file formats, including CSV, Excel, JSON, and more. The most common method is read_csv, which reads data from a CSV file and returns a DataFrame. Python python# Load data into a pandas DataFrame data = pd.read_csv('path/to/data.csv') In this example, we're loading data from a CSV file located at 'path/to/data.csv' and storing it in a variable called 'data'. You can replace 'path/to/data.csv' with the actual path to your data file. By loading data into a pandas DataFrame, we can easily manipulate and analyze the data using pandas' functions and methods. The DataFrame is a 2-dimensional table-like data structure that allows us to work with data in a structured and organized way. It provides functions for selecting, filtering, grouping, aggregating, and visualizing data. Data Exploration head() and tail() The head() and tail() functions are used to view the first few and last few rows of the data, respectively. By default, these functions display the first/last five rows of the data, but you can specify a different number of rows as an argument. Python python# View the first 5 rows of the data print(data.head()) # View the last 10 rows of the data print(data.tail(10)) info() The info() function provides information about the DataFrame, including the number of rows and columns, the data types of each column, and the number of non-null values. This function is useful for identifying missing values and determining the appropriate data types for each column. Python python# Get information about the data print(data.info()) describe() The describe() function provides summary statistics for numerical columns in the DataFrame, including the count, mean, standard deviation, minimum, maximum, and quartiles. This function is useful for getting a quick overview of the distribution of the data. Python python# Get summary statistics for the data print(data.describe()) value_counts() The value_counts() function is used to count the number of occurrences of each unique value in a column. This function is useful for identifying the frequency of specific values in the data. Python python# Count the number of unique values in a column print(data['column_name'].value_counts()) These are just a few examples of panda functions you can use to explore data. There are many other functions you can use depending on your specific data exploration needs, such as isnull() to check for missing values, groupby() to group data by a specific column, corr() to calculate correlation coefficients between columns and more. Data Cleaning isnull() The isnull() function is used to check for missing or null values in the DataFrame. It returns a DataFrame of the same shape as the original, with True values where the data is missing and False values where the data is present. You can use the sum() function to count the number of missing values in each column. Python python# Check for missing values print(data.isnull().sum()) dropna() The dropna() function is used to remove rows or columns with missing or null values. By default, this function removes any row that contains at least one missing value. You can use the subset argument to specify which columns to check for missing values and the how argument to specify whether to drop rows with any missing values or only rows where all values are missing. Python python# Drop rows with missing values data = data.dropna() drop_duplicates() The drop_duplicates() function is used to remove duplicate rows from the DataFrame. By default, this function removes all rows that have the same values in all columns. You can use the subset argument to specify which columns to check for duplicates. Python python# Drop duplicate rows data = data.drop_duplicates() replace() The replace() function is used to replace values in a column with new values. You can specify the old value to replace and the new value to replace it. This function is useful for handling data quality issues such as misspellings or inconsistent formatting. Python python# Replace values in a column data['column_name'] = data['column_name'].replace('old_value', 'new_value') These are just a few examples of pandas functions you can use to clean data. There are many other functions you can use depending on your specific data-cleaning needs, such as fillna() to fill missing values with a specific value or method, astype() to convert data types of columns, clip() to trim outliers and more. Data cleaning plays a crucial role in preparing data for analysis, and automating the process can save time and ensure data quality. In addition to the panda's functions mentioned earlier, automation techniques can be applied to streamline data-cleaning workflows. For instance, you can create reusable functions or pipelines to handle missing values, drop duplicates, and replace values across multiple datasets. Moreover, you can leverage advanced techniques like imputation to fill in missing values intelligently or regular expressions to identify and correct inconsistent formatting. By combining the power of pandas functions with automation strategies, you can efficiently clean and standardize data, improving the reliability and accuracy of your exploratory data analysis (EDA). Data Visualization Data visualization is a critical component of data science, as it allows us to gain insights from data quickly and easily. Matplotlib is a popular Python library for creating a wide range of data visualizations, including scatter plots, line plots, bar charts, histograms, box plots, and more. Here are a few examples of how to create these types of visualizations using Matplotlib: Scatter Plot A scatter plot is used to visualize the relationship between two continuous variables. You can create a scatter plot in Matplotlib using the scatter() function. Python python# Create a scatter plot plt.scatter(data['column1'], data['column2']) plt.xlabel('Column 1') plt.ylabel('Column 2') plt.show() In this example, we're creating a scatter plot with column1 on the x-axis and column2 on the y-axis. We're also adding labels to the x-axis and y-axis using the xlabel() and ylabel() functions. Histogram A histogram is used to visualize the distribution of a single continuous variable. You can create a histogram in Matplotlib using the hist() function. Python python# Create a histogram plt.hist(data['column'], bins=10) plt.xlabel('Column') plt.ylabel('Frequency') plt.show() In this example, we're creating a histogram of the column variable with 10 bins. We're also adding labels to the x-axis and y-axis using the xlabel() and ylabel() functions. Box Plot A box plot is used to visualize the distribution of a single continuous variable and to identify outliers. You can create a box plot in Matplotlib using the boxplot() function. Python python# Create a box plot plt.boxplot(data['column']) plt.ylabel('Column') plt.show() In this example, we're creating a box plot of the column variable. We're also adding a label to the y-axis using the ylabel() function. These are just a few examples of what you can do with Matplotlib for data visualization. There are many other functions and techniques you can use, depending on the specific requirements of your project. Conclusion Exploratory data analysis (EDA) is a crucial step in any data science project, and Python provides powerful tools to perform EDA effectively. In this article, we have learned how to use two popular Python libraries, Pandas and Matplotlib, to load, explore, clean, and visualize data. Pandas provides a flexible and efficient way to manipulate and analyze data, while Matplotlib provides a wide range of options to create visualizations. By leveraging these two libraries, we can gain insights from data quickly and easily. With the skills and techniques learned in this article, you can start performing EDA on your own datasets and uncover valuable insights that can drive data-driven decision-making.
As technology professionals, we are already aware that our world is increasingly data-driven. This is especially true in the realm of financial markets, where algorithmic trading has become the norm, leveraging complex algorithms to execute trades at speeds and frequencies that far outstrip human capabilities. In this world where milliseconds can mean the difference between profit and loss, algorithmic trading provides an edge by making trading more systematic and less influenced by human emotional biases. But what if we could take this a step further? What if our trading algorithms could learn from their mistakes, adapt to new market conditions, and continually improve their performance over time? This is where reinforcement learning, a cutting-edge field in artificial intelligence, comes into play. Reinforcement learning (RL) is an area of machine learning that's focused on making decisions. It is about learning from interaction with an environment to achieve a goal, often formulated as a game where the RL agent learns to make moves to maximize its total reward. It is the technology that now being applied to a variety of problems, from self-driving cars to resource allocation in computer networks. But reinforcement learning's potential remains largely untapped in the world of algorithmic trading. This is surprising, given that trading is essentially a sequential decision-making problem, which is exactly what reinforcement learning is designed to handle. In this article, we will delve into how reinforcement learning can enhance algorithmic trading, explore the challenges involved, and discuss the future of this exciting intersection of AI and finance. Whether you're a data scientist interested in applying your skills to financial markets, or a technology enthusiast curious about the practical applications of reinforcement learning, this article has something for you. Understanding Algorithmic Trading Algorithmic trading, also known as algo-trading or black-box trading, utilizes complex formulas and high-speed, computer-programmed instructions to execute large orders in financial markets with minimal human intervention. It is a practice that has revolutionized the finance industry and is becoming increasingly prevalent in today's digital age. At its core, algorithmic trading is about making the trading process more systematic and efficient. It involves the use of sophisticated mathematical models to make lightning-fast decisions about when, how, and what to trade. This ability to execute trades at high speeds and high volumes offers significant advantages, including reduced risk of manual errors, improved order execution speed, and the ability to backtest trading strategies on historical data. In addition, algorithmic trading can implement complex strategies that would be impossible for humans to execute manually. These strategies can range from statistical arbitrage (exploiting statistical patterns in prices) to mean reversion (capitalizing on price deviations from long-term averages). An important aspect of algorithmic trading is that it removes emotional human influences from the trading process. Decisions are made based on pre-set rules and models, eliminating the potential for human biases or emotions to interfere with trading decisions. This can lead to more consistent and predictable trading outcomes. However, as powerful as algorithmic trading is, it is not without its challenges. One of the primary difficulties lies in the development of effective trading algorithms. These algorithms must be robust enough to handle a wide range of market conditions and flexible enough to adapt to changing market dynamics. They also need to be able to manage risk effectively, a task that becomes increasingly challenging as the speed and volume of trading increase. This is where reinforcement learning can play a critical role. With its ability to learn from experience and adapt its strategies over time, reinforcement learning offers a promising solution to the challenges faced by traditional algorithmic trading strategies. In the next section, we will delve deeper into the principles of reinforcement learning and how they can be applied to algorithmic trading. The Basics of Reinforcement Learning Reinforcement Learning (RL) is a subfield of artificial intelligence that focuses on decision-making processes. In contrast to other forms of machine learning, reinforcement learning models learn by interacting with their environment and receiving feedback in the form of rewards or penalties. The fundamental components of a reinforcement learning system are the agent, the environment, states, actions, and rewards. The agent is the decision-maker, the environment is what the agent interacts with, states are the situations the agent finds itself in, actions are what the agent can do, and rewards are the feedback the agent gets after taking an action. One key concept in reinforcement learning is the idea of exploration vs exploitation. The agent needs to balance between exploring the environment to find out new information and exploiting the knowledge it already has to maximize the rewards. This is known as the exploration-exploitation tradeoff. Another important aspect of reinforcement learning is the concept of a policy. A policy is a strategy that the agent follows while deciding on an action from a particular state. The goal of reinforcement learning is to find the optimal policy, which maximizes the expected cumulative reward over time. Reinforcement learning has been successfully applied in various fields, from game playing (like the famous AlphaGo) to robotics (for teaching robots new tasks). Its power lies in its ability to learn from trial and error and improve its performance over time. In the context of algorithmic trading, the financial market can be considered as the environment, the trading algorithm as the agent, the market conditions as the states, the trading decisions (buy, sell, hold) as the actions, and the profit or loss from the trades as the rewards. Applying reinforcement learning to algorithmic trading means developing trading algorithms that can learn and adapt their trading strategies based on feedback from the market, with the aim of maximizing the cumulative profit. However, implementing reinforcement learning in trading comes with its own unique challenges, which we will explore in the following sections. The Intersection of Algorithmic Trading and Reinforcement Learning The intersection of algorithmic trading and reinforcement learning represents an exciting frontier in the field of financial technology. At its core, the idea is to create trading algorithms that can learn from past trades and iteratively improve their trading strategies over time. In a typical reinforcement learning setup for algorithmic trading, the agent (the trading algorithm) interacts with the environment (the financial market) by executing trades (actions) based on the current market conditions (state). The result of these trades, in terms of profit or loss, serves as the reward or penalty, guiding the algorithm to adjust its strategy. One of the key advantages of reinforcement learning in this context is its ability to adapt to changing market conditions. Financial markets are notoriously complex and dynamic, with prices affected by a myriad of factors, from economic indicators to geopolitical events. A trading algorithm that can learn and adapt in real-time has a significant advantage over static algorithms. For example, consider a sudden market downturn. A static trading algorithm might continue executing trades based on its pre-programmed strategy, potentially leading to significant losses. In contrast, a reinforcement learning-based algorithm could recognize the change in market conditions and adapt its strategy accordingly, potentially reducing losses or even taking advantage of the downturn to make profitable trades. Another advantage of reinforcement learning in trading is its ability to handle high-dimensional data and make decisions based on complex, non-linear relationships. This is especially relevant in today's financial markets, where traders have access to vast amounts of data, from price histories to social media sentiment. For instance, a reinforcement learning algorithm could be trained to take into account not just historical price data, but also other factors such as trading volume, volatility, and even news articles or tweets, to make more informed trading decisions. Challenges and Solutions of Implementing Reinforcement Learning in Algorithmic Trading While the potential benefits of using reinforcement learning in algorithmic trading are significant, it's also important to understand the challenges and complexities associated with its implementation. Overcoming the Curse of Dimensionality The curse of dimensionality refers to the exponential increase in computational complexity as the number of features (dimensions) in the dataset grows. For a reinforcement learning model in trading, each dimension could represent a market factor or indicator, and the combination of all these factors constitutes the state space, which can become enormous. One approach to mitigating the curse of dimensionality is through feature selection, which involves identifying and selecting the most relevant features for the task at hand. By reducing the number of features, we can effectively shrink the state space, making the learning problem more tractable. Python from sklearn.feature_selection import SelectKBest, mutual_info_regression # Assume X is the feature matrix, and y is the target variable k = 10 # Number of top features to select selector = SelectKBest(mutual_info_regression, k=k) X_reduced = selector.fit_transform(X, y) Another approach is dimensionality reduction, such as Principal Component Analysis (PCA) or t-distributed Stochastic Neighbor Embedding (t-SNE). These techniques transform the original high-dimensional data into a lower-dimensional space, preserving as much of the important information as possible. Python from sklearn.decomposition import PCA # Assume X is the feature matrix n_components = 5 # Number of principal components to keep pca = PCA(n_components=n_components) X_reduced = pca.fit_transform(X) Handling Uncertainty and Noise Financial markets are inherently noisy and unpredictable, with prices influenced by numerous factors. To address this, we can incorporate techniques that manage uncertainty into our reinforcement learning model. For example, Bayesian methods can be used to represent and manipulate uncertainties in the model. Additionally, reinforcement learning algorithms like Q-learning and SARSA can be used, which learn an action-value function and are known to handle environments with a high degree of uncertainty. Preventing Overfitting Overfitting happens when a model becomes too specialized to the training data and performs poorly on unseen data. Regularization techniques, such as L1 and L2 regularization, can help prevent overfitting by penalizing overly complex models. Python from sklearn.linear_model import Ridge # Assume X_train and y_train are the training data alpha = 0.5 # Regularization strength ridge = Ridge(alpha=alpha) ridge.fit(X_train, y_train) Another way to prevent overfitting is through the use of validation sets and cross-validation. By regularly evaluating the model's performance on a separate validation set during the training process, we can keep track of how well the model is generalizing to unseen data. Python from sklearn.model_selection import cross_val_score from sklearn.linear_model import LinearRegression # Assume X and y are the feature matrix and target variable model = LinearRegression() cv_scores = cross_val_score(model, X, y, cv=5) # 5-fold cross-validation Balancing Exploration and Exploitation Striking the right balance between exploration (trying out new actions) and exploitation (sticking to known actions) is a key challenge in reinforcement learning. Several strategies can be used to manage this tradeoff. One common approach is the epsilon-greedy strategy, where the agent mostly takes the action that it currently thinks is best (exploitation), but with a small probability (epsilon), it takes a random action (exploration). Python import numpy as np def epsilon_greedy(Q, state, n_actions, epsilon): if np.random.random() < epsilon: return np.random.randint(n_actions) # Exploration: choose a random action else: return np.argmax(Q[state]) # Exploitation: choose the action with the highest Q-value Another approach is the Upper Confidence Bound (UCB) method, where the agent chooses actions based on an upper bound of the expected reward, encouraging exploration of actions with high potential. Python import numpy as np import math def ucb_selection(plays, rewards, t): n_arms = len(plays) ucb_values = [0] * n_arms for i in range(n_arms): if plays[i] == 0: ucb_values[i] = float('inf') else: ucb_values[i] = rewards[i] / plays[i] + math.sqrt(2 * math.log(t) / plays[i]) return np.argmax(ucb_values) Future Perspectives The intersection of reinforcement learning and algorithmic trading is a burgeoning field, and while it's already showing promise, there are several exciting developments on the horizon. One of the most prominent trends is the increasing use of deep reinforcement learning, which combines the decision-making capabilities of reinforcement learning with the pattern recognition capabilities of deep learning. Deep reinforcement learning has the potential to handle much more complex decision-making tasks, making it especially suited to the intricacies of financial markets. We can also expect to see more sophisticated reward structures in reinforcement learning models. Current models often use simple reward structures, such as profit or loss from a trade. However, future models could incorporate more nuanced rewards, taking into account factors such as risk, liquidity, and transaction costs. This would allow for the development of more balanced and sustainable trading strategies. Another intriguing prospect is the use of reinforcement learning for portfolio management. Instead of making decisions about individual trades, reinforcement learning could be used to manage a portfolio of assets, deciding what proportion of the portfolio to allocate to each asset in order to maximize returns and manage risk. In terms of research, there's a lot of ongoing work aimed at overcoming the challenges associated with reinforcement learning in trading. For instance, researchers are exploring methods to manage the exploration-exploitation tradeoff more effectively, to deal with the curse of dimensionality, and to prevent overfitting. In conclusion, while reinforcement learning in algorithmic trading is still a relatively new field, it holds immense potential. By continuing to explore and develop this technology, we could revolutionize algo-trading, making it more efficient, adaptable, and profitable. As technology professionals, we have the exciting opportunity to be at the forefront of this revolution.
Vector databases are currently all the rage in the tech world, and it isn't just hype. Vector search has become ever more critical due to artificial intelligence advances which make use of vector embeddings. These vector embeddings are vector representations of word embeddings, sentences, or documents that provide semantic similarity for semantically close inputs by simply looking at a distance metric between the vectors. The canonical example from word2vec in which the embedding of the word "king" was very near the resulting vector from the vectors of the words "queen", "man", and "woman" when arranged in the following formula: king - man + woman ≈ queen The fact that this works has always seemed amazing to me, but it works even for fairly large documents provided our embedding space is of sufficiently high dimension. With modern deep learning methods, you can get excellent embeddings of complex documents. For TerminusDB we needed a way to leverage these sorts of embeddings for the following tasks that our users have asked for: Full-text search Entity resolution (finding other documents which are likely the same for deduplication) Similarity search (for related content or for recommender systems) Clustering We decided to prototype using OpenAI's embeddings, but in order to get the rest of the features we required a vector database. We needed a few unusual features, including the ability to do incremental indexing, and the ability to index the basis of commits, so we know precisely what commit an index applies to. This allows us to put indexing into our CI workflows. A versioned open-source vector database doesn't exist in the wild. So we wrote one! Writing a Vector Database A vector database is a store of vectors with the ability to compare any two vectors using some metric. The metric can be a lot of different things such as Euclidean distance, Cosine similarity, Taxicab geometry, or really anything that obeys the triangle inequality rules required to define a metric space. In order to make this fast, you need to have some sort of indexing structure to quickly find candidates that are already close for comparison. Otherwise, many operations will need to compare with every single thing in the database, every time. There are many approaches to indexing vector spaces, but we went with the HNSW (Hierarchical Navigable Small World) graph (see Malkov and Yashunin). HNSW is easy to understand and provides good performance in both low and high dimensions, so is flexible. Most importantly there is a very clear open-source implementation that we found - HNSW for Rust Computer Vision. Storing the Vectors Vectors are stored in a domain. This helps to separate different vector stores that do not need to describe the same vectors. For TerminusDB we have many different commits that all pertain to the same vectors, so it's important that we put them all into the same domain. Markdown Page 0 1 2... ——————————————————————— Vectors: | 0 [......] 2 [......] | 1 [......] 3 [......] The vector store is page-based, where each buffer is designed to map cleanly to the operating system pages but fit the vectors we use closely. We assign each vector an index and then we can map from the index to the appropriate page and offset. Inside the HNSW index, we refer to a LoadedVec. This ensures that the page lives in a buffer, currently loaded so we can perform our metric comparisons on the vectors of interest. As soon as the last LoadedVec drops from a buffer, the buffer can be added back into a buffer pool to be used to load a new page. Creating a Versioned Index We build an HNSW structure for each (domain + commit) pair. If starting a new index, we start with an empty HNSW. If starting an incremental index from a previous commit, we load the old HNSW from the previous commit and then begin our indexing operations. What is new versus what is old is all kept in TerminusDB, which knows how to find changes between commits and can submit them to the vector database indexer. The indexer only needs to know the operations it is being asked to perform (i.e., Insert, Delete, Replace). We maintain the indexes themselves in an LRU pool that allows us to load on demand or use a cache if the index is already in memory. Since we only perform destructive operations at commits, this caching is always coherent. When we save the index, we serialize the structure with the raw vector index as a stand-in for the LoadedVec which helps to keep the index small. In the future, we would like to use some of the tricks we have learned in TerminusDB to keep layers of an index around, so new layers can be added without requiring each incremental index to add a duplicate when serializing. However, the indexes have proved small enough compared to the vectors we are storing that it has not mattered much. NOTE: While we currently do incremental indexing, we have yet to implement the delete and replace operations (there are only so many hours in a week!). I've read the literature on HNSW and it seems that it is not yet well described. We have a design for the delete and replace operations that we think will work well with HNSW and wanted to explain in case any technical people have ideas: If we are in an upper layer of the HNSW, then simply ignore the deletion - it should not matter much as most vectors are not in upper layers, and the ones that are, are only for navigation. If we are in the zero layer but not in an above layer, delete the node from the index, while trying to replace links between all neighbors of the deleted link according to closeness. If we are in the zero layer but also above, mark the node as deleted, and use it for navigation but do not store this node in the candidate pool. Finding Embeddings We use OpenAI to define our embeddings, and after an indexing request is made to TerminusDB, we feed each of the documents to OpenAI which returns lists of float vectors in JSON. It turns out that the embeddings are quite sensitive to context. We tried initially just submitting TerminusDB JSON documents and the results were not fantastic. However, we found that if we define a GraphQL query + Handlebars template, we can create very high-quality embeddings. For People in Star Wars, this pair, which is defined in our schema, looks like this: JSON { "embedding": { "query": "query($id: ID){ People(id : $id) { birth_year, created, desc, edited, eye_color, gender, hair_colors, height, homeworld { label }, label, mass, skin_colors, species { label }, url } }", "template": "The person's name is {{label}.{{#if desc} They are described with the following synopsis: {{#each desc} *{{this} {{/each}.{{/if}{{#if gender} Their gender is {{gender}.{{/if}{{#if hair_colors} They have the following hair colours: {{hair_colors}.{{/if}{{#if mass} They have a mass of {{mass}.{{/if}{{#if skin_colors} Their skin colours are {{skin_colors}.{{/if}{{#if species} Their species is {{species.label}.{{/if}{{#if homeworld} Their homeworld is {{homeworld.label}.{{/if}" } } The meaning of each field in the People object is rendered as text which helps OpenAI understand what we mean, providing much better semantics. Ultimately it would be nice if we could guess these sentences from a combination of our schema documentation and the schema structure, which is probably also possible using AI chat! But for now, this works brilliantly and does not require much technical sophistication. Indexing Star Wars So what happens when we actually run this thing? Well, we tried it out on our Star Wars data product to see what would happen. First, we fire off an index request, and our indexer obtains the information from TerminusDB: curl 'localhost:8080/index?commit=o2uq7k1mrun1vp4urktmw55962vlpto&domain=admin/star_wars' This returns with a task-id which we can use to poll an endpoint for completion. The index file and vector files for the domain admin/star_wars and the commit o2uq7k1mrun1vp4urktmw55962vlpto come out as: admin%2Fstar_wars@o2uq7k1mrun1vp4urktmw55962vlpto.hnswand admin%2Fstar_wars.vecs. We can now ask the semantic index server about our documents at the specified commit. curl 'localhost:8080/search?commit=o2uq7k1mrun1vp4urktmw55962vlpto&domain=admin/star_wars' -d "Who are the squid people" We get back a number of results as JSON which look like this: JSON [{"id":"terminusdb:///star-wars/Species/8","distance":0.09396297}, ...] But what is the embedding string we used to produce this result? This is how the text rendered for the Species/8 id: JSON "The species name is Mon Calamari. They have the following hair colours: none. Their skin colours are red, blue, brown, magenta. They speak the Mon Calamarian language." Amazing! Notice that it never says squid anywhere! There is some pretty amazing work being done by our embeddings here. Let's have another go: curl 'localhost:8080/search?commit=o2uq7k1mrun1vp4urktmw55962vlpto&domain=admin/star_wars' -d "Wise old man" JSON "The person's name is Yoda. They are described with the following synopsis: Yoda is a fictional character in the Star Wars franchise created by George Lucas, first appearing in the 1980 film The Empire Strikes Back. In the original films, he trains Luke Skywalker to fight against the Galactic Empire. In the prequel films, he serves as the Grand Master of the Jedi Order and as a high-ranking general of Clone Troopers in the Clone Wars. Following his death in Return of the Jedi at the age of 900, Yoda was the oldest living character in the Star Wars franchise in canon, until the introduction of Maz Kanata in Star Wars: The Force Awakens. Their gender is male. They have the following hair colours: white. They have a mass of 17. Their skin colours are green." Incredible! While we do say "oldest" in the text, we don't say "wise" or "man"! I hope you can see how this could be helpful for you in getting high-quality semantic indexing of your data! Conclusion We have also added endpoints to find neighboring documents and to find duplicates that search the entire corpus. The latter was used on some benchmarks and has performed admirably. We hope to show the results of these experiments here soon. While there are really great vector databases out there in the wild, such as Pinecone, we want to have a sidecar that integrates well with TerminusDB and which can be used for less technical users who care about content primarily and are not going to be spinning up their own vector database. We are really excited about the potential of this VectorLink, and would love people to have a look at what we have so far! Please forgive us a bit for the relatively sparse error handling. We're working furiously on it!