Recently we had to solve a problem at work - we wanted to use AWS SNS topics to communicate in some of our services, but we also needed to selectively delay visibility/ processing of specific messages.

In order to get delayed messages while still using SNS we developed a simple microservice that listens to an SQS Queue, grabs messages, and publishes the message to the appropriate topic. Because SQS allows timeouts we can have producers set the appropriate timeout on messages and get exactly what we need.

The service works fairly well, but I like excuses to write rust, so I came up with an idealized version and started rewriting.

Just so you aren’t disappointed at the end, no this service is not in production.

Here is the Rust code for the project in its current state.

Service Goals

  • Never lose messages, avoid duplicating messages.

  • Attempt to use bulk APIs where possible to cut down on cost, and to improve performance, since this program will spend most of its time in IO.

  • Process as many messages as quickly as possible

  • Remain stable under unexpected conditions

And ideally the code should be reasonable and clean enough that if I were to ever try to get my company to deploy it my coworkers would have a simple enough time understanding how it worked. I care less about that part though, since I am unlikely to push this to production.

Actors In Rust

I decided early on that I would structure my code using actors. The work is mostly IO bound and I wanted to try to understand what patterns I would end up with while writing actor-oriented rust code.

I took some of my code form my actor crate and based the patterns off of there to start.

Essentially every structure with some business logic had an actor ‘wrapper’ for it that would just pass messages along to it. The API was typed and made concurrency trivial.

So we would have our business logic structure + impl: (Note that the code below is slightly abridged)


#[derive(Clone)]
pub struct MessageDeleter<SQ>
    where SQ: Sqs + Send + Sync + 'static,
{
    sqs_client: Arc<SQ>,
    queue_url: String,
}

impl<SQ> MessageDeleter<SQ>
    where SQ: Sqs + Send + Sync + 'static,
{
    pub fn new(sqs_client: Arc<SQ>, queue_url: String) -> MessageDeleter<SQ> {
        MessageDeleter {
            sqs_client,
            queue_url
        }
    }

    pub fn delete_messages(&self, receipts: Vec<(String, Instant)>) {
        let msg_count = receipts.len();
        println!("Deleting {} messages", msg_count);

        let mut receipt_init_map = HashMap::new();

        for (receipt, time) in receipts {
            receipt_init_map.insert(receipt, time);
        }

        let entries = receipt_init_map.keys().map(|r| {
            DeleteMessageBatchRequestEntry {
            id: format!("{}", uuid::Uuid::new_v4()),
            receipt_handle: r.to_owned()
        }
        }).collect();


        let req = DeleteMessageBatchRequest {
            entries,
            queue_url: self.queue_url.clone()
        };


        match self.sqs_client.delete_message_batch(&req) {
            Ok(res)   => {
                unimplemented!()
            },
            Err(e)  => {
                println!("Failed to deleted {} messages {}", msg_count ,e);
            }
        }
    }
}

A message for communicating to it:

pub enum MessageDeleterMessage {
    DeleteMessages {
        receipts: Vec<(String, Instant)>,
    },
}

A ‘route_msg’ function to destructure the message and pass it to the proper method:

    pub fn route_msg(&self, msg: MessageDeleterMessage) {
        match msg {
            MessageDeleterMessage::DeleteMessages { receipts } => self.delete_messages(receipts)
        }
    }

And then the actor interface, which packs function arguments into messages and passes them along.


#[derive(Clone)]
pub struct MessageDeleterActor {
    sender: Sender<MessageDeleterMessage>,
    receiver: Receiver<MessageDeleterMessage>,
    id: String,
}

impl MessageDeleterActor {
    pub fn new<SQ>(actor: MessageDeleter<SQ>) -> MessageDeleterActor
        where SQ: Sqs + Send + Sync + 'static,
    {
        let (sender, receiver) = unbounded();
        let id = uuid::Uuid::new_v4().to_string();
        let recvr = receiver.clone();

        thread::spawn(
            move || {
                loop {
                    if recvr.len() > 10 {
                        println!("MessageDeleterActor queue len {}", recvr.len());
                    }
                    match recvr.recv_timeout(Duration::from_secs(60)) {
                        Ok(msg) => {
                            actor.route_msg(msg);
                            continue
                        }
                        Err(RecvTimeoutError::Disconnected) => {
                            break
                        }
                        Err(RecvTimeoutError::Timeout) => {
                            continue
                        }
                    }
                }
            });

        MessageDeleterActor {
            sender: sender,
            receiver: receiver,
            id: id,
        }
    }
    
    pub fn delete_messages(&self, receipts: Vec<(String, Instant)>) {
        self.sender.send(
            MessageDeleterMessage::DeleteMessages { receipts }
        ).unwrap();
    }
}

And you have a pretty sweet, typed, async interface to your actor.

Still, there’s a lot of boilerplate involved, which is why my actor library attempts to generate most of the above with a macro. Unfortunately that library is not in a place for me to use it right now, and it’s breaking too often, so I used it to generate the boilerplate with ‘cargo expand’ and copy/pasted stuff over, making fixes where necessary.

At one point my actors used ‘fibers’ from the fibers crate, but I ran into performance problems and stability issues. I really wanted my actors to be cheap, so I attempted to use the futures crate, but I just couldn’t get it working.

Work Stealing Actors

In order to provide work stealing/ pooled actors, I added a function to pooled actors, called “with_queue”. Essentially this function would act exactly like ‘new’ but provide a queue with the arguments. So you could have many actors sharing the same queue.

Then a single ‘Broker’ actor would feed messages into that queue.

#[derive(Clone)]
pub struct MessageDeleterBroker
{
    workers: Vec<MessageDeleterActor>,
    sender: Sender<MessageDeleterMessage>,
    id: String
}

impl MessageDeleterBroker
{
    pub fn new<T, F, SQ>(new: F,
                         worker_count: usize,
                         max_queue_depth: T)
                         -> MessageDeleterBroker
        where F: Fn(MessageDeleterActor) -> MessageDeleter<SQ>,
              T: Into<Option<usize>>,
              SQ: Sqs + Send + Sync + 'static,
    {
        let id = uuid::Uuid::new_v4().to_string();

        let (sender, receiver) = max_queue_depth.into().map_or(unbounded(), channel);

        let workers = (0..worker_count)
            .map(|_| MessageDeleterActor::from_queue(&new, sender.clone(), receiver.clone()))
            .collect();

        MessageDeleterBroker {
            workers,
            sender,
            id
        }
    }

    pub fn delete_messages(&self, receipts: Vec<(String, Instant)>) {
        self.sender.send(
            MessageDeleterMessage::DeleteMessages { receipts }
        ).unwrap();
    }
}

The broker just generates a bunch of MessageDeleterActor structs using a shared queue among all of them. It then provides an identical interface to its workers.

Buffering Actors

Lastly, I needed a way to buffer deletes. In order to do this I stuck the broker behind a buffer actor. The buffer actor will receive delete requests, store them, and then flush its buffer when it hits its maximum capacity. A separate ‘BufferFlusherActor’ will periodically send flush commands so as to ensure that messages get deleted in a timely manner, even under low load.

pub struct MessageDeleteBuffer {
    deleter_broker: MessageDeleterBroker,
    buffer: ArrayVec<[(String, Instant); 10]>,
    flush_period: Duration
}

impl MessageDeleteBuffer {
    pub fn new(deleter_broker: MessageDeleterBroker, flush_period: u8) -> MessageDeleteBuffer
    {
        MessageDeleteBuffer {
            deleter_broker: deleter_broker,
            buffer: ArrayVec::new(),
            flush_period: Duration::from_secs(flush_period as u64)
        }
    }

    pub fn delete_message(&mut self, receipt: String, init_time: Instant) {
        if self.buffer.is_full() {
            println!("MessageDeleteBuffer buffer full. Flushing.");
            self.flush();
        }

        self.buffer.push((receipt, init_time));
    }

    pub fn flush(&mut self) {
        self.deleter_broker.delete_messages(Vec::from(self.buffer.as_ref()));
        self.buffer.clear();
    }

    pub fn on_timeout(&mut self) {
        if self.buffer.len() != 0 {
            println!("MessageDeleteBuffer timeout. Flushing {} messages.", self.buffer.len());
            self.flush();
        }
    }
}

pub enum MessageDeleteBufferMessage {
    Delete {
        receipt: String,
        init_time: Instant
    },
    Flush {},
    OnTimeout {},
}

#[derive(Clone)]
pub struct MessageDeleteBufferActor {
    sender: Sender<MessageDeleteBufferMessage>,
    receiver: Receiver<MessageDeleteBufferMessage>,
    id: String,
}

impl MessageDeleteBufferActor {
    pub fn new
    (actor: MessageDeleteBuffer)
     -> MessageDeleteBufferActor
    {
        let mut actor = actor;
        let (sender, receiver) = unbounded();
        let id = uuid::Uuid::new_v4().to_string();
        let recvr = receiver.clone();
        thread::spawn(
            move || {
                loop {
                    if recvr.len() > 10 {
                        println!("MessageDeleteBufferActor queue len {}", recvr.len());
                    }
                    match recvr.recv_timeout(Duration::from_secs(60)) {
                        Ok(msg) => {
                            actor.route_msg(msg);
                            continue
                        }
                        Err(RecvTimeoutError::Disconnected) => {
                            break
                        }
                        Err(RecvTimeoutError::Timeout) => {
                            println!("MessageDeleteBufferActor Haven't received a message in 10 seconds");
                            continue
                        }
                    }
                }
            });

        MessageDeleteBufferActor {
            sender: sender,
            receiver: receiver,
            id: id,
        }
    }

    pub fn delete_message(&self, receipt: String, init_time: Instant) {
        let msg = MessageDeleteBufferMessage::Delete {
            receipt,
            init_time
        };
        self.sender.send(msg).expect("All receivers have died.");
    }

    pub fn flush(&self) {
        let msg = MessageDeleteBufferMessage::Flush {};
        self.sender.send(msg).expect("All receivers have died.");
    }

    pub fn on_timeout(&self) {
        let msg = MessageDeleteBufferMessage::OnTimeout {};
        self.sender.send(msg).expect("All receivers have died.");
    }
}

impl MessageDeleteBuffer
{
    pub fn route_msg(&mut self, msg: MessageDeleteBufferMessage) {
        match msg {
            MessageDeleteBufferMessage::Delete {
                receipt,
                init_time
            } => {
                self.delete_message(receipt, init_time)
            }
            MessageDeleteBufferMessage::Flush {} => self.flush(),
            MessageDeleteBufferMessage::OnTimeout {} => self.on_timeout(),
        };
    }
}

In my tests, under load, the Buffer almost always flushes a full 10 messages. There’s room to improve with the buffer strategy, such as resetting the Flusher’s timer if we flush due to a full buffer, but so far this is already a massive improvement over using the non-bulk API.

The rest of the code uses very similar patterns. Some other particularly interesting patterns are my MessageStateManager, which is an actor that maintains a messages state in SQS (Invisible, Deleted, etc) as well as the AutoScalingActor, which will emit messages to brokers letting them know when to scale up or down.

The Good Stuff

The design has worked very well in my tests, and rust facilitated that design. With some work I would have a lot of confidence in the correctness and stability of my code.

The actual processing of data is fast, and because of rust’s explicit nature if I ever need to make it faster I can audit for low hanging fruit very easily via .clone() calls.

There’s room for optimization around message passing. Rust’s strong type system allows me to eventually implement 0 copying of messages.

Serde is awesome. I used it for deserialization and it was a breeze.

Actors in rust actually worked pretty well, I’m happy with how the code looks and feels.

The community was very helpful. I got a lot of help from a lot of people, usually within minutes of asking for help.

Problems

One problem I ran into with this was enforcing ordering of events. I used to not guarantee that message deletes happened before the cancellation of timeouts, which led to errors when trying to extend the timeout of a deleted message.

That’s where the MessageStateManager came in - when a message was done being processed, you told the state manager, and it enforced the ordering of events. This is all based on causal ordering - the guarantee that if I place a message A on a queue, and then message B, A will arrive before B. This allowed me to ensure that as long as I handled visibility timeouts and deletes via the same queue (or series of queues) I could guarantee ordering of events.

This is simply a pattern that I had to internalize, not really any fault of rust. I have considered how to avoid this issue, and it likely just means writing a spec for the service before hand and then using types to enforce it - there are at least a few types that can be broken ou tinto session types.

I also ran into a few problems with the AWS SDK in rust - rusoto. While I’m super happy to see the progress that it’s made over the course of its development it’s still got a few issues that need sorting out before I could use it in production.

In particular, the clients don’t support timeouts or automatic retry policies. So sometimes calls to the services will take 20+ seconds and then time out. I actually created a macro to provide timeouts, but it’s a hack and I’d really want native support.

macro_rules! timeout_ms {
    ($pool:expr, $closure:expr, $dur:expr, $timer:expr) => {
        {
            let timeout = $timer.sleep(Duration::from_millis($dur))
                .then(|_| Err(()));
            let value = $pool.spawn_fn($closure);
            let value_or_timeout = timeout.select(value).map(|(win, _)| win);
            value_or_timeout.wait()
        }
    };
}

This macro just takes a function and executes it or times out. The problem is that, as far as I am aware, it isn’t able to actually stop the execution of the function after it’s timed out. So I could time out, and then the call could succeed later. This is definitely not acceptable for an SNS publish, as we want to avoid double publishing as much as possible.

This is honestly the number one blocker - and thankfully there’s progress being made. In particular, hyper .11 support in rusoto will allow some of these features to be built.

Ultimately, however, I was quite happy with the experience. I’m going to continue to improve the application at least in part because it’s been so much fun coming up with fun patterns in rust, particularly with actors.

I’m surprised by how close Rust is to being production ready for my use case. Some work on Rusoto, and ideally some better async support in general, and I could start making the case to productionalize this.



blog comments powered by Disqus

Published

10 July 2017

Categories