Patterns for Command Query Responsibility Segregation (CQRS) Read Models

By Adam McQuistan in CQRS  11/27/2022 Comment

In this article I review the concepts of Command Query Responsibility Segregation (CQRS) emphasizing the value and flexibility that this software architecture provides through independent optimization of writing and reading data. I present a set of patterns that are commonly used to implement CQRS along with code samples to provide practical examples of each pattern.

Sections

Introducing CQRS

CQRS is a software architecture pattern that draws a distinction between requests to modify state referred to as commands and requests to retrieve data referred to as queries. A driving benefit of CQRS is clear separation of concerns enabling purpose driven designs and implementations that focus on doing one thing very well, reading or writing data, and often lends to independent scalability.

On the left side of the diagram I'm showing commands coming into the application if the form of HTTP POST requests with a JSON body containing the data required to initiate change. This data is parsed, validated, and persisted by a section of the application code often referred to as the write model. This command handling write model is optimized to enforce business rules and constraints while writing data in the most efficient manner for a given application and use case. Data is then sourced from the write datastore synchronized and reshaped into a format suitable for efficient data retrieval. This synchronization and reshaping of data may take place within the application or in a process that is external to the application as I'll discuss in this article. Then on the right side of the diagram the focus is shifted towards data retrieval, the query side, which is initiated by an HTTP GET request and handled by a distinct section of code engineered to efficiently retrieve the requested data often refrerred to as the read model.

Its worth calling out that Commands and Queries need not be HTTP oriented they could just as easily be messages flowing into the application via a messsage broker as is the case in event driven architectures that utilize a combination of HTTP and message based communication.

Overview of Demo Application and Use Case

To provide a practical understanding of the concepts discussed in the article I will work through a contrived product pricing use case as articulated in the following user story.

As a product owner I want to update pricing for a product specifying the product, price, and start datetime so that marketing campaigns can be automated and sales systems can retrieve the current product price with information on how long the current price will be offerred.

The following constraits and business rules are to be satisified.

1) Current product prices without a future price remain active until a new future price is issued

GIVE a product needs a sales price

WHEN a product ID, price and start date are issued

THEN the product's price can be retrieved by product ID to present the price along with a end time of null indicating the current price will be held until a new price issued.

2) Future product prices are to be specified with a start date in the future of a current product price

GIVEN there is a current product price that started in the past

WHEN a new price is supplied with a future start date

THEN the current price is retrieved and presented providing with a date and time of when the current price ends equal to the start of the next earliest price.

Pattern 1: Separate Code Model but Same Logical and Physical Datastore

This first pattern is a shallow dip into some aspects of CQRS where data changes are initiated through commands and state changes are being persisted to a single datastore and model in the form of a product_pricing table in a PostgreSQL database.

A Spring Boot based Java example can be found in GitHub in branch 01-same-datastore-and-source.

This pattern represents the a minimal implemetnation of the CQRS pattern in that it does utilize a read specific code model which presents data to requesters in the form that best suites consumption yet it uses another model for validating and persisting data efficiently. The write model is designed to be efficient because of its append only nature which also has the added benefit of maintaining a history of all pricing changes and is conceptually forming an event store. However, as shown from the code snippet below, data retrieval is both complex and not terribly efficient as it does a lookup on a non-indexed field (product_id) retriving multiple rows per request in order to determine the end date of a current price as the next price if available.

@Service
public class ProductPricingService {

    private ProductPricingRepo pricingRepo;

    // ... omitting other code for brevity

    public ProductPriceEntity fetchProductPrice(long productId) {
        ProductPriceEntity currPrice = null;
        ProductPricingEntity nextPrice = null;
        var now = OffsetDateTime.now();

        var prices = pricingRepo.findByProductIdOrderByStartDesc(productId);
        for (ProductPricingEntity price : prices) {
            if (price.getStart().isBefore(now)) {
                currPrice = new ProductPriceEntity();
                currPrice.setProductId(price.getProductId());
                currPrice.setPrice(price.getPrice());
                break;
            }
            nextPrice = price;
        }

        if (nextPrice != null) {
            currPrice.setEnd(nextPrice.getStart());
        }

        return currPrice;
    }
}

A concern exists here where this particular application (web service, microservice, monolith module, etc ...) may degrade in under moderate to high workloads due to complex data retrieval and database contention between concurrent writes and reads blocking each other.

A possible option for extending this pattern with minimal software change is to supplement the primary datastore with a read-replica providing the capability to offload read request workload from the primary writable database. This would require that the application be updated to source data appropriately.

Pattern 2: Separate Code Model and Read Datastore via Materialized View with PostgreSQL

This next pattern is a nice approach to providing a read optimized datastore and model without introducing new infrastructure. Instead the architecture stays within the comfort of a SQL based implementation. Here a new physical data model is created known as a materialized view which is a persisted cache of a complex query that can be indexed and refreshed periodically (ie, on a schedule) or event based through triggers. With the introduction of a read optimized materialized view a corresponding code model can query it directly simplifying code and providing significant read performance improvements as long as the underlying database system has sufficient computational resources.

A Spring Boot based Java example can be found in GitHub in branch 02-pg-materialized-view.

In this example the new read specific code is the price model used to query the product price materialized view (indirectly through a repository layer) alleviating the need to calculate the end date of a price.

@Service
public class ProductPriceService {

    private ProductPriceRepo productPriceRepo;

    public ProductPriceService(ProductPriceRepo productPriceRepo) {
        this.productPriceRepo = productPriceRepo;
    }

    public ProductPriceEntity fetchProductPrice(long productId) {
        return productPriceRepo.findById(productId).get();
    }
}

The logic still exists to calculate an end date of a price represented in the materialized view and depending on the SQL capabilities of the engineering team implementing the solution this may or may not be an improvement to complexity. If you are in need of a refresher on SQL window functions, sometimes referred to as analytical functions, have a pass over my article How to Use Window Functions in SQL.

CREATE MATERIALIZED VIEW product_price
AS
  SELECT y.product_id AS product_id, y.price AS price, y.end AS end
  FROM (
      SELECT
        x.product_id AS product_id,
        x.price AS price,
        x.end AS end,
        ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY x.end) AS row_num
      FROM (
          SELECT
            pp.product_id AS product_id,
            pp.price AS price,
            LEAD(start) OVER (PARTITION BY pp.product_id ORDER BY pp.start) AS end
           FROM product_pricing pp
           ORDER BY product_id, start
       ) x
       WHERE x.end IS NULL OR x.end > now()
   ) y
   WHERE y.row_num = 1
WITH DATA;

CREATE UNIQUE INDEX product_price_idx ON product_price(product_id);

However, we actually have a new problem as soon as we execute these statements because even though we now have a direct indexed access pattern to a very simple read optimized data source we have a staleness issue now. When a product price updates via a new pricing entry then the materialized view becomes an inaccurate reflection of the product's price until it is refreshed. This staleness problem can be addressed by either periodic refreshing the materialized view according to some schedule (ie, hourly) or in response to some event such as an insert, update, or delete to the source product_pricing table. For this example I've chosen to implement a custom SQL function to refresh the materialized view that is fired by a trigger when a new row is inserted in the product_pricing table.

CREATE FUNCTION refresh_product_price()
  RETURNS TRIGGER LANGUAGE plpgsql
  AS $$
    BEGIN REFRESH MATERIALIZED VIEW CONCURRENTLY product_price;
    RETURN NULL;
  END $$;

CREATE TRIGGER refresh_product_price_after_insert
  AFTER INSERT
  ON product_pricing
  FOR EACH STATEMENT
  EXECUTE PROCEDURE refresh_product_price();

Of course there are no free lunches with this solution either. This implementation design of the CQRS read model pattern will remain performant only as long as the underlying query is relatively efficient and/or the materialized view is refreshed relatively infrequently. If the source product_pricing table contains a lot of products with actively fluctuating prices then the database is likely to degrade and trickle into the application and user experience.

Like the last pattern we have the option to scale the database horizontally using a read replica then perform a slight refactor of the code to read from the read replica.

Pattern 3: Separate Code Model and Key/Value Redis Datastore Sourced from Events Published to Kafka 

This next pattern I present pulls in some new infrastructure components to the architecture namely Apache Kafka and Redis. Apache Kafka is utilized as a event broker providing the capability to decouple these command driven state changes from the generation of an optimized read model backed by the fast key/value in-memory Redis cache serving up information requested through queries.

A Spring Boot based Java example can be found in GitHub in branch 03-kafka-redis-cache-events.

In this pattern when new pricing entries are saved via the write model then a state change event is published to a Kafka topic.

@Service
public class ProductPricingService {

    @Value("${kafka-topics.pricing}")
    private String pricingTopic;

    private ProductPricingRepo pricingRepo;
    private KafkaTemplate<Long, ProductPricingEntity> kafkaTemplate;

    public ProductPricingService(
            ProductPricingRepo pricingRepo,
            KafkaTemplate<Long, ProductPricingEntity> kafkaTemplate) {
      
        this.pricingRepo = pricingRepo;
        this.kafkaTemplate = kafkaTemplate;
    }

    public void savePricingUpdate(UpdatePriceCommand cmd) {
        var pricing = new ProductPricingEntity(
                cmd.getProductId(),
                cmd.getPrice(),
                cmd.getStart()
        );
        pricingRepo.save(pricing);
        kafkaTemplate.send(pricingTopic, pricing.getProductId(), pricing);
        kafkaTemplate.flush();
    }
}

On the other end of a Kafka Topic is a consumer which handles these state change events aggregating and pushing the read optimized model to the Redis datastore.

Here is a snippet of the Kafka consumer implemented as a Spring for Kafka Listener handing these pricing state events.

@SpringBootApplication
@DependsOn("jedisConnectionFactory")
public class PricingServiceApplication {
	static final Logger logger = LoggerFactory.getLogger(PricingServiceApplication.class);

	@Autowired
	private ProductPriceService priceService;

	public static void main(String[] args) {
		SpringApplication.run(PricingServiceApplication.class, args);
	}

	@KafkaListener(topics = "#{'${kafka-topics.pricing}'.split(' ')}", containerFactory = "productPricingListenerFactory")
	public void handle(ProductPricingEntity pricing) {
		var price = priceService.updatePriceFromLatestPricing(pricing);
	}
}

The listener simply calls a service method to update the cache based off the new pricing.

@Service
public class ProductPriceService {

    private ProductPriceRepo productPriceRepo;
    private ProductPricingRepo productPricingRepo;

    public ProductPriceService(ProductPriceRepo productPriceRepo, ProductPricingRepo productPricingRepo) {
        this.productPriceRepo = productPriceRepo;
        this.productPricingRepo  = productPricingRepo;
    }
    
    // ... other code ommitted for brevity

    public ProductPriceEntity updatePriceFromLatestPricing(
                ProductPricingEntity pricing) {

        ProductPriceEntity currPrice = fetchPriceFromPricing(pricing.getProductId());
        if (currPrice.getEnd() == null) {
            currPrice.setEnd(pricing.getStart());
        }

        productPriceRepo.save(currPrice);
        return currPrice;
    }
}

This solution provides the greatest flexibility and a large capacity for scaling but, it is also considerably more complex to implement than the pervious patterns. The read datastore backed by a key/value NoSQL caching technology like Redis is going to be limited by the amount of memory available to hold the data in memory whereby high memory pressure will inevitably result in some number of pricing entities being evicted from the cache to make room for new ones upon a read request (ie, query) if the cache doesn't have the request product price it must be sourced from the write model and persisteted to the cache. Similarly if multiple future prices are added for a given product then logic needs to be implemented on the read side to refresh itself in the event that the current product's price end datetime is expired (ie, occurs before the time the query was issued).

@Service
public class ProductPriceService {

    private ProductPriceRepo productPriceRepo;
    private ProductPricingRepo productPricingRepo;

    public ProductPriceService(
             ProductPriceRepo productPriceRepo,
             ProductPricingRepo productPricingRepo) {
           
        this.productPriceRepo = productPriceRepo;
        this.productPricingRepo  = productPricingRepo;
    }
    
    // ... other code ommitted for brevity

    public ProductPriceEntity fetchProductPrice(long productId) {
        var id = Long.toString(productId);
        return productPriceRepo.findById(id).orElseGet(() -> {
            var price = fetchPriceFromPricing(productId);
            return productPriceRepo.save(price);
        });
    }
}

Of course we could use another technology to build the read model's datastore using essentially the same logic and pattern. By this I mean in place of Redis I could have just as kept the implementation in a Relational Datastore like PostgreSQL or use a completely other technology like MongoDB or CassandraDB.

Resources for Further Learning

Conclusion

In this article I resented a few different patterns for optimizing the read model of a CQRS base architecture.

As always, I thank you for reading and please feel free to ask questions or critique in the comments section below.

Share with friends and colleagues

[[ likes ]] likes

Navigation

Community favorites for CQRS

theCodingInterface