Skip to content

Commit

Permalink
Merge pull request #139 from samuelfu/samuelfu/timer
Browse files Browse the repository at this point in the history
Add percentile timer metric for gzip compression ratio
  • Loading branch information
pkarumanchi9 authored Jun 29, 2023
2 parents cca61a4 + f028968 commit b8f252e
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/**
* Copyright (C) 2006-2009 Dustin Sallings
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALING
* IN THE SOFTWARE.
*/

package com.netflix.evcache;

import com.netflix.evcache.metrics.EVCacheMetricsFactory;
import com.netflix.evcache.pool.ServerGroup;
import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Tag;
import com.netflix.spectator.api.Timer;
import net.spy.memcached.CachedData;
import net.spy.memcached.transcoders.BaseSerializingTranscoder;
import net.spy.memcached.transcoders.Transcoder;
import net.spy.memcached.transcoders.TranscoderUtils;
import net.spy.memcached.util.StringUtils;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;


/**
* Transcoder that serializes and compresses objects.
*/
public class EVCacheSerializingTranscoder extends BaseSerializingTranscoder implements
Transcoder<Object> {

// General flags
static final int SERIALIZED = 1;
static final int COMPRESSED = 2;

// Special flags for specially handled types.
private static final int SPECIAL_MASK = 0xff00;
static final int SPECIAL_BOOLEAN = (1 << 8);
static final int SPECIAL_INT = (2 << 8);
static final int SPECIAL_LONG = (3 << 8);
static final int SPECIAL_DATE = (4 << 8);
static final int SPECIAL_BYTE = (5 << 8);
static final int SPECIAL_FLOAT = (6 << 8);
static final int SPECIAL_DOUBLE = (7 << 8);
static final int SPECIAL_BYTEARRAY = (8 << 8);

static final String COMPRESSION = "COMPRESSION_METRIC";

private final TranscoderUtils tu = new TranscoderUtils(true);
private Timer timer;

/**
* Get a serializing transcoder with the default max data size.
*/
public EVCacheSerializingTranscoder() {
this(CachedData.MAX_SIZE);
}

/**
* Get a serializing transcoder that specifies the max data size.
*/
public EVCacheSerializingTranscoder(int max) {
super(max);
}

@Override
public boolean asyncDecode(CachedData d) {
if ((d.getFlags() & COMPRESSED) != 0 || (d.getFlags() & SERIALIZED) != 0) {
return true;
}
return super.asyncDecode(d);
}

/*
* (non-Javadoc)
*
* @see net.spy.memcached.Transcoder#decode(net.spy.memcached.CachedData)
*/
public Object decode(CachedData d) {
byte[] data = d.getData();
Object rv = null;
if ((d.getFlags() & COMPRESSED) != 0) {
data = decompress(d.getData());
}
int flags = d.getFlags() & SPECIAL_MASK;
if ((d.getFlags() & SERIALIZED) != 0 && data != null) {
rv = deserialize(data);
} else if (flags != 0 && data != null) {
switch (flags) {
case SPECIAL_BOOLEAN:
rv = Boolean.valueOf(tu.decodeBoolean(data));
break;
case SPECIAL_INT:
rv = Integer.valueOf(tu.decodeInt(data));
break;
case SPECIAL_LONG:
rv = Long.valueOf(tu.decodeLong(data));
break;
case SPECIAL_DATE:
rv = new Date(tu.decodeLong(data));
break;
case SPECIAL_BYTE:
rv = Byte.valueOf(tu.decodeByte(data));
break;
case SPECIAL_FLOAT:
rv = new Float(Float.intBitsToFloat(tu.decodeInt(data)));
break;
case SPECIAL_DOUBLE:
rv = new Double(Double.longBitsToDouble(tu.decodeLong(data)));
break;
case SPECIAL_BYTEARRAY:
rv = data;
break;
default:
getLogger().warn("Undecodeable with flags %x", flags);
}
} else {
rv = decodeString(data);
}
return rv;
}

/*
* (non-Javadoc)
*
* @see net.spy.memcached.Transcoder#encode(java.lang.Object)
*/
public CachedData encode(Object o) {
byte[] b = null;
int flags = 0;
if (o instanceof String) {
b = encodeString((String) o);
if (StringUtils.isJsonObject((String) o)) {
return new CachedData(flags, b, getMaxSize());
}
} else if (o instanceof Long) {
b = tu.encodeLong((Long) o);
flags |= SPECIAL_LONG;
} else if (o instanceof Integer) {
b = tu.encodeInt((Integer) o);
flags |= SPECIAL_INT;
} else if (o instanceof Boolean) {
b = tu.encodeBoolean((Boolean) o);
flags |= SPECIAL_BOOLEAN;
} else if (o instanceof Date) {
b = tu.encodeLong(((Date) o).getTime());
flags |= SPECIAL_DATE;
} else if (o instanceof Byte) {
b = tu.encodeByte((Byte) o);
flags |= SPECIAL_BYTE;
} else if (o instanceof Float) {
b = tu.encodeInt(Float.floatToRawIntBits((Float) o));
flags |= SPECIAL_FLOAT;
} else if (o instanceof Double) {
b = tu.encodeLong(Double.doubleToRawLongBits((Double) o));
flags |= SPECIAL_DOUBLE;
} else if (o instanceof byte[]) {
b = (byte[]) o;
flags |= SPECIAL_BYTEARRAY;
} else {
b = serialize(o);
flags |= SERIALIZED;
}
assert b != null;
if (b.length > compressionThreshold) {
byte[] compressed = compress(b);
if (compressed.length < b.length) {
getLogger().debug("Compressed %s from %d to %d",
o.getClass().getName(), b.length, compressed.length);
b = compressed;
flags |= COMPRESSED;
} else {
getLogger().info("Compression increased the size of %s from %d to %d",
o.getClass().getName(), b.length, compressed.length);
}

long compression_ratio = Math.round((double) compressed.length / b.length * 100);
updateTimerWithCompressionRatio(compression_ratio);
}
return new CachedData(flags, b, getMaxSize());
}

private void updateTimerWithCompressionRatio(long ratio_percentage) {
if(timer == null) {
final List<Tag> tagList = new ArrayList<Tag>(1);
tagList.add(new BasicTag(EVCacheMetricsFactory.COMPRESSION_TYPE, "gzip"));
timer = EVCacheMetricsFactory.getInstance().getPercentileTimer(EVCacheMetricsFactory.COMPRESSION_RATIO, tagList, Duration.ofMillis(100));
};

timer.record(ratio_percentage, TimeUnit.MILLISECONDS);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
import com.netflix.evcache.util.EVCacheConfig;

import net.spy.memcached.CachedData;
import net.spy.memcached.transcoders.SerializingTranscoder;

public class EVCacheTranscoder extends SerializingTranscoder {
public class EVCacheTranscoder extends EVCacheSerializingTranscoder {

public EVCacheTranscoder() {
this(EVCacheConfig.getInstance().getPropertyRepository().get("default.evcache.max.data.size", Integer.class).orElse(20 * 1024 * 1024).get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ public String getStatusCode(StatusCode sc) {
*/
public static final String OVERALL_CALL = "evcache.client.call";
public static final String OVERALL_KEYS_SIZE = "evcache.client.call.keys.size";
public static final String COMPRESSION_RATIO = "evcache.client.compression.ratio";

/**
* External IPC Metric Names
Expand Down Expand Up @@ -350,6 +351,7 @@ public String getStatusCode(StatusCode sc) {
public static final String EVENT_STAGE = "evc.event.stage";
public static final String CONNECTION = "evc.connection.type";
public static final String TLS = "evc.connection.tls";
public static final String COMPRESSION_TYPE = "evc.compression.type";

/**
* Metric Tags Values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.netflix.evcache.EVCacheException;
import com.netflix.evcache.EVCacheLatch;
import com.netflix.evcache.EVCacheReadQueueException;
import com.netflix.evcache.EVCacheSerializingTranscoder;
import com.netflix.evcache.metrics.EVCacheMetricsFactory;
import com.netflix.evcache.operation.EVCacheFutures;
import com.netflix.evcache.operation.EVCacheItem;
Expand All @@ -54,7 +55,6 @@
import net.spy.memcached.internal.ListenableFuture;
import net.spy.memcached.internal.OperationCompletionListener;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.transcoders.SerializingTranscoder;
import net.spy.memcached.transcoders.Transcoder;
import rx.Scheduler;
import rx.Single;
Expand Down Expand Up @@ -91,7 +91,7 @@ public class EVCacheClient {
private final Property<String> encoderBase;

private final ChunkTranscoder chunkingTranscoder;
private final SerializingTranscoder decodingTranscoder;
private final EVCacheSerializingTranscoder decodingTranscoder;
private static final int SPECIAL_BYTEARRAY = (8 << 8);
private final EVCacheClientPool pool;
// private Counter addCounter = null;
Expand Down Expand Up @@ -143,7 +143,7 @@ public class EVCacheClient {
this.evcacheMemcachedClient = new EVCacheMemcachedClient(connectionFactory, memcachedNodesInZone, readTimeout, this);
this.evcacheMemcachedClient.addObserver(connectionObserver);

this.decodingTranscoder = new SerializingTranscoder(Integer.MAX_VALUE);
this.decodingTranscoder = new EVCacheSerializingTranscoder(Integer.MAX_VALUE);
decodingTranscoder.setCompressionThreshold(Integer.MAX_VALUE);

this.hashKeyByServerGroup = EVCacheConfig.getInstance().getPropertyRepository().get(this.serverGroup.getName() + ".hash.key", Boolean.class).orElse(null);
Expand Down Expand Up @@ -1498,7 +1498,7 @@ public ChunkTranscoder getChunkingTranscoder() {
return chunkingTranscoder;
}

public SerializingTranscoder getDecodingTranscoder() {
public EVCacheSerializingTranscoder getDecodingTranscoder() {
return decodingTranscoder;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.netflix.evcache.test;

import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
Expand All @@ -8,6 +9,9 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.netflix.evcache.EVCacheSerializingTranscoder;
import net.spy.memcached.CachedData;
import net.spy.memcached.transcoders.SerializingTranscoder;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
Expand Down Expand Up @@ -106,6 +110,7 @@ public void testAll() {
// testAppend();
testGet();
testGetWithPolicy();
testEVCacheTranscoder();
// testGetObservable();
// testGetAndTouch();
// testBulk();
Expand Down Expand Up @@ -315,6 +320,21 @@ public void testGetObservable() throws Exception {
// obs.doOnNext(new OnNextHandler(key)).doOnError(new OnErrorHandler(key)).subscribe();
}
}

@Test(dependsOnMethods = { "testInsert" })
public void testEVCacheTranscoder() throws Exception {
EVCacheSerializingTranscoder evcacheTranscoder = new EVCacheSerializingTranscoder();
SerializingTranscoder serializingTranscoder = new SerializingTranscoder();

// long string to trigger compression
String val = "val_01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789";

CachedData evCachedData = evcacheTranscoder.encode(val);
CachedData serializingCachedData = serializingTranscoder.encode(val);

assertTrue(Arrays.equals(evCachedData.getData(), serializingCachedData.getData()), "cacheData same" + evCachedData.toString());
if(log.isDebugEnabled()) log.debug("EVCacheTranscoder result equal to SerializingTranscoder: " + Arrays.equals(evCachedData.getData(), serializingCachedData.getData()));
}


class StatusChecker implements Runnable {
Expand Down

0 comments on commit b8f252e

Please sign in to comment.