Skip to content

Commit 6684904

Browse files
tvoranerentantekin
authored andcommitted
events: CE portion of adding vault_index to event metadata (#30725)
1 parent 0f001bc commit 6684904

File tree

5 files changed

+184
-22
lines changed

5 files changed

+184
-22
lines changed

changelog/30725.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:improvement
2+
events: Add `vault_index` to an event's metadata if the metadata contains `modified=true`, to support client consistency controls when reading from Vault in response to an event where storage was modified.
3+
```

vault/core.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1365,7 +1365,7 @@ func NewCore(conf *CoreConfig) (*Core, error) {
13651365
if err != nil {
13661366
return nil, err
13671367
}
1368-
events, err := eventbus.NewEventBus(nodeID, eventsLogger)
1368+
events, err := eventbus.NewEventBus(nodeID, eventsLogger, c)
13691369
if err != nil {
13701370
return nil, err
13711371
}

vault/core_stubs_oss.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ func (c *Core) EntWaitUntilWALShipped(ctx context.Context, index uint64) bool {
103103
return true
104104
}
105105

106+
func (c *Core) GetCurrentWALHeader() string {
107+
return ""
108+
}
109+
106110
func (c *Core) SecretsSyncLicensedActivated() bool { return false }
107111

108112
func (c *Core) IsMultisealEnabled() bool { return false }

vault/eventbus/bus.go

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/hashicorp/eventlogger/formatter_filters/cloudevents"
2020
"github.com/hashicorp/go-bexpr"
2121
"github.com/hashicorp/go-hclog"
22+
"github.com/hashicorp/go-secure-stdlib/parseutil"
2223
"github.com/hashicorp/go-uuid"
2324
"github.com/hashicorp/vault/helper/namespace"
2425
"github.com/hashicorp/vault/sdk/logical"
@@ -30,8 +31,9 @@ const (
3031
// eventTypeAll is purely internal to the event bus. We use it to send all
3132
// events down one big firehose, and pipelines define their own filtering
3233
// based on what each subscriber is interested in.
33-
eventTypeAll = "*"
34-
defaultTimeout = 60 * time.Second
34+
eventTypeAll = "*"
35+
defaultTimeout = 60 * time.Second
36+
eventMetadataVaultIndex = "vault_index"
3537
)
3638

3739
var (
@@ -55,6 +57,13 @@ type EventBus struct {
5557
timeout time.Duration
5658
filters *Filters
5759
cloudEventsFormatterFilter *cloudevents.FormatterFilter
60+
walGetter StorageWALGetter
61+
}
62+
63+
// StorageWALGetter is an interface used to fetch the current storage index
64+
// from core without importing core
65+
type StorageWALGetter interface {
66+
GetCurrentWALHeader() string
5867
}
5968

6069
type pluginEventBus struct {
@@ -111,6 +120,25 @@ func patchMountPath(data *logical.EventData, pluginInfo *logical.EventPluginInfo
111120
return data
112121
}
113122

123+
// getIndexForEvent returns the storage index (wal header) for events with
124+
// metadata.modified=true.
125+
func (bus *EventBus) getIndexForEvent(event *logical.EventReceived) (string, error) {
126+
if event.Event == nil || event.Event.Metadata == nil || bus.walGetter == nil {
127+
return "", nil
128+
}
129+
eventMetadataModified := event.Event.Metadata.GetFields()[logical.EventMetadataModified]
130+
if eventMetadataModified != nil {
131+
isModified, err := parseutil.ParseBool(eventMetadataModified.GetStringValue())
132+
if err != nil {
133+
return "", fmt.Errorf("failed to parse event metadata modified: %w", err)
134+
}
135+
if isModified {
136+
return bus.walGetter.GetCurrentWALHeader(), nil
137+
}
138+
}
139+
return "", nil
140+
}
141+
114142
// SendEventInternal sends an event to the event bus and routes it to all relevant subscribers.
115143
// This function does *not* wait for all subscribers to acknowledge before returning.
116144
// This function is meant to be used by trusted internal code, so it can specify details like the namespace
@@ -136,6 +164,13 @@ func (bus *EventBus) SendEventInternal(_ context.Context, ns *namespace.Namespac
136164
eventReceived.Event = data
137165
} else {
138166
eventReceived.Event = patchMountPath(data, pluginInfo)
167+
walStr, err := bus.getIndexForEvent(eventReceived)
168+
if err != nil {
169+
bus.logger.Warn("Failed to get index for event", "error", err)
170+
}
171+
if walStr != "" {
172+
eventReceived.Event.Metadata.Fields[eventMetadataVaultIndex] = structpb.NewStringValue(walStr)
173+
}
139174
}
140175

141176
// We can't easily know when the SendEvent is complete, so we can't call the cancel function.
@@ -170,7 +205,7 @@ func (bus *pluginEventBus) SendEvent(ctx context.Context, eventType logical.Even
170205
return bus.bus.SendEventInternal(ctx, bus.namespace, bus.pluginInfo, eventType, false, data)
171206
}
172207

173-
func NewEventBus(localNodeID string, logger hclog.Logger) (*EventBus, error) {
208+
func NewEventBus(localNodeID string, logger hclog.Logger, c StorageWALGetter) (*EventBus, error) {
174209
broker, err := eventlogger.NewBroker()
175210
if err != nil {
176211
return nil, err
@@ -205,6 +240,7 @@ func NewEventBus(localNodeID string, logger hclog.Logger) (*EventBus, error) {
205240
timeout: defaultTimeout,
206241
cloudEventsFormatterFilter: cloudEventsFormatterFilter,
207242
filters: NewFilters(localNodeID),
243+
walGetter: c,
208244
}, nil
209245
}
210246

0 commit comments

Comments
 (0)