Skip to content

Commit

Permalink
added entities for webhook events and webhook sends (#92)
Browse files Browse the repository at this point in the history
* added entities for webhook events and webhook sends

* models and adapters for webhook events and sends

* tests pass

* storing sends and events

* added methods to resource for getting sends.

* added payload method to get by keycloak type and id

* added a rest method for getting the webhooks that were triggered by a keycloak event

* move storeEvent to process. got webhook entity out of adapter.

* updated to null or empty check to fix missing UID

* bumping versions to 26.1.1

* check for existing webhookevent before storing twice. fix authdetails json deserialization bug. revisit backoff default values

* bug in webhookevent storage condition. set type in resend response

* storeSend synchronized. null check in getStatusMessage. check for existing WebhookSendEntity before store send-- merge if so.

* catch EntityExistsException. synchronizing storeEvent

* removed supplier. check for stored event type.

* separated user event condition methods. bumped pom version.
  • Loading branch information
xgp authored Feb 11, 2025
1 parent e1c4152 commit 1b22817
Show file tree
Hide file tree
Showing 23 changed files with 1,192 additions and 45 deletions.
17 changes: 15 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ Configuration values:
| `retry` | N | true | Should it use exponential backoff to retry on non 2xx response |
| `backoffInitialInterval` | N | 500 | Initial interval value in milliseconds |
| `backoffMaxElapsedTime` | N | 900000 | Maximum elapsed time in milliseconds |
| `backoffMaxInterval` | N | 60000 | Maximum back off time in milliseconds |
| `backoffMultiplier` | N | 1.5 | Multiplier value (E.g. 1.5 is 50% increase per back off) |
| `backoffMaxInterval` | N | 180000 | Maximum back off time in milliseconds |
| `backoffMultiplier` | N | 5 | Multiplier value (E.g. 1.5 is 50% increase per back off) |
| `backoffRandomizationFactor` | N | 0.5 | Randomization factor (E.g. 0.5 results in a random period ranging between 50% below and 50% above the retry interval) |

### Adding Configuration to your EventListenerProvider
Expand Down Expand Up @@ -177,6 +177,19 @@ The webhook object has this format:

For creating and updating of webhooks, `id`, `createdBy` and `createdAt` are ignored. `secret` is not sent when fetching webhooks.

#### Storing webhook events and sends

This extension contains the functionality to store and retrieve the payload that was sent to a webhook, as well as the sending status. In order to enable this functionality, you must set the SPI config variable `--spi-events-listener-ext-event-webhook-store-webhook-events=true` and ensure that your realm settings have events and admin events enabled, which causes them to be stored using the configured `EventStoreProvider`.

This also enables a few additional custom REST endpoints for querying information about the payload and status of webhook sends.

| Path | Method | Payload | Returns | Description |
| ---------------------------------- | -------- | -------------- | ----------------------- | -------------- |
| `/auth/realms/:realm/webhooks/:id/sends` | `GET` | `first`, `max` query params for pagination | Webhook send objects (brief) | Get webhook sends |
| `/auth/realms/:realm/webhooks/:id/sends/:sid` | `GET` | | Webhook send object (with payload) | Get a webhook send |
| `/auth/realms/:realm/webhooks/:id/sends/:sid/resend` | `POST` | | `202` | Resend a webhook payload |


##### Example

To create a webhook for all events on the `master` realm:
Expand Down
17 changes: 10 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.java.package>io.phasetwo.keycloak.events</main.java.package>
<junit.version>5.11.2</junit.version>
<keycloak.version>26.0.2</keycloak.version>
<keycloak-admin-client.version>26.0.1</keycloak-admin-client.version>
<lombok.version>1.18.34</lombok.version>
<keycloak.version>26.1.2</keycloak.version>
<keycloak-admin-client.version>26.0.4</keycloak-admin-client.version>
<lombok.version>1.18.36</lombok.version>
<auto-service.version>1.1.1</auto-service.version>
<ossrh.url>https://s01.oss.sonatype.org</ossrh.url>
</properties>
Expand Down Expand Up @@ -117,8 +117,11 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<version>3.5.2</version>
<configuration>
<classpathDependencyExcludes>
<classpathDependencyExclude>org.jboss.slf4j:slf4j-jboss-logmanager</classpathDependencyExclude>
</classpathDependencyExcludes>
<systemPropertyVariables>
<keycloak-version>${keycloak.version}</keycloak-version>
</systemPropertyVariables>
Expand Down Expand Up @@ -232,13 +235,13 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.19.3</version>
<version>1.20.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.dasniko</groupId>
<artifactId>testcontainers-keycloak</artifactId>
<version>3.5.1</version>
<version>3.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ BackOff getBackOff() {
return new ExponentialBackOff.Builder()
.setInitialIntervalMillis(getIntOr(config, BACKOFF_INITIAL_INTERVAL, 500))
.setMaxElapsedTimeMillis(getIntOr(config, BACKOFF_MAX_ELAPSED_TIME, 900000))
.setMaxIntervalMillis(getIntOr(config, BACKOFF_MAX_INTERVAL, 60000))
.setMultiplier(getDoubleOr(config, BACKOFF_MULTIPLIER, 1.5))
.setMaxIntervalMillis(getIntOr(config, BACKOFF_MAX_INTERVAL, 180000))
.setMultiplier(getDoubleOr(config, BACKOFF_MULTIPLIER, 5))
.setRandomizationFactor(getDoubleOr(config, BACKOFF_RANDOMIZATION_FACTOR, 0.5))
.build();
}
Expand Down Expand Up @@ -81,6 +81,7 @@ protected void send(
LegacySimpleHttp.Response response = request.asResponse();
int status = response.getStatus();
log.debugf("sent to %s (%d)", targetUri, status);
doAfterSend(task, status);
if (status < HTTP_OK || status >= HTTP_MULT_CHOICE) { // any 2xx is acceptable
log.warnf("Sending failure (Server response:%d)", status);
throw new SenderException(true);
Expand All @@ -94,6 +95,16 @@ protected void send(
}
}

protected final void doAfterSend(SenderTask task, int httpStatus) {
try {
afterSend(task, httpStatus);
} catch (Exception e) {
log.warn("Error afterSend", e);
}
}

protected void afterSend(SenderTask task, int httpStatus) {}

protected String hmacFor(Object o, String sharedSecret, String algorithm) {
try {
String data = JsonSerialization.writeValueAsString(o);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ public void onEvent(AdminEvent event, boolean b) {

@Override
public void close() {
// close this instance of the event listener
log.debugf("called close() on SenderEventListenerProvider");
log.tracef("called close() on SenderEventListenerProvider");
}

class SenderTask {
Expand Down Expand Up @@ -110,14 +109,14 @@ protected void schedule(SenderTask task, long delay, TimeUnit unit) {
try {
send(task);
} catch (SenderException | IOException e) {
log.debug("sending exception", e);
log.trace("sending exception", e);
if (e instanceof SenderException && !((SenderException) e).isRetryable()) return;
log.debugf(
log.tracef(
"BackOff policy is %s",
BackOff.STOP_BACKOFF == task.getBackOff() ? "STOP" : "BACKOFF");
long backOffTime = task.getBackOff().nextBackOffMillis();
if (backOffTime == BackOff.STOP) return;
log.debugf("retrying in %d due to %s", backOffTime, e.getCause());
log.tracef("retrying in %d due to %s", backOffTime, e.getCause());
schedule(task, backOffTime, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
log.warn("Uncaught Sender error", t);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
package io.phasetwo.keycloak.events;

import com.google.common.base.Strings;
import io.phasetwo.keycloak.model.KeycloakEventType;
import io.phasetwo.keycloak.model.WebhookEventModel;
import io.phasetwo.keycloak.model.WebhookModel;
import io.phasetwo.keycloak.model.WebhookProvider;
import io.phasetwo.keycloak.model.WebhookSendModel;
import io.phasetwo.keycloak.representation.ExtendedAdminEvent;
import io.phasetwo.keycloak.representation.ExtendedAuthDetails;
import java.io.IOException;
import java.util.Date;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.extern.jbosslog.JBossLog;
import org.keycloak.events.Event;
import org.keycloak.events.EventType;
import org.keycloak.events.admin.AdminEvent;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserModel;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.util.JsonSerialization;

@JBossLog
public class WebhookSenderEventListenerProvider extends HttpSenderEventListenerProvider {
Expand All @@ -31,12 +38,15 @@ public class WebhookSenderEventListenerProvider extends HttpSenderEventListenerP
private final RunnableTransaction runnableTrx;
private final KeycloakSessionFactory factory;

private final boolean storeWebhookEvents;
private final WebhookProvider webhooks;

private final String systemUri;
private final String systemSecret;
private final String systemAlgorithm;

public WebhookSenderEventListenerProvider(
KeycloakSession session, ScheduledExecutorService exec) {
KeycloakSession session, ScheduledExecutorService exec, boolean storeWebhookEvents) {
super(session, exec);
this.factory = session.getKeycloakSessionFactory();
this.runnableTrx = new RunnableTransaction();
Expand All @@ -45,14 +55,18 @@ public WebhookSenderEventListenerProvider(
this.systemUri = System.getenv(WEBHOOK_URI_ENV);
this.systemSecret = System.getenv(WEBHOOK_SECRET_ENV);
this.systemAlgorithm = System.getenv(WEBHOOK_ALGORITHM_ENV);
// should we store webhook events and sends?
this.storeWebhookEvents = storeWebhookEvents;
this.webhooks = session.getProvider(WebhookProvider.class);
}

@Override
public void onEvent(Event event) {
log.debugf("onEvent %s %s", event.getType(), event.getId());
try {
ExtendedAdminEvent customEvent = completeAdminEventAttributes("", event);
runnableTrx.addRunnable(() -> processEvent(customEvent, event.getRealmId()));
runnableTrx.addRunnable(
() -> processEvent(KeycloakEventType.USER, customEvent, event.getRealmId()));
} catch (Exception e) {
log.warn("Error converting and scheduling event: " + event, e);
}
Expand All @@ -67,27 +81,69 @@ public void onEvent(AdminEvent adminEvent, boolean b) {
adminEvent.getResourcePath());
try {
ExtendedAdminEvent customEvent = completeAdminEventAttributes("", adminEvent);
runnableTrx.addRunnable(() -> processEvent(customEvent, adminEvent.getRealmId()));
runnableTrx.addRunnable(
() -> processEvent(KeycloakEventType.ADMIN, customEvent, adminEvent.getRealmId()));
} catch (Exception e) {
log.warn("Error converting and scheduling event: " + adminEvent, e);
}
}

/** Update the event with a unique uid */
public void processEvent(ExtendedAdminEvent customEvent, String realmId) {
processEvent(
() -> {
customEvent.setUid(KeycloakModelUtils.generateId());
return customEvent;
},
realmId);
private synchronized void storeEvent(
KeycloakSession session, KeycloakEventType type, ExtendedAdminEvent event) {
if (!storeWebhookEvents) {
log.tracef("storeWebhookEvents is %s. skipping...", storeWebhookEvents);
return;
}
if (!type.keycloakNative()) {
log.tracef("%S event. Skipping event storage.", type);
return;
}

RealmModel realm = session.realms().getRealm(event.getRealmId());
Set<String> eventTypes = realm.getEnabledEventTypesStream().collect(Collectors.toSet());
EventType eventType = event.getNativeType();
if (type == KeycloakEventType.USER && !realm.isEventsEnabled()) {
log.tracef("USER events disabled for realm %s", realm.getName());
return;
}
if (type == KeycloakEventType.USER
&& !(eventTypes.isEmpty() && eventType.isSaveByDefault()
|| eventTypes.contains(eventType.name()))) {
log.tracef(
"USER events not persisted for event type %s for realm %s ", eventType, realm.getName());
return;
}

if (type == KeycloakEventType.ADMIN && !realm.isAdminEventsEnabled()) {
log.tracef("ADMIN events disabled for realm %s", realm.getName());
return;
}

// look it up first, as we might have multiple webhooks
WebhookEventModel we = webhooks.getEvent(realm, type, event.getId());
if (we != null) {
log.tracef("Webhook event %s already stored. Skipping.", event.getId());
return;
}

we = webhooks.storeEvent(realm, type, event.getId(), event);
log.tracef(
"Webhook event stored [%s] %s, %s, %s, %s",
we.getId(), event.getRealmId(), we.getEventType(), we.getEventId(), we.getAdminEventId());
}

public void processEvent(ExtendedAdminEvent event, String realmId) {
processEvent(KeycloakEventType.fromTypeString(event.getType()), event, realmId);
}

/** Schedule dispatch to all webhooks and system */
private void processEvent(Supplier<ExtendedAdminEvent> supplier, String realmId) {
private void processEvent(KeycloakEventType type, ExtendedAdminEvent event, String realmId) {
KeycloakModelUtils.runJobInTransaction(
factory,
(session) -> {
if (type.keycloakNative()) {
storeEvent(session, type, event);
}
RealmModel realm = session.realms().getRealm(realmId);
WebhookProvider webhooks = session.getProvider(WebhookProvider.class);
webhooks
Expand All @@ -96,20 +152,76 @@ private void processEvent(Supplier<ExtendedAdminEvent> supplier, String realmId)
.filter(w -> !Strings.isNullOrEmpty(w.getUrl()))
.forEach(
w -> {
ExtendedAdminEvent customEvent = supplier.get();
ExtendedAdminEvent customEvent = clone(event);
customEvent.setUid(KeycloakModelUtils.generateId());
log.tracef("Got custom event with UID %s", customEvent.getUid());
if (!enabledFor(w, customEvent)) return;
schedule(customEvent, w.getUrl(), w.getSecret(), w.getAlgorithm());
schedule(w, customEvent);
});
// for system owner catch-all
if (!Strings.isNullOrEmpty(systemUri)) {
schedule(supplier.get(), systemUri, systemSecret, systemAlgorithm);
ExtendedAdminEvent customEvent = clone(event);
customEvent.setUid(KeycloakModelUtils.generateId());
schedule(null, customEvent, systemUri, systemSecret, systemAlgorithm);
}
});
}

@Override
protected synchronized void afterSend(final SenderTask task, final int httpStatus) {
if (task.getProperties().get("webhookId") == null) return;
final ExtendedAdminEvent customEvent = (ExtendedAdminEvent) task.getEvent();
if (!KeycloakEventType.fromTypeString(customEvent.getType()).keycloakNative()) {
log.tracef("%s event type. Skipping send storage.", customEvent.getType());
return;
}
KeycloakModelUtils.runJobInTransaction(
factory,
(session) -> {
RealmModel realm = session.realms().getRealm(customEvent.getRealmId());
WebhookProvider webhooks = session.getProvider(WebhookProvider.class);
WebhookModel webhook =
webhooks.getWebhookById(realm, task.getProperties().get("webhookId"));
WebhookEventModel event =
webhooks.getEvent(
realm,
KeycloakEventType.fromTypeString(customEvent.getType()),
customEvent.getId());
if (event == null) {
log.tracef(
"No event for [%s] %s. Skipping send storage.",
customEvent.getType(), customEvent.getId());
} else {
// look it up first, as we might be here for a retry/resend
WebhookSendModel webhookSend = webhooks.getSendById(realm, customEvent.getUid());
if (webhookSend == null) {
webhookSend =
webhooks.storeSend(webhook, event, customEvent.getUid(), customEvent.getType());
}
webhookSend.setStatus(httpStatus);
webhookSend.incrementRetries();
webhookSend.setSentAt(new Date());
}
});
}

public void schedule(WebhookModel webhook, ExtendedAdminEvent customEvent) {
schedule(
webhook.getId(),
customEvent,
webhook.getUrl(),
webhook.getSecret(),
webhook.getAlgorithm());
}

private void schedule(
ExtendedAdminEvent customEvent, String url, String secret, String algorithm) {
String webhookId,
ExtendedAdminEvent customEvent,
String url,
String secret,
String algorithm) {
SenderTask task = new SenderTask(customEvent, getBackOff());
task.getProperties().put("webhookId", webhookId);
task.getProperties().put("url", url);
task.getProperties().put("secret", secret);
task.getProperties().put("algorithm", algorithm);
Expand Down Expand Up @@ -198,14 +310,27 @@ private ExtendedAdminEvent completeExtendedAuthDetails(ExtendedAdminEvent event)
details.setSessionId(
session.getContext().getAuthenticationSession().getParentSession().getId());
} catch (Exception e) {
log.debug("couldn't get sessionId", e);
log.tracef("couldn't get sessionId: %s", e.getMessage());
}
try {
details.setRealmId(
session.getContext().getAuthenticationSession().getParentSession().getRealm().getName());
} catch (Exception e) {
log.debug("couldn't get realmId", e);
log.tracef("couldn't get realmId: %s", e.getMessage());
}
return event;
}

/** deep clone the event */
private static ExtendedAdminEvent clone(ExtendedAdminEvent event) {
try {
ExtendedAdminEvent customEvent =
JsonSerialization.readValue(
JsonSerialization.writeValueAsString(event), ExtendedAdminEvent.class);
customEvent.setUid(null);
return customEvent;
} catch (IOException e) {
throw new IllegalStateException("Event can't be cloned because of serialization issue.", e);
}
}
}
Loading

0 comments on commit 1b22817

Please sign in to comment.