"; */ ?>

zeromq


9
Sep 11

ØMQ and Google Protocol Buffers

Using ZeroMQ API, we can both: queue up and dispatch / route Google Protobuf messages with X lines of code, where X approaches to zero.. Well, it is ZeroMQ after all.

Google Protocol Buffers Side


Say we have a “Trade” message that is described by Google protobufs as:

message TradeMessage {
 
    required string messageType = 1;
 
    required int32 maxFloor = 2;
    required int32 qty = 3;
    required int32 accountType = 4;
    required int32 encodedTextLen = 5;
    ... ...
}

Let’s assume that our “messageType” is always 2 bytes long. Then Google Protocol Buffers will encode it as a sequence of bytes, where first two bytes will determine protobuf’s field type (10) and field lenght (2), and the rest will be the actual UTF-8 byte sequence that would represent a message type. Let’s make “TR” a message type for “Trade” messages.

Once a Google protobuf “Trade” message is generated it will start with a message type in a following format:

byte [] messageType = { 10, 2, 84, 82 };

Where ’84’ and ’82’ are ASCII for ‘T’ and ‘R’.

Now let’s say we have a some kind of “TradeGenerator” ( just for testing purposes to simulate the actual feed / load ) that will produce Google Protobuf encoded “Trade” messages:

public static Trade.TradeMessage nextTrade() {
 
    return
        Trade.TradeMessage.newBuilder()
                      .setMessageType( "TR" )
                      .setAccountType( 42 )
                         ... ... ...
    }

Note that it sets the message type to “TR” as we agreed upon.

ØMQ Side


Sending “Trade” messages with ØMQ is as simple as drinking a cup of coffee in the morning:

ZMQ.Context context = ZMQ.context( 1 );
ZMQ.Socket publisher = context.socket( ZMQ.PUB );
publisher.bind( "tcp://*:5556" );
 
// creating a static trade => encoding a trade message ONCE for this example
Trade.TradeMessage trade = TradeGenerator.nextTrade();
 
while ( true ) {
    publisher.send( trade.toByteArray(), 0 );
}

Consuming messages is as simple as eating a bagel with that coffee. The interesting part (call it “the kicker”) is that we can actually subscribe to a “TR” message type (first 4 bytes) using just ZeroMQ API:

ZMQ.Context context = ZMQ.context( 1 );
ZMQ.Socket subscriber = context.socket( ZMQ.SUB );
subscriber.connect( "tcp://localhost:5556" );
 
// subscribe to a Trade message type => Google Proto '10' ( type 2 )
//                                      Google Proto '2'  ( length 2 bytes )
//                                             ASCII '84' = "T"
//                                             ASCII '82' = "R"
 
byte [] messageType = { 10, 2, 84, 82 };
subscriber.subscribe( messageType );
 
for ( int i = 0; i < NUMBER_OF_MESSAGES; i++ ) {
 
    byte[] rawTrade = subscriber.recv( 0 );
 
    try {
        Trade.TradeMessage trade = Trade.TradeMessage.parseFrom( rawTrade );
        assert ( trade.getAccountType() == 42 );
    }
    catch ( InvalidProtocolBufferException pbe ) {
        throw new RuntimeException( pbe );
    }
}

Now all the “TR” messages will actually go to this subscriber.

NOTE: Alternatively, you can use a “Union” Google Protocol Buffers technique (or extensions) in order to encode all different message types: here is how.