4.3. The Chat Server
The chat server is implemented as a console program. The functionality is divided into two primary classes: ConnectionListener, which accepts incoming client connections, hands them over to a ServerConnectionManager instance, and continues to listen for more connections; and ServerConnectionManager, which manages and processes messages for each connected client and then routes messages between clients.
Listing 5 shows the ConnectionListener class that you use in the server program to listen and accept incoming connections from clients.
Listing 5. ConnectionListener class in ConnectionListener.cs
using System;
using System.Net;
using System.Net.Sockets;
namespace Recipe7_5.ChatBroker
{
internal class ConnectionListener
{
//the socket used for listening to incoming connections
Socket ListenerSocket { get; set; }
SocketAsyncEventArgs sockEvtArgs = null;
//new server connection manager
ServerConnectionManager ConnManager = new ServerConnectionManager();
//run the connection listener
internal void Run(int Port)
{
//create a new IP endpoint at the specific port,
//and on any available IP address
IPEndPoint ListenerEndPoint = new IPEndPoint(IPAddress.Any, Port);
//create the listener socket
ListenerSocket = new Socket(AddressFamily.InterNetwork,
SocketType.Stream, ProtocolType.Tcp);
//bind to the endpoint
ListenerSocket.Bind(ListenerEndPoint);
//listen with a backlog of 20
ListenerSocket.Listen(20);
Console.WriteLine("Waiting for incoming connection ...");
//start accepting connections
AcceptIncoming();
}
//accept incoming connections
internal void AcceptIncoming()
{
//pass in the server connection manager
sockEvtArgs = new SocketAsyncEventArgs { UserToken = ConnManager };
sockEvtArgs.Completed += new EventHandler<SocketAsyncEventArgs>(
delegate(object Sender, SocketAsyncEventArgs e)
{
Console.WriteLine("Accepted connection..." +
"Assigning to Connection Manager...." +
"Waiting for more connections...");
//pass the connected socket to the server connection manager
ConnManager.Manage(e.AcceptSocket);
//keep listening
AcceptIncoming();
});
//accept an incoming connection
ListenerSocket.AcceptAsync(sockEvtArgs);
}
}
}
|
The ConnectionListener class is instantiated and launched by calling its Run() method from the server program's Main() method. In Run(), you create an IPEndpoint using the port number passed in as a command-line argument. Specifying IPAddress.Any as the IPAddress
parameter allows the listener to listen on all available IP addresses
on the machine, which is especially handy on machines that have multiple
active network connections. You then bind the socket to the endpoint
and start listening by calling Socket.Listen(). The parameter to Listen()
specifies the size of the backlog of incoming connections that the
runtime maintains for you while you process them one at a time. Finally,
you call AcceptIncoming().
The AcceptIncoming() method uses Socket.AcceptAsync() on the listener socket to asynchronously accept an incoming connection. In the Completed handler of SocketAsyncEventArgs, the connected client socket is available in the SocketAsyncEventArgs.AcceptSocket property. You pass this socket on to an instance of the ServerConnectionManager type through its Manage() method. You then continue to accept more incoming connections.
The ServerConnectionManager type is used to manage all connected client sockets. You also define a Participant type to represent a specific connected client and its communications. Listing 6 shows the code for these two classes.
Listing 6. Implementation for ServerConnectionManager and participant types in MessageProcessing.cs
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Sockets;
using System.Threading;
namespace Recipe7_5.ChatBroker
{
internal class ServerConnectionManager
{
//list of participants
private List<Participant> _Participants = new List<Participant>();
internal List<Participant> Participants
{
get { return _Participants; }
}
//accept and manage a client socket
internal void Manage(Socket socket)
{
//create a new Participant around the client socket
Participant p = new Participant { ClientSocket = socket, Parent = this };
//add it to the list
_Participants.Add(p);
//start up the participant
p.StartUp();
}
//broadcast a message from a participant to all other participants
internal void Broadcast(string From, MessageWrapper Message)
{
//get a list of all participants other than the one sending the message
List<Participant> targets = (from p in Participants
where p.Name != From
select p).ToList();
//iterate and add to the Send queue for each
foreach (Participant p in targets)
{
lock (p.QueueSyncRoot)
{
p.SendQueue.Enqueue(Message);
}
}
}
//send a message to a specific participant
internal void Send(string To, MessageWrapper Message)
{
//get the Participant from the list
Participant target = (from p in Participants
where p.Name == To
select p).ToList()[0];
//add to the send queue for the participant
lock (target.QueueSyncRoot)
{
target.SendQueue.Enqueue(Message);
}
}
}
internal class Participant
{
//lock target
internal object QueueSyncRoot = new object();
//name as specified at the client
internal string Name { get; set; }
//the connected client socket
internal Socket ClientSocket { get; set; }
//a reference back to the ServerConnectionManager instance
internal ServerConnectionManager Parent { get; set; }
//are we currently receiving a message from this participant?
bool Receiving = false;
//are we currently sending a message to this participant?
bool Sending = false;
//a queue to hold messages being sent to this participant
private Queue<MessageWrapper> _SendQueue = new Queue<MessageWrapper>();
internal Queue<MessageWrapper> SendQueue
{
get { return _SendQueue; }
set { _SendQueue = value; }
}
//check to see if there are messages in the queue
private int HasMessage()
{
lock (QueueSyncRoot)
{
return SendQueue.Count;
}
}
//start the participant up
internal void StartUp()
{
//create the receiver thread
Thread thdParticipantReceiver = new Thread(new ThreadStart(
//thread start delegate
delegate
{
//loop while the socket is valid
while (ClientSocket != null)
{
//if there is no data available OR
//we are currently receiving, continue
if (ClientSocket.Available <= 0 || Receiving) continue;
//set receiving to true
Receiving = true;
//begin to receive the next message
ReceiveMessage();
}
}));
//set thread to background
thdParticipantReceiver.IsBackground = true;
//start receiver thread
thdParticipantReceiver.Start();
//create the sender thread
Thread thdParticipantSender = new Thread(new ThreadStart(
//thread start delegate
delegate
{
//loop while the socket is valid
while (ClientSocket != null)
{
//if there are no messages to be sent OR
//we are currently sending, continue
if (HasMessage() == 0 || Sending) continue;
//set sending to true
Sending = true;
//begin sending
SendMessage();
}
}));
//set thread to background
thdParticipantSender.IsBackground = true;
//start sender thread
thdParticipantSender.Start();
}
//receive a message
private void ReceiveMessage()
{
SocketAsyncEventArgs sockEvtArgs = new SocketAsyncEventArgs();
//allocate a buffer as large as the available data
sockEvtArgs.SetBuffer(
new byte[ClientSocket.Available], 0, ClientSocket.Available);
sockEvtArgs.Completed += new EventHandler<SocketAsyncEventArgs>(
//completion handler
delegate(object sender, SocketAsyncEventArgs e)
{
//process the message
ProcessMessage(e.Buffer);
//done receiving, thread loop will look for next
Receiving = false;
});
//start receiving
ClientSocket.ReceiveAsync(sockEvtArgs);
}
internal void ProcessMessage(byte[] Message)
{
//deserialize message
MessageWrapper mw = MessageWrapper.DeserializeMessage(Message);
//if text message
if (mw.Message is TextMessage)
{
//send it to the target participant
Parent.Send((mw.Message as TextMessage).To, mw);
}
//if it is a ConnectionDisconnectionRequest
else if (mw.Message is ConnectionDisconnectionRequest)
{
ConnectionDisconnectionRequest connDisconnReq =
mw.Message as ConnectionDisconnectionRequest;
//if connecting
if (connDisconnReq.Connect)
{
this.Name = connDisconnReq.From;
//broadcast to everyone else
Parent.Broadcast(this.Name, new MessageWrapper
{
Message = new ConnectionDisconnectionNotification
{
Participant = this.Name,
Connect = true
}
});
//send the list of all participants other than
//the one connecting to the connecting client
Parent.Send(this.Name, new MessageWrapper
{
Message = new ConnectionReply
{
Participants =
(from part in Parent.Participants
where part.Name != this.Name
select part.Name).ToList()
}
});
}
else //disconnecting
{
//remove from the participants list
Parent.Participants.Remove(this);
//close socket
this.ClientSocket.Close();
//reset
this.ClientSocket = null;
//broadcast to everyone else
Parent.Broadcast(this.Name, new MessageWrapper
{
Message = new ConnectionDisconnectionNotification
{
Participant = this.Name,
Connect = false
}
});
}
}
//chat end
else if (mw.Message is ChatEndNotification)
{
//send it to the other participant
Parent.Send((mw.Message as ChatEndNotification).To, mw);
}
}
//send a message
private void SendMessage()
{
MessageWrapper mw = null;
//dequeue a message from the send queue
lock (QueueSyncRoot)
{
mw = SendQueue.Dequeue();
}
SocketAsyncEventArgs sockEvtArgs =
new SocketAsyncEventArgs { UserToken = mw };
//serialize and pack into the send buffer
MemoryStream SerializedMessage =
MessageWrapper.SerializeMessage(mw);
sockEvtArgs.SetBuffer(
SerializedMessage.GetBuffer(), 0, (int)SerializedMessage.Length);
sockEvtArgs.Completed += new EventHandler<SocketAsyncEventArgs>(
//completion handler
delegate(object sender, SocketAsyncEventArgs e)
{
//not sending anymore
Sending = false;
});
//begin send
ClientSocket.SendAsync(sockEvtArgs);
}
}
}
|
An instance of a Participant is created and stored in a list when the ServerConnectionManager receives a connected client socket through the Manage() method. The Participant.Startup()
method starts two background threads—one each for receiving and sending
messages, each of which continue as long as the client socket for that Participant is valid.
The receive thread calls the ReceiveMessage() method, provided that there is data to be read (as determined by the Socket.Available property) and that the Receiving boolean flag is set to false. The flag is set to true prior to calling ReceiveMessage() and is reset after ReceiveMessage() returns so that the socket is always ready to receive the next message as soon as it arrives.
ReceiveMessage() uses the ProcessMessage() method to process and act on a received message. ProcessMessage()
is structured similarly to the one in the Silverlight client in that it
deserializes a message and looks at the type of the contained Body property to determine the course of action. For messages that are intended to be delivered to other participants, ProcessMessage delivers it to that participant either through ServerConnectionManager.Broadcast(), which delivers a message to all participants except for the one sending it, or by ServerConnectionManager.Send(),
which delivers it to a single targeted participant. Delivery of a
message in this case is achieved by adding the message to a send queue
of type Queue<MessageWrapper> defined in each participant.
The send thread continously checks the Sending flag (used similarly to the Receiving flag) and the presence of messages in the queue of the owning participant using Participant.HasMessage(). When a message is found, SendMessage() is called, which then serializes the message and sends it out through the participant's socket.