I’ve been continuing work on my rust actor library, which macro-magically turns synchronous structs into asynchronous actors. (Note that all examples are using branches of that project and not Master).

For example, here’s a struct that stores a closure and, upon its ‘complete’ method being called, executes that closure.


pub struct CompletionHandler<F>
    where
        F: Fn() + Send + Sync + 'static + Send,
{
    self_ref: CompletionHandlerActor,
    system: SystemActor,
    f: F,
}

#[derive_actor]
impl<F> CompletionHandler<F>
    where
        F: Fn() + Send + Sync + 'static,
{
    pub fn complete(&self) {
        (self.f)();
    }

}

Note the #[derive_actor] bit. This generates (some bits omitted) something like this:


pub struct CompletionHandlerActor {
    sender: ::channel::Sender<CompletionHandlerSystemMessage>,
    ref_count: ::std::sync::Arc<()>,
    pub id: ::std::sync::Arc<String>,
}

impl CompletionHandlerActor {
    pub fn complete(&self) {
        let msg = CompletionHandlerMessage::CompleteVariant {};
        let msg = CompletionHandlerSystemMessage::Inner(msg);
        let _ = self.sender.send(msg);
    }
}

I didn’t have a good sense of how this would actually be in practice. I’ve used a similar approach elsewhere but without the macro, and I diverged from the patterns that the macro enforced (a single queue per actor, for example). My last project that used actors also became fairly large in terms of just lines of code due to all of the ‘actor’ boilerplate everywhere - I was very curious to see if the macro helped here.

Spam Detection

I set out to build a simple program. In my talk at Rustconf Suchin and I brought up where Rust was particularly strong and not much has changed since then. Rust is great for feature extraction and cleaning, not as great for investigation, with (unmet) potential for modelling. This meant I’d need some not-rust code. Since actors model everything as services they felt well suited for a project that would involve multiple languages.

The basic premise was to use machine learning, specifically around NLP to start with, to determine if a given email is malicious.

You can see the code for it here (note that it is not complete, every prediction is hardcoded to ‘false’)

Actor Construction

As I built the service I developed the derive_actor library further. One of the first changes was changing how actors are constructed. Any StructNameActor::new(args) will require a closure that is used to initialize the struct.

Here’s how you initialize a PythonModelActor - an actor that interfaces with a background Python service.

    let python_model =
        move |self_ref, system| PythonModel::new(self_ref, system, "./model_service/service/prediction_service.py".into());
    let python_model_actor = PythonModelActor::new(python_model, system.clone(), Duration::from_secs(30));

We create our python_model, whic his a closure that takes a PythonModelActor, a SystemActor, and returns a PythonModel. PythonModelActor::new uses this to construct and route messages to the PythonModel.

With Depency Injection by Construction starting up actors/services ends up looking a lot like this:

    let prediction_cache =
        move |self_ref, system| PredictionCache::new(self_ref, system);
    let prediction_cache = PredictionCacheActor::new(prediction_cache, system.clone(), Duration::from_secs(30));

    let mail_parser =
        move |self_ref, system| MailParser::new(self_ref, system);
    let mail_parser = MailParserActor::new(mail_parser, system.clone(), Duration::from_secs(30));

    let sentiment_analyzer =
        move |self_ref, system| SentimentAnalyzer::new(self_ref, system);
    let sentiment_analyzer = SentimentAnalyzerActor::new(sentiment_analyzer, system.clone(), Duration::from_secs(30));

    let python_model =
        move |self_ref, system| PythonModel::new(self_ref, system, "./model_service/service/prediction_service.py".into());
    let python_model = PythonModelActor::new(python_model, system.clone(), Duration::from_secs(30));

    let model =
        move |self_ref, system| Model::new(self_ref, system, python_model.clone());
    let model = ModelActor::new(model, system.clone(), Duration::from_secs(30));

    let extractor =
        move |self_ref, system|
            FeatureExtractionManager::new(mail_parser.clone(), sentiment_analyzer.clone(), self_ref, system);
    let extractor = FeatureExtractionManagerActor::new(extractor, system.clone(), Duration::from_secs(30));

    let service =
        move |self_ref, system| SpamDetectionService::new(
            prediction_cache.clone(),
            extractor.clone(),
            model.clone(),
            self_ref,
            system
        );

    SpamDetectionServiceActor::new(service, system.clone(), Duration::from_secs(30))

It almost makes me miss Spring.

Panic Handling

I also realized that I wanted to handle panics. A typical Rust service, such as one built on Hyper, would likely handle panics at the service’s top level. But since Actors are their own little services they need to be able to handle panics individually. So every struct that derives an Actor must now implement an on_error method. Error handling is still very much something I’m trying to work through.

Here’s an example of on_error being used in my SentimentAnalyzerActor.


    fn on_error<T>(&mut self,
                   err: Box<std::any::Any + Send>,
                   msg: SentimentAnalyzerMessage,
                   t: Arc<T>)
        where T: Fn(SentimentAnalyzerActor, SystemActor) -> SentimentAnalyzer + Send + Sync + 'static
    {
        match msg {
            SentimentAnalyzerMessage::AnalyzeVariant{
                phrase, res
            } => {
                res(Err(
                    ErrorKind::UnrecoverableError(
                        "An unexpected error occurred in sentiment analyzer".into()).into())
                );

            },
        };
    }

I wasn’t really sure what information would be useful in an on_error method so I just went with as much as I could.

  • err - This is the value that was produced by the panic
  • msg - This is the message that caused the panic
  • t - This is the closure that was originally used to construct the SentimentAnalyzer

In this case all I do is send ‘res’ (a callback) an UnrecoverableError message, which will instruct the service not to retry processing for this message. I could also inspect err to see if it’s an error I could recover from, but this suited me fine.

I’ve considered whether actors should even continue to live if an error has occurred. So far, they do - the actor’s loop will continue.

It also means that all Message types must be Clone. This is usually trivial to implement since everything tends to be Arc’d, but it’s worth keeping in mind. Every message is cloned, whether you have an error not. This kiiiinda sucks, because it means handling errors impacts the performance of non-erroneous messages. But I’ve avoided thinking about it, so it isn’t a problem yet.

To ensure that my service works in the presence of unrecoverable failures I’ve started adding these to actors:

type SentimentResponse = Arc<Fn(Result<Analysis>) + Send + Sync + 'static>;

#[derive_actor]
impl SentimentAnalyzer {
    pub fn analyze(&self, phrase: String, res: SentimentResponse) {

        random_panic!(10);
        random_latency!(10, 20);
        let analysis = analyze(phrase);

        res(Ok(analysis));
    }
}

random_panic! is a 1 in n chance to panic. random_latency! is a 1 in n chance to sleep for k milliseconds.

You’ll notice I type alias’d the SentimentResponse - this is partly due to it being a large type, but mostly due to my macro being unable to handle types with multiple segments eg: Box resolves to Box in generated code. I haven't felt the need to fix this yet because it's actually forced me to avoid vague types like `Box` and write things like:

type EmailBytes = Vec<u8>;

This comes in particular handy when I need to export that type and change it later.

Assume Failure

One of the nice things with actors is that they can be treated as entirely separate services, even moved to another process or system. This boundary forces you to assume that the actor may fail. I may not get a response back, I may get an error back.

SentimentAnalyzer.analyze(..) can never fail, other than bugs - there’s no IO going on and the crate I use doesn’t expose a Result. Still, my SentimentAnalyzerActor’s SentimentResponse contains a Result. I found that using Result for all Actor callbacks led to life being easier later, as code changed and suddenly there were error conditions. If I wanted to move SentimentAnalyzer to another process none of the code that interacts with it would have t ochange.

Separate Processes

I mentioned earlier that I’d have a Python service doing some of the machine learning work. The code is trivial using Flask:

@app.route('/predict/<string:csv_features>')
def predict(csv_features):
    print(csv_features)
    csv_features = StringIO(csv_features)
    features: pd.DataFrame = pd.read_csv(csv_features, names=['a','b','c'])

    p = forest.predict(features)
    print(p)
    return str(forest.predict(features)[0])


@app.route('/health_check')
def health_check():
    return "UP"

This simple API gets an Actor mirror on the Rust side, in the PythonModel struct.

pub struct PythonModel {
    self_ref: ModelActor,
    system: SystemActor,
    python: Child,
    client: Client,
    port: u16,
    path: PathBuf
}

#[derive_actor]
impl PythonModel {
    pub fn predict(&mut self, features: Features, res: Prediction) {
        println!("poython pred");
        if let Err(e) = self.client.get(&format!("http://127.0.0.1:{}/predict/1,2,3", self.port)).send() {
            res(Err(ErrorKind::RecoverableError(
                format!("Failed to predict {}", e).into())
                .into()));
        } else {
            res(Ok(false));
        }
    }
}

The code is clearly incomplete as I only check for errors, otherwise returning a hardcoded prediction. But looking at the code it’s very simple to model the separate Python process as an actor in the Rust code. This is a big win because I’m far more comfortable with Pandas and sklearn than anything in Rust.

Callbacks

The callback aspect of actors is clearly prevalent in the codebase. If ActorA sends ActorB a message, because actors are strongly typed, ActorB does not know how to respond. ActorA could implement an interface and send itself to ActorB but generic actors aren’t very pretty right now and it seems silly to have to implement ‘HandlesXResponse’ and ‘HandlesYResponse’ 1000 times. Instead, Callbacks serve as a translation mechanism for sending messages to where they need to go.

#[derive_actor]
impl SpamDetectionService {
    pub fn predict_with_cache(&mut self, email: EmailBytes, res: PredictionResult) {
        let self_ref = self.self_ref.clone();
        let res = res.clone();
        let email = email.clone();

        let hash = SpamDetectionService::hash_email(email.clone());

        self.prediction_cache
            .get(hash, std::sync::Arc::new(move |cache_res| {
                match cache_res {
                    Ok(Some(hit)) => {
                        println!("pred cache hit");
                        res(Ok(hit));
                    }
                    _ => self_ref.clone().predict(email.clone(), res.clone())
                };
            }));
    }

    pub fn predict(&self, email: EmailBytes, res: PredictionResult) {
        let self_ref = self.self_ref.clone();
        let model = self.model.clone();

        self.extractor.extract(email, std::sync::Arc::new(move |features| {
            match features {
                Ok(data) => {
                    model.clone().predict(data, res.clone())
                }
                Err(e) => {
                    res(Err(e));
                }
            };
        }));
    }
    ///
}

Here’s an example of using callbacks to route a message. The SpamDetectionService queries the PredictionServiceCacheActor, and then routes its response. If we get a hit we short circuit the computation, sending res the immediate result.

Otherwise, we use a reference to our SpamDetectionServiceActor (self_ref) and send it a message to perform the prediction.

Closures can get a bit large. I tried to avoid any logic in them, only using them as ‘routes’ or translations between actors. If I had a nested closure I made an effort to refactor.

Here’s an example of a larger closure, though as you can see its only goal is to route a message:

        worker.predict(work, Arc::new(move |p| {
            match p {
                Ok(p) => {
                    completion_handler.success();
                }
                Err(ref e) => {
                    match *e.kind() {
                        ErrorKind::RecoverableError(ref e) => {
                            completion_handler
                                .retry(Arc::new(ErrorKind::RecoverableError(e.to_owned().into())));
                        }
                        ErrorKind::UnrecoverableError(ref e) => {
                            completion_handler
                                .abort(Arc::new(ErrorKind::UnrecoverableError(e.to_owned().into())));
                        }
                        ErrorKind::Msg(ref e) => {
                            completion_handler
                                .retry(Arc::new(e.as_str().into()));
                        }
                        _ => {
                            completion_handler
                                .retry(Arc::new("An unknown error occurred".into()));
                        }
                    }
                }
            };
            res(p);
        })
        );

I found it fairly simple to keep closures small and simple. If they grew too large I realized quickly that I had some other issue with the structure of my codebase.

Work Stealing, Back Pressure

One thing I ran into early on was that I was pushing data to the service faster than it could handle it. I found a few patterns for dealing with that, mostly based on work stealing and completion handlers. Whatever produces work for the SpamDetectionServiceActor also provides a CompletionHandlerActor. This actor is very simple:

pub struct CompletionHandler<F>
    where F: Fn(CompletionStatus) + Send + Sync + 'static + Send
{
    self_ref: CompletionHandlerActor,
    system: SystemActor,
    f: F,
    tries: usize
}

#[derive(Debug, Clone)]
pub enum CompletionStatus {
    /// Processed successfully
    Success,
    /// A transient error occurred
    Retry(CloneableError, usize),
    /// An unrecoverable Error occurred
    Abort(CloneableError)
}

#[derive_actor]
impl<F> CompletionHandler<F>
    where F: Fn(CompletionStatus) + Send + Sync + 'static
{
    pub fn success(&self) {
        (self.f)(CompletionStatus::Success);
    }

    pub fn retry(&self, e: CloneableError) {
        (self.f)(CompletionStatus::Retry(e, self.tries + 1));
    }

    pub fn abort(&self, e: CloneableError) {
        (self.f)(CompletionStatus::Abort(e));
    }
}

When work is completed it forwards the result to the producer. The producer can then determine whether to schedule new work for an available actor, retry the last message again, or give up.

Type Errors

There are definitely some sore spots when using this library.

First, and probably most painfully, is fixing type errors. When I get a type error the message comes from nowhere.

error[E0618]: expected function, found `()`
  --> src/main.rs:45:1
   |
45 | #[derive_actor]
   | ^^^^^^^^^^^^^^^

Not great. A combination of cargo-expand, disabling the macro, and walking away from the computer a lot are what I found most effective for fixing errors given this limitation.

IDE Support

Also, no intellij completion for Actor’s. So if I write some_actor.f intellij won’t help me complete it to some_actor.foo(). I found that first writing code with a non-actor version, then just adding Actor to the end of the type name where it’s used actually worked well but that’s hardly a smooth experience.

Scheduling

I’ve referenced the SystemActor a few times. That actor, on some branches of code, is the scheduler. It’s used to spawn actors using rust’s Futures crate. I have a basic proof of concept working but it’s not nearly ready. This means that all of my actors are based on OS threads. I haven’t had any issues with this - performance is fine, I can handle thousands of emails in seconds, even querying out to some Python service, with hardcoded latency in some actors, and failure conditions.

But it’s not ideal.

Overall

I’ve really enjoyed working on the service. Modelling individual components as actors has allowed me to refactor, parallelize, and ensure stability in the code. It’s not ideal and it’s not production ready but I’m having fun, and I’m getting a lot of work done on the service.



blog comments powered by Disqus

Published

09 October 2017

Categories