@@ -30,7 +30,9 @@ groups() ->
30
30
[
31
31
{cluster_size_2 , [], [
32
32
policy_ttl ,
33
- operator_policy_ttl
33
+ operator_policy_ttl ,
34
+ operator_retroactive_policy_ttl ,
35
+ operator_retroactive_policy_publish_ttl
34
36
]}
35
37
].
36
38
@@ -112,6 +114,49 @@ operator_policy_ttl(Config) ->
112
114
rabbit_ct_client_helpers :close_connection (Conn ),
113
115
passed .
114
116
117
+ operator_retroactive_policy_ttl (Config ) ->
118
+ {Conn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config , 0 ),
119
+ Q = <<" policy_ttl-queue" >>,
120
+ declare (Ch , Q ),
121
+ publish (Ch , Q , lists :seq (1 , 50 )),
122
+ % Operator policy will override
123
+ rabbit_ct_broker_helpers :set_operator_policy (Config , 0 , <<" ttl-policy-op" >>,
124
+ <<" policy_ttl-queue" >>, <<" all" >>, [{<<" message-ttl" >>, 1 }]),
125
+
126
+ % % Old messages are not expired
127
+ timer :sleep (50 ),
128
+ get_messages (50 , Ch , Q ),
129
+ delete (Ch , Q ),
130
+
131
+ rabbit_ct_broker_helpers :clear_operator_policy (Config , 0 , <<" ttl-policy-op" >>),
132
+
133
+ rabbit_ct_client_helpers :close_channel (Ch ),
134
+ rabbit_ct_client_helpers :close_connection (Conn ),
135
+ passed .
136
+
137
+ operator_retroactive_policy_publish_ttl (Config ) ->
138
+ {Conn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config , 0 ),
139
+ Q = <<" policy_ttl-queue" >>,
140
+ declare (Ch , Q ),
141
+ publish (Ch , Q , lists :seq (1 , 50 )),
142
+ % Operator policy will override
143
+ rabbit_ct_broker_helpers :set_operator_policy (Config , 0 , <<" ttl-policy-op" >>,
144
+ <<" policy_ttl-queue" >>, <<" all" >>, [{<<" message-ttl" >>, 1 }]),
145
+
146
+ % % Old messages are not expired, new ones only expire when they get to the head of
147
+ % % the queue
148
+ publish (Ch , Q , lists :seq (1 , 25 )),
149
+ timer :sleep (50 ),
150
+ [[<<" policy_ttl-queue" >>, <<" 75" >>]] = rabbit_ct_broker_helpers :rabbitmqctl_list (Config , 0 , [" list_queues" ]),
151
+ get_messages (50 , Ch , Q ),
152
+ delete (Ch , Q ),
153
+
154
+ rabbit_ct_broker_helpers :clear_operator_policy (Config , 0 , <<" ttl-policy-op" >>),
155
+
156
+ rabbit_ct_client_helpers :close_channel (Ch ),
157
+ rabbit_ct_client_helpers :close_connection (Conn ),
158
+ passed .
159
+
115
160
% %----------------------------------------------------------------------------
116
161
117
162
@@ -154,4 +199,14 @@ consume(Ch, Q, Ack) ->
154
199
get_empty (Ch , Q ) ->
155
200
# 'basic.get_empty' {} = amqp_channel :call (Ch , # 'basic.get' {queue = Q }).
156
201
202
+ get_messages (0 , Ch , Q ) ->
203
+ get_empty (Ch , Q );
204
+ get_messages (Number , Ch , Q ) ->
205
+ case amqp_channel :call (Ch , # 'basic.get' {queue = Q }) of
206
+ {# 'basic.get_ok' {}, _ } ->
207
+ get_messages (Number - 1 , Ch , Q );
208
+ # 'basic.get_empty' {} ->
209
+ exit (failed )
210
+ end .
211
+
157
212
% %----------------------------------------------------------------------------
0 commit comments