RabbitMq - First Hops

RabbitMq - First Hops

Get it, "Hops", 'cause "Rabbit". They "hop". HAH! ...

I've started writing some code to explore an idea I've had bubbling around for... I think close to two years now. It's undergone a number of iterations, and discussions with a few people.

It's finally reached a point I can start exploring the idea with code, seeing how it wants to be built. I say wants, much like other posts where I talk about what the code wants, gotta listen.

I started TDD'ing one of the base classes and got blocked. I'm a bit better about listening to the code, there was resistance to what I was doing so I stopped until I found a better way.
I explained this block to a colleague and his question was, and I get to feel dumb, "What are you trying to test?" ... I fumbled and bumbled and realized that I was writing tests for the expected implementation.
I realized the mistake; deleted everything I had and started over. The newer tests flow better.
I see this as an example of the stance I've taken with TDD, and is explicitly called out for XP in a number of places, It's the Mentality. It's the approach you take that produces the benefit XP "provides". The practices are to help you stay on that path.
I like to phrase this for TDD as, "The tests are the least important part of TDD".

Note: I'm going to be lite on the main project code, as it's not the point here, and I want to get it into a real state with real things before I show that.

RabbitMq

Once I got the correct viewpoint on how to write a test... ... (I don't expect that struggle to ever really go away) I was able to get into a pretty good flow and get three tests passing... and then I realized I didn't really know how RabbitMq worked. I couldn't really "fake/mock" the RabbitMq behavior with TDD. Not that it can't be, I couldn't.

I just don't know enough about RabbitMq to fake it.

Set Up

Docker is currently unavailable on my windows box due to VMWare, (I'm ignoring that I could run a linux vm... or a windows vm...) which has me using the docker container on my Mac.
I've used the docker for RabbitMq provided at Docker Hub. There's apparently a number of ways to run this. Since I'm doing the development work for this project in C# on my windows box; and running Docker on my mac, I had to do a little more than documentation seems to suggest. Or I'm bad at reading - always a reasonable option.

The command I ended up using is largely C&P from the documentation.

docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management
A part that tripped me up for a while is some of the other resources, and early docker commands have just rabbitmq:3 in the docker run command. This doesn't, far as I could tell, bring down the management UI. If you know what to do with RabbitMq, may not need it. I know nothing, I wanted it.
The next part I had trouble with is when documentation assumes you're running localhost. This was actually two complications.
When I tried to connect to the port EVERYTHING said I need to connect to, 5672, it wouldn't. Actively refused. My lack of experience with Dockershows with this stumbling block. The above docker command has -p 5672:5672 which forwards the host system's port to the docker's port... or something. I assume there's "correct terminology" I'm not using, but that's the gist.
The 8080 to 15672 is the same port forwarding. It just changes the port that the management console is accessed through. I don't need to change it from 15672; but the doc I was following said to do that to make it accessible to other computers; so I did, and it did. Having done the connection port (5672) I now know I could have kept it the original port.

The next part I had a bit of a headache with, though the error was pretty clear about it. The default name and password into the management portal are guest/guest and most things just kinda assume it'll work without specifying account credentials. I was running non-localhost; I had to add a new account and use those credentials. It's small, but that's an additional step I had to do.

I was using unit tests to do the connection testing with RabbitMq - it's a nice way to do it. No boiler plate, just write what you want to test.

Testing

The guide I used to start writing code to connect is found on the RabbitMq site for C#. They have one for many languages. And it looks like the C# is a little out of date and small easily understood inconsistencies... maybe I'll submit a pull request for the website... or lazy.

I was just working to get a publish happening. No concern about consuming.

Following the documentation I had a test that looked a lot like

[TestMethod]
public void Publishes(){
    //Set up the connection
    ConnectionFactory factory = new ConnectionFactory();
    factory.Uri = "amqp://testguy:supersekrit@192.168.1.200:5672/";
    IConnection conn = factory.CreateConnection();

    //Set up the Exchange and Queue
    IModel model = conn.CreateModel();
    string exchangeName = "someExchange";
    string queueName = "someQueue";
    string routingKey = "ToSomeQueue";
    model.ExchangeDeclare(exchangeName, ExchangeType.Direct);
    model.QueueDeclare(queueName, false, false, false, null);
    model.QueueBind(queueName, exchangeName, routingKey, null);

    //Super Basic Message Publish
    byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
    model.BasicPublish(exchangeName, routingKey, null, messageBodyBytes);

    //Close the connection
    conn.Close();
}

I say, "A lot like" because it's undergone a bit of refactoring with all the other tests I have in place.
Once this test was in place, I used the management console to validate that I was able to publish messages to the correct exchange and queue.

My Scenarios

At this point, I consumed messages with manual acknowledgement. Then with auto acknowledgement. The process for this was available from the "Pull API" documentation. I'll have these examples in code at the end, but the docs work fantastic for information on how to get these working (once publish works).

Part of what I'm looking to do for this project is have messages pushed to the clients. I don't want to have to poll for new ones.
This had me set up a test to consume by push. Which was clearly demonstrated in the "Push API" documentation.

The next piece I wanted to do is get a Direct Reply in place. A Direct reply-to is discussed in the relevant documentation. It seems to get some mention as well in the C# examples for Point-to-point messaging. But I didn't get a lot out of that.
I'm able to find a few samples around the net on setting up direct reply; but they all feel a bit... verbose. More focused on structuring for a solution. I don't want a solution, I want a unit test.
After a little bit of struggling; I did get a solution.

Again, this is mostly what I have. I'll put my actual code in at the end. It has common functionality refactored out, which I'm putting back in for these examples.

[TestMethod]
public void DirectReply()
{
    //This is my control for multithreaded tests
    CountdownEvent latch = new CountdownEvent(1);
   
    //Set up the connection
    ConnectionFactory factory = new ConnectionFactory();
    factory.Uri = "amqp://testguy:supersekrit@192.168.1.200:5672/";
    IConnection conn = factory.CreateConnection();

    //Set up the Exchange and Queue
    IModel model = conn.CreateModel();
    string exchangeName = "someExchange";
    string queueName = "someQueue";
    string routingKey = "ToSomeQueue";
    model.ExchangeDeclare(exchangeName, ExchangeType.Topic);
    model.QueueDeclare(queueName, false, false, false, null);
    model.QueueBind(queueName, exchangeName, routingKey, null);


    //Client - Configure the Direct Reply Consumer
    EventingBasicConsumer consumer = new EventingBasicConsumer(_model);
    consumer.Received += (ch, basicDeliverEventArgs) =>
    {
        string contentConsumer = Encoding.UTF8.GetString(basicDeliverEventArgs.Body);
        contentConsumer.Should().Be("Hello! Love, Worker!");
        latch.Signal();//This unblocks the test
    };
    
    //This registers the "direct reply to" consumer.
    // * amq.rabbitmq.reply-to is the required queue
    // * it's a special queue for direct replies
    _model.BasicConsume("amq.rabbitmq.reply-to", true, consumer);

    //Client - Send an initial message that will be replied to
    byte[] messageBodyBytes = Encoding.UTF8.GetBytes("Hello, world!");
    IBasicProperties props = _model.CreateBasicProperties();
    //This informs rabbitmq that it should configure a hidden one use
    // * exchange/queue to reply back to our above 'consumer'
    props.ReplyTo = "amq.rabbitmq.reply-to";
    //Send the message
    _model.BasicPublish(ExchangeName, RoutingKey, props, messageBodyBytes);

    //Worker - Consume and Reply
    BasicGetResult result = _model.BasicGet(QueueName, true);
    string content = Encoding.UTF8.GetString(result.Body);
    content.Should().Be("Hello, world!");
    //Once we've received the message, send a direct reply-to
    // * The exchange needs to ben an empty string
    // * The ReplyTo field is a custom queue that maps back to 
    // * the above consumer. 
    _model.BasicPublish("", result.BasicProperties.ReplyTo, null, Encoding.UTF8.GetBytes("Hello! Love Worker!"));

    //This gives us some waiting for the round trips to happen
    latch.Wait(1000).Should().BeTrue();
}

I also like to retype them to be self contained as I can add a the commentary to explain things a bit better than I might have in my exploration code.

I currently feel that, for my purposes, I have everything I need to start re-building my base components effectively using RabbitMq.

While I'd like to do them as Unit Tests; I think I'll have them as more Functional tests initially. Just to ensure I'm using RabbitMq correctly. As can be seen in the samples, RabbitMq uses interfaces for everything, so I should be able to refactor into UnitTests; assuming I IoC correctly; without much trouble.

This is my first real foray into RabbitMq, learned a few things; but mostly it's nice to have something running I can experiment against.

THE ACTUAL CODE!!!

For full code sample; here's my test class as it sits at this point showing my few tests

using System.Text;
using System.Threading;
using FluentAssertions;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace JobBoardAi
{
    [TestClass]
    public class RabbitMqPlayground
    {
        private const string QueueName = "FoodQueue";
        private const string ExchangeName = "testing.topic";
        private const string RoutingKey = "Food";
        private IConnection _connection;
        private IModel _model;

        [TestInitialize]
        public void Setup()
        {
            _connection = new ConnectionFactory { Uri = "amqp://sample:test@192.168.1.15:5672/" }.CreateConnection();
            _model = _connection.CreateModel();
        }

        [TestCleanup]
        public void TearDown()
        {
            _connection.Close();
        }

        [TestMethod]
        public void Publishes()
        {
            _model.ExchangeDeclare(ExchangeName, ExchangeType.Topic);
            _model.QueueDeclare(QueueName, false, false, false, null);
            _model.QueueBind(QueueName, ExchangeName, RoutingKey, null);

            byte[] messageBodyBytes = Encoding.UTF8.GetBytes("Hello, world!");
            _model.BasicPublish(ExchangeName, RoutingKey, null, messageBodyBytes);

        }

        [TestMethod]
        public void ConsumeWithManualAck()
        {
            _model.ExchangeDeclare(ExchangeName, ExchangeType.Topic);
            _model.QueueDeclare(QueueName, false, false, false, null);
            _model.QueueBind(QueueName, ExchangeName, RoutingKey, null);

            BasicGetResult result = _model.BasicGet(QueueName, false);
            _model.BasicAck(result.DeliveryTag, false);
            string content = Encoding.UTF8.GetString(result.Body);
            content.Should().Be("Hello, world!");


        }

        [TestMethod]
        public void ConsumeWithAutoAck()
        {
            _model.ExchangeDeclare(ExchangeName, ExchangeType.Topic);
            _model.QueueDeclare(QueueName, false, false, false, null);
            _model.QueueBind(QueueName, ExchangeName, RoutingKey, null);

            BasicGetResult result = _model.BasicGet(QueueName, true);
            string content = Encoding.UTF8.GetString(result.Body);
            content.Should().Be("Hello, world!");

        }


        [TestMethod]
        public void ConsumeByPush()
        {

            CountdownEvent latch = new CountdownEvent(1);

            EventingBasicConsumer consumer = new EventingBasicConsumer(_model);
            consumer.Received += (ch, basicDeliverEventArgs) =>
            {

                string content = Encoding.UTF8.GetString(basicDeliverEventArgs.Body);
                content.Should().Be("Hello, world!");
                _model.BasicAck(basicDeliverEventArgs.DeliveryTag, false);
                latch.Signal();
            };

            _model.BasicConsume(QueueName, false, consumer);

            Publishes();//Use Common Functionality

            latch.Wait(1000).Should().BeTrue();
        }

        [TestMethod]
        public void DirectReply()
        {
            CountdownEvent latch = new CountdownEvent(1);

            _model.ExchangeDeclare(ExchangeName, ExchangeType.Topic);
            _model.QueueDeclare(QueueName, false, false, false, null);
            _model.QueueBind(QueueName, ExchangeName, RoutingKey, null);


            //Client
            EventingBasicConsumer consumer = new EventingBasicConsumer(_model);
            consumer.Received += (ch, basicDeliverEventArgs) =>
            {

                string contentConsumer = Encoding.UTF8.GetString(basicDeliverEventArgs.Body);
                contentConsumer.Should().Be("Hello! Love Worker!");
                latch.Signal();
            };

            _model.BasicConsume("amq.rabbitmq.reply-to", true, consumer);

            byte[] messageBodyBytes = Encoding.UTF8.GetBytes("Hello, world!");
            IBasicProperties props = _model.CreateBasicProperties();
            props.ReplyTo = "amq.rabbitmq.reply-to";
            _model.BasicPublish(ExchangeName, RoutingKey, props, messageBodyBytes);

            //Worker
            BasicGetResult result = _model.BasicGet(QueueName, true);
            string content = Encoding.UTF8.GetString(result.Body);
            content.Should().Be("Hello, world!");
            _model.BasicPublish("", result.BasicProperties.ReplyTo, null, Encoding.UTF8.GetBytes("Hello! Love Worker!"));

            latch.Wait(5000).Should().BeTrue();
        }
    }
}
Show Comments