This article describes how to use ZeroMQ for RPC calls to internal services. HTTP is the canonical choice for public facing services. But for RPC to internal services in systems composed of many small parts, you're probably better off using ZeroMQ instead of HTTP.
In summary, the benefits ZeroMQ offers over HTTP are:
- Multiple concurrent RPC calls over the same TCP connection. With HTTP, you're limited to sequential calls (keep-alive). This is a big one.
- No manual connection management. This isn't impossible to do with HTTP, and is mostly a library problem, but AFAIK few libraries exist that lets you do this right with HTTP.
- Supports multiple server processes responding to requests. No HTTP load balancer needed.
- No manual retry handling. Can stop the server, do a RPC, then start the server, then get a response - the message is queued, instead of delivered directly.
Let's dive in!
A very quick Clojure interop tutorial
Clojure's Java interop might not be intuitive at first glance, so here's a quick explanation.
;; Quick Clojure tutorial, with Java equivalents
;; Java method call on string instance. Returns "A STRING"
(.toUpperCase "a string")
"a string".toUpperCase()
;; Java method call with one argument. Returns byte[].
(.getBytes "a string" "UTF8")
"a string".getBytes("UTF8")
;; Calls the constructor and creates a new instance of Point
(Point. 15 21)
new Point(15, 21)
;; Create a new thread and start it
(.start (Thread. (fn [] ...)))
Thread t = new Thread(aClojureFunctionHere)
t.start()
A ZeroMQ tutorial
ZeroMQ is badly too cleverly named. It's not a message queue, there's no server or broker you need to download and run. In fact, there are no centralized components in ZeroMQ at all. So in a way, ZeroMQ is an apt name: there's zero message queue in ZeroMQ. So what is it, then?
ZeroMQ is a networking library. You create sockets of different types (REQ, REP, DEALER, ROUTER, PUB, SUB, ...), connect these sockets to each other in various interesting ways, and send messages to these sockets. The type of socket determines how the messages are routed and delivered.
How to connect the sockets and how to use them for RPC instead of HTTP, is what this blog post is covering.
Don't think of ZeroMQ as an alternative to ActiveMQ or RabbitMQ, think of ZeroMQ as a nuculear powered mutant ninja networking and sockets library.
Iteration 1: requests and replies, no concurrency
This first example is something you would never do in production code, since it has no concurrency. But it demonstrates some important ZeroMQ concepts in a small and isolated snippet, on which we will improve iteratively until it does what we want.
We will create two sockets. One is the REQ (request) socket. ZeroMQ sockets has different APIs depending on their type. If you try to send two messages in a row from a REQ socket, you'll get an error. You have to send, then wait for a reply, until you can send again.
The other socket is a REP (reply) socket. It has a similar but mirrored API. If the first thing you do on a REP socket is to send data, you'll get an error. You have to start by waiting for a message, and only when you receive one you send a message. Then you have to wait for another message.
This API makes it easy to do RPC, as ZeroMQ will handle all the details for us. All we need to do is to wait for a response and reply on REP, and send a request and wait for a response on REQ.
Evidently, a single ZeroMQ REQ or REP socket has no concurrency. You can only do one request or one reply at a time. We'll get into concurrency later.
;; Very small REQ/REP setup, to demonstrate the basics.
;; One request at a time, no concurrency.
(.start
(Thread.
(fn []
(let [sock (.socket (ZMQ/context 1) ZMQ/REP)]
(.bind sock "tcp://127.0.0.1:1337")
(while true
;; Block until we receive a message
(let [req (.recv sock 0)]
;; req is a byte[]. Do whatever you want with it!
;; We echo the req back to the client.
(.send sock (.getBytes (str (String. req) " - echoed!")) ZMQ/NOBLOCK)))))))
(.start
(Thread.
(fn []
(let [sock (.socket (ZMQ/context 1) ZMQ/REQ)]
(.connect sock "tcp://127.0.0.1:1337")
(dotimes [n 5]
;; Perform a request
(.send sock (.getBytes (str "Hello, " n)) 0)
;; Block until we receive a response
(let [res (.recv sock 0)]
;; res is a byte[] containing whatever the REP socket replied with.
(println (String. res))))))))
There are actually many lessons to learn in this small example.
- Look, no broker! Again, ZeroMQ is not a traditional message queue, it's a socket library. (But with some message queue like semantics, read on!)
- We're assuming that the messages are strings with our JVM's default encoding. But to ZeroMQ they are just bytes.
- The order of bind/connect does not matter. Woot! If it did matter, we had to ensure that we didn't connect REQ until after we bound REP, but we don't (the order of the thread invocation is undefined). For ZeroMQ sockets, the actual connection is hidden from us. If we post a message and we're not connected, the message is queued locally on the REQ socket until it manages to connect. Try it! Put a
(Thread/sleep 1000)
before the bind and notice how it still works. - A single ZeroMQ socket can not be used concurrently in different threads. In fact, the limitation isn't really about threads, but about state. As explained above, we can't do two send in a row on REQ, that'll just throw an error. So you can use a single socket in multiple threads. But read on to figure out an alternative to shared mutable state hell.
Iteration 2: Adding concurrent replies
We'll keep our request code for now, but we'll replace the single thread that does one reply at a time.
ZeroMQ allows for in process message passing, and this is what we'll use to achieve concurrent replies. We'll create multiple thread with one REP socket for each thread, and use a new thread with a new socket to connect these into one blob. There will be no cross thread state stuff, the only "state" we pass between threads will be ZeroMQ messages.
To achieve this, we create a socket with the type DEALER. We bind this to the "inproc://"
protocol, and connect the actual REP sockets to this DEALER. When the DEALER socket receives messages, it will pass it on to one of the REP sockets, keep track of which REP sockets are busy. When a REP socket replies, it will do so to the DEALER, which will just forward the message as is. This setup allows for one top level socket that forwards messages for a multitude of REP sockets.
In all the examples in this post we will be doing concurrent requests with multiple REQ sockets over one single TCP connection. But if we want to support multiple TCP connections to our server, which we almost certainly want, we need a ROUTER in front of our DEALER. For example, we might have multiple systems connecting to one service, requiring separate TCP connections for each service/process. Or we might want to connect to the server for maintenance RPC calls.
The ROUTER will assign an internal id to each socket that connects to it, and immediately forward that message with metadata attached to it, containing the internal socket ID. When it gets messages back, it immediately forwards the message to the connected REQ sockets based on the socket ID present in the metadata.
(defn my-response-handler
"Takes a req (bytes), returns the response (also bytes)."
[req]
(.getBytes (str (String. req) " - echoed!")))
(let [ctx (ZMQ/context 1)
worker-url "inproc://responders"
router-socket (.socket ctx ZMQ/ROUTER)
dealer-socket (.socket ctx ZMQ/DEALER)]
(.bind router-socket "tcp://127.0.0.1:1337")
(.bind dealer-socket worker-url)
;; We can now respond to 10 requests in parallel
(dotimes [n 10]
(.start
(Thread.
(fn []
(let [sock (.socket ctx ZMQ/REP)]
;; We reply to the DEALER
(.connect sock worker-url)
(while true
;; Same API as before - receive message, then reply.
(let [req (.recv sock 0)]
(.send sock (my-response-handler req) ZMQ/NOBLOCK))))))))
(.start
(Thread.
;; Forwards messages from router to dealer and vice versa.
(fn [] (.run (ZMQQueue. ctx router-socket dealer-socket))))))
You can replace this code with the REP thread in the previous example, and it will still work. Except now, we have ten threads responding to requests.
A few things to note
- The
ZMQQueue
is a built in convenience function that passes all messages to the other socket and vice versa. In our setup, that means that all messages from the ROUTER is sent to the DEALER, and all messages from the DEALER is sent to the ROUTER, without us having to write the code to make that happen. - Invoking .run on the ZMQQueue will block the current thread, which is why we have a separate thread for it.
- The order of bind/connect matters for the
"inproc://"
protocol. This is only true for inproc;"tcp://"
, as we saw above, can connect first, then bind. For inproc, you don't risk stuff going down, not being up, etc, and you control the flow of inproc binds and connects, so it's more of a inconvenience than an essential problem that the ordering matters. This also means that connecting inproc is very cheap - it doesn't have to check if the connection is up, do retries, etc, so inproc is much more effecient than TCP, not only because it doesn't have to do TCP handshakes, checksums, etc.
Iteration 3: Adding concurrent requests
We now have multiple threads doing replies. It's time to replace the single REQ socket with something more clever that lets us achieve concurrency.
As you might have guessed, we could just create a gazillion REQ sockets. This is bad, though, as each REQ socket requires its own TCP connection. We could have a pool of REQ sockets available so that we don't have to pay the cost of a new TCP connection every time we do a request. But ZeroMQ has a better solution where it handles all the cruft for us.
We won't be getting away from the fact that REQ has a blocking API. I'm going to assume you're in an environment where blocking requests are OK. My use case have always been Java HTTP servlet responders that needs to call out to internal services via ZeroMQ. I'm not sure if ZeroMQ has async APIs.
OK, now for the actual code.
(defn connect
[server-url]
(let [ctx (ZMQ/context 1)
worker-url (str "inproc://" (java.util.UUID/randomUUID))
queue-thread (Thread.
(fn []
(let [client-sock (.socket ctx ZMQ/DEALER)
worker-sock (.socket ctx ZMQ/ROUTER)]
(.connect client-sock server-url)
(.bind worker-sock worker-url)
(.run (ZMQQueue. ctx client-sock worker-sock)))))]
(.start queue-thread)
{:ctx ctx
:worker-url worker-url
:queue-thread queue-thread}))
(defn disconnect
"Useful for tests etc. Pass the map returned by `connect` above."
[connection]
(.interrupt (get connection :queue-thread))
(.term (get connection :ctx)))
(defn with-req-sock
"Takes the connection and a higher order function that is passed a new REQ
socket. When this function returns, the REQ socket is destroyed."
[connection handler]
(let [socket (.socket (get connection :ctx) ZMQ/REQ)]
(.connect socket (get connection :worker-url))
(try
(handler socket)
(finally (.close socket)))))
;; Usage
(def connection (connect "tcp://127.0.0.1:1337"))
(dotimes [n 5]
(.start
(Thread.
(fn []
(with-req-sock connection
(fn [sock]
(.send sock (.getBytes (str "Hello, " n)) 0)
(let [res (.recv sock 0)]
(println (String. res)))))))))
(connect server-url)
creates new connections. We create a single connection for our process, and pass it around when we want to make requests. We create a new REQ socket for each request, and use use "inproc://"
to send messages to the ROUTER. The ROUTER then forwards the message to the DEALER (via ZMQQueue as before). The DEALER connects via TCP to our actual server.
A few things to note:
- The output should be interleaved in your terminal if you run this code. That's because both the requests and replies happens concurrently.
- Again, no state is shared between threads other than the inproc url, all we really do is sending ZeroMQ messages between them.
- As already noted, we could REQ directly against the server, but that means we would need to perform a full TCP connection per request. In this setup, our DEALER is the sole tcp connection, everything else is inproc. We end up with concurrent requests running over the same TCP connection. The DEALER doesn't really care about the types of messages, so it can send 5 requests, get 2 replies, send 2 more requests, and so on. The ROUTER sockets takes care of sending the messages to the correct place, based on the internal socket IDs.
- As the code shows, we can now create multiple REQ sockets in separate threads and perform requests. Typically, you create a new REQ for each incoming request, using
(with-req-sock)
.
Implementing actual RPC
So far, all you've learned is how to use ZeroMQ to send messages back and forth. We've seen that we can send any number of request and replies concurrently on one TCP connection. We can start a client, do a request, then start the server, and everything will work, even though we don't have a broker. We don't have to manage connections and there's no manual retry handling.
But what about actually doing RPC? So far we've been sending bytes and echoed them. We want a way to invoke multiple procedures, and also send data with out procedure calls. What should we do to send something useful here?
Let's just mimic HTTP! I like the method, path and body semantics, so let's keep that. Similar to HTTP, we'll always respond, but some times with error codes - there's no "null" response. Since ZeroMQ likes bytes, we can use SMILE as the data format, which is JSON-like in that it knows how to encode maps, lists, sets and strings into something that can be decoded later. It's a so called "binary" format, so it can also do raw bytes, and there won't be any string encoding issues.
Performing requests
Here's how I perform a request to list all the users in my system:
(defn do-rpc
([sock method path] (do-rpc sock method path nil))
([sock method path body]
(let [msg (cheshire.core/generate-smile
{:method method :path path :body body})]
(.send sock msg 0)
(let [res (.recv sock 0)]
(cheshire.core/parse-smile res true)))))
(with-req-sock connection
(fn [sock]
(println (do-rpc sock "GET" "/users"))))
This sends a request with a "json" map that contains the keys method, path and body. If we do this now, we'll just get a weird response back since our server still just echoes the requsest. So let's get our hands dirty and implement an actual RPC request handler!
Responding to requests
Since we're very HTTP like, implementing a rudimentary request handler only takes a few small steps.
We're going to re-implement the my-response-handler
from above into something that reads in the SMILE encoded requests and responds with SMILE encoded responses.
(defn respond
[req]
(if (and (= (get req :method) "GET") (= (get req :path) "/users"))
{:status 200 :body [{:id 1 :name "August"}]}))
;; This function used to echo. We now parse and generate SMILE.
(defn my-response-handler
[raw-req]
(try
(let [req (cheshire.core/parse-smile raw-req true)]
(if-let [response (respond req)]
(cheshire.core/generate-smile response)
(cheshire.core/generate-smile {:status 404})))
(catch Exception e
(cheshire.core/generate-smile {:status 500}))))
If respond returns non-nil, assume it's a valid response and SMILE encode it. If respond returns nil, return status 404. Just like HTTP, because why not! If any of this throws an exception, we'll respond with status 500. Again, just like HTTP.
While this is slightly more interesting than an echo server, we don't want to manually do RPC routing with if tests. We want to utilize some kind of library to be able to set up different request handlers with as little repetition as possible.
Take a deep breath. Sit down. Are you comfortable? This is going to be awesome.
Using existing HTTP routing libraries
Our requests are very HTTP-like, with a method and a path, which is really all we need in order to do traditional HTTP routing on, well, method and path.
So let's just use an existing HTTP routing library for that!
We'll be using Compojure, which handles Ring request. Ring is the most commonly used HTTP stack for Clojure. Ring has a very simple API. You give Ring a function. This function takes one argument, the request, represented as an immutable map. This function should return a map, representing the response. That's it!
The only thing we need to do, is to turn our ZeroMQ based map into something that Compojure understands. On Ring, it's called :request-method
instead of just :method
, for example.
(defroutes app
(GET "/users" []
{:status 200 :body [{:id 1 :name "August"}]})
(GET "/users/:user-id" [user-id]
(if (= 1 user-id)
{:status 200 :body {:id 1 :name "August"}})))
(defn respond
[req]
(app {:request-method (keyword (.toLowerCase (get req :method)))
:uri (get req :path)
:body (get req :body)}))
Words cannot describe how awesome this is. With just a couple of lines of code, we're invoking a HTTP stack from something that isn't HTTP at all.
We're done!
Wow, you made it this far! Or perhaps you just skimmed and got to this part. I'm proud of you either way.
We now have a solid way of doing RPC calls over just one TCP connection, with no manual connection management, and with queuing of requests if the server is down. We have a sane way of implementing our server to respond to different procedures. This is all we really wanted, so let's celebrate and perform remote procedure calls without having to deal with boring details such as connection management.
And I think this post demonstrates the powers of one datastructure and a hundred functions. Just by creating a data structure that looks identical to the ones the Ring stack create, we can re-use an entire suite of HTTP tools already available to us.
Good luck!
Multiple servers
I've never actually done this myself, so I won't show you the code for it. I'll leave the implementation details as an exercise for the reader. The concepts are clear, though. Since we've seen that it's really easy to compose REQ/REP and DEALER/ROUTER, all it should take is another node that sits between the REQ clients and the REP servers and routes and deals requests to multiple servers. Alternatively, just bind the DEALER we already have on the server and connect a bunch of REP sockets directly to it. But be aware, that would result in one TCP connection per REP socket.
Authentication
I've only used ZeroMQ for internal services that are behind firewalls, so I haven't needed to add any form of auth. My advice for authentication would be to mimic HTTP. I would allow full access to the service itself, but in the routing, have a precondition that checks if an auth token of some sort is present in the request. If the request fails to authenticate, respond with status 403 or something along those lines.
Work is underway in ZeroMQ 3 for providing encrypted transports, too, so you don't have to manually encrypt your messages to be MITM safe if you're dispatching messages across a network you don't trust.
Some notes on the Clojure code
I have this crazy idea that I should make the code as readable as possible to people that are unfamiliar with Clojure in general. So I do some non-ideomatic stuff. All I probably achieved with this was to alienate the people that actually read the article - Clojure programmers. Oh well.
We're doing a lot of Java interop, and in my code I like to use the (doto)
macro. Instead of this:
(let [ctx (ZMQ/context 1)
router-socket (.socket ctx ZMQ/ROUTER)
dealer-socket (.socket ctx ZMQ/DEALER)]
(.bind router-socket "tcp://127.0.0.1:1337")
(.bind dealer-socket "inproc://responders")
;; ...
)
We could have written this:
(let [ctx (ZMQ/context 1)
router-socket (doto (.socket ctx ZMQ/ROUTER) (.bind "tcp://127.0.0.1:1337"))
dealer-socket (doto (.socket ctx ZMQ/DEALER) (.bind "inproc://responders"))]
;; ...
)
Which means we could have created a whole lot less names in that code, and created our ROUTER/DEALER pair and the queue like so:
(let [ctx (ZMQ/context 1)
queue (ZMQQueue.
ctx
(doto (.socket ctx ZMQ/ROUTER) (.bind "tcp://127.0.0.1:1337"))
(doto (.socket ctx ZMQ/DEALER) (.bind "inproc://responders")))]
;; ...
(.start (Thread. (fn [] (.run queue)))))
Or even fewer names, and inlined the entire thing.
(let [ctx (ZMQ/context 1)]
;; ...
(.start
(Thread.
(fn []
(doto
(ZMQQueue.
ctx
(doto (.socket ctx ZMQ/ROUTER) (.bind "tcp://127.0.0.1:1337"))
(doto (.socket ctx ZMQ/DEALER) (.bind "inproc://responders")))
(.run))))))
doto
takes a Java object as the first argument, and all the other following arguments are assumed to be lists (in Clojure, a list is a function call) and inserts this instance as the 2nd element in the list, i.e. the first argument to the function. So while the code looks like we call bind on a string, it's actually called on the instance pased to the first argument - the socket instances created by the call to socket on the ctx. This is convenient for creating an instance of something, calling a bunch of methods with side effects on it, and return that instance, without having to create a named variable for the instance.
I've also chosen to use (get my-map :a-key)
instead of the more idiomatic (:a-key my-map)
. It might be confusing for people unfamiliar with Clojure that keywords are also callable polymorphic functions, that does map lookups if the first argument is a map.