Skip to content

Commit

Permalink
Debounce upload events (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
tung-vu-td authored Apr 22, 2021
1 parent c347b40 commit c80a5cb
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 12 deletions.
74 changes: 74 additions & 0 deletions src/main/java/com/treasuredata/android/Debouncer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.treasuredata.android;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

class Debouncer <T> {
interface Callback<T> {
void call(T arg);
}

private final ScheduledExecutorService sched = Executors.newScheduledThreadPool(1);
private final ConcurrentHashMap<T, TimerTask> delayedMap = new ConcurrentHashMap<T, TimerTask>();
private final Callback<T> callback;
private final int interval;

Debouncer(Callback<T> c, int interval) {
this.callback = c;
this.interval = interval;
}

void call(T key) {
TimerTask task = new TimerTask(key);

TimerTask prev;
do {
prev = delayedMap.putIfAbsent(key, task);
if (prev == null)
sched.schedule(task, interval, TimeUnit.MILLISECONDS);
} while (prev != null && !prev.extend()); // Exit only if new task was added to map, or existing task was extended successfully
}

void terminate() {
sched.shutdownNow();
}

// The task that wakes up when the wait time elapses
private class TimerTask implements Runnable {
private final T key;
private long dueTime;
private final Object lock = new Object();

TimerTask(T key) {
this.key = key;
extend();
}

boolean extend() {
synchronized (lock) {
if (dueTime < 0) // Task has been shutdown
return false;
dueTime = System.currentTimeMillis() + interval;
return true;
}
}

public void run() {
synchronized (lock) {
long remaining = dueTime - System.currentTimeMillis();
if (remaining > 0) { // Re-schedule task
sched.schedule(this, remaining, TimeUnit.MILLISECONDS);
} else { // Mark as terminated and invoke callback
dueTime = -1;
try {
callback.call(key);
} finally {
delayedMap.remove(key);
}
}
}
}
}
}
30 changes: 20 additions & 10 deletions src/main/java/com/treasuredata/android/TreasureData.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public class TreasureData implements CDPClient {

private final AtomicBoolean isInAppPurchaseEventTracking = new AtomicBoolean(false);
private CDPClientImpl cdpClientDelegate;
private Debouncer debouncer;

public static TreasureData initializeSharedInstance(Context context, String apiKey) {
synchronized (TreasureData.class) {
Expand Down Expand Up @@ -604,17 +605,26 @@ public void uploadEvents() {
uploadEventsWithCallback(null);
}

public void uploadEventsWithCallback(TDCallback callback) {

if (client == null) {
Log.w(TAG, "TDClient is null");
return;
}

if (callback == null) {
callback = uploadEventsCallBack;
public void uploadEventsWithCallback(final TDCallback callback) {
if (debouncer == null) {
debouncer = new Debouncer(new Debouncer.Callback() {
@Override
public void call(Object key) {
if (client == null) {
Log.w(TAG, "TDClient is null");
return;
}

if (callback == null) {
client.sendQueuedEventsAsync(null, createKeenCallback(LABEL_UPLOAD_EVENTS, uploadEventsCallBack));
} else {
client.sendQueuedEventsAsync(null, createKeenCallback(LABEL_UPLOAD_EVENTS, callback));
}
debouncer = null;
}
}, 100);
}
client.sendQueuedEventsAsync(null, createKeenCallback(LABEL_UPLOAD_EVENTS, callback));
debouncer.call("uploadEvents");
}

private static KeenClient.KeenCallbackWithErrorCode createKeenCallback(final String methodName, final TDCallback callback) {
Expand Down
6 changes: 4 additions & 2 deletions src/test/java/com/treasuredata/android/TreasureDataTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -805,11 +805,12 @@ public void testAddEventWithNullTableName() throws IOException {
assertEquals(0, client.addedEvent.size());
}

public void testUploadEventsWithSuccess() throws IOException {
public void testUploadEventsWithSuccess() throws IOException, InterruptedException {
enableCallbackForAddEvent();
enableCallbackForUploadEvents();

td.uploadEvents();
Thread.sleep(100);
assertFalse(onSuccessCalledForAddEvent);
assertNull(exceptionOnFailedCalledForAddEvent);
assertNull(errorCodeForAddEvent);
Expand All @@ -819,14 +820,15 @@ public void testUploadEventsWithSuccess() throws IOException {
assertEquals(0, client.addedEvent.size());
}

public void testUploadEventsWithError() throws IOException {
public void testUploadEventsWithError() throws IOException, InterruptedException {
client.exceptionOnSendQueuedEventsCalled = new IllegalArgumentException("foo bar");
client.errorCodeOnSendQueuedEventsCalled = KeenClient.ERROR_CODE_NETWORK_ERROR;

enableCallbackForAddEvent();
enableCallbackForUploadEvents();

td.uploadEvents();
Thread.sleep(100);
assertFalse(onSuccessCalledForAddEvent);
assertNull(exceptionOnFailedCalledForAddEvent);
assertFalse(onSuccessCalledForUploadEvents);
Expand Down

0 comments on commit c80a5cb

Please sign in to comment.