Skip to content

Commit 780e64b

Browse files
committed
Start support for token refresh
1 parent 4adcd1d commit 780e64b

File tree

10 files changed

+374
-35
lines changed

10 files changed

+374
-35
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
7676
private final Lock instanceLock = new ReentrantLock();
7777
private final boolean filterExpressionsSupported, setTokenSupported;
7878
private volatile ExecutorService dispatchingExecutorService;
79-
private final Credentials credentials;
79+
private final Credentials.Registration credentialsRegistration;
8080

8181
AmqpConnection(AmqpConnectionBuilder builder) {
8282
super(builder.listeners());
@@ -119,9 +119,14 @@ final class AmqpConnection extends ResourceBase implements Connection {
119119
instanceof UsernamePasswordCredentialsProvider) {
120120
UsernamePasswordCredentialsProvider credentialsProvider =
121121
(UsernamePasswordCredentialsProvider) connectionSettings.credentialsProvider();
122-
this.credentials = new UsernamePasswordCredentials(credentialsProvider);
122+
Credentials credentials = new UsernamePasswordCredentials(credentialsProvider);
123+
this.credentialsRegistration =
124+
credentials.register(
125+
(u, p) -> {
126+
((AmqpManagement) management()).setToken(p);
127+
});
123128
} else {
124-
this.credentials = callback -> {};
129+
this.credentialsRegistration = Credentials.NO_OP.register((u, p) -> {});
125130
}
126131
LOGGER.debug("Opening native connection for connection '{}'...", this.name());
127132
NativeConnectionWrapper ncw =
@@ -199,7 +204,7 @@ private NativeConnectionWrapper connect(
199204
List<Address> addresses) {
200205

201206
ConnectionOptions connectionOptions = new ConnectionOptions();
202-
credentials.configure(new TokenConnectionCallback(connectionOptions));
207+
credentialsRegistration.connect(new TokenConnectionCallback(connectionOptions));
203208
connectionOptions.virtualHost("vhost:" + connectionSettings.virtualHost());
204209
connectionOptions.saslOptions().addAllowedMechanism(connectionSettings.saslMechanism());
205210
connectionOptions.idleTimeout(
@@ -726,6 +731,7 @@ long id() {
726731
private void close(Throwable cause) {
727732
if (this.closed.compareAndSet(false, true)) {
728733
this.state(CLOSING, cause);
734+
this.credentialsRegistration.unregister();
729735
this.environment.removeConnection(this);
730736
if (this.topologyListener instanceof AutoCloseable) {
731737
try {

src/main/java/com/rabbitmq/client/amqp/impl/Credentials.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,43 @@
1919

2020
interface Credentials {
2121

22-
void configure(ConnectionCallback callback);
22+
Credentials NO_OP = new NoOpCredentials();
23+
24+
Registration register(RefreshCallback refreshCallback);
25+
26+
interface Registration {
27+
28+
void connect(ConnectionCallback callback);
29+
30+
void unregister();
31+
}
2332

2433
interface ConnectionCallback {
2534

2635
ConnectionCallback username(String username);
2736

2837
ConnectionCallback password(String password);
2938
}
39+
40+
interface RefreshCallback {
41+
42+
void refresh(String username, String password);
43+
}
44+
45+
class NoOpCredentials implements Credentials {
46+
47+
@Override
48+
public Registration register(RefreshCallback refreshCallback) {
49+
return new NoOpRegistration();
50+
}
51+
}
52+
53+
class NoOpRegistration implements Registration {
54+
55+
@Override
56+
public void connect(ConnectionCallback callback) {}
57+
58+
@Override
59+
public void unregister() {}
60+
}
3061
}

src/main/java/com/rabbitmq/client/amqp/impl/OAuthCredentialsProvider.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,18 @@
1919

2020
import com.rabbitmq.client.amqp.UsernamePasswordCredentialsProvider;
2121
import com.rabbitmq.client.amqp.oauth.Token;
22-
import com.rabbitmq.client.amqp.oauth.TokenParser;
2322
import com.rabbitmq.client.amqp.oauth.TokenRequester;
2423
import java.util.concurrent.locks.Lock;
2524
import java.util.concurrent.locks.ReentrantLock;
2625

2726
final class OAuthCredentialsProvider implements UsernamePasswordCredentialsProvider {
2827

2928
private final TokenRequester requester;
30-
private final TokenParser parser;
3129
private volatile Token token;
3230
private final Lock lock = new ReentrantLock();
3331

34-
OAuthCredentialsProvider(TokenRequester requester, TokenParser parser) {
32+
OAuthCredentialsProvider(TokenRequester requester) {
3533
this.requester = requester;
36-
this.parser = parser;
3734
}
3835

3936
@Override
@@ -46,7 +43,7 @@ public String getPassword() {
4643
lock.lock();
4744
try {
4845
if (token == null || token.expirationTime() < System.currentTimeMillis()) {
49-
this.token = this.parser.parse(this.requester.request());
46+
this.token = this.requester.request();
5047
}
5148
} finally {
5249
lock.unlock();
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.client.amqp.impl;
19+
20+
import com.rabbitmq.client.amqp.oauth.Token;
21+
import com.rabbitmq.client.amqp.oauth.TokenRequester;
22+
import java.time.Duration;
23+
import java.util.Map;
24+
import java.util.Objects;
25+
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.ScheduledExecutorService;
27+
import java.util.concurrent.ScheduledFuture;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.concurrent.atomic.AtomicLong;
31+
import java.util.concurrent.locks.Lock;
32+
import java.util.concurrent.locks.ReentrantLock;
33+
34+
final class TokenCredentials implements Credentials {
35+
36+
private final TokenRequester requester;
37+
private final ScheduledExecutorService scheduledExecutorService;
38+
private volatile Token token;
39+
private final Lock lock = new ReentrantLock();
40+
private final Map<Long, RegistrationImpl> registrations = new ConcurrentHashMap<>();
41+
private final AtomicLong registrationSequence = new AtomicLong(0);
42+
private final AtomicBoolean schedulingRenewal = new AtomicBoolean(false);
43+
private volatile ScheduledFuture<?> renewalTask;
44+
45+
TokenCredentials(TokenRequester requester, ScheduledExecutorService scheduledExecutorService) {
46+
this.requester = requester;
47+
this.scheduledExecutorService = scheduledExecutorService;
48+
}
49+
50+
private void lock() {
51+
this.lock.lock();
52+
}
53+
54+
private void unlock() {
55+
this.lock.unlock();
56+
}
57+
58+
private boolean expiresSoon(Token t) {
59+
return t.expirationTime() < System.currentTimeMillis() - 20_000;
60+
}
61+
62+
private Duration delayBeforeTokenRenewal(Token token) {
63+
long expiresIn = token.expirationTime() - System.currentTimeMillis();
64+
long delay = (long) (expiresIn * 0.8);
65+
return Duration.ofMillis(delay);
66+
}
67+
68+
private Token getToken() {
69+
return requester.request();
70+
}
71+
72+
@Override
73+
public Registration register(RefreshCallback refreshCallback) {
74+
Long id = this.registrationSequence.getAndIncrement();
75+
RegistrationImpl registration = new RegistrationImpl(id, refreshCallback);
76+
this.registrations.put(id, registration);
77+
return registration;
78+
}
79+
80+
private void refresh() {
81+
this.scheduledExecutorService.execute(
82+
() -> {
83+
for (RegistrationImpl registration : this.registrations.values()) {
84+
if (!registration.isClosed() && !this.token.equals(registration.registrationToken)) {
85+
// the registration does not have the new token yet
86+
registration.refreshCallback.refresh("", this.token.value());
87+
registration.registrationToken = this.token;
88+
}
89+
}
90+
});
91+
}
92+
93+
private void token(Token t) {
94+
if (!t.equals(this.token)) {
95+
this.token = t;
96+
if (this.schedulingRenewal.compareAndSet(false, true)) {
97+
if (this.renewalTask != null) {
98+
this.renewalTask.cancel(false);
99+
}
100+
Duration delay = delayBeforeTokenRenewal(t);
101+
if (delay.isZero() || delay.isNegative()) {
102+
delay = Duration.ofSeconds(1);
103+
}
104+
// TODO check delay is > 0, schedule 1 second later at least
105+
this.renewalTask =
106+
this.scheduledExecutorService.schedule(
107+
() -> {
108+
Token previousToken = this.token;
109+
this.lock();
110+
try {
111+
if (this.token.equals(previousToken)) {
112+
Token newToken = getToken();
113+
token(newToken);
114+
refresh();
115+
}
116+
} finally {
117+
unlock();
118+
}
119+
},
120+
delay.toMillis(),
121+
TimeUnit.MILLISECONDS);
122+
this.schedulingRenewal.set(false);
123+
}
124+
}
125+
}
126+
127+
private final class RegistrationImpl implements Registration {
128+
129+
private final Long id;
130+
private final RefreshCallback refreshCallback;
131+
private volatile Token registrationToken;
132+
private final AtomicBoolean closed = new AtomicBoolean(false);
133+
134+
private RegistrationImpl(Long id, RefreshCallback refreshCallback) {
135+
this.id = id;
136+
this.refreshCallback = refreshCallback;
137+
}
138+
139+
@Override
140+
public void connect(ConnectionCallback callback) {
141+
boolean shouldRefresh = false;
142+
lock();
143+
try {
144+
if (token == null) {
145+
token(getToken());
146+
} else if (expiresSoon(token)) {
147+
shouldRefresh = true;
148+
token(getToken());
149+
}
150+
this.registrationToken = token;
151+
} finally {
152+
unlock();
153+
}
154+
155+
callback.username("").password(this.registrationToken.value());
156+
if (shouldRefresh) {
157+
refresh();
158+
}
159+
}
160+
161+
@Override
162+
public void unregister() {
163+
if (this.closed.compareAndSet(false, true)) {
164+
registrations.remove(this.id);
165+
}
166+
}
167+
168+
private boolean isClosed() {
169+
return this.closed.get();
170+
}
171+
172+
@Override
173+
public boolean equals(Object o) {
174+
if (o == null || getClass() != o.getClass()) return false;
175+
RegistrationImpl that = (RegistrationImpl) o;
176+
return Objects.equals(id, that.id);
177+
}
178+
179+
@Override
180+
public int hashCode() {
181+
return Objects.hashCode(id);
182+
}
183+
}
184+
}

src/main/java/com/rabbitmq/client/amqp/impl/UsernamePasswordCredentials.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,18 @@ final class UsernamePasswordCredentials implements Credentials {
2828
}
2929

3030
@Override
31-
public void configure(ConnectionCallback callback) {
32-
callback.username(this.provider.getUsername()).password(this.provider.getPassword());
31+
public Registration register(RefreshCallback refreshCallback) {
32+
return new RegistrationImpl();
33+
}
34+
35+
private final class RegistrationImpl implements Registration {
36+
37+
@Override
38+
public void connect(ConnectionCallback callback) {
39+
callback.username(provider.getUsername()).password(provider.getPassword());
40+
}
41+
42+
@Override
43+
public void unregister() {}
3344
}
3445
}

src/main/java/com/rabbitmq/client/amqp/oauth/HttpTokenRequester.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public final class HttpTokenRequester implements TokenRequester {
5151
private final HttpClient client;
5252
private final Consumer<HttpRequest.Builder> requestBuilderConsumer;
5353

54+
private TokenParser parser;
55+
5456
public HttpTokenRequester(
5557
String tokenEndpointUri,
5658
String clientId,
@@ -60,7 +62,8 @@ public HttpTokenRequester(
6062
HostnameVerifier hostnameVerifier,
6163
SSLSocketFactory sslSocketFactory,
6264
Consumer<HttpClient.Builder> clientBuilderConsumer,
63-
Consumer<HttpRequest.Builder> requestBuilderConsumer) {
65+
Consumer<HttpRequest.Builder> requestBuilderConsumer,
66+
TokenParser parser) {
6467
try {
6568
this.tokenEndpointUri = new URI(tokenEndpointUri);
6669
} catch (URISyntaxException e) {
@@ -72,6 +75,7 @@ public HttpTokenRequester(
7275
this.parameters = Map.copyOf(parameters);
7376
this.hostnameVerifier = hostnameVerifier;
7477
this.sslSocketFactory = sslSocketFactory;
78+
this.parser = parser;
7579
if (requestBuilderConsumer == null) {
7680
this.requestBuilderConsumer =
7781
requestBuilder ->
@@ -95,7 +99,7 @@ public HttpTokenRequester(
9599
}
96100

97101
@Override
98-
public String request() {
102+
public Token request() {
99103
StringBuilder urlParameters = new StringBuilder();
100104
encode(urlParameters, "grant_type", grantType);
101105
for (Map.Entry<String, String> parameter : parameters.entrySet()) {
@@ -117,7 +121,7 @@ public String request() {
117121
this.client.send(request, HttpResponse.BodyHandlers.ofString(UTF_8));
118122
checkStatusCode(response.statusCode());
119123
checkContentType(response.headers().firstValue("content-type").orElse(null));
120-
return response.body();
124+
return this.parser.parse(response.body());
121125
} catch (IOException e) {
122126
throw new OAuthException("Error while retrieving OAuth 2 token", e);
123127
} catch (InterruptedException e) {

src/main/java/com/rabbitmq/client/amqp/oauth/TokenRequester.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,5 @@
1919

2020
public interface TokenRequester {
2121

22-
String request();
22+
Token request();
2323
}

0 commit comments

Comments
 (0)