Blogs

Practical queueing using SQL (Part 2): Do it simply using Spring Boot and JPA

Theme
Software development

Part 1: Rationale and general design
Part 2: Do it simply using Spring Boot and JPA
Part 3: Clustering and parallelism


Enough of talking, let’s see how to implement queueing mechanism described in previous post. We gonna do it in Java, and for this little demo, we are picking a combination of Spring Boot, JPA and Gradle, since it’s fairly quick setup with these.

Although I am not a fan of premature abstraction due to frequent situations where it gets more in the way than helps, but just for sake of easier explanation I will abstract whole queueing mechanism in separate package (or two), and the rest of app acts simply as a user of such “queueing library”. This way one can easily see what is application-specific, and what is general queueing code.

We gonna demonstrate this queueing mechanism on some dummy SMS message sending application, but let’s first get you introduced to queueing code.

The code and build/run instructions are available at the end of this post.

Queueing library

Whole “library” is present under com.ag04.jpaqueue package.

Since entities that are to be processed in queueing fashion are application-specific, we gonna extract only their queueing state as JPA Embeddableobject (some code omitted for brevity):

@Embeddable
public class QueueingState {
  
	public enum Status {
		NOT_ATTEMPTED,
		ERROR,
		SUCCESS
	}

	private Status status;
	private LocalDateTime nextAttemptTime;
	private int attemptCount;
	private LocalDateTime lastAttemptTime;
	private String lastAttemptErrorMessage;

	public QueueingState() {
		this.status = Status.NOT_ATTEMPTED;
		this.attemptCount = 0;
	}

Queue producer

As you can guess, just embed this QueueingState object into entity of your choice to prepare it for queueing. Also, there is one important public method in QueueingState class, the one that you have to call when pushing the item entity into queue:

public void scheduleNextAttempt(LocalDateTime nextAttemptTime);

So prior to saving the queuing entity, just make sure to call this method with desired processing time, thus populating next_attempt_time column. Usually this is the current time, which means the item will get processed as soon as possible, but it can even be in the future which effectively makes for delayed/scheduled processing.

So, pushing to queue would typically look something like this:

SomeEntity item = new SomeEntity(....);
item.getQueueingState().scheduleNextAttempt(LocalDateTime.now());
save(item);

Queue consumer

Now comes the most complex part — the consumer code. It is located in one class — com.ag04.jpaqueue.QueueConsumer

Looking at its constructor, you can see it is configured with item batch size, and also with polling period (in seconds). The constructor also requires few other dependency arguments:

  • QueueConsumerModule — application-specific logic required for item consumption
  • RetryPolicy — strategy how to handle retries in case of processing failures; there are couple of implementations available in com.ag04.jpaqueue.retry package
  • PlatformTransactionManager — Spring-provided bean for managing transactions because we need it to open some internal transactions programmatically (this is very rare nowadays in Spring world — other approach would be normally to use @Transactional with additional bean class, but it would require separating code in more classes, so I considered current approach simpler for this demo)

Arguably, the most interesting is QueueConsumerModule which looks like this:

public interface QueueConsumerModule<ID> {
   List<ID> findItemIdsWhereQueueingNextAttemptTimeIsBefore(LocalDateTime time, int limit);
   Optional<QueueingState> getQueueingStateForItem(ID itemId);
   Optional<QueueingState> processItem(ID itemId);
}

So, these methods should be implemented by concrete application in order to provide:

  • IDs of limited list of pending items where next attempt time is before given time value
  • QueueingState instance for specified item entity (sometimes there can be multiple QueueingState embeddables within same entity when there are multiple queued processings present for same entity); return value is optional because it can happen that item is not present in DB for some external reason
  • Processing logic for specific item entity which returns item’s QueueingState in case of success; return value is again optional for same reason described above

Understandably, we need a scheduler for executing processing job periodically, and as explained in previous post, we want to ensure there must be no parallel executions of such processing task, so we will simply use single-thread executor, and schedule processing task upon application start:

private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
// ...
private void startProcessingTask() {
   logger.info("Starting queue processing task with delay of {} secs", this.pollingPeriodInSecs);
   Runnable command = this::processQueuedItems;
   this.processingTask = this.scheduledExecutorService.scheduleWithFixedDelay(command, pollingPeriodInSecs, pollingPeriodInSecs, TimeUnit.SECONDS);
}

Consuming logic is within this method:

public void processQueuedItems() {
	try {
		LocalDateTime now = LocalDateTime.now();
		List<?> itemIds = this.queueConsumerModule.findItemIdsWhereQueueingNextAttemptTimeIsBefore(now, itemsPollSize);

		if (!itemIds.isEmpty()) {
			logger.info("Fetched {} pending queued items", itemIds.size());
			for (Object itemId : itemIds) {
				processItemAndHandleErrorIfRaised(itemId);
			}
		}
	} catch (Throwable th) {
		logger.error("Error while fetching queued items: " + th.getMessage(), th);
	}
}

private void processItemAndHandleErrorIfRaised(Object itemId) {
	try {
		executeUnderTransaction(() -> processItem(itemId));
	} catch (Throwable error) {
		executeUnderTransaction(() -> registerProcessingFailure(itemId, error));
	}
}

So you can see how consumer first polls for pending items, and then process each of them within its own separate transaction. If the error is caught during the processing, then we handle it in a new transaction. This separate transaction for error handling is required because in some cases JPA’s PersistenceContext cannot be reused for error handling if the exception has already rolled back some nested transaction in processing logic.

The consumer starts its processing job in its initialization method, which in Spring is usually annotated with @PostConstruct, and respectively stops it in its destroy method annotated with @PreDestroy. We will see in next post that we need to change this to make our app cluster-friendly.

Demo app — SMS sending

OK, so let’s get to business — now we gonna set up simple Spring Boot app that simply enqueues some amount of SMS messages and “sends” them using previously described queueing library. Application code is located under com.ag04.smsqueueing package(s) and starter class is com.ag04.smsqueueing.SmsQueueingApplication (annotated with @SpringBootApplication).

By SMS “sending” I mean just logging SMS content to console, but this dummy sender will also introduce some short delay to simulate real world case, as well as trigger occasional exceptions to show how error handling works (actually errors will be triggered quite often). Sender implementation is located in com.ag04.smsqueueing.sender.SmsSenderImplclass.

Queued entity — SMS message

Our queued item entity is SmsMessage class, stored in SMS_MESSAGE table in the database.

@Entity
@Table(name = "sms_message", indexes = @Index(name = "idx_sms_msg_queue_polling_fields", columnList = "next_attempt_time"))
public class SmsMessage {
	@Id
	@Column(name = "id", nullable = false)
	@GeneratedValue
	private Long id;

	@Column(name = "uid", nullable = false, unique = true)
	private String uid; // app-assigned unique ID

	@Column(name = "from_address", nullable = false)
	private String fromAddress;

	@Column(name = "to_address", nullable = false)
	private String toAddress;

	@Column(name = "text", nullable = false)
	private String text;

	private QueueingState sendingState = new QueueingState();
  
  // ...
}

So you see, this entity has only a few specific fields (fromAddress, toAddress, text…), and also includes QueueingState embeddable previously described. Note that we also used JPA’s @Index annotation to index “next_attempt_time” column, which is of big importance for efficient polling.

SMS production

Upon application start, the application enqueues some amount of SMS messages. This is visible in com.ag04.smsqueueing.MainApplicationRunner#run method.

SMS consumption

We instantiate QueueConsumer within com.ag04.smsqueueing.SmsQueueingApplication#smsSendingQueueConsumer method:

@Bean
public QueueConsumer smsSendingQueueConsumer(SmsSendingQueueConsumerModule smsSendingQueueConsumerModule, PlatformTransactionManager transactionManager) {
	RetryPolicy retryPolicy = new LimitedRetryPolicy(3, new FixedDelayRetryPolicy(Duration.ofMinutes(1)));
	return new QueueConsumer(smsSendingQueueConsumerModule, retryPolicy, transactionManager, 100, 10);
}

You can see how we configured this instance to work with batch size 100, and with polling period of 10 seconds. Also, RetryPolicy that we configured will try to process SMS message at most 3 times (LimitedRetryPolicy) and also each retry will be delayed from previous one by 1 minute (FixedDelayRetryPolicy). ExponentialDelayRetryPolicy is another interesting RetryPolicy in the package, which makes each subsequent retry delayed by exponentially increasing duration.

Our implementation of QueueConsumerModule is SmsSendingQueueConsumerModule, and uses Spring Data JPA repository to access SmsMessage in database. You will notice that Implementation of processItem(id) is basically just a call to SmsSender. Also, our findItemIdsWhereQueueingNextAttemptTimeIsBefore(time, limit) doesn’t use SmsMessageRepository because Spring Data JPA currently doesn’t offer parameterized result limiting together with simple projection (ID selection), so we ended up using plain JPA via EntityManager.

Building and running

The project code can be downloaded from BitBucket at:

https://bitbucket.org/ag04/smsqueueing/src/single-threaded-consumer/

You can download the sources using following git command:

git clone https://bitbucket.org/ag04/smsqueueing/src/single-threaded-consumer

Gradle is picked as a build tool, and as one can see in gradle.build, there are only 3 dependencies required for this demo project:

dependencies {
   implementation 'org.springframework.boot:spring-boot-starter'
   implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
   runtimeOnly 'org.postgresql:postgresql'
}

To build the project just execute following command line:

./gradlew clean build

Now we have built the app package (build/libs/smsqueueing-1.0.jar), but prior to running it, we need to make sure we have some place to store our data.

Database setup

We decided to use PostgreSQL database, but any other SQL database would work also since we don’t use any DB specifics here.

One has to create appropriate database (“smsqueueing”) and a user (also “smsqueueing”). For PostgreSQL, this “psql” script can come handy:

shell> psql postgres
CREATE USER smsqueueing WITH
LOGIN
CONNECTION LIMIT -1
PASSWORD 'smsqueueing';
CREATE DATABASE smsqueueing
WITH
OWNER = smsqueueing
ENCODING = 'UTF8'
CONNECTION LIMIT = -1;

DB connection configuration is done as standard Spring Boot setup in application.properties:

spring.datasource.url=jdbc:postgresql://localhost:5432/smsqueueing
spring.datasource.username=smsqueueing
spring.datasource.password=smsqueueing

If not already present, all necessary database objects (tables, sequences etc…) will get created upon application start, so you don’t have to worry about it.

Running

Finally we can run the app via:

java -jar ./build/libs/smsqueueing-1.0.jar

Of course, if you gonna start it from your favorite IDE, just import it as Gradle project, and start it via main application class — com.ag04.smsqueueing.SmsQueueingApplication

When you start it, you gonna see at first log lines about SMS production, something like:

2019-05-10 07:41:25.619  INFO 8074 --- [           main] c.a.s.producer.SmsProducerImpl           : Producing SMS: fromAddress=80111, toAddress=385913344599, text=Hello, this is text generated at 2019-05-10T07:41:25.618, sendTime=2019-05-10T07:41:25.618
2019-05-10 07:41:25.622  INFO 8074 --- [           main] c.a.s.producer.SmsProducerImpl           : Producing SMS: fromAddress=80444, toAddress=385913344606, text=Hello, this is text generated at 2019-05-10T07:41:25.622, sendTime=2019-05-10T07:41:25.622
2019-05-10 07:41:25.625  INFO 8074 --- [           main] c.a.s.producer.SmsProducerImpl 

And each periodic execution of consumption task, when it polls some pending items, it will log their count:

2019-05-10 07:45:11.512  INFO 8415 --- [pool-1-thread-1] com.ag04.jpaqueue.QueueConsumer          : Fetched 100 pending queued items

After which it logs individual item processing lines, such as these for successful processing:

2019-05-10 07:45:11.518  INFO 8415 --- [pool-1-thread-1] c.ag04.smsqueueing.sender.SmsSenderImpl  : Sending SMS: SmsMessage[id=311, uid='e2a334af-4398-420a-b493-e5a05549fcc9', fromAddress='80444', toAddress='385913344829', text='Hello, this is text generated at 2019-05-10T07:40:36.360', sendingState=QueueingState[status=NOT_ATTEMPTED, nextAttemptTime=2019-05-10T07:40:36.360, attemptCount=0, lastAttemptTime=null, lastAttemptErrorMessage='null']]
2019-05-10 07:45:11.605  INFO 8415 --- [pool-1-thread-1] c.ag04.smsqueueing.sender.SmsSenderImpl  : Sending SMS: SmsMessage[id=312, uid='dda97da8-af60-4eca-87d7-c406bf1c3161', fromAddress='80555', toAddress='385913344533', text='Hello, this is text generated at 2019-05-10T07:40:36.363', sendingState=QueueingState[status=NOT_ATTEMPTED, nextAttemptTime=2019-05-10T07:40:36.363, attemptCount=0, lastAttemptTime=null, lastAttemptErrorMessage='null']]
2019-05-10 07:45:11.699  INFO 8415 --- [pool-1-thread-1] c.ag04.smsqueueing.sender.SmsSenderImpl  : Sending SMS: SmsMessage[id=313, uid='628b630e-813c-45c6-b889-f0f037d3ec3c', fromAddress='80333', toAddress='385913344634', text='Hello, this is text generated at 2019-05-10T07:40:36.367', sendingState=QueueingState[status=NOT_ATTEMPTED, nextAttemptTime=2019-05-10T07:40:36.367, attemptCount=0, lastAttemptTime=null, lastAttemptErrorMessage='null']]

And something like this for unsuccessful one (and these will be frequent in our demo):

2019-05-10 07:45:12.160 ERROR 8415 --- [pool-1-thread-1] com.ag04.jpaqueue.QueueConsumer          : Error while processing item by ID 316: Wrong time picked for send

java.lang.IllegalStateException: Wrong time picked for send
	at com.ag04.smsqueueing.sender.SmsSenderImpl.send(SmsSenderImpl.java:23) ~[classes/:na]
	at com.ag04.smsqueueing.SmsSendingQueueConsumerModule.processItem(SmsSendingQueueConsumerModule.java:50) ~[classes/:na]
	at com.ag04.smsqueueing.SmsSendingQueueConsumerModule.processItem(SmsSendingQueueConsumerModule.java:16) ~[classes/:na]
	at com.ag04.jpaqueue.QueueConsumer.processItem(QueueConsumer.java:123) [classes/:na]
	at com.ag04.jpaqueue.QueueConsumer.lambda$processItemAndHandleErrorIfRaised$0(QueueConsumer.java:107) [classes/:na]
	at com.ag04.jpaqueue.QueueConsumer$1.doInTransactionWithoutResult(QueueConsumer.java:117) ~[classes/:na]
	at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36) ~[spring-tx-5.1.5.RELEASE.jar:5.1.5.RELEASE]
	at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.1.5.RELEASE.jar:5.1.5.RELEASE]
	at com.ag04.jpaqueue.QueueConsumer.executeUnderTransaction(QueueConsumer.java:114) [classes/:na]
	at com.ag04.jpaqueue.QueueConsumer.processItemAndHandleErrorIfRaised(QueueConsumer.java:107) [classes/:na]
	at com.ag04.jpaqueue.QueueConsumer.processQueuedItems(QueueConsumer.java:97) [classes/:na]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_201]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[na:1.8.0_201]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_201]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[na:1.8.0_201]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_201]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_201]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_201]
Caused by: java.lang.RuntimeException: I'm evil, that's the root cause!
	... 18 common frames omitted

2019-05-10 07:45:12.163  INFO 8415 --- [pool-1-thread-1] com.ag04.jpaqueue.QueueConsumer          : Retry for item by ID 316 scheduled for time: 2019-05-10T07:46:12.163

Notice the last line that informs about next attempt time when the retry has been scheduled (if any, maybe it’s the last attempt).

A quick look at database (SMS_MESSAGE table) during app runtime shows all relevant fields:

Of course, you can try restarting the service which will add new bunch of SMS messages to our queue, and naturally, if there are old ones that are still in pending state, they will continue to get processed according to their scheduled attempt time.


Now, with this extremely simple queueing setup you can do all sorts of things, something like following, and much, much more:

  • count number of pending items:
    SELECT COUNT(*) FROM sms_message WHERE next_attempt_time <> null
  • count number of pending items aggregated by domain-specific field (eg. fromAddress):
    SELECT from_address, COUNT(*) FROM sms_message WHERE next_attempt_time <> null GROUP BY from_address
  • re-schedule all failed items with retry limit reached, to execute as soon as possible:
    UPDATE sms_message SET next_attempt_time = CURRENT_TIMESTAMP() WHERE status = 'ERROR' AND next_attempt_time = null
  • remove pending items that failed too many times (say 50):
    DELETE FROM sms_message WHERE next_attempt_time <> null and attempt_count > 50
  • remove some specific invalid item which ended up in queue due to a bug:
    DELETE FROM sms_message WHERE ID = 34254642991
  • display failure descriptions for items which last processing attempt was after some specified time:
    SELECT last_error_attempt_message FROM sms_message WHERE status = 'ERROR' and last_attempt_time > to_timestamp('20190523', 'YYYYMMDD')

… and anything else that comes to your mind.

So tell me — how easy would be to implement all those mentioned features with some setup including popular queueing library/server?

To be continued …

And that’s it. In our next post we gonna see how to ensure that only single consumption job is executed in clustered environment, and how to add parallelism to improve the throughput.

Next

Blog

Practical queueing using SQL (Part 1): Rationale and general design

Company

Our people really love it here

How it all started

Est. in 2014., gathering eight employees with eyes set on the future. No matter how set they were, they couldn’t predict the success and extent of growth that would ensue. Today there are more than 100 of us, and people are here to stay.

Stability in unstable times

The turmoil of 2020 caused great inconvenience for people all over the world. However, this did not affect our business. Quite the opposite — we not only kept all jobs and salaries intact, but we also grew in size. And we keep expanding. 

Contact

We’d love to hear from you