Cache synchronization using jOOQ and PostgreSQL functions

Imagine having a tool that can automatically detect JPA and Hibernate performance issues. Wouldn’t that be just awesome?

Well, Hypersistence Optimizer is that tool! And it works with Spring Boot, Spring Framework, Jakarta EE, Java EE, Quarkus, or Play Framework.

So, enjoy spending your time on the things you love rather than fixing performance issues in your production system on a Saturday night!

Introduction

In this article, we are going to see how we can achieve cache synchronization with the help of jOOQ and PostgreSQL functions.

By using Change Data Capture, we can track how table records change over time and synchronize the application-level cache entries that were built from the table records in question.

Domain Model

Let’s assume we are building a question-and-answer website similar to Stack Overflow. The largest and the most important tables in our database are the question and answer tables, which look as follows:

Question and Answer tables

Because our application has a lot of users, we want to store the most viewed questions and answers in an application-level cache like Redis.

The cache entry key is the question identifier, and the value is going to be a Question record that contains a List of Answer records, as illustrated by the following diagram:

Question and Answer records

Tracking record changes using a PostgreSQL function

To extract the Question and Answer records that need to be synchronized with the cache, we are going to use the following get_updated_questions_and_answers PostgreSQL function:

CREATE OR REPLACE FUNCTION get_updated_questions_and_answers()
RETURNS TABLE(
    question_id bigint, question_title varchar(250), 
    question_body text, question_score integer, 
    question_created_on timestamp, question_updated_on timestamp,
    answer_id bigint, answer_body text, 
    answer_accepted boolean, answer_score integer, 
    answer_created_on timestamp, answer_updated_on timestamp
)
LANGUAGE plpgsql
AS $$
DECLARE
previous_snapshot_timestamp timestamp;
    max_snapshot_timestamp timestamp;
    result_set_record record;
BEGIN
    previous_snapshot_timestamp = (
        SELECT
            updated_on
        FROM
            cache_snapshot
        WHERE
            region = 'QA'
        FOR NO KEY UPDATE
    );
    IF previous_snapshot_timestamp is null THEN
        INSERT INTO cache_snapshot(
            region,
            updated_on
        )
        VALUES (
            'QA',
            to_timestamp(0)
        );

        previous_snapshot_timestamp = to_timestamp(0);
    END IF;

    max_snapshot_timestamp = to_timestamp(0);
    FOR result_set_record IN(
        SELECT
            q1.id as question_id, q1.title as question_title,
            q1.body as question_body,q1.score as question_score,
            q1.created_on as question_created_on, q1.updated_on as question_updated_on,
            a1.id as answer_id, a1.body as answer_body,
            a1.accepted as answer_accepted, a1.score as answer_score,
            a1.created_on as answer_created_on, a1.updated_on as answer_updated_on
        FROM
            question q1
        LEFT JOIN
            answer a1 on q1.id = a1.question_id
        WHERE
            q1.id IN (
                SELECT q2.id
                FROM question q2
                WHERE
                    q2.updated_on > previous_snapshot_timestamp
            ) OR
            q1.id IN (
                SELECT a2.question_id
                FROM answer a2
                WHERE
                    a2.updated_on > previous_snapshot_timestamp
            )
        ORDER BY
            question_created_on, answer_created_on
    ) loop
	
    IF result_set_record.question_updated_on > max_snapshot_timestamp THEN
       max_snapshot_timestamp = result_set_record.question_updated_on;
    END IF;
	
    IF result_set_record.answer_updated_on > max_snapshot_timestamp THEN
       max_snapshot_timestamp = result_set_record.answer_updated_on;
    END IF;

    question_id = result_set_record.question_id;
    question_title = result_set_record.question_title;
    question_body = result_set_record.question_body;
    question_score = result_set_record.question_score;
    question_created_on = result_set_record.question_created_on;
    question_updated_on = result_set_record.question_updated_on;
    answer_id = result_set_record.answer_id;
    answer_body = result_set_record.answer_body;
    answer_accepted = result_set_record.answer_accepted;
    answer_score = result_set_record.answer_score;
    answer_created_on = result_set_record.answer_created_on;
    answer_updated_on = result_set_record.answer_updated_on;
    RETURN next;
END loop;

UPDATE
    cache_snapshot
SET updated_on = max_snapshot_timestamp
WHERE
    region = 'QA';
END
$$

The get_updated_questions_and_answers function works as follows:

  • First, it checks the previous_snapshot_timestamp, which tracks what was the most recent question or answer that we previously synchronized with the cache.
  • Second, we fetch the question along with all their answer records if there was any modification that happened inside this question and answer hierarchy
  • Afterward, we iterate over the question and answer records and calculate the max_snapshot_timestamp, which will become the next previous_snapshot_timestamp the next time we call the get_updated_questions_and_answers function.

If the get_updated_questions_and_answers function is called from a @Transactional context that executes the cache update, then in case of a cache update failure the transaction will be rolled back, and the cache_snapshot table is reverted to its previous consistent state.

Calling the TABLE-value function using jOOQ

As I explained in this article, jOOQ provides the best way to call database stored procedures and functions from Java.

By using the code generator, jOOQ creates a GetUpdatedQuestionsAndAnswers utility that allows us to call the get_updated_questions_and_answers PostgreSQL function.

First, we will import the static variables declared by the GetUpdatedQuestionsAndAnswers utility:

import static com.vladmihalcea.book.hpjp.jooq.pgsql.schema.crud.tables
    .GetUpdatedQuestionsAndAnswers.GET_UPDATED_QUESTIONS_AND_ANSWERS;

Afterward, we can call the get_updated_questions_and_answers PostgreSQL function like this:

Result<GetUpdatedQuestionsAndAnswersRecord> records = sql
    .selectFrom(GET_UPDATED_QUESTIONS_AND_ANSWERS.call())
    .fetch();

The GetUpdatedQuestionsAndAnswersRecord contains the type-safe TABLE result set that is returned by the get_updated_questions_and_answers PostgreSQL function.

From the GetUpdatedQuestionsAndAnswersRecord, we can create the Question and Answer hierarchy to be stored in the cache.

This can be encapsulated in the getUpdatedQuestionsAndAnswers method using a custom Java Collector:

public List<Question> getUpdatedQuestionsAndAnswers() {
    return doInJOOQ(sql -> {
        return sql
            .selectFrom(GET_UPDATED_QUESTIONS_AND_ANSWERS.call())
            .collect(
                Collectors.collectingAndThen(
                    Collectors.toMap(
                        GetUpdatedQuestionsAndAnswersRecord::getQuestionId,
                        record -> {
                            Question question = new Question(
                                record.getQuestionId(),
                                record.getQuestionTitle(),
                                record.getQuestionBody(),
                                record.getQuestionScore(),
                                record.getQuestionCreatedOn(),
                                record.getQuestionUpdatedOn(),
                                new ArrayList<>()
                            );

                            Long answerId = record.getAnswerId();
                            if (answerId != null) {
                                question.answers().add(
                                    new Answer(
                                        answerId,
                                        record.getAnswerBody(),
                                        record.getAnswerScore(),
                                        record.getAnswerAccepted(),
                                        record.getAnswerCreatedOn(),
                                        record.getAnswerUpdatedOn()
                                    )
                                );
                            }

                            return question;
                        },
                        (Question existing, Question replacement) -> {
                            existing.answers().addAll(
                                replacement.answers()
                            );
                            return existing;
                        },
                        LinkedHashMap::new
                    ),
                    (Function<Map<Long, Question>, List<Question>>) map -> 
                        new ArrayList<>(map.values())
                )
            );
    });
}

Testing time

When inserting a parent question row with two associated answer child records:

LocalDateTime timestamp = LocalDateTime.now().minusSeconds(1);

sql
.insertInto(QUESTION)
.columns(
    QUESTION.ID,
    QUESTION.TITLE,
    QUESTION.BODY,
    QUESTION.SCORE,
    QUESTION.CREATED_ON,
    QUESTION.CREATED_ON
)
.values(
    1L,
    "How to call jOOQ stored procedures?",
    "I have a PostgreSQL stored procedure and I'd like to call it from jOOQ.",
    1,
    timestamp,
    timestamp
)
.execute();

sql
.insertInto(ANSWER)
.columns(
    ANSWER.ID,
    ANSWER.QUESTION_ID,
    ANSWER.BODY,
    ANSWER.SCORE,
    ANSWER.ACCEPTED,
    ANSWER.CREATED_ON,
    ANSWER.CREATED_ON
)
.values(
    1L,
    1L,
    "Checkout the [jOOQ docs]" +
    "(https://www.jooq.org/doc/latest/manual/sql-execution/stored-procedures/).",
    10,
    true,
    timestamp,
    timestamp
)
.values(
    2L,
    1L,
    "Checkout [this article]" +
    "(https://vladmihalcea.com/jooq-facts-sql-functions-made-easy/).",
    5,
    false,
    timestamp,
    timestamp
)
.execute();

We can see that the getUpdatedQuestionsAndAnswers method returns one Question with two Answer entries that match exactly the Question hierarchy we have just created:

List<Question> questions = getUpdatedQuestionsAndAnswers();

assertEquals(1, questions.size());

Question question = questions.get(0);
assertEquals(1, question.id().intValue());

List<Answer> answers = question.answers();
assertEquals(2, answers.size());
assertEquals(1, answers.get(0).id().intValue());
assertEquals(2, answers.get(1).id().intValue());

When inserting a new Answer into our hierarchy:

sql
.insertInto(ANSWER)
.columns(
    ANSWER.ID,
    ANSWER.QUESTION_ID,
    ANSWER.BODY
)
.values(
    3L,
    1L,
    "Checkout this [video from Toon Koppelaars]" +
    "(https://www.youtube.com/watch?v=8jiJDflpw4Y)."
)
.execute();

We can see that now the Question record returned by the getUpdatedQuestionsAndAnswers method will contain three Answer child elements:

List<Question> questions = getUpdatedQuestionsAndAnswers();

assertEquals(1, questions.size());

Question question = questions.get(0);
assertEquals(1, question.id().intValue());

List<Answer> answers = question.answers();
assertEquals(3, answers.size());
assertEquals(1, answers.get(0).id().intValue());
assertEquals(2, answers.get(1).id().intValue());
assertEquals(3, answers.get(2).id().intValue());

When updating the answer table row that we have just created:

sql
.update(ANSWER)
.set(
    ANSWER.BODY,
    "Checkout this [YouTube video from Toon Koppelaars]" +
    "(https://www.youtube.com/watch?v=8jiJDflpw4Y)."
)
.where(ANSWER.ID.eq(3L))
.execute();

The getUpdatedQuestionsAndAnswers method will return the updated snapshot of our Question and Answer hierarchy:

List<Question> questions = getUpdatedQuestionsAndAnswers();

assertEquals(1, questions.size());
Question question = questions.get(0);
assertEquals(1, question.id().intValue());

List<Answer> answers = question.answers();
assertEquals(3, answers.size());
assertEquals(1, answers.get(0).id().intValue());
assertEquals(2, answers.get(1).id().intValue());

Answer latestAnswer = answers.get(2);
assertEquals(3, latestAnswer.id().intValue());
assertEquals(
    "Checkout this [YouTube video from Toon Koppelaars]" +
    "(https://www.youtube.com/watch?v=8jiJDflpw4Y).", 
    latestAnswer.body()
);

If we decide to insert a new Question:

sql
.insertInto(QUESTION)
.columns(
    QUESTION.ID,
    QUESTION.TITLE,
    QUESTION.BODY
)
.values(
    2L,
    "How to use the jOOQ MULTISET operator?",
    "I want to know how I can use the jOOQ MULTISET operator."
)
.execute();

The getUpdatedQuestionsAndAnswers method will capture this change and return the newly created Question that we can store in the cache:

List<Question> questions = getUpdatedQuestionsAndAnswers();

assertEquals(1, questions.size());

Question question = questions.get(0);
assertEquals(2, question.id().intValue());
assertTrue(question.answers().isEmpty());

Cool, right?

I'm running an online workshop on the 20-21 and 23-24 of November about High-Performance Java Persistence.

If you enjoyed this article, I bet you are going to love my Book and Video Courses as well.

Conclusion

While caching data is easy, synchronizing the cache with the database is the difficult part.

By using jOOQ to call the PostgreSQL TABLE-value functions that fetch the cacheable aggregates, we can simplify this task, as the result will capture the entries that have been changed since the last time we executed the cache synchronization.

This research was funded by Data Geekery GmbH and conducted in accordance with the blog ethics policy.

While the article was written independently and reflects entirely my opinions and conclusions, the amount of work involved in making this article happen was compensated by Data Geekery.

Transactions and Concurrency Control eBook

4 Comments on “Cache synchronization using jOOQ and PostgreSQL functions

  1. Very interesting! Single question that is implementation specific – why did you subtract one second from the ‘timestamp’?

    • I’m glad you liked it.

      The reason why the INSERT statement uses a TIMESTAMP that’s one second early is that we want to emulate the case when something has been created before we actually call the database function.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.