Tuesday, January 13, 2009

Integrating sipX with ejabberd

I recently completed integrating our sipX based voip platform with our ejabberd XMPP server, so that users can see when others are on the phone or not. There are alot of similar integrations that people have done with Asterisk using their AMI api, but I haven't found anything similar for sipX yet, so we rolled our own for now. While, it's not terribly exciting, here's a screenshot of what it looks like when someone is on the phone:


The solution I came up with involves 3 parts. First, I setup a clustered RabbitMQ server (an open source implementation of AMQP). I plan on using it to facilitate a loosely coupled, event driven architecture for integrating multiple open source
applications. I'm pretty happy with RabbitMQ thus far - about the only complaint I have is that they don't have any message tracing capabilities right now (version 1.5.0) which made it more difficult to debug my client side code. I'm also hoping that sometime soon we start seeing debian packages for python/perl amqp libraries. For now, I'm using Net::Stomp and the RabbitMQ stomp adapter which seemed like the most stable, easily deployed client side solution.

On the XMPP server side, I created an erlang module that acts as a message consumer. Each virtual host in our ejabberd server listens on a separate queue for presence messages generated by the sipX side and sends out XMPP presence updates to online sessions.

After getting the RabbitMQ erlang client library installed, here's the code I used to connect and setup my consumer:
Connection = amqp_connection:start(Uname, Pwd, "mq.nvizn.com"),
Channel = amqp_connection:open_channel(Connection),
Qname = list_to_binary("/" ++ Host ++ "/presence/phone"),
Q = lib_amqp:declare_queue(Channel, Qname),
lib_amqp:bind_queue(Channel, <<"">>, Q, Qname),
lib_amqp:subscribe(Channel, Q, self(), false),

Then I created a handle_info function that looks like this:

handle_info({ {'basic.deliver', DeliveryTag, _, _, _, _ },
{content, ClassId, Properties, PropertiesBin,
[Payload]} = Info}, State) ->

%% Message processing here, then send out the XMPP presence update...,
BroadcastPresence = fun({U, S, R}) ->
Dest = jlib:make_jid(U, S, R),
ejabberd_router:route(FromJID, Dest, Presence)
end,
Sessions = ejabberd_sm:get_vh_session_list(State#state.host),
lists:foreach(BroadcastPresence, Sessions),
Now on the sipX side, things are a bit more ugly, and when I have more time later, I'd like to rework this end. For now, I created a PL/pgSQL AFTER trigger on SIPXCDR.call_state_events table that handles new call state events ('S' and 'E' event_types to be specific). This trigger inserts new rows into a new cse_summary table I created for every call, one for when the call is setup and one for call termination and it does this for each internal user. If the call involves two internal folks, you end up with 4 rows, if on the other hand, one side is external, you end up with only 2 rows. This trigger also looks up the XMPP jid for the extension and records that in the generated cse_summary rows.

When a row is created in the cse_summary table, a separate
PL/Perl AFTER trigger uses Net::Stomp to generate a call state
event message for the RabbitMQ cluster.

Here's what the PL/Perl trigger looks like:
my $stomp = Net::Stomp->new({hostname=>'mq.nvizn.com',port=>'61613'});
$stomp->connect({login=>$uid, passcode=>$pwd});

my $msg = sprintf("%s,%s,%s", $domain,
$_TD->{"new"}{"event_type"}, $_TD->{"new"}{"jid"});

$stomp->send({destination=>"/$domain/presence/phone", body=>($msg)});
$stomp->disconnect;
Now, I'm just creating some debian packages and RPMs (for the sipX side), documenting how it works, and thinking about our next integration.