APIIn-queue reader

In-queue reader

An in-queue reader reads messages from one or more in-queues, and passes them to a delegate for further processing.

Instance creation

InQueueReader inQueueReader = new InQueueReader(keyPair, delegate);
var inQueueReader = new cn.InQueueReader(keyPair, delegate);
my $inQueueReader = CN::InQueueReader->new($keyPair, $delegate);

Creates an in-queue reader for reading messages addressed to the key pair.

Reading in-queues

inQueueReader.read(accountWithKey);
inQueueReader.read(accountWithKey);
$inQueueReader->read($accountWithKey);

Starts reading the in-queue of account with key, unless a read operation for that account is already ongoing.

A read operation first lists the in-queue. If listing fails, onInQueueListFailed is called on the delegate, and the read operation stops immediately.

The reader then processes each message. Up to approximately 4 messages are processed in parallel.

For each message, the envelope is retrieved from the store and parsed. If the envelope is complete, onInQueueVerifyStore is called on the delegate to resolve the sender's store. If the delegate returns nullundef, the message is treated as invalid.

Otherwise, the sender's public key is retrieved from the store, the signature is verified, the AES key decrypted, and the content retrieved.

Once everything is ready onInQueueMessage is called for further processing.

When a read operation starts, on the delegate, indicating the store. Similarly, onInQueueReadingStarts is called when the

boolean isReading = inQueueReader.isReading(store);
var isReading = inQueueReader.isReading(store);
my $isReading = $inQueueReader->isReading($store);

Indicates whether a read operation for the indicated store is ongoing.

Implementing a delegate

		@Override
		public boolean onInQueueVerifyStore(HashReference source, Record envelope, String storeUrl) {
			// Allow HTTP stores
			return storeUrl.startsWith("http://") || storeUrl.startsWith("https://");
		}

		@Override
		void onInQueueMessage(ReceivedMessage message) {
			// Process the message, and call one of the following
			message.processed();
			message.remove();
			message.failed(error);
			message.postpone(Duration.MINUTE * 10);
		}

		@Override
		void onInQueueMessageInvalid(HashReference source, String reason) {
			// An invalid message was found. Such messages are deleted.
			// This could be shown on the user interface, or logged.
		}

		@Override
		void onInQueueMessageFailed(HashReference source, String error) {
			// A message could not be retrieved due to a store error.
			// Such messages are put on hold, and will be tried later.
			// This could be shown on the user interface, or logged.
		}

		@Override
		void onInQueueListFailed(AccountWithKey accountWithKey, String error) {
			// Listing failed. This could be shown on the user interface.
		}

		@Override
		void onInQueueReadingDone(AccountWithKey accountWithKey) {
			// All messages have been read. This could be shown on the user interface.
		}