Skip to content

Commit 725e869

Browse files
authored
[confighttp] Allow compression list for a server to be overridden (#10295)
--------- Signed-off-by: Juraci Paixão Kröhling <[email protected]>
1 parent 6ca551e commit 725e869

File tree

5 files changed

+114
-51
lines changed

5 files changed

+114
-51
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'enhancement'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: confighttp
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Allow the compression list to be overridden
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [10295]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Allows Collector administrators to control which compression algorithms to enable for HTTP-based receivers.

config/confighttp/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ will not be enabled.
7575
not set, browsers use a default of 5 seconds.
7676
- `endpoint`: Valid value syntax available [here](https://github.com/grpc/grpc/blob/master/doc/naming.md)
7777
- `max_request_body_size`: configures the maximum allowed body size in bytes for a single request. Default: `0` (no restriction)
78+
- `compression_algorithms`: configures the list of compression algorithms the server can accept. Default: ["", "gzip", "zstd", "zlib", "snappy", "deflate"]
7879
- [`tls`](../configtls/README.md)
7980
- [`auth`](../configauth/README.md)
8081

@@ -98,6 +99,7 @@ receivers:
9899
- Example-Header
99100
max_age: 7200
100101
endpoint: 0.0.0.0:55690
102+
compression_algorithms: ["", "gzip"]
101103
processors:
102104
attributes:
103105
actions:

config/confighttp/compression.go

Lines changed: 58 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,53 @@ type compressRoundTripper struct {
2525
compressor *compressor
2626
}
2727

28+
var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, error){
29+
"": func(io.ReadCloser) (io.ReadCloser, error) {
30+
// Not a compressed payload. Nothing to do.
31+
return nil, nil
32+
},
33+
"gzip": func(body io.ReadCloser) (io.ReadCloser, error) {
34+
gr, err := gzip.NewReader(body)
35+
if err != nil {
36+
return nil, err
37+
}
38+
return gr, nil
39+
},
40+
"zstd": func(body io.ReadCloser) (io.ReadCloser, error) {
41+
zr, err := zstd.NewReader(
42+
body,
43+
// Concurrency 1 disables async decoding. We don't need async decoding, it is pointless
44+
// for our use-case (a server accepting decoding http requests).
45+
// Disabling async improves performance (I benchmarked it previously when working
46+
// on https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23257).
47+
zstd.WithDecoderConcurrency(1),
48+
)
49+
if err != nil {
50+
return nil, err
51+
}
52+
return zr.IOReadCloser(), nil
53+
},
54+
"zlib": func(body io.ReadCloser) (io.ReadCloser, error) {
55+
zr, err := zlib.NewReader(body)
56+
if err != nil {
57+
return nil, err
58+
}
59+
return zr, nil
60+
},
61+
"snappy": func(body io.ReadCloser) (io.ReadCloser, error) {
62+
sr := snappy.NewReader(body)
63+
sb := new(bytes.Buffer)
64+
_, err := io.Copy(sb, sr)
65+
if err != nil {
66+
return nil, err
67+
}
68+
if err = body.Close(); err != nil {
69+
return nil, err
70+
}
71+
return io.NopCloser(sb), nil
72+
},
73+
}
74+
2875
func newCompressRoundTripper(rt http.RoundTripper, compressionType configcompression.Type) (*compressRoundTripper, error) {
2976
encoder, err := newCompressor(compressionType)
3077
if err != nil {
@@ -77,64 +124,27 @@ type decompressor struct {
77124
// by identifying the compression format in the "Content-Encoding" header and re-writing
78125
// request body so that the handlers further in the chain can work on decompressed data.
79126
// It supports gzip and deflate/zlib compression.
80-
func httpContentDecompressor(h http.Handler, maxRequestBodySize int64, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int), decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)) http.Handler {
127+
func httpContentDecompressor(h http.Handler, maxRequestBodySize int64, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int), enableDecoders []string, decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)) http.Handler {
81128
errHandler := defaultErrorHandler
82129
if eh != nil {
83130
errHandler = eh
84131
}
85132

133+
enabled := map[string]func(body io.ReadCloser) (io.ReadCloser, error){}
134+
for _, dec := range enableDecoders {
135+
enabled[dec] = availableDecoders[dec]
136+
137+
if dec == "deflate" {
138+
enabled["deflate"] = availableDecoders["zlib"]
139+
}
140+
}
141+
86142
d := &decompressor{
87143
maxRequestBodySize: maxRequestBodySize,
88144
errHandler: errHandler,
89145
base: h,
90-
decoders: map[string]func(body io.ReadCloser) (io.ReadCloser, error){
91-
"": func(io.ReadCloser) (io.ReadCloser, error) {
92-
// Not a compressed payload. Nothing to do.
93-
return nil, nil
94-
},
95-
"gzip": func(body io.ReadCloser) (io.ReadCloser, error) {
96-
gr, err := gzip.NewReader(body)
97-
if err != nil {
98-
return nil, err
99-
}
100-
return gr, nil
101-
},
102-
"zstd": func(body io.ReadCloser) (io.ReadCloser, error) {
103-
zr, err := zstd.NewReader(
104-
body,
105-
// Concurrency 1 disables async decoding. We don't need async decoding, it is pointless
106-
// for our use-case (a server accepting decoding http requests).
107-
// Disabling async improves performance (I benchmarked it previously when working
108-
// on https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23257).
109-
zstd.WithDecoderConcurrency(1),
110-
)
111-
if err != nil {
112-
return nil, err
113-
}
114-
return zr.IOReadCloser(), nil
115-
},
116-
"zlib": func(body io.ReadCloser) (io.ReadCloser, error) {
117-
zr, err := zlib.NewReader(body)
118-
if err != nil {
119-
return nil, err
120-
}
121-
return zr, nil
122-
},
123-
"snappy": func(body io.ReadCloser) (io.ReadCloser, error) {
124-
sr := snappy.NewReader(body)
125-
sb := new(bytes.Buffer)
126-
_, err := io.Copy(sb, sr)
127-
if err != nil {
128-
return nil, err
129-
}
130-
if err = body.Close(); err != nil {
131-
return nil, err
132-
}
133-
return io.NopCloser(sb), nil
134-
},
135-
},
146+
decoders: enabled,
136147
}
137-
d.decoders["deflate"] = d.decoders["zlib"]
138148

139149
for key, dec := range decoders {
140150
d.decoders[key] = dec

config/confighttp/compression_test.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func TestHTTPCustomDecompression(t *testing.T) {
134134
return io.NopCloser(strings.NewReader("decompressed body")), nil
135135
},
136136
}
137-
srv := httptest.NewServer(httpContentDecompressor(handler, defaultMaxRequestBodySize, defaultErrorHandler, decoders))
137+
srv := httptest.NewServer(httpContentDecompressor(handler, defaultMaxRequestBodySize, defaultErrorHandler, defaultCompressionAlgorithms, decoders))
138138

139139
t.Cleanup(srv.Close)
140140

@@ -253,7 +253,7 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
253253
require.NoError(t, err, "failed to read request body: %v", err)
254254
assert.EqualValues(t, testBody, string(body))
255255
w.WriteHeader(http.StatusOK)
256-
}), defaultMaxRequestBodySize, defaultErrorHandler, noDecoders))
256+
}), defaultMaxRequestBodySize, defaultErrorHandler, defaultCompressionAlgorithms, noDecoders))
257257
t.Cleanup(srv.Close)
258258

259259
req, err := http.NewRequest(http.MethodGet, srv.URL, tt.reqBody)
@@ -349,6 +349,31 @@ func TestHTTPContentCompressionRequestBodyCloseError(t *testing.T) {
349349
require.Error(t, err)
350350
}
351351

352+
func TestOverrideCompressionList(t *testing.T) {
353+
// prepare
354+
configuredDecoders := []string{"none", "zlib"}
355+
356+
srv := httptest.NewServer(httpContentDecompressor(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
357+
w.WriteHeader(http.StatusOK)
358+
}), defaultMaxRequestBodySize, defaultErrorHandler, configuredDecoders, nil))
359+
t.Cleanup(srv.Close)
360+
361+
req, err := http.NewRequest(http.MethodGet, srv.URL, compressSnappy(t, []byte("123decompressed body")))
362+
require.NoError(t, err, "failed to create request to test handler")
363+
req.Header.Set("Content-Encoding", "snappy")
364+
365+
client := http.Client{}
366+
367+
// test
368+
res, err := client.Do(req)
369+
require.NoError(t, err)
370+
371+
// verify
372+
assert.Equal(t, http.StatusBadRequest, res.StatusCode, "test handler returned unexpected status code ")
373+
_, err = io.ReadAll(res.Body)
374+
require.NoError(t, res.Body.Close(), "failed to close request body: %v", err)
375+
}
376+
352377
func compressGzip(t testing.TB, body []byte) *bytes.Buffer {
353378
var buf bytes.Buffer
354379
gw := gzip.NewWriter(&buf)

config/confighttp/confighttp.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131

3232
const headerContentEncoding = "Content-Encoding"
3333
const defaultMaxRequestBodySize = 20 * 1024 * 1024 // 20MiB
34+
var defaultCompressionAlgorithms = []string{"", "gzip", "zstd", "zlib", "snappy", "deflate"}
3435

3536
// ClientConfig defines settings for creating an HTTP client.
3637
type ClientConfig struct {
@@ -283,6 +284,9 @@ type ServerConfig struct {
283284
// Additional headers attached to each HTTP response sent to the client.
284285
// Header values are opaque since they may be sensitive.
285286
ResponseHeaders map[string]configopaque.String `mapstructure:"response_headers"`
287+
288+
// CompressionAlgorithms configures the list of compression algorithms the server can accept. Default: ["", "gzip", "zstd", "zlib", "snappy", "deflate"]
289+
CompressionAlgorithms []string `mapstructure:"compression_algorithms"`
286290
}
287291

288292
// ToListener creates a net.Listener.
@@ -348,7 +352,11 @@ func (hss *ServerConfig) ToServer(_ context.Context, host component.Host, settin
348352
hss.MaxRequestBodySize = defaultMaxRequestBodySize
349353
}
350354

351-
handler = httpContentDecompressor(handler, hss.MaxRequestBodySize, serverOpts.errHandler, serverOpts.decoders)
355+
if hss.CompressionAlgorithms == nil {
356+
hss.CompressionAlgorithms = defaultCompressionAlgorithms
357+
}
358+
359+
handler = httpContentDecompressor(handler, hss.MaxRequestBodySize, serverOpts.errHandler, hss.CompressionAlgorithms, serverOpts.decoders)
352360

353361
if hss.MaxRequestBodySize > 0 {
354362
handler = maxRequestBodySizeInterceptor(handler, hss.MaxRequestBodySize)

0 commit comments

Comments
 (0)