Using ZeroMQ API, we can both: queue up and dispatch / route Google Protobuf messages with X lines of code, where X approaches to zero.. Well, it is ZeroMQ after all.
Google Protocol Buffers Side
Say we have a “Trade” message that is described by Google protobufs as:
message TradeMessage { required string messageType = 1; required int32 maxFloor = 2; required int32 qty = 3; required int32 accountType = 4; required int32 encodedTextLen = 5; ... ... } |
Let’s assume that our “messageType” is always 2 bytes long. Then Google Protocol Buffers will encode it as a sequence of bytes, where first two bytes will determine protobuf’s field type (10) and field lenght (2), and the rest will be the actual UTF-8 byte sequence that would represent a message type. Let’s make “TR” a message type for “Trade” messages.
Once a Google protobuf “Trade” message is generated it will start with a message type in a following format:
byte [] messageType = { 10, 2, 84, 82 }; |
Where ’84’ and ’82’ are ASCII for ‘T’ and ‘R’.
Now let’s say we have a some kind of “TradeGenerator” ( just for testing purposes to simulate the actual feed / load ) that will produce Google Protobuf encoded “Trade” messages:
public static Trade.TradeMessage nextTrade() { return Trade.TradeMessage.newBuilder() .setMessageType( "TR" ) .setAccountType( 42 ) ... ... ... } |
Note that it sets the message type to “TR” as we agreed upon.
ØMQ Side
Sending “Trade” messages with ØMQ is as simple as drinking a cup of coffee in the morning:
ZMQ.Context context = ZMQ.context( 1 ); ZMQ.Socket publisher = context.socket( ZMQ.PUB ); publisher.bind( "tcp://*:5556" ); // creating a static trade => encoding a trade message ONCE for this example Trade.TradeMessage trade = TradeGenerator.nextTrade(); while ( true ) { publisher.send( trade.toByteArray(), 0 ); } |
Consuming messages is as simple as eating a bagel with that coffee. The interesting part (call it “the kicker”) is that we can actually subscribe to a “TR” message type (first 4 bytes) using just ZeroMQ API:
ZMQ.Context context = ZMQ.context( 1 ); ZMQ.Socket subscriber = context.socket( ZMQ.SUB ); subscriber.connect( "tcp://localhost:5556" ); // subscribe to a Trade message type => Google Proto '10' ( type 2 ) // Google Proto '2' ( length 2 bytes ) // ASCII '84' = "T" // ASCII '82' = "R" byte [] messageType = { 10, 2, 84, 82 }; subscriber.subscribe( messageType ); for ( int i = 0; i < NUMBER_OF_MESSAGES; i++ ) { byte[] rawTrade = subscriber.recv( 0 ); try { Trade.TradeMessage trade = Trade.TradeMessage.parseFrom( rawTrade ); assert ( trade.getAccountType() == 42 ); } catch ( InvalidProtocolBufferException pbe ) { throw new RuntimeException( pbe ); } } |
Now all the “TR” messages will actually go to this subscriber.
NOTE: Alternatively, you can use a “Union” Google Protocol Buffers technique (or extensions) in order to encode all different message types: here is how.