Mega Code Archive

 
Categories / C# / Network
 

Server Pool

//Copyright (c) 2007-2008 Henrik Schröder, Oliver Kofoed Pedersen //Permission is hereby granted, free of charge, to any person //obtaining a copy of this software and associated documentation //files (the "Software"), to deal in the Software without //restriction, including without limitation the rights to use, //copy, modify, merge, publish, distribute, sublicense, and/or sell //copies of the Software, and to permit persons to whom the //Software is furnished to do so, subject to the following //conditions: //The above copyright notice and this permission notice shall be //included in all copies or substantial portions of the Software. //THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, //EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES //OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND //NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT //HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, //WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING //FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR //OTHER DEALINGS IN THE SOFTWARE. using System.Net.Sockets; using System.Net; using System.IO; using System; using System.Threading; using System.Collections.Generic; using System.Text; namespace Apollo.Common.Cache {     /// <summary>     /// The PooledSocket class encapsulates a socket connection to a specified memcached server.     /// It contains a buffered stream for communication, and methods for sending and retrieving     /// data from the memcached server, as well as general memcached error checking.     /// </summary>     internal class PooledSocket : IDisposable     {         private SocketPool socketPool;         private Socket socket;         private Stream stream;         public readonly DateTime Created;         public PooledSocket(SocketPool socketPool, IPEndPoint endPoint, int sendReceiveTimeout)         {             this.socketPool = socketPool;             Created = DateTime.Now;             //Set up the socket.             socket = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);             socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout, sendReceiveTimeout);             socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, sendReceiveTimeout);             socket.ReceiveTimeout = sendReceiveTimeout;             socket.SendTimeout = sendReceiveTimeout;             //Do not use Nagle's Algorithm             socket.NoDelay = true;             //Establish connection             socket.Connect(endPoint);             //Wraps two layers of streams around the socket for communication.             stream = new BufferedStream(new NetworkStream(socket, false));         }         /// <summary>         /// Disposing of a PooledSocket object in any way causes it to be returned to its SocketPool.         /// </summary>         public void Dispose()         {             socketPool.Return(this);         }         /// <summary>         /// This method closes the underlying stream and socket.         /// </summary>         public void Close()         {             if (stream != null)             {                 try { stream.Close(); }                 catch (Exception e)                 {                     Console.WriteLine("Error closing stream: " + socketPool.Host);                 }                 stream = null;             }             if (socket != null)             {                 try { socket.Shutdown(SocketShutdown.Both); }                 catch (Exception e)                 {                     Console.WriteLine("Error shutting down socket: " + socketPool.Host);                 }                 try { socket.Close(); }                 catch (Exception e)                 {                     Console.WriteLine("Error closing socket: " + socketPool.Host);                 }                 socket = null;             }         }         /// <summary>         /// Checks if the underlying socket and stream is connected and available.         /// </summary>         public bool IsAlive         {             get { return socket != null && socket.Connected && stream.CanRead; }         }         /// <summary>         /// Writes a string to the socket encoded in UTF8 format.         /// </summary>         public void Write(string str)         {             Write(Encoding.UTF8.GetBytes(str));         }         /// <summary>         /// Writes an array of bytes to the socket and flushes the stream.         /// </summary>         public void Write(byte[] bytes)         {             stream.Write(bytes, 0, bytes.Length);             stream.Flush();         }         /// <summary>         /// Reads from the socket until the sequence '\r\n' is encountered,          /// and returns everything up to but not including that sequence as a UTF8-encoded string         /// </summary>         public string ReadLine()         {             MemoryStream buffer = new MemoryStream();             int b;             bool gotReturn = false;             while ((b = stream.ReadByte()) != -1)             {                 if (gotReturn)                 {                     if (b == 10)                     {                         break;                     }                     else                     {                         buffer.WriteByte(13);                         gotReturn = false;                     }                 }                 if (b == 13)                 {                     gotReturn = true;                 }                 else                 {                     buffer.WriteByte((byte)b);                 }             }             return Encoding.UTF8.GetString(buffer.GetBuffer());         }         /// <summary>         /// Reads a response line from the socket, checks for general memcached errors, and returns the line.         /// If an error is encountered, this method will throw an exception.         /// </summary>         public string ReadResponse()         {             string response = ReadLine();             if (String.IsNullOrEmpty(response))             {                 throw new Exception("Received empty response.");             }             if (response.StartsWith("ERROR")                 || response.StartsWith("CLIENT_ERROR")                 || response.StartsWith("SERVER_ERROR"))             {                 throw new Exception("Server returned " + response);             }             return response;         }         /// <summary>         /// Fills the given byte array with data from the socket.         /// </summary>         public void Read(byte[] bytes)         {             if (bytes == null)             {                 return;             }             int readBytes = 0;             while (readBytes < bytes.Length)             {                 readBytes += stream.Read(bytes, readBytes, (bytes.Length - readBytes));             }         }         /// <summary>         /// Reads from the socket until the sequence '\r\n' is encountered.         /// </summary>         public void SkipUntilEndOfLine()         {             int b;             bool gotReturn = false;             while ((b = stream.ReadByte()) != -1)             {                 if (gotReturn)                 {                     if (b == 10)                     {                         break;                     }                     else                     {                         gotReturn = false;                     }                 }                 if (b == 13)                 {                     gotReturn = true;                 }             }         }         /// <summary>         /// Resets this PooledSocket by making sure the incoming buffer of the socket is empty.         /// If there was any leftover data, this method return true.         /// </summary>         public bool Reset()         {             if (socket.Available > 0)             {                 byte[] b = new byte[socket.Available];                 Read(b);                 return true;             }             return false;         }     }     internal delegate T UseSocket<T>(PooledSocket socket);     internal delegate void UseSocket(PooledSocket socket);     /// <summary>     /// The ServerPool encapsulates a collection of memcached servers and the associated SocketPool objects.     /// This class contains the server-selection logic, and contains methods for executing a block of code on      /// a socket from the server corresponding to a given key.     /// </summary>     internal class ServerPool     {         //Expose the socket pools.         private SocketPool[] hostList;         internal SocketPool[] HostList { get { return hostList; } }         private Dictionary<uint, SocketPool> hostDictionary;         private uint[] hostKeys;         //Internal configuration properties         private int sendReceiveTimeout = 2000;         private uint maxPoolSize = 10;         private uint minPoolSize = 5;         private TimeSpan socketRecycleAge = TimeSpan.FromMinutes(30);         internal int SendReceiveTimeout { get { return sendReceiveTimeout; } set { sendReceiveTimeout = value; } }         internal uint MaxPoolSize { get { return maxPoolSize; } set { maxPoolSize = value; } }         internal uint MinPoolSize { get { return minPoolSize; } set { minPoolSize = value; } }         internal TimeSpan SocketRecycleAge { get { return socketRecycleAge; } set { socketRecycleAge = value; } }         /// <summary>         /// Internal constructor. This method takes the array of hosts and sets up an internal list of socketpools.         /// </summary>         internal ServerPool(string[] hosts)         {             hostDictionary = new Dictionary<uint, SocketPool>();             List<SocketPool> pools = new List<SocketPool>();             List<uint> keys = new List<uint>();             foreach (string host in hosts)             {                 //Create pool                 SocketPool pool = new SocketPool(this, host.Trim());                 //Create 250 keys for this pool, store each key in the hostDictionary, as well as in the list of keys.                 for (int i = 0; i < 250; i++)                 {                     uint key = (uint)i;                     if (!hostDictionary.ContainsKey(key))                     {                         hostDictionary[key] = pool;                         keys.Add(key);                     }                 }                 pools.Add(pool);             }             //Hostlist should contain the list of all pools that has been created.             hostList = pools.ToArray();             //Hostkeys should contain the list of all key for all pools that have been created.             //This array forms the server key continuum that we use to lookup which server a             //given item key hash should be assigned to.             keys.Sort();             hostKeys = keys.ToArray();         }         /// <summary>         /// Given an item key hash, this method returns the serverpool which is closest on the server key continuum.         /// </summary>         internal SocketPool GetSocketPool(uint hash)         {             //Quick return if we only have one host.             if (hostList.Length == 1)             {                 return hostList[0];             }             //New "ketama" host selection.             int i = Array.BinarySearch(hostKeys, hash);             //If not exact match...             if (i < 0)             {                 //Get the index of the first item bigger than the one searched for.                 i = ~i;                 //If i is bigger than the last index, it was bigger than the last item = use the first item.                 if (i >= hostKeys.Length)                 {                     i = 0;                 }             }             return hostDictionary[hostKeys[i]];         }         internal SocketPool GetSocketPool(string host)         {             return Array.Find(HostList, delegate(SocketPool socketPool) { return socketPool.Host == host; });         }         /// <summary>         /// This method executes the given delegate on a socket from the server that corresponds to the given hash.         /// If anything causes an error, the given defaultValue will be returned instead.         /// This method takes care of disposing the socket properly once the delegate has executed.         /// </summary>         internal T Execute<T>(uint hash, T defaultValue, UseSocket<T> use)         {             return Execute(GetSocketPool(hash), defaultValue, use);         }         internal T Execute<T>(SocketPool pool, T defaultValue, UseSocket<T> use)         {             PooledSocket sock = null;             try             {                 //Acquire a socket                 sock = pool.Acquire();                 //Use the socket as a parameter to the delegate and return its result.                 if (sock != null)                 {                     return use(sock);                 }             }             catch (Exception e)             {                 Console.WriteLine("Error in Execute<T>: " + pool.Host);                 //Socket is probably broken                 if (sock != null)                 {                     sock.Close();                 }             }             finally             {                 if (sock != null)                 {                     sock.Dispose();                 }             }             return defaultValue;         }         internal void Execute(SocketPool pool, UseSocket use)         {             PooledSocket sock = null;             try             {                 //Acquire a socket                 sock = pool.Acquire();                 //Use the socket as a parameter to the delegate and return its result.                 if (sock != null)                 {                     use(sock);                 }             }             catch (Exception e)             {                 Console.WriteLine("Error in Execute: " + pool.Host);                 //Socket is probably broken                 if (sock != null)                 {                     sock.Close();                 }             }             finally             {                 if (sock != null)                 {                     sock.Dispose();                 }             }         }         /// <summary>         /// This method executes the given delegate on all servers.         /// </summary>         internal void ExecuteAll(UseSocket use)         {             foreach (SocketPool socketPool in hostList)             {                 Execute(socketPool, use);             }         }     }     /// <summary>     /// The SocketPool encapsulates the list of PooledSockets against one specific host, and contains methods for      /// acquiring or returning PooledSockets.     /// </summary>     internal class SocketPool     {         /// <summary>         /// If the host stops responding, we mark it as dead for this amount of seconds,          /// and we double this for each consecutive failed retry. If the host comes alive         /// again, we reset this to 1 again.         /// </summary>         private int deadEndPointSecondsUntilRetry = 1;         private const int maxDeadEndPointSecondsUntilRetry = 60 * 10; //10 minutes         private ServerPool owner;         private IPEndPoint endPoint;         private Queue<PooledSocket> queue;         //Debug variables and properties         private int newsockets = 0;         private int failednewsockets = 0;         private int reusedsockets = 0;         private int deadsocketsinpool = 0;         private int deadsocketsonreturn = 0;         private int dirtysocketsonreturn = 0;         private int acquired = 0;         public int NewSockets { get { return newsockets; } }         public int FailedNewSockets { get { return failednewsockets; } }         public int ReusedSockets { get { return reusedsockets; } }         public int DeadSocketsInPool { get { return deadsocketsinpool; } }         public int DeadSocketsOnReturn { get { return deadsocketsonreturn; } }         public int DirtySocketsOnReturn { get { return dirtysocketsonreturn; } }         public int Acquired { get { return acquired; } }         public int Poolsize { get { return queue.Count; } }         //Public variables and properties         public readonly string Host;         private bool isEndPointDead = false;         public bool IsEndPointDead { get { return isEndPointDead; } }         private DateTime deadEndPointRetryTime;         public DateTime DeadEndPointRetryTime { get { return deadEndPointRetryTime; } }         internal SocketPool(ServerPool owner, string host)         {             Host = host;             this.owner = owner;             endPoint = getEndPoint(host);             queue = new Queue<PooledSocket>();         }         /// <summary>         /// This method parses the given string into an IPEndPoint.         /// If the string is malformed in some way, or if the host cannot be resolved, this method will throw an exception.         /// </summary>         private static IPEndPoint getEndPoint(string host)         {             //Parse port, default to 11211.             int port = 11211;             if (host.Contains(":"))             {                 string[] split = host.Split(new char[] { ':' });                 if (!Int32.TryParse(split[1], out port))                 {                     throw new ArgumentException("Unable to parse host: " + host);                 }                 host = split[0];             }             //Parse host string.             IPAddress address;             if (IPAddress.TryParse(host, out address))             {                 //host string successfully resolved as an IP address.             }             else             {                 //See if we can resolve it as a hostname                 try                 {                     address = Dns.GetHostEntry(host).AddressList[0];                 }                 catch (Exception e)                 {                     Console.WriteLine("Unable to resolve host: " + host);                     return null;                 }             }             return new IPEndPoint(address, port);         }         /// <summary>         /// Gets a socket from the pool.         /// If there are no free sockets, a new one will be created. If something goes         /// wrong while creating the new socket, this pool's endpoint will be marked as dead         /// and all subsequent calls to this method will return null until the retry interval         /// has passed.         /// </summary>         internal PooledSocket Acquire()         {             //Do we have free sockets in the pool?             //if so - return the first working one.             //if not - create a new one.             Interlocked.Increment(ref acquired);             lock (queue)             {                 while (queue.Count > 0)                 {                     PooledSocket socket = queue.Dequeue();                     if (socket != null && socket.IsAlive)                     {                         Interlocked.Increment(ref reusedsockets);                         return socket;                     }                     Interlocked.Increment(ref deadsocketsinpool);                 }             }             Interlocked.Increment(ref newsockets);             //If we know the endpoint is dead, check if it is time for a retry, otherwise return null.             if (isEndPointDead)             {                 if (DateTime.Now > deadEndPointRetryTime)                 {                     //Retry                     isEndPointDead = false;                 }                 else                 {                     //Still dead                     return null;                 }             }             //Try to create a new socket. On failure, mark endpoint as dead and return null.             try             {                 PooledSocket socket = new PooledSocket(this, endPoint, owner.SendReceiveTimeout);                 //Reset retry timer on success.                 deadEndPointSecondsUntilRetry = 1;                 return socket;             }             catch (Exception e)             {                 Interlocked.Increment(ref failednewsockets);                 Console.WriteLine("Error connecting to: " + endPoint.Address);                 //Mark endpoint as dead                 isEndPointDead = true;                 //Retry in 2 minutes                 deadEndPointRetryTime = DateTime.Now.AddSeconds(deadEndPointSecondsUntilRetry);                 if (deadEndPointSecondsUntilRetry < maxDeadEndPointSecondsUntilRetry)                 {                     deadEndPointSecondsUntilRetry = deadEndPointSecondsUntilRetry * 2; //Double retry interval until next time                 }                 return null;             }         }         /// <summary>         /// Returns a socket to the pool.         /// If the socket is dead, it will be destroyed.         /// If there are more than MaxPoolSize sockets in the pool, it will be destroyed.         /// If there are less than MinPoolSize sockets in the pool, it will always be put back.         /// If there are something inbetween those values, the age of the socket is checked.          /// If it is older than the SocketRrecycleAge, it is destroyed, otherwise it will be          /// put back in the pool.         /// </summary>         internal void Return(PooledSocket socket)         {             //If the socket is dead, destroy it.             if (!socket.IsAlive)             {                 Interlocked.Increment(ref deadsocketsonreturn);                 socket.Close();             }             else             {                 //Clean up socket                 if (socket.Reset())                 {                     Interlocked.Increment(ref dirtysocketsonreturn);                 }                 //Check pool size.                 if (queue.Count >= owner.MaxPoolSize)                 {                     //If the pool is full, destroy the socket.                     socket.Close();                 }                 else if (queue.Count > owner.MinPoolSize && DateTime.Now - socket.Created > owner.SocketRecycleAge)                 {                     //If we have more than the minimum amount of sockets, but less than the max, and the socket is older than the recycle age, we destroy it.                     socket.Close();                 }                 else                 {                     //Put the socket back in the pool.                     lock (queue)                     {                         queue.Enqueue(socket);                     }                 }             }         }     } }