Skip to content

Support token renewal #96

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ jobs:
cache: 'maven'
- name: Start broker
run: ci/start-broker.sh
env:
RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:main'
- name: Start toxiproxy
run: ci/start-toxiproxy.sh
- name: Display Java version
Expand Down
23 changes: 20 additions & 3 deletions ci/start-broker.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env bash

RABBITMQ_IMAGE=${RABBITMQ_IMAGE:-rabbitmq:4.0}
RABBITMQ_IMAGE=${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}

wait_for_message() {
while ! docker logs "$1" | grep -q "$2";
Expand All @@ -17,7 +17,7 @@ cp -R "${PWD}"/tls-gen/basic/result/* rabbitmq-configuration/tls
chmod o+r rabbitmq-configuration/tls/*
chmod g+r rabbitmq-configuration/tls/*

echo "[rabbitmq_auth_mechanism_ssl]." >> rabbitmq-configuration/enabled_plugins
echo "[rabbitmq_auth_mechanism_ssl,rabbitmq_auth_backend_oauth2]." >> rabbitmq-configuration/enabled_plugins

echo "loopback_users = none

Expand All @@ -34,7 +34,24 @@ ssl_options.depth = 1

auth_mechanisms.1 = PLAIN
auth_mechanisms.2 = ANONYMOUS
auth_mechanisms.3 = EXTERNAL" >> rabbitmq-configuration/rabbitmq.conf
auth_mechanisms.3 = EXTERNAL

auth_backends.1 = internal
auth_backends.2 = rabbit_auth_backend_oauth2" >> rabbitmq-configuration/rabbitmq.conf

echo "[
{rabbitmq_auth_backend_oauth2, [{key_config,
[{signing_keys,
#{<<\"token-key\">> =>
{map,
#{<<\"alg\">> => <<\"HS256\">>,
<<\"k\">> => <<\"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGH\">>,
<<\"kid\">> => <<\"token-key\">>,
<<\"kty\">> => <<\"oct\">>,
<<\"use\">> => <<\"sig\">>,
<<\"value\">> => <<\"token-key\">>}}}}]},
{resource_server_id,<<\"rabbitmq\">>}]}
]." >> rabbitmq-configuration/advanced.config

echo "Running RabbitMQ ${RABBITMQ_IMAGE}"

Expand Down
11 changes: 10 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@
<assertj.version>3.26.3</assertj.version>
<mockito.version>5.14.2</mockito.version>
<jqwik.version>1.9.1</jqwik.version>
<amqp-client.version>5.20.0</amqp-client.version>
<amqp-client.version>5.22.0</amqp-client.version>
<micrometer-tracing-test.version>1.4.0</micrometer-tracing-test.version>
<micrometer-docs-generator.version>1.0.4</micrometer-docs-generator.version>
<jose4j.version>0.9.6</jose4j.version>
<maven.compiler.plugin.version>3.13.0</maven.compiler.plugin.version>
<maven.dependency.plugin.version>3.8.1</maven.dependency.plugin.version>
<maven-surefire-plugin.version>3.5.2</maven-surefire-plugin.version>
Expand Down Expand Up @@ -236,6 +237,14 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.bitbucket.b_c</groupId>
<artifactId>jose4j</artifactId>
<version>${jose4j.version}</version>
<scope>test</scope>
</dependency>


</dependencies>

<dependencyManagement>
Expand Down
18 changes: 13 additions & 5 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package com.rabbitmq.client.amqp.impl;

import static com.rabbitmq.client.amqp.Resource.State.*;
import static com.rabbitmq.client.amqp.impl.Utils.supportFilterExpressions;
import static com.rabbitmq.client.amqp.impl.Utils.supportSetToken;

import com.rabbitmq.client.amqp.*;
import com.rabbitmq.client.amqp.ObservationCollector;
Expand Down Expand Up @@ -72,7 +74,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
private final ConnectionSettings.AffinityStrategy affinityStrategy;
private final String name;
private final Lock instanceLock = new ReentrantLock();
private final boolean filterExpressionsSupported;
private final boolean filterExpressionsSupported, setTokenSupported;
private volatile ExecutorService dispatchingExecutorService;

AmqpConnection(AmqpConnectionBuilder builder) {
Expand Down Expand Up @@ -128,8 +130,9 @@ final class AmqpConnection extends ResourceBase implements Connection {
ConnectionUtils.NO_RETRY_STRATEGY,
this.name());
this.sync(ncw);
this.filterExpressionsSupported =
Utils.supportFilterExpressions(brokerVersion(this.nativeConnection));
String brokerVesion = brokerVersion(this.nativeConnection);
this.filterExpressionsSupported = supportFilterExpressions(brokerVesion);
this.setTokenSupported = supportSetToken(brokerVesion);
LOGGER.debug("Opened connection '{}' on node '{}'.", this.name(), this.connectionNodename());
this.state(OPEN);
this.environment.metricsCollector().openConnection();
Expand Down Expand Up @@ -331,6 +334,7 @@ TopologyListener createTopologyListener(AmqpConnectionBuilder builder) {
"Not recovering connection '{}' for error {}",
this.name(),
event.failureCause().getMessage());
close(ExceptionUtils.convert(ioex));
}
};

Expand Down Expand Up @@ -707,6 +711,10 @@ boolean filterExpressionsSupported() {
return this.filterExpressionsSupported;
}

boolean setTokenSupported() {
return this.setTokenSupported;
}

long id() {
return this.id;
}
Expand All @@ -730,10 +738,10 @@ private void close(Throwable cause) {
rpcServer.close();
}
for (AmqpPublisher publisher : this.publishers) {
publisher.close();
publisher.close(cause);
}
for (AmqpConsumer consumer : this.consumers) {
consumer.close();
consumer.close(cause);
}
try {
this.dispatchingExecutorService.shutdownNow();
Expand Down
24 changes: 9 additions & 15 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static com.rabbitmq.client.amqp.Resource.State.*;
import static com.rabbitmq.client.amqp.impl.AmqpConsumerBuilder.*;
import static com.rabbitmq.client.amqp.impl.ExceptionUtils.*;
import static java.time.Duration.ofSeconds;
import static java.util.Optional.ofNullable;

Expand Down Expand Up @@ -71,7 +72,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
private final Runnable replenishCreditOperation = this::replenishCreditIfNeeded;
private final ExecutorService dispatchingExecutorService;
private final java.util.function.Consumer<Delivery> nativeHandler;
private final java.util.function.Consumer<ClientException> nativeReceiverCloseHandler;
private final java.util.function.Consumer<ClientException> nativeCloseHandler;
// native receiver internal state, accessed only in the native executor/scheduler
private ProtonReceiver protonReceiver;
private volatile Scheduler protonExecutor;
Expand Down Expand Up @@ -101,7 +102,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {

this.dispatchingExecutorService = connection.dispatchingExecutorService();
this.nativeHandler = createNativeHandler(messageHandler);
this.nativeReceiverCloseHandler =
this.nativeCloseHandler =
e ->
this.dispatchingExecutorService.submit(
() -> {
Expand All @@ -116,7 +117,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
this.filters,
this.subscriptionListener,
this.nativeHandler,
this.nativeReceiverCloseHandler);
this.nativeCloseHandler);
this.initStateFromNativeReceiver(this.nativeReceiver);
this.metricsCollector = this.connection.metricsCollector();
try {
Expand Down Expand Up @@ -277,8 +278,8 @@ private Runnable createReceiveTask(Receiver receiver, MessageHandler messageHand
messageHandler.handle(context, message);
}
}
} catch (ClientLinkRemotelyClosedException e) {
if (ExceptionUtils.notFound(e) || ExceptionUtils.resourceDeleted(e)) {
} catch (ClientLinkRemotelyClosedException | ClientSessionRemotelyClosedException e) {
if (notFound(e) || resourceDeleted(e) || unauthorizedAccess(e)) {
this.close(ExceptionUtils.convert(e));
}
} catch (ClientConnectionRemotelyClosedException e) {
Expand All @@ -304,7 +305,7 @@ void recoverAfterConnectionFailure() {
this.filters,
this.subscriptionListener,
this.nativeHandler,
this.nativeReceiverCloseHandler),
this.nativeCloseHandler),
e -> {
boolean shouldRetry =
e instanceof AmqpException.AmqpResourceClosedException
Expand All @@ -327,7 +328,7 @@ void recoverAfterConnectionFailure() {
}
}

private void close(Throwable cause) {
void close(Throwable cause) {
if (this.closed.compareAndSet(false, true)) {
this.state(CLOSING, cause);
this.connection.removeConsumer(this);
Expand Down Expand Up @@ -533,13 +534,6 @@ private void handleException(Exception ex, String operation) {
}

private static boolean maybeCloseConsumerOnException(AmqpConsumer consumer, Exception ex) {
if (ex instanceof ClientLinkRemotelyClosedException) {
ClientLinkRemotelyClosedException e = (ClientLinkRemotelyClosedException) ex;
if (ExceptionUtils.notFound(e) || ExceptionUtils.resourceDeleted(e)) {
consumer.close(ExceptionUtils.convert(e));
return true;
}
}
return false;
return ExceptionUtils.maybeCloseConsumerOnException(consumer::close, ex);
}
}
23 changes: 23 additions & 0 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.rabbitmq.client.amqp.AmqpException;
import com.rabbitmq.client.amqp.Management;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
Expand All @@ -38,6 +39,7 @@
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientLinkRemotelyClosedException;
import org.apache.qpid.protonj2.client.exceptions.ClientSessionRemotelyClosedException;
import org.apache.qpid.protonj2.types.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -169,6 +171,27 @@ public UnbindSpecification unbind() {
return new AmqpBindingManagement.AmqpUnbindSpecification(this);
}

void setToken(String token) {
if (!this.connection.setTokenSupported()) {
throw new UnsupportedOperationException("Token renewal requires at least RabbitMQ 4.1.0");
}
checkAvailable();
UUID requestId = messageId();
try {
Message<?> request =
Message.create(new Binary(token.getBytes(StandardCharsets.UTF_8)))
.to("/auth/tokens")
.subject("PUT");

OutstandingRequest outstandingRequest = this.request(request, requestId);
outstandingRequest.block();

checkResponse(outstandingRequest, requestId, 204);
} catch (ClientException e) {
throw new AmqpException("Error on set-token operation", e);
}
}

@Override
public void close() {
if (this.initializing) {
Expand Down
36 changes: 31 additions & 5 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.qpid.protonj2.client.*;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
Expand All @@ -53,6 +54,8 @@ final class AmqpPublisher extends ResourceBase implements Publisher {
private final Duration publishTimeout;
private final SessionHandler sessionHandler;
private volatile ObservationCollector.ConnectionInfo connectionInfo;
private final ExecutorService dispatchingExecutorService;
private final java.util.function.Consumer<ClientException> nativeCloseHandler;

AmqpPublisher(AmqpPublisherBuilder builder) {
super(builder.listeners());
Expand All @@ -63,7 +66,17 @@ final class AmqpPublisher extends ResourceBase implements Publisher {
this.connection = builder.connection();
this.publishTimeout = builder.publishTimeout();
this.sessionHandler = this.connection.createSessionHandler();
this.sender = this.createSender(sessionHandler.session(), this.address, this.publishTimeout);
this.dispatchingExecutorService = connection.dispatchingExecutorService();
this.nativeCloseHandler =
e ->
this.dispatchingExecutorService.submit(
() -> {
// get result to make spotbugs happy
boolean ignored = maybeCloseConsumerOnException(this, e);
});
this.sender =
this.createSender(
sessionHandler.session(), this.address, this.publishTimeout, this.nativeCloseHandler);
this.metricsCollector = this.connection.metricsCollector();
this.observationCollector = this.connection.observationCollector();
this.state(OPEN);
Expand Down Expand Up @@ -154,7 +167,11 @@ private static MetricsCollector.PublishDisposition mapToPublishDisposition(Statu
void recoverAfterConnectionFailure() {
this.connectionInfo = new Utils.ObservationConnectionInfo(this.connection.connectionAddress());
this.sender =
this.createSender(this.sessionHandler.sessionNoCheck(), this.address, this.publishTimeout);
this.createSender(
this.sessionHandler.sessionNoCheck(),
this.address,
this.publishTimeout,
this.nativeCloseHandler);
}

@Override
Expand All @@ -164,14 +181,19 @@ public void close() {

// internal API

private Sender createSender(Session session, String address, Duration publishTimeout) {
private Sender createSender(
Session session,
String address,
Duration publishTimeout,
Consumer<ClientException> nativeCloseHandler) {
SenderOptions senderOptions =
new SenderOptions()
.deliveryMode(DeliveryMode.AT_LEAST_ONCE)
.sendTimeout(
publishTimeout.isNegative()
? ConnectionOptions.INFINITE
: publishTimeout.toMillis());
: publishTimeout.toMillis())
.closeHandler(nativeCloseHandler);
try {
Sender s =
address == null
Expand All @@ -183,7 +205,7 @@ private Sender createSender(Session session, String address, Duration publishTim
}
}

private void close(Throwable cause) {
void close(Throwable cause) {
if (this.closed.compareAndSet(false, true)) {
this.state(State.CLOSING, cause);
this.connection.removePublisher(this);
Expand All @@ -198,6 +220,10 @@ private void close(Throwable cause) {
}
}

private static boolean maybeCloseConsumerOnException(AmqpPublisher publisher, Exception ex) {
return ExceptionUtils.maybeCloseConsumerOnException(publisher::close, ex);
}

private static class DefaultContext implements Publisher.Context {

private final Message message;
Expand Down
Loading