- * @version 2.0
- */
-package com.alisoft.xplatform.asf.cache.memcached.client;
-
-
-import java.io.*;
-
-public class ContextObjectInputStream extends ObjectInputStream {
-
- ClassLoader mLoader;
-
- public ContextObjectInputStream( InputStream in, ClassLoader loader ) throws IOException, SecurityException {
- super( in );
- mLoader = loader;
- }
-
- @SuppressWarnings("unchecked")
- protected Class resolveClass( ObjectStreamClass v ) throws IOException, ClassNotFoundException {
- if ( mLoader == null )
- return super.resolveClass( v );
- else
- return Class.forName( v.getName(), true, mLoader );
- }
-}
diff --git a/memcache-client-forjava/src/java/com/alisoft/xplatform/asf/cache/memcached/client/ErrorHandler.java b/memcache-client-forjava/src/java/com/alisoft/xplatform/asf/cache/memcached/client/ErrorHandler.java
deleted file mode 100644
index 7762bf1..0000000
--- a/memcache-client-forjava/src/java/com/alisoft/xplatform/asf/cache/memcached/client/ErrorHandler.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package com.alisoft.xplatform.asf.cache.memcached.client;
-
-
-public interface ErrorHandler
-{
- /**
- * Called for errors thrown during initialization.
- */
- public void handleErrorOnInit( final MemCachedClient client ,
- final Throwable error );
-
- /**
- * Called for errors thrown during {@link MemCachedClient#get(String)} and related methods.
- */
- public void handleErrorOnGet( final MemCachedClient client ,
- final Throwable error ,
- final String cacheKey );
-
- /**
- * Called for errors thrown during {@link MemCachedClient#getMulti(String)} and related methods.
- */
- public void handleErrorOnGet( final MemCachedClient client ,
- final Throwable error ,
- final String[] cacheKeys );
-
- /**
- * Called for errors thrown during {@link MemCachedClient#set(String,Object)} and related methods.
- */
- public void handleErrorOnSet( final MemCachedClient client ,
- final Throwable error ,
- final String cacheKey );
-
- /**
- * Called for errors thrown during {@link MemCachedClient#delete(String)} and related methods.
- */
- public void handleErrorOnDelete( final MemCachedClient client ,
- final Throwable error ,
- final String cacheKey );
-
- /**
- * Called for errors thrown during {@link MemCachedClient#flushAll()} and related methods.
- */
- public void handleErrorOnFlush( final MemCachedClient client ,
- final Throwable error );
-
- /**
- * Called for errors thrown during {@link MemCachedClient#stats()} and related methods.
- */
- public void handleErrorOnStats( final MemCachedClient client ,
- final Throwable error );
-
-}
diff --git a/memcache-client-forjava/src/java/com/alisoft/xplatform/asf/cache/memcached/client/LineInputStream.java b/memcache-client-forjava/src/java/com/alisoft/xplatform/asf/cache/memcached/client/LineInputStream.java
deleted file mode 100644
index e24eaea..0000000
--- a/memcache-client-forjava/src/java/com/alisoft/xplatform/asf/cache/memcached/client/LineInputStream.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package com.alisoft.xplatform.asf.cache.memcached.client;
-
-import java.io.IOException;
-
-public interface LineInputStream
-{
- /**
- * Read everything up to the next end-of-line. Does
- * not include the end of line, though it is consumed
- * from the input.
- * @return All next up to the next end of line.
- */
- public String readLine() throws IOException;
-
- /**
- * Read everything up to and including the end of line.
- */
- public void clearEOL() throws IOException;
-
- /**
- * Read some bytes.
- * @param buf The buffer into which read.
- * @return The number of bytes actually read, or -1 if none could be read.
- */
- public int read( byte[] buf ) throws IOException;
-
-}
diff --git a/memcache-client-forjava/src/java/com/alisoft/xplatform/asf/cache/memcached/client/MemCachedClient.java b/memcache-client-forjava/src/java/com/alisoft/xplatform/asf/cache/memcached/client/MemCachedClient.java
deleted file mode 100644
index d411a83..0000000
--- a/memcache-client-forjava/src/java/com/alisoft/xplatform/asf/cache/memcached/client/MemCachedClient.java
+++ /dev/null
@@ -1,2503 +0,0 @@
-/**
- *
- */
-package com.alisoft.xplatform.asf.cache.memcached.client;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.zip.*;
-import java.nio.*;
-import java.nio.channels.*;
-import java.io.*;
-import java.net.URLEncoder;
-import org.apache.log4j.Logger;
-import com.alisoft.xplatform.asf.cache.memcached.MemcachedException;
-
-
-
-
-
-
-/**
- * This is a Java client for the memcached server available from
- * http://www.danga.com/memcached/.
- *
- * Supports setting, adding, replacing, deleting compressed/uncompressed and
- * serialized (can be stored as string if object is native class) objects to memcached.
- *
- * Now pulls SockIO objects from SockIOPool, which is a connection pool. The server failover
- * has also been moved into the SockIOPool class.
- * This pool needs to be initialized prior to the client working. See javadocs from SockIOPool.
- *
- * Some examples of use follow.
- * To create cache client object and set params:
- *
- * MemCachedClient mc = new MemCachedClient();
- *
- * // compression is enabled by default
- * mc.setCompressEnable(true);
- *
- * // set compression threshhold to 4 KB (default: 15 KB)
- * mc.setCompressThreshold(4096);
- *
- * // turn on storing primitive types as a string representation
- * // Should not do this in most cases.
- * mc.setPrimitiveAsString(true);
- *
- * To store an object:
- *
- * MemCachedClient mc = new MemCachedClient();
- * String key = "cacheKey1";
- * Object value = SomeClass.getObject();
- * mc.set(key, value);
- *
- * To store an object using a custom server hashCode:
- *
- * MemCachedClient mc = new MemCachedClient();
- * String key = "cacheKey1";
- * Object value = SomeClass.getObject();
- * Integer hash = new Integer(45);
- * mc.set(key, value, hash);
- *
- * The set method shown above will always set the object in the cache.
- * The add and replace methods do the same, but with a slight difference.
- *
- * - add -- will store the object only if the server does not have an entry for this key
- * - replace -- will store the object only if the server already has an entry for this key
- *
- * To delete a cache entry:
- *
- * MemCachedClient mc = new MemCachedClient();
- * String key = "cacheKey1";
- * mc.delete(key);
- *
- * To delete a cache entry using a custom hash code:
- *
- * MemCachedClient mc = new MemCachedClient();
- * String key = "cacheKey1";
- * Integer hash = new Integer(45);
- * mc.delete(key, hashCode);
- *
- * To store a counter and then increment or decrement that counter:
- *
- * MemCachedClient mc = new MemCachedClient();
- * String key = "counterKey";
- * mc.storeCounter(key, new Integer(100));
- * System.out.println("counter after adding 1: " mc.incr(key));
- * System.out.println("counter after adding 5: " mc.incr(key, 5));
- * System.out.println("counter after subtracting 4: " mc.decr(key, 4));
- * System.out.println("counter after subtracting 1: " mc.decr(key));
- *
- * To store a counter and then increment or decrement that counter with custom hash:
- *
- * MemCachedClient mc = new MemCachedClient();
- * String key = "counterKey";
- * Integer hash = new Integer(45);
- * mc.storeCounter(key, new Integer(100), hash);
- * System.out.println("counter after adding 1: " mc.incr(key, 1, hash));
- * System.out.println("counter after adding 5: " mc.incr(key, 5, hash));
- * System.out.println("counter after subtracting 4: " mc.decr(key, 4, hash));
- * System.out.println("counter after subtracting 1: " mc.decr(key, 1, hash));
- *
- * To retrieve an object from the cache:
- *
- * MemCachedClient mc = new MemCachedClient();
- * String key = "key";
- * Object value = mc.get(key);
- *
- * To retrieve an object from the cache with custom hash:
- *
- * MemCachedClient mc = new MemCachedClient();
- * String key = "key";
- * Integer hash = new Integer(45);
- * Object value = mc.get(key, hash);
- *
- * To retrieve an multiple objects from the cache
- *
- * MemCachedClient mc = new MemCachedClient();
- * String[] keys = { "key", "key1", "key2" };
- * Map<Object> values = mc.getMulti(keys);
- *
- * To retrieve an multiple objects from the cache with custom hashing
- *
- * MemCachedClient mc = new MemCachedClient();
- * String[] keys = { "key", "key1", "key2" };
- * Integer[] hashes = { new Integer(45), new Integer(32), new Integer(44) };
- * Map<Object> values = mc.getMulti(keys, hashes);
- *
- * To flush all items in server(s)
- *
- * MemCachedClient mc = new MemCachedClient();
- * mc.flushAll();
- *
- * To get stats from server(s)
- *
- * MemCachedClient mc = new MemCachedClient();
- * Map stats = mc.stats();
- *
- */
-public class MemCachedClient {
-
- // logger
- private static Logger log =
- Logger.getLogger( MemCachedClient.class.getName() );
-
- // return codes
- private static final String VALUE = "VALUE"; // start of value line from server
- private static final String STATS = "STAT"; // start of stats line from server
- private static final String ITEM = "ITEM"; // start of item line from server
- private static final String DELETED = "DELETED"; // successful deletion
- private static final String NOTFOUND = "NOT_FOUND"; // record not found for delete or incr/decr
- private static final String STORED = "STORED"; // successful store of data
- private static final String NOTSTORED = "NOT_STORED"; // data not stored
- private static final String OK = "OK"; // success
- private static final String END = "END"; // end of data from server
-
- private static final String ERROR = "ERROR"; // invalid command name from client
- private static final String CLIENT_ERROR = "CLIENT_ERROR"; // client error in input line - invalid protocol
- private static final String SERVER_ERROR = "SERVER_ERROR"; // server error
-
- private static final byte[] B_END = "END\r\n".getBytes();
- @SuppressWarnings("unused")
- private static final byte[] B_NOTFOUND = "NOT_FOUND\r\n".getBytes();
- @SuppressWarnings("unused")
- private static final byte[] B_DELETED = "DELETED\r\r".getBytes();
- @SuppressWarnings("unused")
- private static final byte[] B_STORED = "STORED\r\r".getBytes();
-
- private static final byte[] B_RETURN = "\r\n".getBytes();
-
- // default compression threshold
- private static final int COMPRESS_THRESH = 30720;
-
- // values for cache flags
- public static final int MARKER_BYTE = 1;
- public static final int MARKER_BOOLEAN = 8192;
- public static final int MARKER_INTEGER = 4;
- public static final int MARKER_LONG = 16384;
- public static final int MARKER_CHARACTER = 16;
- public static final int MARKER_STRING = 32;
- public static final int MARKER_STRINGBUFFER = 64;
- public static final int MARKER_FLOAT = 128;
- public static final int MARKER_SHORT = 256;
- public static final int MARKER_DOUBLE = 512;
- public static final int MARKER_DATE = 1024;
- public static final int MARKER_STRINGBUILDER = 2048;
- public static final int MARKER_BYTEARR = 4096;
- public static final int F_COMPRESSED = 2;
- public static final int F_SERIALIZED = 8;
-
-
- // flags
- private boolean sanitizeKeys;
- private boolean primitiveAsString;
- private boolean compressEnable;
- private long compressThreshold;
- private String defaultEncoding;
-
- // pool instance
- private SockIOPool pool;
-
- // which pool to use
- private String poolName;
-
- // optional passed in classloader
- private ClassLoader classLoader;
-
- // optional error handler
- private ErrorHandler errorHandler;
-
- /**
- * Чʻ汾Ϣ
- */
- private Map localCache;
-
-
-
- /**
- * Creates a new instance of MemCachedClient.
- */
- public MemCachedClient() {
- init();
- }
-
- /**
- * Creates a new instance of MemCachedClient
- * accepting a passed in pool name.
- *
- * @param poolName name of SockIOPool
- */
- public MemCachedClient( String poolName ) {
- this.poolName = poolName;
- init();
- }
-
- /**
- * Creates a new instance of MemCacheClient but
- * acceptes a passed in ClassLoader.
- *
- * @param classLoader ClassLoader object.
- */
- public MemCachedClient( ClassLoader classLoader ) {
- this.classLoader = classLoader;
- init();
- }
-
- /**
- * Creates a new instance of MemCacheClient but
- * acceptes a passed in ClassLoader and a passed
- * in ErrorHandler.
- *
- * @param classLoader ClassLoader object.
- * @param errorHandler ErrorHandler object.
- */
- public MemCachedClient( ClassLoader classLoader, ErrorHandler errorHandler ) {
- this.classLoader = classLoader;
- this.errorHandler = errorHandler;
- init();
- }
-
- /**
- * Creates a new instance of MemCacheClient but
- * acceptes a passed in ClassLoader, ErrorHandler,
- * and SockIOPool name.
- *
- * @param classLoader ClassLoader object.
- * @param errorHandler ErrorHandler object.
- * @param poolName SockIOPool name
- */
- public MemCachedClient( ClassLoader classLoader, ErrorHandler errorHandler, String poolName ) {
- this.classLoader = classLoader;
- this.errorHandler = errorHandler;
- this.poolName = poolName;
- init();
- }
-
- /**
- * Initializes client object to defaults.
- *
- * This enables compression and sets compression threshhold to 15 KB.
- */
- private void init() {
- this.sanitizeKeys = true;
- this.primitiveAsString = false;
- this.compressEnable = true;
- this.compressThreshold = COMPRESS_THRESH;
- this.defaultEncoding = "UTF-8";
- this.poolName = ( this.poolName == null ) ? "default" : this.poolName;
-
- localCache = new ConcurrentHashMap();
-
-
- // get a pool instance to work with for the life of this instance
- this.pool = SockIOPool.getInstance( poolName );
- }
-
- /**
- * Sets an optional ClassLoader to be used for
- * serialization.
- *
- * @param classLoader
- */
- public void setClassLoader( ClassLoader classLoader ) {
- this.classLoader = classLoader;
- }
-
- /**
- * Sets an optional ErrorHandler.
- *
- * @param errorHandler
- */
- public void setErrorHandler( ErrorHandler errorHandler ) {
- this.errorHandler = errorHandler;
- }
-
- /**
- * Enables/disables sanitizing keys by URLEncoding.
- *
- * @param sanitizeKeys if true, then URLEncode all keys
- */
- public void setSanitizeKeys( boolean sanitizeKeys ) {
- this.sanitizeKeys = sanitizeKeys;
- }
-
- /**
- * Enables storing primitive types as their String values.
- *
- * @param primitiveAsString if true, then store all primitives as their string value.
- */
- public void setPrimitiveAsString( boolean primitiveAsString ) {
- this.primitiveAsString = primitiveAsString;
- }
-
- /**
- * Sets default String encoding when storing primitives as Strings.
- * Default is UTF-8.
- *
- * @param defaultEncoding
- */
- public void setDefaultEncoding( String defaultEncoding ) {
- this.defaultEncoding = defaultEncoding;
- }
-
- /**
- * Enable storing compressed data, provided it meets the threshold requirements.
- *
- * If enabled, data will be stored in compressed form if it is
- * longer than the threshold length set with setCompressThreshold(int)
- *
- * The default is that compression is enabled.
- *
- * Even if compression is disabled, compressed data will be automatically
- * decompressed.
- *
- * @param compressEnable true
to enable compression, false
to disable compression
- */
- public void setCompressEnable( boolean compressEnable ) {
- this.compressEnable = compressEnable;
- }
-
- /**
- * Sets the required length for data to be considered for compression.
- *
- * If the length of the data to be stored is not equal or larger than this value, it will
- * not be compressed.
- *
- * This defaults to 15 KB.
- *
- * @param compressThreshold required length of data to consider compression
- */
- public void setCompressThreshold( long compressThreshold ) {
- this.compressThreshold = compressThreshold;
- }
-
- /**
- * Checks to see if key exists in cache.
- *
- * @param key the key to look for
- * @return true if key found in cache, false if not (or if cache is down)
- */
- public boolean keyExists( String key ) {
- return ( this.get( key, null, true ) != null );
- }
-
- /**
- * Deletes an object from cache given cache key.
- *
- * @param key the key to be removed
- * @return true
, if the data was deleted successfully
- */
- public boolean delete( String key ) {
- return delete( key, null, null );
- }
-
- /**
- * Deletes an object from cache given cache key and expiration date.
- *
- * @param key the key to be removed
- * @param expiry when to expire the record.
- * @return true
, if the data was deleted successfully
- */
- public boolean delete( String key, Date expiry ) {
- return delete( key, null, expiry );
- }
-
- /**
- * Deletes an object from cache given cache key, a delete time, and an optional hashcode.
- *
- * The item is immediately made non retrievable.
- * Keep in mind {@link #add(String, Object) add} and {@link #replace(String, Object) replace}
- * will fail when used with the same key will fail, until the server reaches the
- * specified time. However, {@link #set(String, Object) set} will succeed,
- * and the new value will not be deleted.
- *
- * @param key the key to be removed
- * @param hashCode if not null, then the int hashcode to use
- * @param expiry when to expire the record.
- * @return true
, if the data was deleted successfully
- */
- public boolean delete( String key, Integer hashCode, Date expiry ) {
-
- if ( key == null ) {
- log.error( "null value for key passed to delete()" );
- return false;
- }
-
- try {
- key = sanitizeKey( key );
- }
- catch ( UnsupportedEncodingException e ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnDelete( this, e, key );
-
- log.error( "failed to sanitize your key!", e );
- return false;
- }
-
- // get SockIO obj from hash or from key
- SockIOPool.SockIO sock = pool.getSock( key, hashCode );
-
- // return false if unable to get SockIO obj
- if ( sock == null ) {
- if ( errorHandler != null )
- errorHandler.handleErrorOnDelete( this, new IOException( "no socket to server available" ), key );
- return false;
- }
-
- // build command
- StringBuilder command = new StringBuilder( "delete " ).append( key );
- if ( expiry != null )
- command.append(" ").append(expiry.getTime() / 1000 );
-
- command.append( "\r\n" );
-
- try {
- sock.write( command.toString().getBytes() );
- sock.flush();
-
- // if we get appropriate response back, then we return true
- String line = sock.readLine();
- if ( DELETED.equals( line ) ) {
-
- if (log.isInfoEnabled())
- log.info( new StringBuilder().append("++++ deletion of key: ").append(key).append(" from cache was a success").toString() );
-
- // return sock to pool and bail here
- sock.close();
- sock = null;
- return true;
- }
- else if ( NOTFOUND.equals( line ) ) {
-
- if (log.isInfoEnabled())
- log.info( new StringBuilder().append("++++ deletion of key: ").append(key).append(" from cache failed as the key was not found").toString());
- }
- else {
- log.error( new StringBuilder().append("++++ error deleting key: ").append(key).toString());
- log.error( new StringBuilder().append("++++ server response: ").append(line).toString());
- }
- }
- catch ( IOException e ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnDelete( this, e, key );
-
- // exception thrown
- log.error( "++++ exception thrown while writing bytes to server on delete" );
- log.error( e.getMessage(), e );
-
- try {
- sock.trueClose();
- }
- catch ( IOException ioe ) {
- log.error( new StringBuilder().append("++++ failed to close socket : ").append(sock.toString()).toString() );
- }
-
- sock = null;
- }
-
- if ( sock != null ) {
- sock.close();
- sock = null;
- }
-
- return false;
- }
-
- /**
- * Stores data on the server; only the key and the value are specified.
- *
- * @param key key to store data under
- * @param value value to store
- * @return true, if the data was successfully stored
- */
- public boolean set( String key, Object value ) {
- return set( "set", key, value, null, null, primitiveAsString );
- }
-
- /**
- * Stores data on the server; only the key and the value are specified.
- *
- * @param key key to store data under
- * @param value value to store
- * @param hashCode if not null, then the int hashcode to use
- * @return true, if the data was successfully stored
- */
- public boolean set( String key, Object value, Integer hashCode ) {
- return set( "set", key, value, null, hashCode, primitiveAsString );
- }
-
- /**
- * Stores data on the server; the key, value, and an expiration time are specified.
- *
- * @param key key to store data under
- * @param value value to store
- * @param expiry when to expire the record
- * @return true, if the data was successfully stored
- */
- public boolean set( String key, Object value, Date expiry ) {
- return set( "set", key, value, expiry, null, primitiveAsString );
- }
-
- /**
- * Stores data on the server; the key, value, and an expiration time are specified.
- *
- * @param key key to store data under
- * @param value value to store
- * @param expiry when to expire the record
- * @param hashCode if not null, then the int hashcode to use
- * @return true, if the data was successfully stored
- */
- public boolean set( String key, Object value, Date expiry, Integer hashCode ) {
- return set( "set", key, value, expiry, hashCode, primitiveAsString );
- }
-
- /**
- * Adds data to the server; only the key and the value are specified.
- *
- * @param key key to store data under
- * @param value value to store
- * @return true, if the data was successfully stored
- */
- public boolean add( String key, Object value ) {
- return set( "add", key, value, null, null, primitiveAsString );
- }
-
- /**
- * Adds data to the server; the key, value, and an optional hashcode are passed in.
- *
- * @param key key to store data under
- * @param value value to store
- * @param hashCode if not null, then the int hashcode to use
- * @return true, if the data was successfully stored
- */
- public boolean add( String key, Object value, Integer hashCode ) {
- return set( "add", key, value, null, hashCode, primitiveAsString );
- }
-
- /**
- * Adds data to the server; the key, value, and an expiration time are specified.
- *
- * @param key key to store data under
- * @param value value to store
- * @param expiry when to expire the record
- * @return true, if the data was successfully stored
- */
- public boolean add( String key, Object value, Date expiry ) {
- return set( "add", key, value, expiry, null, primitiveAsString );
- }
-
- /**
- * Adds data to the server; the key, value, and an expiration time are specified.
- *
- * @param key key to store data under
- * @param value value to store
- * @param expiry when to expire the record
- * @param hashCode if not null, then the int hashcode to use
- * @return true, if the data was successfully stored
- */
- public boolean add( String key, Object value, Date expiry, Integer hashCode ) {
- return set( "add", key, value, expiry, hashCode, primitiveAsString );
- }
-
- /**
- * Updates data on the server; only the key and the value are specified.
- *
- * @param key key to store data under
- * @param value value to store
- * @return true, if the data was successfully stored
- */
- public boolean replace( String key, Object value ) {
- return set( "replace", key, value, null, null, primitiveAsString );
- }
-
- /**
- * Updates data on the server; only the key and the value and an optional hash are specified.
- *
- * @param key key to store data under
- * @param value value to store
- * @param hashCode if not null, then the int hashcode to use
- * @return true, if the data was successfully stored
- */
- public boolean replace( String key, Object value, Integer hashCode ) {
- return set( "replace", key, value, null, hashCode, primitiveAsString );
- }
-
- /**
- * Updates data on the server; the key, value, and an expiration time are specified.
- *
- * @param key key to store data under
- * @param value value to store
- * @param expiry when to expire the record
- * @return true, if the data was successfully stored
- */
- public boolean replace( String key, Object value, Date expiry ) {
- return set( "replace", key, value, expiry, null, primitiveAsString );
- }
-
- /**
- * Updates data on the server; the key, value, and an expiration time are specified.
- *
- * @param key key to store data under
- * @param value value to store
- * @param expiry when to expire the record
- * @param hashCode if not null, then the int hashcode to use
- * @return true, if the data was successfully stored
- */
- public boolean replace( String key, Object value, Date expiry, Integer hashCode ) {
- return set( "replace", key, value, expiry, hashCode, primitiveAsString );
- }
-
- /**
- * Stores data to cache.
- *
- * If data does not already exist for this key on the server, or if the key is being
- * deleted, the specified value will not be stored.
- * The server will automatically delete the value when the expiration time has been reached.
- *
- * If compression is enabled, and the data is longer than the compression threshold
- * the data will be stored in compressed form.
- *
- * As of the current release, all objects stored will use java serialization.
- *
- * @param cmdname action to take (set, add, replace)
- * @param key key to store cache under
- * @param value object to cache
- * @param expiry expiration
- * @param hashCode if not null, then the int hashcode to use
- * @param asString store this object as a string?
- * @return true/false indicating success
- */
- private boolean set( String cmdname, String key, Object value, Date expiry, Integer hashCode, boolean asString ) {
-
- if ( cmdname == null || cmdname.trim().equals( "" ) || key == null ) {
- log.error( "key is null or cmd is null/empty for set()" );
- return false;
- }
-
- try {
- key = sanitizeKey( key );
- }
- catch ( UnsupportedEncodingException e ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnSet( this, e, key );
-
- log.error( "failed to sanitize your key!", e );
- return false;
- }
-
- if ( value == null ) {
- log.error( "trying to store a null value to cache" );
- return false;
- }
-
- // get SockIO obj
- SockIOPool.SockIO sock = pool.getSock( key, hashCode );
-
- if ( sock == null ) {
- if ( errorHandler != null )
- errorHandler.handleErrorOnSet( this, new IOException( "no socket to server available" ), key );
- return false;
- }
-
- if ( expiry == null )
- expiry = new Date(0);
-
- // store flags
- int flags = 0;
-
- // byte array to hold data
- byte[] val;
-
- if ( NativeHandler.isHandled( value ) ) {
-
- if ( asString ) {
- // useful for sharing data between java and non-java
- // and also for storing ints for the increment method
- try {
- if (log.isInfoEnabled())
- log.info( new StringBuilder().append("++++ storing data as a string for key: ")
- .append(key).append(" for class: ").append(value.getClass().getName()).toString() );
-
- val = value.toString().getBytes( defaultEncoding );
- }
- catch ( UnsupportedEncodingException ue ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnSet( this, ue, key );
-
- log.error( new StringBuilder().append("invalid encoding type used: ").append(defaultEncoding).toString(), ue );
- sock.close();
- sock = null;
- return false;
- }
- }
- else {
- try {
- if (log.isInfoEnabled())
- log.info( "Storing with native handler..." );
-
- flags |= NativeHandler.getMarkerFlag( value );
- val = NativeHandler.encode( value );
- }
- catch ( Exception e ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnSet( this, e, key );
-
- log.error( "Failed to native handle obj", e );
-
- sock.close();
- sock = null;
- return false;
- }
- }
- }
- else {
- // always serialize for non-primitive types
- try {
- if (log.isInfoEnabled())
- log.info( new StringBuilder().append("++++ serializing for key: ")
- .append(key).append(" for class: ").append(value.getClass().getName()).toString() );
-
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- (new ObjectOutputStream( bos )).writeObject( value );
-
- val = bos.toByteArray();
- flags |= F_SERIALIZED;
-
- }
- catch ( IOException e ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnSet( this, e, key );
-
- // if we fail to serialize, then
- // we bail
- log.error( "failed to serialize obj", e );
- log.error( value.toString() );
-
- // return socket to pool and bail
- sock.close();
- sock = null;
- return false;
- }
- }
-
- // now try to compress if we want to
- // and if the length is over the threshold
- if ( compressEnable && val.length > compressThreshold ) {
-
- try {
- if (log.isInfoEnabled())
- {
- log.info( "++++ trying to compress data" );
- log.info( new StringBuilder().append("++++ size prior to compression: ")
- .append(val.length).toString());
- }
-
- ByteArrayOutputStream bos = new ByteArrayOutputStream( val.length );
- GZIPOutputStream gos = new GZIPOutputStream( bos );
- gos.write( val, 0, val.length );
- gos.finish();
-
- // store it and set compression flag
- val = bos.toByteArray();
- flags |= F_COMPRESSED;
-
- if (log.isInfoEnabled())
- log.info(new StringBuilder().append("++++ compression succeeded, size after: ").append(val.length).toString() );
- }
- catch ( IOException e ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnSet( this, e, key );
-
- log.error( new StringBuilder().append("IOException while compressing stream: ")
- .append(e.getMessage()).toString());
- log.error( "storing data uncompressed" );
- }
- }
-
- // now write the data to the cache server
- try {
- String cmd = new StringBuilder().append(cmdname).append(" ")
- .append(key).append(" ").append(flags).append(" ")
- .append(expiry.getTime() / 1000).append(" ")
- .append(val.length).append("\r\n").toString();
-
- sock.write( cmd.getBytes() );
-
- sock.write( val );
- sock.write(B_RETURN);
- sock.flush();
-
- // get result code
- String line = sock.readLine();
-
- if (log.isInfoEnabled())
- log.info( new StringBuilder().append("++++ memcache cmd (result code): ").append(cmd)
- .append(" (").append(line).append(")").toString() );
-
- if ( STORED.equals( line ) ) {
-
- if (log.isInfoEnabled())
- log.info(new StringBuilder().append("++++ data successfully stored for key: ").append(key).toString() );
-
- sock.close();
- sock = null;
- return true;
- }
- else if ( NOTSTORED.equals( line ) )
- {
- if (log.isInfoEnabled())
- log.info( new StringBuilder().append("++++ data not stored in cache for key: ").append(key).toString() );
- }
- else {
-
- log.error( new StringBuilder().append("++++ error storing data in cache for key: ")
- .append(key).append(" -- length: ").append(val.length).toString() );
- log.error( new StringBuilder().append("++++ server response: ").append(line).toString() );
- }
- }
- catch ( IOException e ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnSet( this, e, key );
-
- // exception thrown
- log.error( "++++ exception thrown while writing bytes to server on set" );
- log.error( e.getMessage(), e );
-
- try {
- sock.trueClose();
- }
- catch ( IOException ioe ) {
- log.error( new StringBuilder().append("++++ failed to close socket : ").append(sock.toString()).toString() );
- }
-
- sock = null;
- }
-
- if ( sock != null ) {
- sock.close();
- sock = null;
- }
-
- return false;
- }
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- /**
- * Store a counter to memcached given a key
- *
- * @param key cache key
- * @param counter number to store
- * @return true/false indicating success
- */
- public boolean storeCounter( String key, long counter ) {
- return set( "set", key, new Long( counter ), null, null, true );
- }
-
- /**
- * Store a counter to memcached given a key
- *
- * @param key cache key
- * @param counter number to store
- * @return true/false indicating success
- */
- public boolean storeCounter( String key, Long counter ) {
- return set( "set", key, counter, null, null, true );
- }
-
- /**
- * Store a counter to memcached given a key
- *
- * @param key cache key
- * @param counter number to store
- * @param hashCode if not null, then the int hashcode to use
- * @return true/false indicating success
- */
- public boolean storeCounter( String key, Long counter, Integer hashCode ) {
- return set( "set", key, counter, null, hashCode, true );
- }
-
- /**
- * Returns value in counter at given key as long.
- *
- * @param key cache ket
- * @return counter value or -1 if not found
- */
- public long getCounter( String key ) {
- return getCounter( key, null );
- }
-
- /**
- * Returns value in counter at given key as long.
- *
- * @param key cache ket
- * @param hashCode if not null, then the int hashcode to use
- * @return counter value or -1 if not found
- */
- public long getCounter( String key, Integer hashCode ) {
-
- if ( key == null ) {
- log.error( "null key for getCounter()" );
- return -1;
- }
-
- long counter = -1;
- try
- {
- String value = (String)get( key, hashCode, true);
-
- if (value != null && !value.equals(""))
- counter = Long.parseLong(value.trim());
- }
- catch ( Exception ex ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnGet( this, ex, key );
-
- // not found or error getting out
- if (log.isInfoEnabled())
- log.info( new StringBuilder().append("Failed to parse Long value for key: ").append(key).toString() );
-
- throw new MemcachedException(ex);
- }
-
- return counter;
- }
-
- /**
- * Thread safe way to initialize and increment a counter.
- *
- * @param key key where the data is stored
- * @return value of incrementer
- */
- public long addOrIncr( String key ) {
- return addOrIncr( key, 0, null );
- }
-
- /**
- * Thread safe way to initialize and increment a counter.
- *
- * @param key key where the data is stored
- * @param inc value to set or increment by
- * @return value of incrementer
- */
- public long addOrIncr( String key, long inc ) {
- return addOrIncr( key, inc, null );
- }
-
- /**
- * Thread safe way to initialize and increment a counter.
- *
- * @param key key where the data is stored
- * @param inc value to set or increment by
- * @param hashCode if not null, then the int hashcode to use
- * @return value of incrementer
- */
- public long addOrIncr( String key, long inc, Integer hashCode ) {
-
- boolean isExist = false;
-
- //жǷ
- if (localCache.get(key) != null)
- {
- try
- {
- if (System.currentTimeMillis() - (Long)localCache.get(key) < 5 *1000)
- isExist = true;
- }
- catch(Exception ex){/* do nothing */}
- }
-
- localCache.put(key, System.currentTimeMillis());
-
- if (isExist)
- {
-
- long result = incrdecr( "incr", key, inc, hashCode );
-
- if (result != -1)
- {
- return result;
- }
- else
- {
- set( "add", key, new Long( inc ), null, hashCode, true );
- return inc;
- }
- }
- else
- {
- boolean ret = set( "add", key, new Long( inc ), null, hashCode, true );
-
- if ( ret ) {
- return inc;
- }
- else {
- return incrdecr( "incr", key, inc, hashCode );
- }
- }
-
- }
-
- /**
- * Thread safe way to initialize and decrement a counter.
- *
- * @param key key where the data is stored
- * @return value of incrementer
- */
- public long addOrDecr( String key ) {
- return addOrDecr( key, 0, null );
- }
-
- /**
- * Thread safe way to initialize and decrement a counter.
- *
- * @param key key where the data is stored
- * @param inc value to set or increment by
- * @return value of incrementer
- */
- public long addOrDecr( String key, long inc ) {
- return addOrDecr( key, inc, null );
- }
-
- /**
- * Thread safe way to initialize and decrement a counter.
- *
- * @param key key where the data is stored
- * @param inc value to set or increment by
- * @param hashCode if not null, then the int hashcode to use
- * @return value of incrementer
- */
- public long addOrDecr( String key, long inc, Integer hashCode ) {
-
- boolean isExist = false;
-
- //жǷ
- if (localCache.get(key) != null)
- {
- try
- {
- if (System.currentTimeMillis() - (Long)localCache.get(key) < 5 *1000)
- isExist = true;
- }
- catch(Exception ex){/* do nothing */}
- }
-
- localCache.put(key, System.currentTimeMillis());
-
- if (isExist)
- {
- long result = incrdecr( "decr", key, inc, hashCode );
-
- if (result != -1)
- {
- return result;
- }
- else
- {
- set( "add", key, new Long( inc ), null, hashCode, true );
- return inc;
- }
-
- }
- else
- {
- boolean ret = set( "add", key, new Long( inc ), null, hashCode, true );
-
- if ( ret ) {
- return inc;
- }
- else {
- return incrdecr( "decr", key, inc, hashCode );
- }
- }
- }
-
- /**
- * Increment the value at the specified key by 1, and then return it.
- *
- * @param key key where the data is stored
- * @return -1, if the key is not found, the value after incrementing otherwise
- */
- public long incr( String key ) {
- return incrdecr( "incr", key, 1, null );
- }
-
- /**
- * Increment the value at the specified key by passed in val.
- *
- * @param key key where the data is stored
- * @param inc how much to increment by
- * @return -1, if the key is not found, the value after incrementing otherwise
- */
- public long incr( String key, long inc ) {
- return incrdecr( "incr", key, inc, null );
- }
-
- /**
- * Increment the value at the specified key by the specified increment, and then return it.
- *
- * @param key key where the data is stored
- * @param inc how much to increment by
- * @param hashCode if not null, then the int hashcode to use
- * @return -1, if the key is not found, the value after incrementing otherwise
- */
- public long incr( String key, long inc, Integer hashCode ) {
- return incrdecr( "incr", key, inc, hashCode );
- }
-
- /**
- * Decrement the value at the specified key by 1, and then return it.
- *
- * @param key key where the data is stored
- * @return -1, if the key is not found, the value after incrementing otherwise
- */
- public long decr( String key ) {
- return incrdecr( "decr", key, 1, null );
- }
-
- /**
- * Decrement the value at the specified key by passed in value, and then return it.
- *
- * @param key key where the data is stored
- * @param inc how much to increment by
- * @return -1, if the key is not found, the value after incrementing otherwise
- */
- public long decr( String key, long inc ) {
- return incrdecr( "decr", key, inc, null );
- }
-
- /**
- * Decrement the value at the specified key by the specified increment, and then return it.
- *
- * @param key key where the data is stored
- * @param inc how much to increment by
- * @param hashCode if not null, then the int hashcode to use
- * @return -1, if the key is not found, the value after incrementing otherwise
- */
- public long decr( String key, long inc, Integer hashCode ) {
- return incrdecr( "decr", key, inc, hashCode );
- }
-
- /**
- * Increments/decrements the value at the specified key by inc.
- *
- * Note that the server uses a 32-bit unsigned integer, and checks for
- * underflow. In the event of underflow, the result will be zero. Because
- * Java lacks unsigned types, the value is returned as a 64-bit integer.
- * The server will only decrement a value if it already exists;
- * if a value is not found, -1 will be returned.
- *
- * @param cmdname increment/decrement
- * @param key cache key
- * @param inc amount to incr or decr
- * @param hashCode if not null, then the int hashcode to use
- * @return new value or -1 if not exist
- */
- private long incrdecr( String cmdname, String key, long inc, Integer hashCode ) {
-
- if ( key == null ) {
- log.error( "null key for incrdecr()" );
- return -1;
- }
-
- try {
- key = sanitizeKey( key );
- }
- catch ( UnsupportedEncodingException e ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnGet( this, e, key );
-
- log.error( "failed to sanitize your key!", e );
- return -1;
- }
-
- // get SockIO obj for given cache key
- SockIOPool.SockIO sock = pool.getSock( key, hashCode );
-
- if ( sock == null ) {
- if ( errorHandler != null )
- errorHandler.handleErrorOnSet( this, new IOException( "no socket to server available" ), key );
- return -1;
- }
-
- try {
- String cmd = new StringBuilder().append(cmdname).append(" ")
- .append(key).append(" ").append(inc).append("\r\n").toString();
-
- if ( log.isDebugEnabled() )
- log.debug( "++++ memcache incr/decr command: " + cmd );
-
- sock.write( cmd.getBytes() );
- sock.flush();
-
- // get result back
- String line = sock.readLine();
-
- if ( line.matches( "\\d+" ) ) {
-
- // return sock to pool and return result
- sock.close();
- try {
- return Long.parseLong( line );
- }
- catch ( Exception ex ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnGet( this, ex, key );
-
- log.error( new StringBuilder().append("Failed to parse Long value for key: ")
- .append(key).toString() );
- }
-
- }
- else if ( NOTFOUND.equals( line ) ) {
- if (log.isInfoEnabled())
- log.info( new StringBuilder().append("++++ key not found to incr/decr for key: ").append(key).toString() );
- }
- else {
- log.error( new StringBuilder().append("++++ error incr/decr key: ").append(key).toString() );
- log.error( new StringBuilder().append("++++ server response: ").append(line).toString() );
- }
- }
- catch ( IOException e ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnGet( this, e, key );
-
- // exception thrown
- log.error( "++++ exception thrown while writing bytes to server on incr/decr" );
- log.error( e.getMessage(), e );
-
- try {
- sock.trueClose();
- }
- catch ( IOException ioe ) {
- log.error( new StringBuilder().append("++++ failed to close socket : ").append(sock.toString()).toString() );
- }
-
- sock = null;
- }
-
- if ( sock != null ) {
- sock.close();
- sock = null;
- }
-
- return -1;
- }
-
- /**
- * Retrieve a key from the server, using a specific hash.
- *
- * If the data was compressed or serialized when compressed, it will automatically
- * be decompressed or serialized, as appropriate. (Inclusive or)
- *
- * Non-serialized data will be returned as a string, so explicit conversion to
- * numeric types will be necessary, if desired
- *
- * @param key key where data is stored
- * @return the object that was previously stored, or null if it was not previously stored
- */
- public Object get( String key ) {
- return get( key, null, false );
- }
-
- /**
- * Retrieve a key from the server, using a specific hash.
- *
- * If the data was compressed or serialized when compressed, it will automatically
- * be decompressed or serialized, as appropriate. (Inclusive or)
- *
- * Non-serialized data will be returned as a string, so explicit conversion to
- * numeric types will be necessary, if desired
- *
- * @param key key where data is stored
- * @param hashCode if not null, then the int hashcode to use
- * @return the object that was previously stored, or null if it was not previously stored
- */
- public Object get( String key, Integer hashCode ) {
- return get( key, hashCode, false );
- }
-
- /**
- * Retrieve a key from the server, using a specific hash.
- *
- * If the data was compressed or serialized when compressed, it will automatically
- * be decompressed or serialized, as appropriate. (Inclusive or)
- *
- * Non-serialized data will be returned as a string, so explicit conversion to
- * numeric types will be necessary, if desired
- *
- * @param key key where data is stored
- * @param hashCode if not null, then the int hashcode to use
- * @param asString if true, then return string val
- * @return the object that was previously stored, or null if it was not previously stored
- */
- public Object get( String key, Integer hashCode, boolean asString ) {
-
- if ( key == null ) {
- log.error( "key is null for get()" );
- return null;
- }
-
- try {
- key = sanitizeKey( key );
- }
- catch ( UnsupportedEncodingException e ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnGet( this, e, key );
-
- log.error( "failed to sanitize your key!", e );
-
- throw new MemcachedException(e);
- }
-
- // get SockIO obj using cache key
- SockIOPool.SockIO sock = pool.getSock( key, hashCode );
-
- if ( sock == null ) {
- if ( errorHandler != null )
- errorHandler.handleErrorOnGet( this, new IOException( "no socket to server available" ), key );
- throw new MemcachedException("sock is null");
- }
-
- // ready object
- Object o = null;
-
- try {
- String cmd = new StringBuilder("get ").append(key).append("\r\n").toString();
-
- if ( log.isDebugEnabled() )
- log.debug("++++ memcache get command: " + cmd);
-
- sock.write( cmd.getBytes() );
- sock.flush();
-
- while ( true ) {
- String line = sock.readLine();
-
- if (line != null && line.startsWith("\r\n"))
- line = line.substring(2);
-
- if ( log.isDebugEnabled() )
- log.debug( new StringBuilder().append("++++ line: ")
- .append(line).toString() );
-
- if ( line.startsWith( VALUE ) ) {
-
- String res = line.substring(0,line.lastIndexOf(" "));
- int flag = Integer.parseInt(res.substring(res.lastIndexOf(" ")+1));
- int length = Integer.parseInt(line.substring(line.lastIndexOf(" ")+1));
-
-// String[] info = line.split(" ");
-// int flag = Integer.parseInt( info[2] );
-// int length = Integer.parseInt( info[3] );
-
- if ( log.isDebugEnabled() ) {
- log.debug( new StringBuilder().append("++++ key: ").append(key).toString() );
- log.debug( new StringBuilder().append("++++ flags: ").append(flag).toString() );
- log.debug( new StringBuilder().append("++++ length: ").append(length));
- }
-
- // read obj into buffer
- byte[] buf = sock.readBytes(length);
-
-
- if ( (flag & F_COMPRESSED) == F_COMPRESSED ) {
- try {
- // read the input stream, and write to a byte array output stream since
- // we have to read into a byte array, but we don't know how large it
- // will need to be, and we don't want to resize it a bunch
- GZIPInputStream gzi = new GZIPInputStream( new ByteArrayInputStream( buf ) );
- ByteArrayOutputStream bos = new ByteArrayOutputStream( buf.length );
-
- int count;
- byte[] tmp = new byte[2048];
- while ( (count = gzi.read(tmp)) != -1 ) {
- bos.write( tmp, 0, count );
- }
-
- // store uncompressed back to buffer
- buf = bos.toByteArray();
- gzi.close();
- }
- catch ( IOException e ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnGet( this, e, key );
-
- log.error( new StringBuilder().append("++++ IOException thrown while trying to uncompress input stream for key: ")
- .append(key).toString() );
- log.error( e.getMessage(), e );
- throw new NestedIOException( "++++ IOException thrown while trying to uncompress input stream for key: " + key, e );
- }
- }
-
- // we can only take out serialized objects
- if ( ( flag & F_SERIALIZED ) != F_SERIALIZED ) {
- if ( primitiveAsString || asString ) {
- // pulling out string value
- if (log.isInfoEnabled())
- log.info( "++++ retrieving object and stuffing into a string." );
-
- o = new String( buf, defaultEncoding );
- }
- else {
- // decoding object
- try {
- o = NativeHandler.decode( buf, flag );
- }
- catch ( Exception e ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnGet( this, e, key );
-
- log.error( new StringBuilder().append("++++ Exception thrown while trying to deserialize for key: ")
- .append(key).toString(), e );
- throw new NestedIOException( e );
- }
- }
- }
- else {
- // deserialize if the data is serialized
- ContextObjectInputStream ois =
- new ContextObjectInputStream( new ByteArrayInputStream( buf ), classLoader );
- try {
- o = ois.readObject();
-
- if (log.isInfoEnabled())
- log.info( new StringBuilder().append("++++ deserializing ").append(o.getClass()).toString() );
-
-
-
-
-
-
- }
- catch ( ClassNotFoundException e ) {
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnGet( this, e, key );
-
- log.error( new StringBuilder().append("++++ ClassNotFoundException thrown while trying to deserialize for key: ")
- .append(key).toString(), e );
- throw new NestedIOException( "+++ failed while trying to deserialize for key: " + key, e );
- }
- }
- }
- else if ( END.equals( line ) ) {
- if ( log.isDebugEnabled() )
- log.debug( "++++ finished reading from cache server" );
- break;
- }
- }
-
- sock.close();
- sock = null;
- return o;
- }
- catch ( IOException e )
- {
-
-// if (o != null)
-// {
-// log.error(new StringBuilder().append("++++ exception thrown while trying to get object from cache for key: ")
-// .append(key).toString(),e);
-// return o;
-// }
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnGet( this, e, key );
-
- // exception thrown
- log.error(new StringBuilder()
- .append("++++ exception thrown while trying to get object from cache for key: ").append(key).toString() );
- log.error( e.getMessage(), e );
-
- try {
- sock.trueClose();
- }
- catch ( IOException ioe ) {
- log.error(new StringBuilder().append("++++ failed to close socket : ").append(sock.toString()).toString() );
- }
- sock = null;
-
- throw new MemcachedException(e);
- }
- finally
- {
- if ( sock != null )
- sock.close();
- }
-
- }
-
- /**
- * Retrieve multiple objects from the memcache.
- *
- * This is recommended over repeated calls to {@link #get(String) get()}, since it
- * is more efficient.
- *
- * @param keys String array of keys to retrieve
- * @return Object array ordered in same order as key array containing results
- */
- public Object[] getMultiArray( String[] keys ) {
- return getMultiArray( keys, null, false );
- }
-
- /**
- * Retrieve multiple objects from the memcache.
- *
- * This is recommended over repeated calls to {@link #get(String) get()}, since it
- * is more efficient.
- *
- * @param keys String array of keys to retrieve
- * @param hashCodes if not null, then the Integer array of hashCodes
- * @return Object array ordered in same order as key array containing results
- */
- public Object[] getMultiArray( String[] keys, Integer[] hashCodes ) {
- return getMultiArray( keys, hashCodes, false );
- }
-
- /**
- * Retrieve multiple objects from the memcache.
- *
- * This is recommended over repeated calls to {@link #get(String) get()}, since it
- * is more efficient.
- *
- * @param keys String array of keys to retrieve
- * @param hashCodes if not null, then the Integer array of hashCodes
- * @param asString if true, retrieve string vals
- * @return Object array ordered in same order as key array containing results
- */
- public Object[] getMultiArray( String[] keys, Integer[] hashCodes, boolean asString ) {
-
- Map data = getMulti( keys, hashCodes, asString );
-
- if ( data == null )
- return null;
-
- Object[] res = new Object[ keys.length ];
- for ( int i = 0; i < keys.length; i++ ) {
- res[i] = data.get( keys[i] );
- }
-
- return res;
- }
-
- /**
- * Retrieve multiple objects from the memcache.
- *
- * This is recommended over repeated calls to {@link #get(String) get()}, since it
- * is more efficient.
- *
- * @param keys String array of keys to retrieve
- * @return a hashmap with entries for each key is found by the server,
- * keys that are not found are not entered into the hashmap, but attempting to
- * retrieve them from the hashmap gives you null.
- */
- public Map getMulti( String[] keys ) {
- return getMulti( keys, null, false );
- }
-
- /**
- * Retrieve multiple keys from the memcache.
- *
- * This is recommended over repeated calls to {@link #get(String) get()}, since it
- * is more efficient.
- *
- * @param keys keys to retrieve
- * @param hashCodes if not null, then the Integer array of hashCodes
- * @return a hashmap with entries for each key is found by the server,
- * keys that are not found are not entered into the hashmap, but attempting to
- * retrieve them from the hashmap gives you null.
- */
- public Map getMulti( String[] keys, Integer[] hashCodes ) {
- return getMulti( keys, hashCodes, false );
- }
-
- /**
- * Retrieve multiple keys from the memcache.
- *
- * This is recommended over repeated calls to {@link #get(String) get()}, since it
- * is more efficient.
- *
- * @param keys keys to retrieve
- * @param hashCodes if not null, then the Integer array of hashCodes
- * @param asString if true then retrieve using String val
- * @return a hashmap with entries for each key is found by the server,
- * keys that are not found are not entered into the hashmap, but attempting to
- * retrieve them from the hashmap gives you null.
- */
- public Map getMulti( String[] keys, Integer[] hashCodes, boolean asString ) {
-
- if ( keys == null || keys.length == 0 ) {
- log.error( "missing keys for getMulti()" );
- return null;
- }
-
- Map cmdMap =
- new HashMap();
-
- for ( int i = 0; i < keys.length; ++i ) {
-
- String key = keys[i];
- if ( key == null ) {
- log.error( "null key, so skipping" );
- continue;
- }
-
- Integer hash = null;
- if ( hashCodes != null && hashCodes.length > i )
- hash = hashCodes[ i ];
-
- String cleanKey = key;
- try {
- cleanKey = sanitizeKey( key );
- }
- catch ( UnsupportedEncodingException e ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnGet( this, e, key );
-
- log.error( "failed to sanitize your key!", e );
- continue;
- }
-
- // get SockIO obj from cache key
- SockIOPool.SockIO sock = pool.getSock( cleanKey, hash );
-
- if ( sock == null ) {
- if ( errorHandler != null )
- errorHandler.handleErrorOnGet( this, new IOException( "no socket to server available" ), key );
- continue;
- }
-
- // store in map and list if not already
- if ( !cmdMap.containsKey( sock.getHost() ) )
- cmdMap.put( sock.getHost(), new StringBuilder( "get" ) );
-
- cmdMap.get( sock.getHost() ).append( " " + cleanKey );
-
- // return to pool
- sock.close();
- }
-
- if (log.isInfoEnabled())
- log.info( new StringBuilder().append("multi get socket count : ").append(cmdMap.size()).toString() );
-
- // now query memcache
- Map ret =
- new HashMap( keys.length );
-
- // now use new NIO implementation
- (new NIOLoader( this )).doMulti( asString, cmdMap, keys, ret );
-
- // fix the return array in case we had to rewrite any of the keys
- for ( String key : keys ) {
-
- String cleanKey = key;
- try {
- cleanKey = sanitizeKey( key );
- }
- catch ( UnsupportedEncodingException e ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnGet( this, e, key );
-
- log.error( "failed to sanitize your key!", e );
- continue;
- }
-
- if ( ! key.equals( cleanKey ) && ret.containsKey( cleanKey ) ) {
- ret.put( key, ret.get( cleanKey ) );
- ret.remove( cleanKey );
- }
-
- // backfill missing keys w/ null value
- if ( ! ret.containsKey( key ) )
- ret.put( key, null );
- }
-
- if ( log.isDebugEnabled() )
- log.debug( "++++ memcache: got back " + ret.size() + " results" );
- return ret;
- }
-
- /**
- * This method loads the data from cache into a Map.
- *
- * Pass a SockIO object which is ready to receive data and a HashMap
- * to store the results.
- *
- * @param sock socket waiting to pass back data
- * @param hm hashmap to store data into
- * @param asString if true, and if we are using NativehHandler, return string val
- * @throws IOException if io exception happens while reading from socket
- */
- private void loadMulti( LineInputStream input, Map hm, boolean asString ) throws IOException {
-
- while ( true ) {
- String line = input.readLine();
- if ( log.isDebugEnabled() )
- log.debug( "++++ line: " + line );
-
- if ( line.startsWith( VALUE ) ) {
- String[] info = line.split(" ");
- String key = info[1];
- int flag = Integer.parseInt( info[2] );
- int length = Integer.parseInt( info[3] );
-
- if ( log.isDebugEnabled() ) {
- log.debug( "++++ key: " + key );
- log.debug( "++++ flags: " + flag );
- log.debug( "++++ length: " + length );
- }
-
- // read obj into buffer
- byte[] buf = new byte[length];
- input.read( buf );
- input.clearEOL();
-
- // ready object
- Object o;
-
- // check for compression
- if ( (flag & F_COMPRESSED) == F_COMPRESSED ) {
- try {
- // read the input stream, and write to a byte array output stream since
- // we have to read into a byte array, but we don't know how large it
- // will need to be, and we don't want to resize it a bunch
- GZIPInputStream gzi = new GZIPInputStream( new ByteArrayInputStream( buf ) );
- ByteArrayOutputStream bos = new ByteArrayOutputStream( buf.length );
-
- int count;
- byte[] tmp = new byte[2048];
- while ( (count = gzi.read(tmp)) != -1 ) {
- bos.write( tmp, 0, count );
- }
-
- // store uncompressed back to buffer
- buf = bos.toByteArray();
- gzi.close();
- }
- catch ( IOException e ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnGet( this, e, key );
-
- log.error(new StringBuilder().append("++++ IOException thrown while trying to uncompress input stream for key: ").append(key).toString() );
- log.error( e.getMessage(), e );
- throw new NestedIOException( "++++ IOException thrown while trying to uncompress input stream for key: " + key, e );
- }
- }
-
- // we can only take out serialized objects
- if ( ( flag & F_SERIALIZED ) != F_SERIALIZED ) {
- if ( primitiveAsString || asString ) {
- // pulling out string value
- if (log.isInfoEnabled())
- log.info( "++++ retrieving object and stuffing into a string." );
-
- o = new String( buf, defaultEncoding );
- }
- else {
- // decoding object
- try {
- o = NativeHandler.decode( buf, flag );
- }
- catch ( Exception e ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnGet( this, e, key );
-
- log.error(new StringBuilder().append("++++ Exception thrown while trying to deserialize for key: ")
- .append(key).toString(), e );
- throw new NestedIOException( e );
- }
- }
- }
- else {
- // deserialize if the data is serialized
- ContextObjectInputStream ois =
- new ContextObjectInputStream( new ByteArrayInputStream( buf ), classLoader );
- try {
- o = ois.readObject();
-
- if (log.isInfoEnabled())
- log.info(new StringBuilder().append("++++ deserializing ").append(o.getClass()).toString() );
- }
- catch ( ClassNotFoundException e ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnGet( this, e, key );
-
- log.error(new StringBuilder().append("++++ ClassNotFoundException thrown while trying to deserialize for key: ")
- .append(key).toString(), e );
- throw new NestedIOException( "+++ failed while trying to deserialize for key: " + key, e );
- }
- }
-
- // store the object into the cache
- hm.put( key, o );
- }
- else if ( END.equals( line ) ) {
- if ( log.isDebugEnabled() )
- log.debug( "++++ finished reading from cache server" );
- break;
- }
- }
- }
-
- private String sanitizeKey( String key ) throws UnsupportedEncodingException {
- return ( sanitizeKeys ) ? URLEncoder.encode( key, "UTF-8" ) : key;
- }
-
- /**
- * Invalidates the entire cache.
- *
- * Will return true only if succeeds in clearing all servers.
- *
- * @return success true/false
- */
- public boolean flushAll() {
- return flushAll( null );
- }
-
- /**
- * Invalidates the entire cache.
- *
- * Will return true only if succeeds in clearing all servers.
- * If pass in null, then will try to flush all servers.
- *
- * @param servers optional array of host(s) to flush (host:port)
- * @return success true/false
- */
- public boolean flushAll( String[] servers ) {
-
- // get SockIOPool instance
- // return false if unable to get SockIO obj
- if ( pool == null ) {
- log.error( "++++ unable to get SockIOPool instance" );
- return false;
- }
-
- // get all servers and iterate over them
- servers = ( servers == null )
- ? pool.getServers()
- : servers;
-
- // if no servers, then return early
- if ( servers == null || servers.length <= 0 ) {
- log.error( "++++ no servers to flush" );
- return false;
- }
-
- boolean success = true;
-
- for ( int i = 0; i < servers.length; i++ ) {
-
- SockIOPool.SockIO sock = pool.getConnection( servers[i] );
- if ( sock == null ) {
- log.error( "++++ unable to get connection to : " + servers[i] );
- success = false;
- if ( errorHandler != null )
- errorHandler.handleErrorOnFlush( this, new IOException( "no socket to server available" ) );
- continue;
- }
-
- // build command
- String command = "flush_all\r\n";
-
- try {
- sock.write( command.getBytes() );
- sock.flush();
-
- // if we get appropriate response back, then we return true
- String line = sock.readLine();
- success = ( OK.equals( line ) )
- ? success && true
- : false;
- }
- catch ( IOException e ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnFlush( this, e );
-
- // exception thrown
- log.error( "++++ exception thrown while writing bytes to server on flushAll" );
- log.error( e.getMessage(), e );
-
- try {
- sock.trueClose();
- }
- catch ( IOException ioe ) {
- log.error( "++++ failed to close socket : " + sock.toString() );
- }
-
- success = false;
- sock = null;
- }
-
- if ( sock != null ) {
- sock.close();
- sock = null;
- }
- }
-
- return success;
- }
-
- /**
- * Retrieves stats for all servers.
- *
- * Returns a map keyed on the servername.
- * The value is another map which contains stats
- * with stat name as key and value as value.
- *
- * @return Stats map
- */
- public Map stats() {
- return stats( null );
- }
-
- /**
- * Retrieves stats for passed in servers (or all servers).
- *
- * Returns a map keyed on the servername.
- * The value is another map which contains stats
- * with stat name as key and value as value.
- *
- * @param servers string array of servers to retrieve stats from, or all if this is null
- * @return Stats map
- */
- public Map stats( String[] servers ) {
- return stats( servers, "stats\r\n", STATS );
- }
-
- /**
- * Retrieves stats items for all servers.
- *
- * Returns a map keyed on the servername.
- * The value is another map which contains item stats
- * with itemname:number:field as key and value as value.
- *
- * @return Stats map
- */
- public Map statsItems() {
- return statsItems( null );
- }
-
- /**
- * Retrieves stats for passed in servers (or all servers).
- *
- * Returns a map keyed on the servername.
- * The value is another map which contains item stats
- * with itemname:number:field as key and value as value.
- *
- * @param servers string array of servers to retrieve stats from, or all if this is null
- * @return Stats map
- */
- public Map statsItems( String[] servers ) {
- return stats( servers, "stats items\r\n", STATS );
- }
-
- /**
- * Retrieves stats items for all servers.
- *
- * Returns a map keyed on the servername.
- * The value is another map which contains slabs stats
- * with slabnumber:field as key and value as value.
- *
- * @return Stats map
- */
- public Map statsSlabs() {
- return statsSlabs( null );
- }
-
- /**
- * Retrieves stats for passed in servers (or all servers).
- *
- * Returns a map keyed on the servername.
- * The value is another map which contains slabs stats
- * with slabnumber:field as key and value as value.
- *
- * @param servers string array of servers to retrieve stats from, or all if this is null
- * @return Stats map
- */
- public Map statsSlabs( String[] servers ) {
- return stats( servers, "stats slabs\r\n", STATS );
- }
-
- /**
- * Retrieves items cachedump for all servers.
- *
- * Returns a map keyed on the servername.
- * The value is another map which contains cachedump stats
- * with the cachekey as key and byte size and unix timestamp as value.
- *
- * @param slabNumber the item number of the cache dump
- * @return Stats map
- */
- public Map statsCacheDump( int slabNumber, int limit ) {
- return statsCacheDump( null, slabNumber, limit );
- }
-
- /**
- * Retrieves stats for passed in servers (or all servers).
- *
- * Returns a map keyed on the servername.
- * The value is another map which contains cachedump stats
- * with the cachekey as key and byte size and unix timestamp as value.
- *
- * @param servers string array of servers to retrieve stats from, or all if this is null
- * @param slabNumber the item number of the cache dump
- * @return Stats map
- */
- public Map statsCacheDump( String[] servers, int slabNumber, int limit ) {
- return stats( servers, String.format( "stats cachedump %d %d\r\n", slabNumber, limit ), ITEM );
- }
-
- private Map stats( String[] servers, String command, String lineStart ) {
-
- if ( command == null || command.trim().equals( "" ) ) {
- log.error( "++++ invalid / missing command for stats()" );
- return null;
- }
-
- // get all servers and iterate over them
- servers = (servers == null)
- ? pool.getServers()
- : servers;
-
- // if no servers, then return early
- if ( servers == null || servers.length <= 0 ) {
- log.error( "++++ no servers to check stats" );
- return null;
- }
-
- // array of stats Maps
- Map statsMaps =
- new HashMap();
-
- for ( int i = 0; i < servers.length; i++ ) {
-
- SockIOPool.SockIO sock = pool.getConnection( servers[i] );
- if ( sock == null ) {
- log.error( "++++ unable to get connection to : " + servers[i] );
- if ( errorHandler != null )
- errorHandler.handleErrorOnStats( this, new IOException( "no socket to server available" ) );
- continue;
- }
-
- // build command
- try {
- sock.write( command.getBytes() );
- sock.flush();
-
- // map to hold key value pairs
- Map stats = new HashMap();
-
- // loop over results
- while ( true ) {
- String line = sock.readLine();
- if ( log.isDebugEnabled() )
- log.debug( "++++ line: " + line );
-
- if ( line.startsWith( lineStart ) ) {
- String[] info = line.split( " ", 3 );
- String key = info[1];
- String value = info[2];
-
- if ( log.isDebugEnabled() ) {
- log.debug( "++++ key : " + key );
- log.debug( "++++ value: " + value );
- }
-
- stats.put( key, value );
- }
- else if ( END.equals( line ) ) {
- // finish when we get end from server
- if ( log.isDebugEnabled() )
- log.debug( "++++ finished reading from cache server" );
- break;
- }
- else if ( line.startsWith( ERROR ) || line.startsWith( CLIENT_ERROR ) || line.startsWith( SERVER_ERROR ) ) {
- log.error( "++++ failed to query stats" );
- log.error( "++++ server response: " + line );
- break;
- }
-
- statsMaps.put( servers[i], stats );
- }
- }
- catch ( IOException e ) {
-
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnStats( this, e );
-
- // exception thrown
- log.error( "++++ exception thrown while writing bytes to server on stats" );
- log.error( e.getMessage(), e );
-
- try {
- sock.trueClose();
- }
- catch ( IOException ioe ) {
- log.error( "++++ failed to close socket : " + sock.toString() );
- }
-
- sock = null;
- }
-
- if ( sock != null ) {
- sock.close();
- sock = null;
- }
- }
-
- return statsMaps;
- }
-
- protected final class NIOLoader {
- protected Selector selector;
- protected int numConns = 0;
- protected MemCachedClient mc;
- protected Connection[] conns;
-
- public NIOLoader( MemCachedClient mc ) {
- this.mc = mc;
- }
-
- private final class Connection {
-
- public List incoming = new ArrayList();
- public ByteBuffer outgoing;
- public SockIOPool.SockIO sock;
- public SocketChannel channel;
- private boolean isDone = false;
-
- public Connection( SockIOPool.SockIO sock, StringBuilder request ) throws IOException {
- if ( log.isDebugEnabled() )
- log.debug( "setting up connection to "+sock.getHost() );
-
- this.sock = sock;
- outgoing = ByteBuffer.wrap( request.append( "\r\n" ).toString().getBytes() );
-
- channel = sock.getChannel();
- if ( channel == null )
- throw new IOException( "dead connection to: " + sock.getHost() );
-
- channel.configureBlocking( false );
- channel.register( selector, SelectionKey.OP_WRITE, this );
- }
-
- public void close() {
- try {
- if ( isDone ) {
- // turn off non-blocking IO and return to pool
- if ( log.isDebugEnabled() )
- log.debug( "++++ gracefully closing connection to "+sock.getHost() );
-
- channel.configureBlocking( true );
- sock.close();
- return;
- }
- }
- catch ( IOException e ) {
- log.warn( "++++ memcache: unexpected error closing normally" );
- }
-
- try {
- if ( log.isDebugEnabled() )
- log.debug("forcefully closing connection to "+sock.getHost());
-
- channel.close();
- sock.trueClose();
- }
- catch ( IOException ignoreMe ) { }
- }
-
- public boolean isDone() {
- // if we know we're done, just say so
- if ( isDone )
- return true;
-
- // else find out the hard way
- int strPos = B_END.length-1;
-
- int bi = incoming.size() - 1;
- while ( bi >= 0 && strPos >= 0 ) {
- ByteBuffer buf = incoming.get( bi );
- int pos = buf.position()-1;
- while ( pos >= 0 && strPos >= 0 ) {
- if ( buf.get( pos-- ) != B_END[strPos--] )
- return false;
- }
-
- bi--;
- }
-
- isDone = strPos < 0;
- return isDone;
- }
-
- public ByteBuffer getBuffer() {
- int last = incoming.size()-1;
- if ( last >= 0 && incoming.get( last ).hasRemaining() ) {
- return incoming.get( last );
- }
- else {
- ByteBuffer newBuf = ByteBuffer.allocate( 8192 );
- incoming.add( newBuf );
- return newBuf;
- }
- }
-
- public String toString() {
- return new StringBuilder().append("Connection to ").append(sock.getHost())
- .append(" with ").append(incoming.size()).append(" bufs; done is ").append(isDone).toString();
- }
- }
-
- public void doMulti( boolean asString, Map sockKeys, String[] keys, Map ret ) {
-
- long timeRemaining = 0;
- try {
- selector = Selector.open();
-
- // get the sockets, flip them to non-blocking, and set up data
- // structures
- conns = new Connection[sockKeys.keySet().size()];
- numConns = 0;
- for ( Iterator i = sockKeys.keySet().iterator(); i.hasNext(); ) {
- // get SockIO obj from hostname
- String host = i.next();
-
- SockIOPool.SockIO sock = pool.getConnection( host );
-
- if ( sock == null ) {
- if ( errorHandler != null )
- errorHandler.handleErrorOnGet( this.mc, new IOException( "no socket to server available" ), keys );
- return;
- }
-
- conns[numConns++] = new Connection( sock, sockKeys.get( host ) );
- }
-
- // the main select loop; ends when
- // 1) we've received data from all the servers, or
- // 2) we time out
- long startTime = System.currentTimeMillis();
-
- long timeout = pool.getMaxBusy();
- timeRemaining = timeout;
-
- while ( numConns > 0 && timeRemaining > 0 ) {
- int n = selector.select( Math.min( timeout, 5000 ) );
- if ( n > 0 ) {
- // we've got some activity; handle it
- Iterator it = selector.selectedKeys().iterator();
- while ( it.hasNext() ) {
- SelectionKey key = it.next();
- it.remove();
- handleKey( key );
- }
- }
- else {
- // timeout likely... better check
- // TODO: This seems like a problem area that we need to figure out how to handle.
- log.error( "selector timed out waiting for activity" );
- }
-
- timeRemaining = timeout - (System.currentTimeMillis() - startTime);
- }
- }
- catch ( IOException e ) {
- // errors can happen just about anywhere above, from
- // connection setup to any of the mechanics
- handleError( e, keys );
- return;
- }
- finally {
- if ( log.isDebugEnabled() )
- log.debug( "Disconnecting; numConns=" + numConns + " timeRemaining=" + timeRemaining );
-
- // run through our conns and either return them to the pool
- // or forcibly close them
- try {
- selector.close();
- }
- catch ( IOException ignoreMe ) { }
-
- for ( Connection c : conns ) {
- if ( c != null )
- c.close();
- }
- }
-
- // Done! Build the list of results and return them. If we get
- // here by a timeout, then some of the connections are probably
- // not done. But we'll return what we've got...
- for ( Connection c : conns ) {
- try {
- if ( c.incoming.size() > 0 && c.isDone() )
- loadMulti( new ByteBufArrayInputStream( c.incoming ), ret, asString );
- }
- catch ( Exception e ) {
- // shouldn't happen; we have all the data already
- log.warn( "Caught the aforementioned exception on "+c );
- }
- }
- }
-
- private void handleError( Throwable e, String[] keys ) {
- // if we have an errorHandler, use its hook
- if ( errorHandler != null )
- errorHandler.handleErrorOnGet( MemCachedClient.this, e, keys );
-
- // exception thrown
- log.error( "++++ exception thrown while getting from cache on getMulti" );
- log.error( e.getMessage() );
- }
-
- private void handleKey( SelectionKey key ) throws IOException {
- if ( log.isDebugEnabled() )
- log.debug( "handling selector op " + key.readyOps() + " for key " + key );
-
- if ( key.isReadable() )
- readResponse( key );
- else if ( key.isWritable() )
- writeRequest( key );
- }
-
- public void writeRequest( SelectionKey key ) throws IOException {
- ByteBuffer buf = ((Connection) key.attachment()).outgoing;
- SocketChannel sc = (SocketChannel)key.channel();
-
- if ( buf.hasRemaining() ) {
- if ( log.isDebugEnabled() )
- log.debug( "writing " + buf.remaining() + "B to " + ((SocketChannel) key.channel()).socket().getInetAddress() );
-
- sc.write( buf );
- }
-
- if ( !buf.hasRemaining() ) {
- if ( log.isDebugEnabled() )
- log.debug( "switching to read mode for server " + ((SocketChannel)key.channel()).socket().getInetAddress() );
-
- key.interestOps( SelectionKey.OP_READ );
- }
- }
-
- public void readResponse( SelectionKey key ) throws IOException {
- Connection conn = (Connection)key.attachment();
- ByteBuffer buf = conn.getBuffer();
- int count = conn.channel.read( buf );
- if ( count > 0 ) {
- if ( log.isDebugEnabled() )
- log.debug( "read " + count + " from " + conn.channel.socket().getInetAddress() );
-
- if ( conn.isDone() ) {
- if ( log.isDebugEnabled() )
- log.debug( "connection done to " + conn.channel.socket().getInetAddress() );
-
- key.cancel();
- numConns--;
- return;
- }
- }
- }
- }
-}
\ No newline at end of file
diff --git a/memcache-client-forjava/src/java/com/alisoft/xplatform/asf/cache/memcached/client/NativeHandler.java b/memcache-client-forjava/src/java/com/alisoft/xplatform/asf/cache/memcached/client/NativeHandler.java
deleted file mode 100644
index cc4d2ff..0000000
--- a/memcache-client-forjava/src/java/com/alisoft/xplatform/asf/cache/memcached/client/NativeHandler.java
+++ /dev/null
@@ -1,440 +0,0 @@
-/**
- * MemCached Java client
- * Copyright (c) 2007 Greg Whalin
- * All rights reserved.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the BSD license
- *
- * This library is distributed in the hope that it will be
- * useful, but WITHOUT ANY WARRANTY; without even the implied
- * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE.
- *
- * You should have received a copy of the BSD License along with this
- * library.
- *
- * @author Greg Whalin
- * @version 2.0
- */
-package com.alisoft.xplatform.asf.cache.memcached.client;
-
-import java.util.Date;
-
-/**
- * Handle encoding standard Java types directly which can result in significant
- * memory savings:
- *
- * Currently the Memcached driver for Java supports the setSerialize() option.
- * This can increase performance in some situations but has a few issues:
- *
- * Code that performs class casting will throw ClassCastExceptions when
- * setSerialize is enabled. For example:
- *
- * mc.set( "foo", new Integer( 1 ) ); Integer output = (Integer)mc.get("foo");
- *
- * Will work just file when setSerialize is true but when its false will just throw
- * a ClassCastException.
- *
- * Also internally it doesn't support Boolean and since toString is called wastes a
- * lot of memory and causes additional performance issue. For example an Integer
- * can take anywhere from 1 byte to 10 bytes.
- *
- * Due to the way the memcached slab allocator works it seems like a LOT of wasted
- * memory to store primitive types as serialized objects (from a performance and
- * memory perspective). In our applications we have millions of small objects and
- * wasted memory would become a big problem.
- *
- * For example a Serialized Boolean takes 47 bytes which means it will fit into the
- * 64byte LRU. Using 1 byte means it will fit into the 8 byte LRU thus saving 8x
- * the memory. This also saves the CPU performance since we don't have to
- * serialize bytes back and forth and we can compute the byte[] value directly.
- *
- * One problem would be when the user calls get() because doing so would require
- * the app to know the type of the object stored as a bytearray inside memcached
- * (since the user will probably cast).
- *
- * If we assume the basic types are interned we could use the first byte as the
- * type with the remaining bytes as the value. Then on get() we could read the
- * first byte to determine the type and then construct the correct object for it.
- * This would prevent the ClassCastException I talked about above.
- *
- * We could remove the setSerialize() option and just assume that standard VM types
- * are always internd in this manner.
- *
- * mc.set( "foo", new Boolean.TRUE ); Boolean b = (Boolean)mc.get( "foo" );
- *
- * And the type casts would work because internally we would create a new Boolean
- * to return back to the client.
- *
- * This would reduce memory footprint and allow for a virtual implementation of the
- * Externalizable interface which is much faster than Serialzation.
- *
- * Currently the memory improvements would be:
- *
- * java.lang.Boolean - 8x performance improvement (now just two bytes)
- * java.lang.Integer - 16x performance improvement (now just 5 bytes)
- *
- * Most of the other primitive types would benefit from this optimization.
- * java.lang.Character being another obvious example.
- *
- * I know it seems like I'm being really picky here but for our application I'd
- * save 1G of memory right off the bat. We'd go down from 1.152G of memory used
- * down to 144M of memory used which is much better IMO.
- *
- * http://java.sun.com/docs/books/tutorial/native1.1/integrating/types.html
- *
- * @author Kevin A. Burton
- * @author Greg Whalin
- */
-public class NativeHandler {
-
- /**
- * Detemine of object can be natively serialized by this class.
- *
- * @param value Object to test.
- * @return true/false
- */
- public static boolean isHandled( Object value ) {
-
- return (
- value instanceof Byte ||
- value instanceof Boolean ||
- value instanceof Integer ||
- value instanceof Long ||
- value instanceof Character ||
- value instanceof String ||
- value instanceof StringBuffer ||
- value instanceof Float ||
- value instanceof Short ||
- value instanceof Double ||
- value instanceof Date ||
- value instanceof StringBuilder ||
- value instanceof byte[]
- )
- ? true
- : false;
- }
-
- /**
- * Returns the flag for marking the type of the byte array.
- *
- * @param value Object we are storing.
- * @return int marker
- */
- public static int getMarkerFlag( Object value ) {
-
- if ( value instanceof Byte )
- return MemCachedClient.MARKER_BYTE;
-
- if ( value instanceof Boolean )
- return MemCachedClient.MARKER_BOOLEAN;
-
- if ( value instanceof Integer )
- return MemCachedClient.MARKER_INTEGER;
-
- if ( value instanceof Long )
- return MemCachedClient.MARKER_LONG;
-
- if ( value instanceof Character )
- return MemCachedClient.MARKER_CHARACTER;
-
- if ( value instanceof String )
- return MemCachedClient.MARKER_STRING;
-
- if ( value instanceof StringBuffer )
- return MemCachedClient.MARKER_STRINGBUFFER;
-
- if ( value instanceof Float )
- return MemCachedClient.MARKER_FLOAT;
-
- if ( value instanceof Short )
- return MemCachedClient.MARKER_SHORT;
-
- if ( value instanceof Double )
- return MemCachedClient.MARKER_DOUBLE;
-
- if ( value instanceof Date )
- return MemCachedClient.MARKER_DATE;
-
- if ( value instanceof StringBuilder )
- return MemCachedClient.MARKER_STRINGBUILDER;
-
- if ( value instanceof byte[] )
- return MemCachedClient.MARKER_BYTEARR;
-
- return -1;
- }
-
- /**
- * Encodes supported types
- *
- * @param value Object to encode.
- * @return byte array
- *
- * @throws Exception If fail to encode.
- */
- public static byte[] encode( Object value ) throws Exception {
-
- if ( value instanceof Byte )
- return encode( (Byte)value );
-
- if ( value instanceof Boolean )
- return encode( (Boolean)value );
-
- if ( value instanceof Integer )
- return encode( ((Integer)value).intValue() );
-
- if ( value instanceof Long )
- return encode( ((Long)value).longValue() );
-
- if ( value instanceof Character )
- return encode( (Character)value );
-
- if ( value instanceof String )
- return encode( (String)value );
-
- if ( value instanceof StringBuffer )
- return encode( (StringBuffer)value );
-
- if ( value instanceof Float )
- return encode( ((Float)value).floatValue() );
-
- if ( value instanceof Short )
- return encode( (Short)value );
-
- if ( value instanceof Double )
- return encode( ((Double)value).doubleValue() );
-
- if ( value instanceof Date )
- return encode( (Date)value);
-
- if ( value instanceof StringBuilder )
- return encode( (StringBuilder)value );
-
- if ( value instanceof byte[] )
- return encode( (byte[])value );
-
- return null;
- }
-
- protected static byte[] encode( Byte value ) {
- byte[] b = new byte[1];
- b[0] = value.byteValue();
- return b;
- }
-
- protected static byte[] encode( Boolean value ) {
- byte[] b = new byte[1];
-
- if ( value.booleanValue() )
- b[0] = 1;
- else
- b[0] = 0;
-
- return b;
- }
-
- protected static byte[] encode( int value ) {
- return getBytes( value );
- }
-
- protected static byte[] encode( long value ) throws Exception {
- return getBytes( value );
- }
-
- protected static byte[] encode( Date value ) {
- return getBytes( value.getTime() );
- }
-
- protected static byte[] encode( Character value ) {
- return encode( value.charValue() );
- }
-
- protected static byte[] encode( String value ) throws Exception {
- return value.getBytes( "UTF-8" );
- }
-
- protected static byte[] encode( StringBuffer value ) throws Exception {
- return encode( value.toString() );
- }
-
- protected static byte[] encode( float value ) throws Exception {
- return encode( (int)Float.floatToIntBits( value ) );
- }
-
- protected static byte[] encode( Short value ) throws Exception {
- return encode( (int)value.shortValue() );
- }
-
- protected static byte[] encode( double value ) throws Exception {
- return encode( (long)Double.doubleToLongBits( value ) );
- }
-
- protected static byte[] encode( StringBuilder value ) throws Exception {
- return encode( value.toString() );
- }
-
- protected static byte[] encode( byte[] value ) {
- return value;
- }
-
- protected static byte[] getBytes( long value ) {
- byte[] b = new byte[8];
- b[0] = (byte)((value >> 56) & 0xFF);
- b[1] = (byte)((value >> 48) & 0xFF);
- b[2] = (byte)((value >> 40) & 0xFF);
- b[3] = (byte)((value >> 32) & 0xFF);
- b[4] = (byte)((value >> 24) & 0xFF);
- b[5] = (byte)((value >> 16) & 0xFF);
- b[6] = (byte)((value >> 8) & 0xFF);
- b[7] = (byte)((value >> 0) & 0xFF);
- return b;
- }
-
- protected static byte[] getBytes( int value ) {
- byte[] b = new byte[4];
- b[0] = (byte)((value >> 24) & 0xFF);
- b[1] = (byte)((value >> 16) & 0xFF);
- b[2] = (byte)((value >> 8) & 0xFF);
- b[3] = (byte)((value >> 0) & 0xFF);
- return b;
- }
-
- /**
- * Decodes byte array using memcache flag to determine type.
- *
- * @param b
- * @param marker
- * @return
- * @throws Exception
- */
- public static Object decode( byte[] b, int flag ) throws Exception {
-
- if ( b.length < 1 )
- return null;
-
-
- if ( ( flag & MemCachedClient.MARKER_BYTE ) == MemCachedClient.MARKER_BYTE )
- return decodeByte( b );
-
- if ( ( flag & MemCachedClient.MARKER_BOOLEAN ) == MemCachedClient.MARKER_BOOLEAN )
- return decodeBoolean( b );
-
- if ( ( flag & MemCachedClient.MARKER_INTEGER ) == MemCachedClient.MARKER_INTEGER )
- return decodeInteger( b );
-
- if ( ( flag & MemCachedClient.MARKER_LONG ) == MemCachedClient.MARKER_LONG )
- return decodeLong( b );
-
- if ( ( flag & MemCachedClient.MARKER_CHARACTER ) == MemCachedClient.MARKER_CHARACTER )
- return decodeCharacter( b );
-
- if ( ( flag & MemCachedClient.MARKER_STRING ) == MemCachedClient.MARKER_STRING )
- return decodeString( b );
-
- if ( ( flag & MemCachedClient.MARKER_STRINGBUFFER ) == MemCachedClient.MARKER_STRINGBUFFER )
- return decodeStringBuffer( b );
-
- if ( ( flag & MemCachedClient.MARKER_FLOAT ) == MemCachedClient.MARKER_FLOAT )
- return decodeFloat( b );
-
- if ( ( flag & MemCachedClient.MARKER_SHORT ) == MemCachedClient.MARKER_SHORT )
- return decodeShort( b );
-
- if ( ( flag & MemCachedClient.MARKER_DOUBLE ) == MemCachedClient.MARKER_DOUBLE )
- return decodeDouble( b );
-
- if ( ( flag & MemCachedClient.MARKER_DATE ) == MemCachedClient.MARKER_DATE )
- return decodeDate( b );
-
- if ( ( flag & MemCachedClient.MARKER_STRINGBUILDER ) == MemCachedClient.MARKER_STRINGBUILDER )
- return decodeStringBuilder( b );
-
- if ( ( flag & MemCachedClient.MARKER_BYTEARR ) == MemCachedClient.MARKER_BYTEARR )
- return decodeByteArr( b );
-
- return null;
- }
-
- // decode methods
- protected static Byte decodeByte( byte[] b ) {
- return new Byte( b[0] );
- }
-
- protected static Boolean decodeBoolean( byte[] b ) {
- boolean value = b[0] == 1;
- return ( value ) ? Boolean.TRUE : Boolean.FALSE;
- }
-
- protected static Integer decodeInteger( byte[] b ) {
- return new Integer( toInt( b ) );
- }
-
- protected static Long decodeLong( byte[] b ) throws Exception {
- return new Long( toLong( b ) );
- }
-
- protected static Character decodeCharacter( byte[] b ) {
- return new Character( (char)decodeInteger( b ).intValue() );
- }
-
- protected static String decodeString( byte[] b ) throws Exception {
- return new String( b, "UTF-8" );
- }
-
- protected static StringBuffer decodeStringBuffer( byte[] b ) throws Exception {
- return new StringBuffer( decodeString( b ) );
- }
-
- protected static Float decodeFloat( byte[] b ) throws Exception {
- Integer l = decodeInteger( b );
- return new Float( Float.intBitsToFloat( l.intValue() ) );
- }
-
- protected static Short decodeShort( byte[] b ) throws Exception {
- return new Short( (short)decodeInteger( b ).intValue() );
- }
-
- protected static Double decodeDouble( byte[] b ) throws Exception {
- Long l = decodeLong( b );
- return new Double( Double.longBitsToDouble( l.longValue() ) );
- }
-
- protected static Date decodeDate( byte[] b ) {
- return new Date( toLong( b ) );
- }
-
- protected static StringBuilder decodeStringBuilder( byte[] b ) throws Exception {
- return new StringBuilder( decodeString( b ) );
- }
-
- protected static byte[] decodeByteArr( byte[] b ) {
- return b;
- }
-
- /**
- * This works by taking each of the bit patterns and converting them to
- * ints taking into account 2s complement and then adding them..
- *
- * @param b
- * @return
- */
- protected static int toInt( byte[] b ) {
- return (((((int) b[3]) & 0xFF) << 32) +
- ((((int) b[2]) & 0xFF) << 40) +
- ((((int) b[1]) & 0xFF) << 48) +
- ((((int) b[0]) & 0xFF) << 56));
- }
-
- protected static long toLong( byte[] b ) {
- return ((((long) b[7]) & 0xFF) +
- ((((long) b[6]) & 0xFF) << 8) +
- ((((long) b[5]) & 0xFF) << 16) +
- ((((long) b[4]) & 0xFF) << 24) +
- ((((long) b[3]) & 0xFF) << 32) +
- ((((long) b[2]) & 0xFF) << 40) +
- ((((long) b[1]) & 0xFF) << 48) +
- ((((long) b[0]) & 0xFF) << 56));
- }
-}
diff --git a/memcache-client-forjava/src/java/com/alisoft/xplatform/asf/cache/memcached/client/NestedIOException.java b/memcache-client-forjava/src/java/com/alisoft/xplatform/asf/cache/memcached/client/NestedIOException.java
deleted file mode 100644
index 6601598..0000000
--- a/memcache-client-forjava/src/java/com/alisoft/xplatform/asf/cache/memcached/client/NestedIOException.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * MemCached Java client
- * Copyright (c) 2007 Greg Whalin
- * All rights reserved.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the BSD license
- *
- * This library is distributed in the hope that it will be
- * useful, but WITHOUT ANY WARRANTY; without even the implied
- * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE.
- *
- * You should have received a copy of the BSD License along with this
- * library.
- *
- * @author Kevin A. Burton
- * @version 2.0
- */
-package com.alisoft.xplatform.asf.cache.memcached.client;
-
-import java.io.*;
-
-/**
- * Bridge class to provide nested Exceptions with IOException which has
- * constructors that don't take Throwables.
- *
- * @author Kevin Burton
- * @version 1.2
- */
-public class NestedIOException extends IOException {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- /**
- * Create a new NestedIOException
instance.
- * @param cause object of type throwable
- */
- public NestedIOException( Throwable cause ) {
- super( cause.getMessage() );
- super.initCause( cause );
- }
-
- public NestedIOException( String message, Throwable cause ) {
- super( message );
- initCause( cause );
- }
-}
diff --git a/memcache-client-forjava/src/java/com/alisoft/xplatform/asf/cache/memcached/client/SockIOPool.java b/memcache-client-forjava/src/java/com/alisoft/xplatform/asf/cache/memcached/client/SockIOPool.java
deleted file mode 100644
index dbd983a..0000000
--- a/memcache-client-forjava/src/java/com/alisoft/xplatform/asf/cache/memcached/client/SockIOPool.java
+++ /dev/null
@@ -1,2628 +0,0 @@
-/**
- *
- */
-package com.alisoft.xplatform.asf.cache.memcached.client;
-
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Map;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.Iterator;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Date;
-import java.util.Arrays;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.zip.*;
-import java.net.*;
-import java.io.*;
-import java.nio.channels.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.log4j.Logger;
-
-/**
- *
- * This class is a connection pool for maintaning a pool of persistent
- * connections
to memcached servers.
- *
- * The pool must be initialized prior to use. This should typically be early on
- * in the lifecycle of the JVM instance.
- * An example of initializing using defaults:
- *
- *
- * static
- * {
- * String[] serverlist = { "cache0.server.com:12345",
- * "cache1.server.com:12345" };
- *
- * SockIOPool pool = SockIOPool.getInstance();
- * pool.setServers(serverlist);
- * pool.initialize();
- * }
- *
- *
- * An example of initializing using defaults and providing weights for
- * servers:
- *
- *
- * static
- * {
- * String[] serverlist = { "cache0.server.com:12345",
- * "cache1.server.com:12345" };
- * Integer[] weights = { new Integer(5), new Integer(2) };
- *
- * SockIOPool pool = SockIOPool.getInstance();
- * pool.setServers(serverlist);
- * pool.setWeights(weights);
- * pool.initialize();
- * }
- *
- *
- * An example of initializing overriding defaults:
- *
- *
- * static
- * {
- * String[] serverlist = { "cache0.server.com:12345",
- * "cache1.server.com:12345" };
- * Integer[] weights = { new Integer(5), new Integer(2) };
- * int initialConnections = 10;
- * int minSpareConnections = 5;
- * int maxSpareConnections = 50;
- * long maxIdleTime = 1000 * 60 * 30; // 30 minutes
- * long maxBusyTime = 1000 * 60 * 5; // 5 minutes
- * long maintThreadSleep = 1000 * 5; // 5 seconds
- * int socketTimeOut = 1000 * 3; // 3 seconds to block on reads
- * int socketConnectTO = 1000 * 3; // 3 seconds to block on initial connections. If 0, then will use blocking connect (default)
- * boolean failover = false; // turn off auto-failover in event of server down
- * boolean nagleAlg = false; // turn off Nagle's algorithm on all sockets in pool
- * boolean aliveCheck = false; // disable health check of socket on checkout
- *
- * SockIOPool pool = SockIOPool.getInstance();
- * pool.setServers(serverlist);
- * pool.setWeights(weights);
- * pool.setInitConn(initialConnections);
- * pool.setMinConn(minSpareConnections);
- * pool.setMaxConn(maxSpareConnections);
- * pool.setMaxIdle(maxIdleTime);
- * pool.setMaxBusyTime(maxBusyTime);
- * pool.setMaintSleep(maintThreadSleep);
- * pool.setSocketTO(socketTimeOut);
- * pool.setNagle(nagleAlg);
- * pool.setHashingAlg(SockIOPool.NEW_COMPAT_HASH);
- * pool.setAliveCheck(true);
- * pool.initialize();
- * }
- *
- *
- * The easiest manner in which to initialize the pool is to set the servers and
- * rely on defaults as in the first example.
After pool is initialized, a
- * client will request a SockIO object by calling getSock with the cache key
- * The client must always close the SockIO object when finished, which will
- * return the connection back to the pool.
- * An example of retrieving a SockIO object:
- *
- *
- * SockIOPool.SockIO sock = SockIOPool.getInstance().getSock( key );
- * try {
- * sock.write( "version\r\n" );
- * sock.flush();
- * System.out.println( "Version: " + sock.readLine() );
- * }
- * catch (IOException ioe) { System.out.println( "io exception thrown" ) };
- *
- * sock.close();
- *
- *
- * ŻSocketԴأЧ
- *
- * @author wenchu.cenwc
- *
- */
-public class SockIOPool
-{
- // logger
- private static Logger log = Logger.getLogger(SockIOPool.class.getName());
-
- // store instances of pools
- private static ConcurrentMap pools = new ConcurrentHashMap();
-
- // avoid recurring construction
- private static ThreadLocal MD5 = new ThreadLocal()
- {
- @Override
- protected MessageDigest initialValue()
- {
- try
- {
- return MessageDigest.getInstance("MD5");
- } catch (NoSuchAlgorithmException e)
- {
- log.error("++++ no md5 algorithm found");
- throw new IllegalStateException("++++ no md5 algorythm found");
- }
- }
- };
-
- // Constants
-
- /**
- * socket״̬ʹãæ
- */
- private static final int SOCKET_STATUS_BUSY = 1;
- /**
- * socket״̬ѾʧЧ
- */
- private static final int SOCKET_STATUS_DEAD = 2;
-
- private static final int SOCKET_STATUS_ACTIVE = 3;
-
- public static final int NATIVE_HASH = 0; // native String.hashCode();
- public static final int OLD_COMPAT_HASH = 1; // original compatibility
- // hashing algorithm (works
- // with other clients)
- public static final int NEW_COMPAT_HASH = 2; // new CRC32 based
- // compatibility hashing
- // algorithm (works with
- // other clients)
- public static final int CONSISTENT_HASH = 3; // MD5 Based -- Stops
- // thrashing when a server
- // added or removed
-
- public static final long MAX_RETRY_DELAY = 10 * 60 * 1000; // max of 10
- // minute delay
- // for fall off
-
- public static final Random random = new Random();
-
- // Pool data
- private MaintThread maintThread;
- private boolean initialized = false;
- @SuppressWarnings("unused")
- private int maxCreate = 1; // this will be initialized by pool when the
- // pool is initialized
-
- // initial, min and max pool sizes
- private int poolMultiplier = 3;
- private int initConn = 1;
- private int minConn = 1;
- private int maxConn = 10;
- private long maxIdle = 1000 * 60 * 5; // max idle time for avail sockets
- private long maxBusyTime = 1000 * 30; // max idle time for avail sockets
- private long maintSleep = 1000 * 30; // maintenance thread sleep time
- private int socketTO = 1000 * 30; // default timeout of socket reads
- private int socketConnectTO = 1000 * 3; // default timeout of socket
- // connections
- @SuppressWarnings("unused")
- private static int recBufferSize = 128;//ݵĻС
-
- private boolean aliveCheck = false; // default to not check each connection
- // for being alive
- private boolean failover = true; // default to failover in event of cache
- // server dead
- private boolean failback = true; // only used if failover is also set ...
- // controls putting a dead server back
- // into rotation
- private boolean nagle = true; // enable/disable Nagle's algorithm
- private int hashingAlg = NATIVE_HASH; // default to using the native hash
- // as it is the fastest
-
- // locks
- private final ReentrantLock hostDeadLock = new ReentrantLock();
- private final ReentrantLock initDeadLock = new ReentrantLock();
-
- // list of all servers
- private String[] servers;
- private Integer[] weights;
- private Integer totalWeight = 0;
-
- private List buckets;
- private TreeMap consistentBuckets;
-
- // dead server map
- private ConcurrentMap hostDead;
- private ConcurrentMap hostDeadDur;
-
- // map to hold all available sockets
- // map to hold socket status;
- private ConcurrentMap> socketPool;
- private Map fastPool;
-
- private static final byte[] B_VERSION = "version\r\n".getBytes();
-
- // empty constructor
- protected SockIOPool()
- {
- }
-
- /**
- * Factory to create/retrieve new pools given a unique poolName.
- *
- * @param poolName
- * unique name of the pool
- * @return instance of SockIOPool
- */
- public static SockIOPool getInstance(String poolName)
- {
- SockIOPool pool;
-
- if (!pools.containsKey(poolName))
- {
- pool = new SockIOPool();
- pools.putIfAbsent(poolName, pool);
- }
-
- return pools.get(poolName);
- }
-
- public static String getPoolUsage(String poolName)
- {
- StringBuilder result = new StringBuilder();
-
- if (pools.containsKey(poolName))
- {
- SockIOPool sockIOPool = pools.get(poolName);
-
- int total = 0;
- int busy = 0;
- int dead = 0;
-
- Iterator> socketIter = sockIOPool.socketPool.values().iterator();
-
- while(socketIter.hasNext())
- {
- ConcurrentMap status = socketIter.next();
-
- total += status.size();
-
- Iterator iter = status.values().iterator();
-
- while(iter.hasNext())
- {
- int value = iter.next();
-
- if (value == SOCKET_STATUS_BUSY)
- busy++;
-
- if (value == SOCKET_STATUS_DEAD)
- dead++;
- }
- }
-
- result.append("SockIOPool ").append(poolName).append(" : ")
- .append(" total socket: ").append(total).append(" , busy socket: ")
- .append(busy).append(" , dead socket: ").append(dead);
- }
-
- return result.toString();
- }
-
- /**
- * get new Instance form pool
- * @param poolName
- * @return
- */
- public static SockIOPool getNewInstance(String poolName)
- {
- SockIOPool pool;
-
- if (!pools.containsKey(poolName))
- {
- pool = new SockIOPool();
- pools.putIfAbsent(poolName, pool);
- }
- else
- {
- SockIOPool newpool = new SockIOPool();
- pool = pools.get(poolName);
- pools.put(poolName, newpool);
-
- try
- {
- pool.shutDown();
- }
- catch(Exception ex)
- {
- log.error("shutdown old pool error!",ex);
- }
-
- }
-
- return pools.get(poolName);
- }
-
- public static void removeInstance(String poolName)
- {
- if (pools.containsKey(poolName))
- {
- SockIOPool pool = pools.get(poolName);
-
- try
- {
- pool.shutDown();
- }
- catch(Exception ex)
- {
- log.error("shutdown old pool error!",ex);
- }
-
- pools.remove(poolName);
- }
- }
-
- /**
- * Single argument version of factory used for back compat. Simply creates a
- * pool named "default".
- *
- * @return instance of SockIOPool
- */
- public static SockIOPool getInstance()
- {
- return getInstance("default");
- }
-
- /**
- * Initializes the pool.
- */
- public void initialize()
- {
-
- // check to see if already initialized
- if (initialized && (buckets != null || consistentBuckets != null)
- && (socketPool != null))
- {
- log.error("++++ trying to initialize an already initialized pool");
- return;
- }
-
- initDeadLock.lock();
- try
- {
- // check to see if already initialized
- if (initialized && (buckets != null || consistentBuckets != null)
- && (socketPool != null))
- {
- log
- .error("++++ trying to initialize an already initialized pool");
- return;
- }
-
- // pools
- socketPool = new ConcurrentHashMap>(
- servers.length * initConn);
-
- fastPool = new HashMap();
-
- hostDeadDur = new ConcurrentHashMap();
- hostDead = new ConcurrentHashMap();
- maxCreate = (poolMultiplier > minConn) ? minConn : minConn
- / poolMultiplier; // only create up to maxCreate
- // connections at once
-
- if (log.isDebugEnabled())
- {
- log.debug("++++ initializing pool with following settings:");
- log.debug("++++ initial size: " + initConn);
- log.debug("++++ min spare : " + minConn);
- log.debug("++++ max spare : " + maxConn);
- }
-
- // if servers is not set, or it empty, then
- // throw a runtime exception
- if (servers == null || servers.length <= 0)
- {
- log.error("++++ trying to initialize with no servers");
- throw new IllegalStateException(
- "++++ trying to initialize with no servers");
- }
-
- // initalize our internal hashing structures
- if (this.hashingAlg == CONSISTENT_HASH)
- populateConsistentBuckets();
- else
- populateBuckets();
-
- // mark pool as initialized
- this.initialized = true;
-
- // start maint thread
- if (this.maintSleep > 0)
- this.startMaintThread();
-
- } finally
- {
- initDeadLock.unlock();
- }
-
- }
-
- private void populateBuckets()
- {
- if (log.isDebugEnabled())
- log
- .debug("++++ initializing internal hashing structure for consistent hashing");
-
- // store buckets in tree map
- this.buckets = new ArrayList();
-
- for (int i = 0; i < servers.length; i++)
- {
- if (this.weights != null && this.weights.length > i)
- {
- for (int k = 0; k < this.weights[i].intValue(); k++)
- {
- this.buckets.add(servers[i]);
- if (log.isDebugEnabled())
- log.debug("++++ added " + servers[i]
- + " to server bucket");
- }
- } else
- {
- this.buckets.add(servers[i]);
- if (log.isDebugEnabled())
- log.debug("++++ added " + servers[i] + " to server bucket");
- }
-
- // create initial connections
- if (log.isDebugEnabled())
- log.debug("+++ creating initial connections (" + initConn
- + ") for host: " + servers[i]);
-
- for (int j = 0; j < initConn; j++)
- {
- SockIO socket = createSocket(servers[i]);
- if (socket == null)
- {
- log.error("++++ failed to create connection to: "
- + servers[i] + " -- only " + j + " created.");
- break;
- }
-
- addSocketToPool(socketPool, servers[i], socket,
- SOCKET_STATUS_ACTIVE,SOCKET_STATUS_ACTIVE, true);
-
- if (log.isDebugEnabled())
- log.debug("++++ created and added socket: "
- + socket.toString() + " for host " + servers[i]);
- }
- }
- }
-
- private void populateConsistentBuckets()
- {
- if (log.isDebugEnabled())
- log
- .debug("++++ initializing internal hashing structure for consistent hashing");
-
- // store buckets in tree map
- this.consistentBuckets = new TreeMap();
-
- MessageDigest md5 = MD5.get();
- if (this.totalWeight <= 0 && this.weights != null)
- {
- for (int i = 0; i < this.weights.length; i++)
- this.totalWeight += (this.weights[i] == null) ? 1
- : this.weights[i];
- } else if (this.weights == null)
- {
- this.totalWeight = this.servers.length;
- }
-
- for (int i = 0; i < servers.length; i++)
- {
- int thisWeight = 1;
- if (this.weights != null && this.weights[i] != null)
- thisWeight = this.weights[i];
-
- double factor = Math
- .floor(((double) (40 * this.servers.length * thisWeight))
- / (double) this.totalWeight);
-
- for (long j = 0; j < factor; j++)
- {
- byte[] d = md5.digest((servers[i] + "-" + j).getBytes());
- for (int h = 0; h < 4; h++)
- {
- Long k = ((long) (d[3 + h * 4] & 0xFF) << 24)
- | ((long) (d[2 + h * 4] & 0xFF) << 16)
- | ((long) (d[1 + h * 4] & 0xFF) << 8)
- | ((long) (d[0 + h * 4] & 0xFF));
-
- consistentBuckets.put(k, servers[i]);
- if (log.isDebugEnabled())
- log.debug("++++ added " + servers[i]
- + " to server bucket");
- }
- }
-
- // create initial connections
- if (log.isDebugEnabled())
- log.debug("+++ creating initial connections (" + initConn
- + ") for host: " + servers[i]);
-
- for (int j = 0; j < initConn; j++)
- {
- SockIO socket = createSocket(servers[i]);
- if (socket == null)
- {
- log.error("++++ failed to create connection to: "
- + servers[i] + " -- only " + j + " created.");
- break;
- }
-
- addSocketToPool(socketPool, servers[i], socket
- ,SOCKET_STATUS_ACTIVE,SOCKET_STATUS_ACTIVE, true);
-
- if (log.isDebugEnabled())
- log.debug("++++ created and added socket: "
- + socket.toString() + " for host " + servers[i]);
- }
- }
- }
-
- /**
- * Creates a new SockIO obj for the given server.
- *
- * If server fails to connect, then return null and do not try
again
- * until a duration has passed. This duration will grow
by doubling
- * after each failed attempt to connect.
- *
- * @param host
- * host:port to connect to
- * @return SockIO obj or null if failed to create
- */
- protected SockIO createSocket(String host)
- {
-
- SockIO socket = null;
-
- // if host is dead, then we don't need to try again
- // until the dead status has expired
- // we do not try to put back in if failback is off
- hostDeadLock.lock();
- try
- {
- if (failover && failback && hostDead.containsKey(host)
- && hostDeadDur.containsKey(host))
- {
-
- Date store = hostDead.get(host);
- long expire = hostDeadDur.get(host).longValue();
-
- if ((store.getTime() + expire) > System.currentTimeMillis())
- return null;
- }
- } finally
- {
- hostDeadLock.unlock();
- }
-
- try
- {
- socket = new SockIO(this, host, this.socketTO,
- this.socketConnectTO, this.nagle);
-
- if (!socket.isConnected())
- {
- log.error("++++ failed to get SockIO obj for: " + host
- + " -- new socket is not connected");
- addSocketToPool(socketPool, host, socket
- ,SOCKET_STATUS_DEAD,SOCKET_STATUS_DEAD,true);
- // socket = null;
- }
- } catch (Exception ex)
- {
- log.error("++++ failed to get SockIO obj for: " + host);
- log.error(ex.getMessage(), ex);
- socket = null;
- }
-
- // if we failed to get socket, then mark
- // host dead for a duration which falls off
- hostDeadLock.lock();
- try
- {
- if (socket == null)
- {
- Date now = new Date();
- hostDead.put(host, now);
-
- long expire = (hostDeadDur.containsKey(host)) ? (((Long) hostDeadDur
- .get(host)).longValue() * 2)
- : 1000;
-
- if (expire > MAX_RETRY_DELAY)
- expire = MAX_RETRY_DELAY;
-
- hostDeadDur.put(host, new Long(expire));
- if (log.isDebugEnabled())
- log.debug("++++ ignoring dead host: " + host + " for "
- + expire + " ms");
-
- // also clear all entries for this host from availPool
- clearHostFromPool(host);
- } else
- {
- if (log.isDebugEnabled())
- log.debug("++++ created socket (" + socket.toString()
- + ") for host: " + host);
- if (hostDead.containsKey(host) || hostDeadDur.containsKey(host))
- {
- hostDead.remove(host);
- hostDeadDur.remove(host);
- }
- }
- } finally
- {
- hostDeadLock.unlock();
- }
-
- return socket;
- }
-
- /**
- * @param key
- * @return
- */
- public String getHost(String key)
- {
- return getHost(key, null);
- }
-
- /**
- * Gets the host that a particular key / hashcode resides on.
- *
- * @param key
- * @param hashcode
- * @return
- */
- public String getHost(String key, Integer hashcode)
- {
- SockIO socket = getSock(key, hashcode);
- String host = socket.getHost();
- socket.close();
- return host;
- }
-
- /**
- * Returns appropriate SockIO object given string cache key.
- *
- * @param key
- * hashcode for cache key
- * @return SockIO obj connected to server
- */
- public SockIO getSock(String key)
- {
- return getSock(key, null);
- }
-
- /**
- * Returns appropriate SockIO object given string cache key and optional
- * hashcode.
- *
- * Trys to get SockIO from pool. Fails over to additional pools in event of
- * server failure.
- *
- * @param key
- * hashcode for cache key
- * @param hashCode
- * if not null, then the int hashcode to use
- * @return SockIO obj connected to server
- */
- public SockIO getSock(String key, Integer hashCode)
- {
-
- if (log.isDebugEnabled())
- log.debug("cache socket pick " + key + " " + hashCode);
-
- if (!this.initialized)
- {
- log.error("attempting to get SockIO from uninitialized pool!");
- return null;
- }
-
- // if no servers return null
- if ((this.hashingAlg == CONSISTENT_HASH && consistentBuckets.size() == 0)
- || (buckets != null && buckets.size() == 0))
- return null;
-
- // if only one server, return it
- if ((this.hashingAlg == CONSISTENT_HASH && consistentBuckets.size() == 0)
- || (buckets != null && buckets.size() == 1))
- {
-
- SockIO sock = (this.hashingAlg == CONSISTENT_HASH) ? getConnection(consistentBuckets
- .get(consistentBuckets.firstKey()))
- : getConnection(buckets.get(0));
-
- if (sock != null && sock.isConnected())
- {
- if (aliveCheck)
- {
- if (!sock.isAlive())
- {
- sock.close();
- try
- {
- if (socketPool.get(sock.getHost()) != null)
- socketPool.get(sock.getHost()).remove(sock);
-
- sock.trueClose();
- } catch (IOException ioe)
- {
- log.error("failed to close dead socket");
- }
-
- sock = null;
- }
- }
- } else
- {
- if (sock != null)
- {
- addSocketToPool(socketPool, sock.host, sock,SOCKET_STATUS_DEAD,
- SOCKET_STATUS_DEAD, true);
- // sock = null;
- }
- }
-
- return sock;
- }
-
- // from here on, we are working w/ multiple servers
- // keep trying different servers until we find one
- // making sure we only try each server one time
- Set tryServers = new HashSet(Arrays.asList(servers));
-
- // get initial bucket
- long bucket = getBucket(key, hashCode);
- String server = (this.hashingAlg == CONSISTENT_HASH) ? consistentBuckets
- .get(bucket)
- : buckets.get((int) bucket);
-
- while (!tryServers.isEmpty())
- {
-
- // try to get socket from bucket
- SockIO sock = getConnection(server);
-
- if (log.isDebugEnabled())
- log.debug("cache choose " + server + " for " + key);
-
- if (sock != null && sock.isConnected())
- {
- if (aliveCheck)
- {
- if (sock.isAlive())
- {
- return sock;
- } else
- {
- sock.close();
- try
- {
-
- if (socketPool.get(sock.getHost()) != null)
- socketPool.get(sock.getHost()).remove(sock);
-
- sock.trueClose();
-
- } catch (IOException ioe)
- {
- log.error("failed to close dead socket");
- }
- sock = null;
- }
- } else
- {
- return sock;
- }
- } else
- {
- if (sock != null)
- {
- addSocketToPool(socketPool, sock.host, sock,SOCKET_STATUS_DEAD,
- SOCKET_STATUS_DEAD, true);
- // sock = null;
- }
- }
-
- // if we do not want to failover, then bail here
- if (!failover)
- return null;
-
- // log that we tried
- tryServers.remove(server);
-
- if (tryServers.isEmpty())
- break;
-
- // if we failed to get a socket from this server
- // then we try again by adding an incrementer to the
- // current key and then rehashing
- int rehashTries = 0;
- while (!tryServers.contains(server))
- {
-
- String newKey = new StringBuilder().append(rehashTries).append(
- key).toString();
-
- // String.format( "%s%s", rehashTries, key );
- if (log.isDebugEnabled())
- log.debug("rehashing with: " + newKey);
-
- bucket = getBucket(newKey, null);
- server = (this.hashingAlg == CONSISTENT_HASH) ? consistentBuckets
- .get(bucket)
- : buckets.get((int) bucket);
-
- rehashTries++;
- }
- }
-
- return null;
- }
-
- /**
- * Returns a SockIO object from the pool for the passed in host.
- *
- * Meant to be called from a more intelligent method
which handles
- * choosing appropriate server
and failover.
- *
- * @param host
- * host from which to retrieve object
- * @return SockIO object or null if fail to retrieve one
- */
- public SockIO getConnection(String host)
- {
-
- if (!this.initialized)
- {
- log.error("attempting to get SockIO from uninitialized pool!");
- return null;
- }
-
- if (host == null)
- return null;
-
- // if we have items in the pool
- // then we can return it
- if (socketPool != null && !socketPool.isEmpty())
- {
- // take first connected socket
- Map aSockets = socketPool.get(host);
-
- //fast check
- SockIO socket = fastPool.get(host);
- if (socket != null)
- {
- if (isFreeSocket(socket,aSockets))
- {
- return socket;
- }
- }
-
- if (aSockets != null && !aSockets.isEmpty())
- {
- int start = (random.nextInt() % aSockets.size());
-
- if (start < 0)
- start*= -1;
-
- int count = 0;
-
- for (Iterator i = aSockets.keySet().iterator();
- i.hasNext();)
- {
- if (count < start)
- {
- i.next();
- count++;
- continue;
- }
-
- socket = i.next();
-
- if (isFreeSocket(socket,aSockets))
- return socket;
- }
-
- for (Iterator i = aSockets.keySet().iterator();
- i.hasNext();)
- {
- if (count > 0)
- {
- socket = i.next();
- if (isFreeSocket(socket,aSockets))
- return socket;
-
- count--;
- }
- else
- break;
-
- }
-
- }
- }
-
- // create one socket -- let the maint thread take care of creating more
- SockIO socket = createSocket(host);
- if (socket != null)
- {
- addSocketToPool(socketPool, host, socket,
- SOCKET_STATUS_BUSY,SOCKET_STATUS_BUSY, true);
- }
-
- return socket;
- }
-
- private boolean isFreeSocket(SockIO socket,Map socketMap)
- {
- if (socket.isConnected())
- {
- if (socketMap.get(socket) == SOCKET_STATUS_ACTIVE)
- {
-
- if (!addSocketToPool(socketPool, socket.getHost(), socket
- ,SOCKET_STATUS_ACTIVE,SOCKET_STATUS_BUSY, false))
- return false;
-
- if (log.isDebugEnabled())
- log.debug("++++ moving socket for host ("
- + socket.getHost() + ") to busy pool ... socket: "
- + socket);
-
- // return socket
- return true;
-
- }
-
- } else
- {
- // add to deadpool for later reaping
- addSocketToPool(socketPool, socket.getHost(), socket,SOCKET_STATUS_DEAD,
- SOCKET_STATUS_DEAD, true);
- }
-
-
- return false;
- }
-
- /**
- * Adds a socket and value to a given pool for the given host.
- *
- * @param pool
- * pool to add to
- * @param host
- * host this socket is connected to
- * @param socket
- * socket to add
- */
- protected boolean addSocketToPool(
- ConcurrentMap> pool, String host,
- SockIO socket, T oldValue,T newValue, boolean needReplace)
- {
- boolean result = false;
-
- ConcurrentMap sockets;
-
- if (!pool.containsKey(host))
- {
- sockets = new ConcurrentHashMap();
- pool.putIfAbsent(host, sockets);
- }
-
- sockets = pool.get(host);
-
- if (sockets != null)
- {
- if (needReplace)
- {
- sockets.put(socket, newValue);
- result = true;
- }
- else
- {
- return sockets.replace(socket, oldValue, newValue);
- }
- }
-
- return result;
- }
-
- /**
- * @param host
- * @param socket
- * @param oldStatus
- * @param newStatus
- */
- protected void updateStatusPool(String host, SockIO socket, int newStatus)
- {
- if (socketPool.containsKey(host))
- socketPool.get(host).replace(socket, newStatus);
- }
-
- /**
- * Closes and removes all sockets from specified pool for host. THIS METHOD
- * IS NOT THREADSAFE, SO BE CAREFUL WHEN USING!
- *
- * Internal utility method.
- *
- * @param pool
- * pool to clear
- * @param host
- * host to clear
- */
- protected void clearHostFromPool(String host)
- {
- Map sockets = socketPool.remove(host);
-
- if (sockets != null)
- {
-
- if (sockets.size() > 0)
- {
- for (SockIO socket : sockets.keySet())
- {
- sockets.remove(socket);
-
- try
- {
- socket.trueClose();
- } catch (IOException ioe)
- {
- log.error("++++ failed to close socket: "
- + ioe.getMessage());
- }
-
- socket = null;
- }
- }
-
- }
- }
-
- /**
- * Checks a SockIO object in with the pool.
- *
- * This will remove SocketIO from busy pool, and optionally
add to
- * avail pool.
- *
- * @param socket
- * socket to return
- * @param addToAvail
- * add to avail pool if true
- */
- private void checkIn(SockIO socket, boolean addToAvail)
- {
-
- String host = socket.getHost();
- if (log.isDebugEnabled())
- log.debug("++++ calling check-in on socket: " + socket.toString()
- + " for host: " + host);
-
- // remove from the busy pool
- if (log.isDebugEnabled())
- log.debug("++++ removing socket (" + socket.toString()
- + ") from busy pool for host: " + host);
-
- if (socketPool.containsKey(host)
- && socketPool.get(host).containsKey(socket)
- && socketPool.get(host).get(socket) == SOCKET_STATUS_BUSY)
- {
- addSocketToPool(socketPool, host, socket,
- SOCKET_STATUS_ACTIVE,SOCKET_STATUS_ACTIVE,true);
-
- fastPool.put(host, socket);
- }
-
- if (socket.isConnected() && addToAvail)
- {
- // add to avail pool
- if (log.isDebugEnabled())
- log.debug("++++ returning socket (" + socket.toString()
- + " to avail pool for host: " + host);
- } else
- {
- addSocketToPool(socketPool, host, socket
- ,SOCKET_STATUS_DEAD,SOCKET_STATUS_DEAD, true);
- // socket = null;
- }
-
- }
-
- /**
- * Returns a socket to the avail pool.
- *
- * This is called from SockIO.close(). Calling this method
directly
- * without closing the SockIO object first
will cause an IOException
- * to be thrown.
- *
- * @param socket
- * socket to return
- */
- private void checkIn(SockIO socket)
- {
- checkIn(socket, true);
- }
-
- /**
- * Closes all sockets in the passed in pool.
- *
- * Internal utility method.
- *
- * @param pool
- * pool to close
- */
- protected void closeSocketPool()
- {
- for (Iterator i = socketPool.keySet().iterator(); i.hasNext();)
- {
- String host = i.next();
- Map sockets = socketPool.get(host);
-
- for (SockIO socket : sockets.keySet())
- {
- sockets.remove(socket);
-
- try
- {
- socket.trueClose(false);
- } catch (IOException ioe)
- {
- log.error("++++ failed to trueClose socket: "
- + socket.toString() + " for host: " + host);
- }
-
- socket = null;
- }
- }
- }
-
- /**
- * Shuts down the pool.
- *
- * Cleanly closes all sockets.
Stops the maint thread.
Nulls out
- * all internal maps
- */
- public void shutDown()
- {
- if (log.isDebugEnabled())
- log.debug("++++ SockIOPool shutting down...");
-
- if (maintThread != null && maintThread.isRunning())
- {
- // stop the main thread
- stopMaintThread();
-
- // wait for the thread to finish
- while (maintThread.isRunning())
- {
- if (log.isDebugEnabled())
- log.debug("++++ waiting for main thread to finish run +++");
- try
- {
- Thread.sleep(500);
- } catch (Exception ex)
- {
- }
- }
- }
-
- if (log.isDebugEnabled())
- log.debug("++++ closing all internal pools.");
- closeSocketPool();
-
- socketPool.clear();
- fastPool.clear();
- socketPool = null;
- fastPool = null;
- buckets = null;
- consistentBuckets = null;
- hostDeadDur = null;
- hostDead = null;
- maintThread = null;
- initialized = false;
- if (log.isDebugEnabled())
- log.debug("++++ SockIOPool finished shutting down.");
-
- }
-
- /**
- * Starts the maintenance thread.
- *
- * This thread will manage the size of the active pool
as well as move
- * any closed, but not checked in sockets
back to the available pool.
- */
- protected void startMaintThread()
- {
-
- if (maintThread != null)
- {
-
- if (maintThread.isRunning())
- {
- log.error("main thread already running");
- } else
- {
- maintThread.start();
- }
- } else
- {
- maintThread = new MaintThread(this);
- maintThread.setInterval(this.maintSleep);
- maintThread.start();
- }
- }
-
- /**
- * Stops the maintenance thread.
- */
- protected void stopMaintThread()
- {
- if (maintThread != null && maintThread.isRunning())
- maintThread.stopThread();
- }
-
- /**
- * Runs self maintenance on all internal pools.
- *
- * This is typically called by the maintenance thread to manage pool size.
- */
- protected void selfMaint()
- {
- if (log.isDebugEnabled())
- log.debug("++++ Starting self maintenance....");
-
- // go through avail sockets and create sockets
- // as needed to maintain pool settings
- Map needSockets = new HashMap();
-
- // find out how many to create
- for (Iterator i = socketPool.keySet().iterator(); i.hasNext();)
- {
- String host = i.next();
- ConcurrentMap sockets = socketPool.get(host);
-
- if (sockets == null)
- {
- sockets = new ConcurrentHashMap();
- socketPool.putIfAbsent(host, sockets);
- sockets = socketPool.get(host);
- }
-
- if (log.isDebugEnabled())
- log.debug("++++ Size of avail pool for host (" + host + ") = "
- + sockets.size());
-
- // if pool is too small (n < minSpare)
- if (sockets != null && sockets.size() < minConn)
- {
- // need to create new sockets
- int need = minConn - sockets.size();
- needSockets.put(host, need);
- }
- }
-
- // now create
- for (String host : needSockets.keySet())
- {
- Integer need = needSockets.get(host);
-
- if (log.isDebugEnabled())
- log.debug("++++ Need to create " + need
- + " new sockets for pool for host: " + host);
-
- for (int j = 0; j < need; j++)
- {
- SockIO socket = createSocket(host);
-
- if (socket == null)
- break;
-
- addSocketToPool(socketPool, host, socket
- ,SOCKET_STATUS_ACTIVE,SOCKET_STATUS_ACTIVE, true);
- }
-
- }
-
- for (Iterator i = socketPool.keySet().iterator(); i.hasNext();)
- {
- String host = i.next();
- ConcurrentMap sockets = socketPool.get(host);
-
- if (log.isDebugEnabled())
- log.debug("++++ Size of avail pool for host (" + host + ") = "
- + sockets.size());
-
- int active = 0;
-
- if (sockets == null)
- {
- sockets = new ConcurrentHashMap();
- socketPool.putIfAbsent(host, sockets);
- sockets = socketPool.get(host);
- }
-
- Iterator iter = sockets.values().iterator();
-
- while(iter.hasNext())
- {
- if(iter.next() == SOCKET_STATUS_ACTIVE)
- active++;
- }
-
- if (sockets != null && (active > maxConn))
- {
- // need to close down some sockets
- int diff = active - maxConn;
- int needToClose = (diff <= poolMultiplier) ? diff : (diff)
- / poolMultiplier;
-
- if (log.isDebugEnabled())
- log.debug("++++ need to remove " + needToClose
- + " spare sockets for pool for host: " + host);
-
- for (Iterator j = sockets.keySet().iterator(); j
- .hasNext();)
- {
- if (needToClose <= 0)
- break;
-
- // remove stale entries
- SockIO socket = j.next();
-
- // remove from the availPool
- if (sockets.get(socket) == SOCKET_STATUS_ACTIVE)
- {
- if (addSocketToPool(socketPool, host, socket
- ,SOCKET_STATUS_ACTIVE,SOCKET_STATUS_DEAD, false))
- needToClose--;
- }
-
- }
- }
- }
-
- // finally clean out the deadPool
- for (Iterator i = socketPool.keySet().iterator(); i.hasNext();)
- {
- String host = i.next();
- ConcurrentMap sockets = socketPool.get(host);
-
- // loop through all connections and check to see if we have any hung
- // connections
-
- for (Iterator j = sockets.keySet().iterator(); j.hasNext();)
- {
- // remove stale entries
- SockIO socket = j.next();
-
- try
- {
- Integer status = null;
- if (sockets != null && socket != null)
- status = sockets.get(socket);
-
- if (status != null && status == SOCKET_STATUS_DEAD)
- {
-
- if (socketPool.containsKey(host))
- socketPool.get(host).remove(socket);
-
- socket.trueClose(false);
-
- socket = null;
- }
- } catch (Exception ex)
- {
- log.error("++++ failed to close SockIO obj from deadPool");
- log.error(ex.getMessage(), ex);
- }
- }
- }
-
- if (log.isDebugEnabled())
- log.debug("+++ ending self maintenance.");
-
- }
-
- /**
- * Returns state of pool.
- *
- * @return true
if initialized.
- */
- public boolean isInitialized()
- {
- return initialized;
- }
-
- /**
- * Sets the list of all cache servers.
- *
- * @param servers
- * String array of servers [host:port]
- */
- public void setServers(String[] servers)
- {
- this.servers = servers;
- }
-
- /**
- * Returns the current list of all cache servers.
- *
- * @return String array of servers [host:port]
- */
- public String[] getServers()
- {
- return this.servers;
- }
-
- /**
- * Sets the list of weights to apply to the server list.
- *
- * This is an int array with each element corresponding to an element
- * in the same position in the server String array.
- *
- * @param weights
- * Integer array of weights
- */
- public void setWeights(Integer[] weights)
- {
- this.weights = weights;
- }
-
- /**
- * Returns the current list of weights.
- *
- * @return int array of weights
- */
- public Integer[] getWeights()
- {
- return this.weights;
- }
-
- /**
- * Sets the initial number of connections per server in the available pool.
- *
- * @param initConn
- * int number of connections
- */
- public void setInitConn(int initConn)
- {
- this.initConn = initConn;
- }
-
- /**
- * Returns the current setting for the initial number of connections per
- * server in the available pool.
- *
- * @return number of connections
- */
- public int getInitConn()
- {
- return this.initConn;
- }
-
- /**
- * Sets the minimum number of spare connections to maintain in our available
- * pool.
- *
- * @param minConn
- * number of connections
- */
- public void setMinConn(int minConn)
- {
- this.minConn = minConn;
- }
-
- /**
- * Returns the minimum number of spare connections in available pool.
- *
- * @return number of connections
- */
- public int getMinConn()
- {
- return this.minConn;
- }
-
- /**
- * Sets the maximum number of spare connections allowed in our available
- * pool.
- *
- * @param maxConn
- * number of connections
- */
- public void setMaxConn(int maxConn)
- {
- this.maxConn = maxConn;
- }
-
- /**
- * Returns the maximum number of spare connections allowed in available
- * pool.
- *
- * @return number of connections
- */
- public int getMaxConn()
- {
- return this.maxConn;
- }
-
- /**
- * Sets the max idle time for threads in the available pool.
- *
- * @param maxIdle
- * idle time in ms
- */
- public void setMaxIdle(long maxIdle)
- {
- this.maxIdle = maxIdle;
- }
-
- /**
- * Returns the current max idle setting.
- *
- * @return max idle setting in ms
- */
- public long getMaxIdle()
- {
- return this.maxIdle;
- }
-
- /**
- * Sets the max busy time for threads in the busy pool.
- *
- * @param maxBusyTime
- * idle time in ms
- */
- public void setMaxBusyTime(long maxBusyTime)
- {
- this.maxBusyTime = maxBusyTime;
- }
-
- /**
- * Returns the current max busy setting.
- *
- * @return max busy setting in ms
- */
- public long getMaxBusy()
- {
- return this.maxBusyTime;
- }
-
- /**
- * Set the sleep time between runs of the pool maintenance thread. If set to
- * 0, then the maint thread will not be started.
- *
- * @param maintSleep
- * sleep time in ms
- */
- public void setMaintSleep(long maintSleep)
- {
- this.maintSleep = maintSleep;
- }
-
- /**
- * Returns the current maint thread sleep time.
- *
- * @return sleep time in ms
- */
- public long getMaintSleep()
- {
- return this.maintSleep;
- }
-
- /**
- * Sets the socket timeout for reads.
- *
- * @param socketTO
- * timeout in ms
- */
- public void setSocketTO(int socketTO)
- {
- this.socketTO = socketTO;
- }
-
- /**
- * Returns the socket timeout for reads.
- *
- * @return timeout in ms
- */
- public int getSocketTO()
- {
- return this.socketTO;
- }
-
- /**
- * Sets the socket timeout for connect.
- *
- * @param socketConnectTO
- * timeout in ms
- */
- public void setSocketConnectTO(int socketConnectTO)
- {
- this.socketConnectTO = socketConnectTO;
- }
-
- /**
- * Returns the socket timeout for connect.
- *
- * @return timeout in ms
- */
- public int getSocketConnectTO()
- {
- return this.socketConnectTO;
- }
-
- /**
- * Sets the failover flag for the pool.
- *
- * If this flag is set to true, and a socket fails to connect,
the
- * pool will attempt to return a socket from another server
if one
- * exists. If set to false, then getting a socket
will return null if
- * it fails to connect to the requested server.
- *
- * @param failover
- * true/false
- */
- public void setFailover(boolean failover)
- {
- this.failover = failover;
- }
-
- /**
- * Returns current state of failover flag.
- *
- * @return true/false
- */
- public boolean getFailover()
- {
- return this.failover;
- }
-
- /**
- * Sets the failback flag for the pool.
- *
- * If this is true and we have marked a host as dead, will try to bring it
- * back. If it is false, we will never try to resurrect a dead host.
- *
- * @param failback
- * true/false
- */
- public void setFailback(boolean failback)
- {
- this.failback = failback;
- }
-
- /**
- * Returns current state of failover flag.
- *
- * @return true/false
- */
- public boolean getFailback()
- {
- return this.failback;
- }
-
- /**
- * Sets the aliveCheck flag for the pool.
- *
- * When true, this will attempt to talk to the server on every connection
- * checkout to make sure the connection is still valid. This adds extra
- * network chatter and thus is defaulted off. May be useful if you want to
- * ensure you do not have any problems talking to the server on a dead
- * connection.
- *
- * @param aliveCheck
- * true/false
- */
- public void setAliveCheck(boolean aliveCheck)
- {
- this.aliveCheck = aliveCheck;
- }
-
- /**
- * Returns the current status of the aliveCheck flag.
- *
- * @return true / false
- */
- public boolean getAliveCheck()
- {
- return this.aliveCheck;
- }
-
- /**
- * Sets the Nagle alg flag for the pool.
- *
- * If false, will turn off Nagle's algorithm on all sockets created.
- *
- * @param nagle
- * true/false
- */
- public void setNagle(boolean nagle)
- {
- this.nagle = nagle;
- }
-
- /**
- * Returns current status of nagle flag
- *
- * @return true/false
- */
- public boolean getNagle()
- {
- return this.nagle;
- }
-
- /**
- * Sets the hashing algorithm we will use.
- *
- * The types are as follows.
- *
- * SockIOPool.NATIVE_HASH (0) - native String.hashCode() - fast (cached) but
- * not compatible with other clients SockIOPool.OLD_COMPAT_HASH (1) -
- * original compatibility hashing alg (works with other clients)
- * SockIOPool.NEW_COMPAT_HASH (2) - new CRC32 based compatibility hashing
- * algorithm (fast and works with other clients)
- *
- * @param alg
- * int value representing hashing algorithm
- */
- public void setHashingAlg(int alg)
- {
- this.hashingAlg = alg;
- }
-
- /**
- * Returns current status of customHash flag
- *
- * @return true/false
- */
- public int getHashingAlg()
- {
- return this.hashingAlg;
- }
-
- /**
- * Internal private hashing method.
- *
- * This is the original hashing algorithm from other clients. Found to be
- * slow and have poor distribution.
- *
- * @param key
- * String to hash
- * @return hashCode for this string using our own hashing algorithm
- */
- private static long origCompatHashingAlg(String key)
- {
- long hash = 0;
- char[] cArr = key.toCharArray();
-
- for (int i = 0; i < cArr.length; ++i)
- {
- hash = (hash * 33) + cArr[i];
- }
-
- return hash;
- }
-
- /**
- * Internal private hashing method.
- *
- * This is the new hashing algorithm from other clients. Found to be fast
- * and have very good distribution.
- *
- * UPDATE: This is dog slow under java
- *
- * @param key
- * @return
- */
- private static long newCompatHashingAlg(String key)
- {
- CRC32 checksum = new CRC32();
- checksum.update(key.getBytes());
- long crc = checksum.getValue();
- return (crc >> 16) & 0x7fff;
- }
-
- /**
- * Internal private hashing method.
- *
- * MD5 based hash algorithm for use in the consistent hashing approach.
- *
- * @param key
- * @return
- */
- private static long md5HashingAlg(String key)
- {
- MessageDigest md5 = MD5.get();
- md5.reset();
- md5.update(key.getBytes());
- byte[] bKey = md5.digest();
- long res = ((long) (bKey[3] & 0xFF) << 24)
- | ((long) (bKey[2] & 0xFF) << 16)
- | ((long) (bKey[1] & 0xFF) << 8) | (long) (bKey[0] & 0xFF);
- return res;
- }
-
- /**
- * Returns a bucket to check for a given key.
- *
- * @param key
- * String key cache is stored under
- * @return int bucket
- */
- private long getHash(String key, Integer hashCode)
- {
-
- if (hashCode != null)
- {
- if (hashingAlg == CONSISTENT_HASH)
- return hashCode.longValue() & 0xffffffffL;
- else
- return hashCode.longValue();
- } else
- {
- switch (hashingAlg)
- {
- case NATIVE_HASH:
- return (long) key.hashCode();
- case OLD_COMPAT_HASH:
- return origCompatHashingAlg(key);
- case NEW_COMPAT_HASH:
- return newCompatHashingAlg(key);
- case CONSISTENT_HASH:
- return md5HashingAlg(key);
- default:
- // use the native hash as a default
- hashingAlg = NATIVE_HASH;
- return (long) key.hashCode();
- }
- }
- }
-
- private long getBucket(String key, Integer hashCode)
- {
- long hc = getHash(key, hashCode);
-
- if (this.hashingAlg == CONSISTENT_HASH)
- {
- return findPointFor(hc);
- } else
- {
- long bucket = hc % buckets.size();
- if (bucket < 0)
- bucket *= -1;
- return bucket;
- }
- }
-
- /**
- * Gets the first available key equal or above the given one, if none found,
- * returns the first k in the bucket
- *
- * @param k
- * key
- * @return
- */
- private Long findPointFor(Long hv)
- {
- // this works in java 6, but still want to release support for java5
- // Long k = this.consistentBuckets.ceilingKey( hv );
- // return ( k == null ) ? this.consistentBuckets.firstKey() : k;
-
- SortedMap tmap = this.consistentBuckets.tailMap(hv);
-
- return (tmap.isEmpty()) ? this.consistentBuckets.firstKey() : tmap
- .firstKey();
- }
-
-
- /**
- * Class which extends thread and handles maintenance of the pool.
- *
- * @author greg whalin
- * @version 1.5
- */
- protected static class MaintThread extends Thread
- {
-
- // logger
- private static Logger log = Logger.getLogger(MaintThread.class
- .getName());
-
- private SockIOPool pool;
- private long interval = 1000 * 3; // every 3 seconds
- private boolean stopThread = false;
- private boolean running;
-
- protected MaintThread(SockIOPool pool)
- {
- this.pool = pool;
- this.setDaemon(true);
- this.setName("MaintThread");
- }
-
- public void setInterval(long interval)
- {
- this.interval = interval;
- }
-
- public boolean isRunning()
- {
- return this.running;
- }
-
- /**
- * sets stop variable and interupts any wait
- */
- public void stopThread()
- {
- this.stopThread = true;
- this.interrupt();
- }
-
- /**
- * Start the thread.
- */
- public void run()
- {
- this.running = true;
-
- while (!this.stopThread)
- {
- try
- {
- Thread.sleep(interval);
-
- // if pool is initialized, then
- // run the maintenance method on itself
- if (pool.isInitialized())
- pool.selfMaint();
-
- } catch (Exception e)
- {
- if (e instanceof java.lang.InterruptedException)
- log.info("MaintThread stop !");
- else
- log.error("MaintThread error !", e);
- break;
- }
- }
-
- this.running = false;
- }
- }
-
- /**
- * MemCached Java client, utility class for Socket IO.
- *
- * This class is a wrapper around a Socket and its streams.
- *
- * @author greg whalin
- * @author Richard 'toast' Russo
- * @version 1.5
- */
- public static class SockIO
- {
-
- // logger
- private static Logger log = Logger.getLogger(SockIO.class.getName());
-
- // pool
- private SockIOPool pool;
-
- // data
- private String host;
- private Socket sock;
-
- private DataInputStream in;
- private BufferedOutputStream out;
-
- private byte[] recBuf;
-
- private int recBufSize = 1028;
- private int recIndex = 0;
- //жǷҪӴڿ״̬
- private long aliveTimeStamp = 0;
-
- /**
- * creates a new SockIO object wrapping a socket connection to
- * host:port, and its input and output streams
- *
- * @param pool
- * Pool this object is tied to
- * @param host
- * host to connect to
- * @param port
- * port to connect to
- * @param timeout
- * int ms to block on data for read
- * @param connectTimeout
- * timeout (in ms) for initial connection
- * @param noDelay
- * TCP NODELAY option?
- * @throws IOException
- * if an io error occurrs when creating socket
- * @throws UnknownHostException
- * if hostname is invalid
- */
- public SockIO(SockIOPool pool, String host, int port, int timeout,
- int connectTimeout, boolean noDelay) throws IOException,
- UnknownHostException
- {
-
- this.pool = pool;
-
- recBuf = new byte[recBufSize];
-
- // get a socket channel
- sock = getSocket(host, port, connectTimeout);
-
- if (timeout >= 0)
- sock.setSoTimeout(timeout);
-
- // testing only
- sock.setTcpNoDelay(noDelay);
-
- // wrap streams
- in = new DataInputStream(sock.getInputStream());
- out = new BufferedOutputStream(sock.getOutputStream());
-
- this.host = host + ":" + port;
- }
-
- /**
- * creates a new SockIO object wrapping a socket connection to
- * host:port, and its input and output streams
- *
- * @param host
- * hostname:port
- * @param timeout
- * read timeout value for connected socket
- * @param connectTimeout
- * timeout for initial connections
- * @param noDelay
- * TCP NODELAY option?
- * @throws IOException
- * if an io error occurrs when creating socket
- * @throws UnknownHostException
- * if hostname is invalid
- */
- public SockIO(SockIOPool pool, String host, int timeout,
- int connectTimeout, boolean noDelay) throws IOException,
- UnknownHostException
- {
-
- this.pool = pool;
-
- recBuf = new byte[recBufSize];
-
- //String[] ip = host.split(":");
- int index = host.indexOf(":");
-
- if(index <= 0)
- throw new RuntimeException(new StringBuilder().append("host :")
- .append(host).append(" is error,check config file!").toString());
-
- // get socket: default is to use non-blocking connect
- sock = getSocket(host.substring(0,index), Integer.parseInt(host.substring(index+1)), connectTimeout);
-
- if (timeout >= 0)
- this.sock.setSoTimeout(timeout);
-
- // testing only
- sock.setTcpNoDelay(noDelay);
-
- // wrap streams
- in = new DataInputStream(sock.getInputStream());
- out = new BufferedOutputStream(sock.getOutputStream());
-
- this.host = host;
- }
-
- /**
- * Method which gets a connection from SocketChannel.
- *
- * @param host
- * host to establish connection to
- * @param port
- * port on that host
- * @param timeout
- * connection timeout in ms
- *
- * @return connected socket
- * @throws IOException
- * if errors connecting or if connection times out
- */
- protected static Socket getSocket(String host, int port, int timeout)
- throws IOException
- {
- SocketChannel sock = SocketChannel.open();
- sock.socket().connect(new InetSocketAddress(host, port), timeout);
- return sock.socket();
- }
-
- /**
- * Lets caller get access to underlying channel.
- *
- * @return the backing SocketChannel
- */
- public SocketChannel getChannel()
- {
- return sock.getChannel();
- }
-
- /**
- * returns the host this socket is connected to
- *
- * @return String representation of host (hostname:port)
- */
- public String getHost()
- {
- return this.host;
- }
-
-
-
- /**
- * closes socket and all streams connected to it
- *
- * @throws IOException
- * if fails to close streams or socket
- */
- public void trueClose() throws IOException
- {
- trueClose(true);
- }
-
- /**
- * closes socket and all streams connected to it
- *
- * @throws IOException
- * if fails to close streams or socket
- */
- public void trueClose(boolean addToDeadPool) throws IOException
- {
- if (log.isDebugEnabled())
- log.debug("++++ Closing socket for real: " + toString());
-
- //alive state clear
- aliveTimeStamp = 0;
-
- recBuf = new byte[recBufSize];
- recIndex = 0;
-
- boolean err = false;
- StringBuilder errMsg = new StringBuilder();
-
- if (in == null || out == null || sock == null)
- {
- err = true;
- errMsg
- .append("++++ socket or its streams already null in trueClose call");
- }
-
- if (in != null)
- {
- try
- {
- in.close();
- } catch (IOException ioe)
- {
- log.error("++++ error closing input stream for socket: "
- + toString() + " for host: " + getHost());
- log.error(ioe.getMessage(), ioe);
- errMsg
- .append("++++ error closing input stream for socket: "
- + toString()
- + " for host: "
- + getHost()
- + "\n");
- errMsg.append(ioe.getMessage());
- err = true;
- }
- }
-
- if (out != null)
- {
- try
- {
- out.close();
- } catch (IOException ioe)
- {
- log.error("++++ error closing output stream for socket: "
- + toString() + " for host: " + getHost());
- log.error(ioe.getMessage(), ioe);
- errMsg
- .append("++++ error closing output stream for socket: "
- + toString()
- + " for host: "
- + getHost()
- + "\n");
- errMsg.append(ioe.getMessage());
- err = true;
- }
- }
-
- if (sock != null)
- {
- try
- {
- sock.close();
- } catch (IOException ioe)
- {
- log.error("++++ error closing socket: " + toString()
- + " for host: " + getHost());
- log.error(ioe.getMessage(), ioe);
- errMsg.append("++++ error closing socket: " + toString()
- + " for host: " + getHost() + "\n");
- errMsg.append(ioe.getMessage());
- err = true;
- }
- }
-
- // check in to pool
- if (addToDeadPool && sock != null)
- pool.checkIn(this, false);
-
- in = null;
- out = null;
- sock = null;
-
- if (err)
- throw new IOException(errMsg.toString());
- }
-
- /**
- * sets closed flag and checks in to connection pool but does not close
- * connections
- */
- void close()
- {
- // check in to pool
- if (log.isDebugEnabled())
- log.debug("++++ marking socket (" + this.toString()
- + ") as closed and available to return to avail pool");
-
- recBuf = new byte[recBufSize];
- recIndex = 0;
-
- pool.checkIn(this);
- }
-
- /**
- * checks if the connection is open
- *
- * @return true if connected
- */
- boolean isConnected()
- {
- return (sock != null && sock.isConnected());
- }
-
- /*
- * checks to see that the connection is still working
- *
- * @return true if still alive
- */
- boolean isAlive()
- {
-
- if (!isConnected())
- {
- //alive state clear
- aliveTimeStamp = 0;
- return false;
- }
-
-
- // try to talk to the server w/ a dumb query to ask its version
- boolean needcheck = true;
-
- if (aliveTimeStamp > 0)
- {
- long interval = System.currentTimeMillis() - aliveTimeStamp;
-
- if (interval < 100)
- needcheck = false;
- }
-
-
- if (needcheck)
- {
- try
- {
- this.write(B_VERSION);
- this.flush();
- this.readLine();
-
- //update alive state
- aliveTimeStamp = System.currentTimeMillis();
-
- } catch (IOException ex)
- {
- return false;
- }
- }
-
-
- return true;
- }
-
-
- public byte[] readBytes(int length) throws IOException
- {
- if (sock == null || !sock.isConnected())
- {
- log.error("++++ attempting to read from closed socket");
- throw new IOException(
- "++++ attempting to read from closed socket");
- }
-
- byte[] result = null;
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
-
- if (recIndex >= length)
- {
- bos.write(recBuf,0,length);
-
- byte[] newBuf = new byte[recBufSize];
-
- if (recIndex > length)
- System.arraycopy(recBuf,length,newBuf,0,recIndex - length);
-
- recBuf = newBuf;
- recIndex = recIndex - length;
- }
- else
- {
- int totalread = length;
-
- if (recIndex > 0)
- {
- totalread = totalread - recIndex;
- bos.write(recBuf,0,recIndex);
-
- recBuf = new byte[recBufSize];
- recIndex = 0;
- }
-
- int readCount = 0;
-
- while(totalread > 0)
- {
- if ((readCount = in.read(recBuf)) > 0)
- {
- if (totalread > readCount)
- {
- bos.write(recBuf,0,readCount);
- recBuf = new byte[recBufSize];
- recIndex = 0;
- }
- else
- {
- bos.write(recBuf,0,totalread);
- byte[] newBuf = new byte[recBufSize];
- System.arraycopy(recBuf,totalread,newBuf,0,readCount - totalread);
-
- recBuf = newBuf;
- recIndex = readCount - totalread;
- }
-
- totalread = totalread - readCount;
- }
- }
-
- }
-
- result = bos.toByteArray();
-
- if (result == null || (result != null && result.length <= 0 && recIndex <= 0))
- {
- throw new IOException(
- "++++ Stream appears to be dead, so closing it down");
- }
-
- //update alive state
- aliveTimeStamp = System.currentTimeMillis();
-
- return result;
- }
-
- /**
- * reads a line intentionally not using the deprecated readLine method
- * from DataInputStream
- *
- * @return String that was read in
- * @throws IOException
- * if io problems during read
- */
- public String readLine() throws IOException
- {
- if (sock == null || !sock.isConnected())
- {
- log.error("++++ attempting to read from closed socket");
- throw new IOException(
- "++++ attempting to read from closed socket");
- }
-
- String result = null;
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- //StringBuilder content = new StringBuilder();
- int readCount = 0;
-
- // some recbuf releave
- if (recIndex > 0 && read(bos))
- {
- return bos.toString();
- }
-
- while((readCount = in.read(recBuf,recIndex,recBuf.length - recIndex)) > 0)
- {
- recIndex = recIndex + readCount;
-
- if (read(bos))
- break;
- }
-
- result = bos.toString();
-
- if (result == null || (result != null && result.length() <= 0 && recIndex <= 0))
- {
- throw new IOException(
- "++++ Stream appears to be dead, so closing it down");
- }
-
- //update alive state
- aliveTimeStamp = System.currentTimeMillis();
-
- return result;
-
- }
-
- private boolean read(ByteArrayOutputStream bos)
- {
- boolean result = false;
- int index = -1;
-
- for(int i = 0 ; i < recIndex-1; i++)
- {
- if(recBuf[i] == 13 && recBuf[i+1] == 10)
- {
- index = i;
- break;
- }
- }
-
- if (index >= 0)
- {
- //strBuilder.append(new String(recBuf,0,index));
- bos.write(recBuf,0,index);
-
- byte[] newBuf = new byte[recBufSize];
-
- if (recIndex > index+2)
- System.arraycopy(recBuf,index+2,newBuf,0,recIndex-index-2);
-
- recBuf = newBuf;
- recIndex = recIndex-index-2;
-
- result = true;
- }
- else
- {
- //߽
- if (recBuf[recIndex-1] == 13)
- {
- //strBuilder.append(new String(recBuf,0,recIndex-1));
- bos.write(recBuf,0,recIndex-1);
- recBuf = new byte[recBufSize];
- recBuf[0] = 13;
- recIndex = 1;
- }
- else
- {
- //strBuilder.append(new String(recBuf,0,recIndex));
- bos.write(recBuf,0,recIndex);
- recBuf = new byte[recBufSize];
- recIndex = 0;
- }
-
- }
-
- return result;
- }
-
-
- /**
- * flushes output stream
- *
- * @throws IOException
- * if io problems during read
- */
- void flush() throws IOException
- {
- if (sock == null || !sock.isConnected())
- {
- log.error("++++ attempting to write to closed socket");
- throw new IOException(
- "++++ attempting to write to closed socket");
- }
- out.flush();
- }
-
- /**
- * writes a byte array to the output stream
- *
- * @param b
- * byte array to write
- * @throws IOException
- * if an io error happens
- */
- void write(byte[] b) throws IOException
- {
- if (sock == null || !sock.isConnected())
- {
- log.error("++++ attempting to write to closed socket");
- throw new IOException(
- "++++ attempting to write to closed socket");
- }
- out.write(b);
- }
-
- /**
- * use the sockets hashcode for this object so we can key off of SockIOs
- *
- * @return int hashcode
- */
- public int hashCode()
- {
- return (sock == null) ? 0 : sock.hashCode();
- }
-
- /**
- * returns the string representation of this socket
- *
- * @return string
- */
- public String toString()
- {
- return (sock == null) ? "" : sock.toString();
- }
-
- /**
- * Hack to reap any leaking children.
- */
- protected void finalize() throws Throwable
- {
- try
- {
- if (sock != null)
- {
- log
- .error("++++ closing potentially leaked socket in finalize");
- sock.close();
- sock = null;
- }
- } catch (Throwable t)
- {
- log.error(t.getMessage(), t);
-
- } finally
- {
- super.finalize();
- }
- }
-
- }
-
-}