Blogs

Practical queueing using SQL (Part 3): Clustering and parallelism

Theme
Software development

Part 1: Rationale and general design
Part 2: Do it simply using Spring Boot and JPA
Part 3: Clustering and parallelism


Inprevious posts we mentioned there should only be one consuming job running in whole system, but we didn’t care much for it because we started whole application in single node setup. But frequently, we want to have our applications in multi-node setups for better availability, so we need to make some changes to ensure that consuming job is only active on a single node.

Also, single thread consumption can prove too slow for our needs, so it would be nice to utilize all those threads we have at our disposal to increase consumption throughput.

Leader-elected consumption

Until now, we simply started our consuming job in initialization method of QueueConsumer (@PostConstruct) and respectively closed it in destroy method (@PreDestroy), but now, we would like to start this job only if the node is elected as a cluster leader. This granting of cluster leadership can happen in any moment during run-time, not just during startup. Likewise, such a cluster node can lose a leadership any moment (when stopped, or when database connection fails, or whatever), so we have to stop the consuming job in such scenarios.

There are couple of options available in Java land on how to implement the leader election mechanism, but we will use a simple one that uses database table for cluster node coordination — Spring Integration’s LockRegistryLeaderInitiator configured with JdbcLockRegistry.

As always, I like the idea to reuse existing piece of technology (SQL database here) to realize some new feature. Or to be more precise, we’re also introducing new tech here, but in form of additional libraries and not as new external service. And I consider remote dependencies usually far heavier tech addition.

So, let’s add 2 extra dependencies to our build script:

implementation 'org.springframework.integration:spring-integration-core'
implementation 'org.springframework.integration:spring-integration-jdbc'

And add a few more Spring beans to application configuration from these new libraries:

public class SmsQueueingApplication {
// ...

	@Bean
	public DefaultLockRepository lockRepository(DataSource dataSource) {
		return new DefaultLockRepository(dataSource);
	}

	@Bean
	public JdbcLockRegistry lockRegistry(LockRepository lockRepository) {
		return new JdbcLockRegistry(lockRepository);
	}

	@Bean
	public LockRegistryLeaderInitiator leaderInitiator(LockRegistry lockRegistry) {
		return new LockRegistryLeaderInitiator(lockRegistry);
	}
}

Also, we need to add one additional table to our database schema, which we implemented using Spring Boot’s src/main/resources/custom-schema.sqlfile which will get executed on each application start. Here’s the DDL statement for this table taken from Spring Integration JDBC library:

CREATE TABLE IF NOT EXISTS INT_LOCK  (
  LOCK_KEY CHAR(36) NOT NULL,
  REGION VARCHAR(100) NOT NULL,
  CLIENT_ID CHAR(36),
  CREATED_DATE TIMESTAMP NOT NULL,
  constraint LOCK_PK primary key (LOCK_KEY, REGION)
);

So now that we have everything in place, Spring will emit OnGrantedEvent / OnRevokedEvent events when leadership is granted / revoked. We utilize this in our QueueConsumer class to start / stop our consuming job:

@EventListener(OnGrantedEvent.class)
	public void onGrantedEvent() {
		logger.info("Granted leadership");

		startProcessingTask();
	}

	@EventListener(OnRevokedEvent.class)
	public void onRevokedEvent() {
		logger.info("Revoked leadership");

		stopProcessingTask();
	}

Trying it out

You can see this mechanism in action by downloading whole code from this branch:

https://bitbucket.org/ag04/smsqueueing/src/clustered-single-consumer/

Start multiple application processes and observe the effect. Upon start, each node will execute SMS production code, but only the leader node will consume the queue (other one will remain passive). Naturally, the leader will initially be the first node that was started, but if you stop this node, you can observe how the leadership is transferred to one of other running nodes which results in consuming job starting on new leader.

Simple parallel consumption

As promised, now we will show you how to utilize all those threads you have on your machine to improve the consumption throughput by employing parallel mode of work.

There is one way to implement the parallelism which is always my first take due to its extreme simplicity. The idea is to have original consuming job thread to only fetch pending queue items from database table, and leave their processing to new multi-thread executor which will utilize those worker threads for better consumption throughput. Of course, original polling job must block until all items have been processed (successfully or not) by this new executor. So basically, we’re now taking items from one persistent queue (database table) and put them into another in-memoryqueue (multi-thread executor).

QueueConsumer creates this new “multi-thread processing” Executor in its constructor and later uses it in new item processing code:

public QueueConsumer(
		QueueConsumerModule<?> queueConsumerModule,
		RetryPolicy retryPolicy,
		PlatformTransactionManager transactionManager,
		int polledItemsLimit,
		long pollingPeriodInSecs,
		int processingThreadCount
) {
  // ...
	this.processingExecutorService = Executors.newFixedThreadPool(processingThreadCount);
}
  
public void processQueuedItems() {
  try {
    LocalDateTime now = LocalDateTime.now();
    List<?> itemIds = this.queueConsumerModule.findItemIdsWhereQueueingNextAttemptTimeIsBefore(now, itemsPollSize);

    if (!itemIds.isEmpty()) {
      logger.info("Fetched {} pending queued items", itemIds.size());
      List<Callable<Object>> itemProcessingCallables = constructItemProcessingCallables(itemIds);
      this.processingExecutorService.invokeAll(itemProcessingCallables);
    }
  } catch (Throwable th) {
    logger.error("Error while fetching queued items: " + th.getMessage(), th);
  }
}

private List<Callable<Object>> constructItemProcessingCallables(List<?> itemIds) {
  return itemIds.stream()
      .map(itemId -> Executors.callable(() -> processItemAndHandleErrorIfRaised(itemId)))
      .collect(Collectors.toList());
}

We have new processingThreadCount argument in the constructor which defines number of processing threads. Main line here is calling ExecutorService.invokeAll(…callables..) which blocks until all submitted processing tasks are completed, thus next poll won’t happen until every item has been processed.

The code for this simple parallelism is available at:

https://bitbucket.org/ag04/smsqueueing/src/simple-parallel-consumer/

Build the code, start the app, and you can notice in logs outputs how SMS sending is now executed with multiple different threads (concretely 5 in this example), and the original polling job thread is just fetching the messages from database.

Minor issue with this implementation is that when number of unprocessed messages left in polled batch falls below number of worker threads in the executor, some workers are left waiting until the last message is processed, but I reckon this is usually not a biggie.

Multiple independent queues

As mentioned, previous implementation of parallel processing is really my favorite way to increase the throughput, but sometimes you don’t want processing of some messages to depend on other ones, meaning, we would like to have multiple queues that are consumed fully independently from each other.

We will still have single database table to persist all queued items, but item records will be treated fully separately depending which “queue” (better term would be “partition”, or “group”) the item belongs to — not only they will be processed by separate thread as in previous example, but this same “partition consuming thread” will be used to poll pending items belonging only to its partition.

Now, this begs a question — what is criteria for message partitioning (grouping)?

Small fixed set of partitioning values

In our SMS example, let’s say that source address, “fromAddress” property in SmsMessage, is one of few SMS campaign short codes, meaning, there is small fixed number of them. And we want messages belonging to same source being queued independently from messages belonging to other source addresses. Of course, it’s important that amount of such message sources is not too big because we will start separate consumer thread for each partition, and each new thread introduces a burden to a system.

Naturally, QueueConsumer should somehow be provided with complete set of available “fromAddress” values, say via constructor. One more thing that would get affected is QueueConsumerModule due to introduction of new “fromAddress” filter argument when fetching pending items:

public interface QueueConsumerModule<ID> {
	List<ID> findItemIdsWhereQueueingNextAttemptTimeIsBefore(String fromAddress, LocalDateTime time, int limit);
  // ...
}

Also, don’t forget to replace old “next_attempt_time” index to include now this new “fromAddress” column since we will be querying the table using both columns.

Now this generic interface is not so generic anymore because it contains domain-specific argument, so that’s why I stated in the beginning to not rush with having generic mini-framework for anything because it can make more burden than just copy-pasting per-case basis and tailoring it to your needs. Or to quote popular line by Sandi Metz from her RailsConf 2014 talk:

duplication is far cheaper than the wrong abstraction

For sake of brevity, I won’t provide you with complete code that would deal with this domain-specific queue partitioning based on fromAddress, but almost the same kind of logic is present in next chapter so you can easily get it from there and adapt.

Big set of partitioning values

Say we want to have independent queueing for SMS messages that have same “toAddress” value. It’s clear that we cannot start separate consumer job for these addresses because destinations are unknown and their amount can be considered practically unlimited. But we still want that SMS messages with same destination always end up in same queue.

This can be achieved by partitioning potentially infinite space of possible destinations into limited set of partitions. One laughingly simple way to do that is called “hash-mod” partitioning — we simply convert partitioning value (destination address) into number by calculating its hash, and then calculating modulo of such hash with fixed number of partitions:

int partition = toAddress.hashCode() % this.partitionCount;

In practical terms, we don’t know what destination address will end up in what partition, but we can be sure it will always be the same partition. Likewise, because of this “random” grouping, some partitions will hold more messages than others, but for large number of partitioning values in play (here SMS destination address) this will be fairly even, meaning, partitions will be well balanced.

We need of course to have this partition available for SQL querying, so we will store it as additional field when constructing our queued entity (SmsMessage):

@Entity
@Table(name = "sms_message", indexes = @Index(name = "idx_sms_msg_queue_polling_fields", columnList = "next_attempt_time,partition"))
public class SmsMessage {
  
// ...
  
  @Column(name = "partition")
  private int partition;
  
// ...
  
}

Notice how our index definition changed to include also “partition” column now, beside “next_attempt_time”.

Similar to previous case with small number of partitioning values, we need to include this new field in SQL query for pending item filtering:

public interface QueueConsumerModule<ID> {
	List<ID> findItemIdsWhereQueueingNextAttemptTimeIsBefore(int partition, LocalDateTime time, int limit);

// ...
}

And now we come to QueueConsumer that starts preconfigured set of consumer jobs (threads), each for its own partition:

public QueueConsumer(
    QueueConsumerModule<?> queueConsumerModule,
    RetryPolicy retryPolicy,
    PlatformTransactionManager transactionManager,
    int polledItemsLimit,
    long pollingPeriodInSecs,
    int partitionCount
) {
// ...
  this.scheduledExecutorService = Executors.newScheduledThreadPool(partitionCount);
}

// ...

private void startProcessingTasks() {
  logger.info("Starting {} queue polling tasks with delay of {} secs", partitionCount, pollingPeriodInSecs);

  Set<ScheduledFuture<?>> tasks = new HashSet<>();
  for (int i = 0; i < partitionCount; i++) {
    final int partition = i; // lambda requires 'final' variable, and loop requires it to be incremented, so we have to do it this way
    Runnable command = () -> processQueuedItems(partition);
    ScheduledFuture<?> task = this.scheduledExecutorService.scheduleWithFixedDelay(command, pollingPeriodInSecs, pollingPeriodInSecs, TimeUnit.SECONDS);
    tasks.add(task);
  }

  this.processingTasks = tasks;
}

This version is available at:

https://bitbucket.org/ag04/smsqueueing/src/master/

NOTE: the code for this implementation requires database change compared to previous chapters, so you should drop and recreate the DB before trying this code.

As before, build it and see it in action. Notice how individual consuming threads, each belonging to its own partition, is fetching and processing its own pending messages.


And that’s it, for now at least. I hope you enjoyed this mini-series and will find these simple SQL-table-based queue designs valuable.


Code:

Next

Blog

Developing CLI application with Spring Shell (part 5)

Company

Our people really love it here

How it all started

Est. in 2014., gathering eight employees with eyes set on the future. No matter how set they were, they couldn’t predict the success and extent of growth that would ensue. Today there are more than 100 of us, and people are here to stay.

Stability in unstable times

The turmoil of 2020 caused great inconvenience for people all over the world. However, this did not affect our business. Quite the opposite — we not only kept all jobs and salaries intact, but we also grew in size. And we keep expanding. 

Contact

We’d love to hear from you