Skip to content

Commit 3e5da54

Browse files
committed
Update according to upstream api changes
1 parent 114f0b9 commit 3e5da54

File tree

2 files changed

+74
-40
lines changed

2 files changed

+74
-40
lines changed

client.go

Lines changed: 55 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package http
22

33
import (
4-
"crypto/tls"
54
"expvar"
65
"fmt"
76
"io"
@@ -12,18 +11,32 @@ import (
1211

1312
"github.com/elastic/beats/libbeat/logp"
1413
"github.com/elastic/beats/libbeat/outputs"
14+
"github.com/elastic/beats/libbeat/outputs/outil"
1515
"github.com/elastic/beats/libbeat/outputs/transport"
1616
)
1717

1818
type Client struct {
1919
Connection
20-
params map[string]string
20+
tlsConfig *transport.TLSConfig
21+
params map[string]string
2122

2223
// additional configs
2324
compressionLevel int
2425
proxyURL *url.URL
2526
}
2627

28+
type ClientSettings struct {
29+
URL string
30+
Proxy *url.URL
31+
TLS *transport.TLSConfig
32+
Username, Password string
33+
Parameters map[string]string
34+
Index outil.Selector
35+
Pipeline *outil.Selector
36+
Timeout time.Duration
37+
CompressionLevel int
38+
}
39+
2740
type Connection struct {
2841
URL string
2942
Username string
@@ -48,29 +61,38 @@ var (
4861
)
4962

5063
func NewClient(
51-
hostURL string, proxyURL *url.URL, tls *tls.Config,
52-
username, password string,
53-
params map[string]string,
54-
timeout time.Duration,
55-
compression int,
64+
s ClientSettings,
5665
) (*Client, error) {
5766
proxy := http.ProxyFromEnvironment
58-
if proxyURL != nil {
59-
proxy = http.ProxyURL(proxyURL)
67+
if s.Proxy != nil {
68+
proxy = http.ProxyURL(s.Proxy)
6069
}
6170

62-
logp.Info("Http url: %s", hostURL)
71+
logp.Info("Http url: %s", s.URL)
6372

64-
dialer := transport.NetDialer(timeout)
65-
dialer = transport.StatsDialer(dialer, &transport.IOStats{
73+
// TODO: add socks5 proxy support
74+
var dialer, tlsDialer transport.Dialer
75+
var err error
76+
77+
dialer = transport.NetDialer(s.Timeout)
78+
tlsDialer, err = transport.TLSDialer(dialer, s.TLS, s.Timeout)
79+
if err != nil {
80+
return nil, err
81+
}
82+
83+
iostats := &transport.IOStats{
6684
Read: statReadBytes,
6785
Write: statWriteBytes,
6886
ReadErrors: statReadErrors,
6987
WriteErrors: statWriteErrors,
70-
})
88+
}
89+
dialer = transport.StatsDialer(dialer, iostats)
90+
tlsDialer = transport.StatsDialer(tlsDialer, iostats)
91+
92+
params := s.Parameters
7193

72-
var err error
7394
var encoder bodyEncoder
95+
compression := s.CompressionLevel
7496
if compression == 0 {
7597
encoder = newJSONEncoder(nil)
7698
} else {
@@ -82,22 +104,23 @@ func NewClient(
82104

83105
client := &Client{
84106
Connection: Connection{
85-
URL: hostURL,
86-
Username: username,
87-
Password: password,
107+
URL: s.URL,
108+
Username: s.Username,
109+
Password: s.Password,
88110
http: &http.Client{
89111
Transport: &http.Transport{
90-
Dial: dialer.Dial,
91-
TLSClientConfig: tls,
92-
Proxy: proxy,
112+
Dial: dialer.Dial,
113+
DialTLS: tlsDialer.Dial,
114+
Proxy: proxy,
93115
},
94-
Timeout: timeout,
116+
Timeout: s.Timeout,
95117
},
96118
encoder: encoder,
97119
},
98120
params: params,
99121

100-
proxyURL: proxyURL,
122+
compressionLevel: compression,
123+
proxyURL: s.Proxy,
101124
}
102125

103126
return client, nil
@@ -108,17 +131,17 @@ func (client *Client) Clone() *Client {
108131
// client's close is for example generated for topology-map support. With params
109132
// most likely containing the ingest node pipeline and default callback trying to
110133
// create install a template, we don't want these to be included in the clone.
111-
112-
transport := client.http.Transport.(*http.Transport)
113134
c, _ := NewClient(
114-
client.URL,
115-
client.proxyURL,
116-
transport.TLSClientConfig,
117-
client.Username,
118-
client.Password,
119-
nil, // XXX: do not pass params?
120-
client.http.Timeout,
121-
client.compressionLevel,
135+
ClientSettings{
136+
URL: client.URL,
137+
Proxy: client.proxyURL,
138+
TLS: client.tlsConfig,
139+
Username: client.Username,
140+
Password: client.Password,
141+
Parameters: nil, // XXX: do not pass params?
142+
Timeout: client.http.Timeout,
143+
CompressionLevel: client.compressionLevel,
144+
},
122145
)
123146
return c
124147
}

http.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package http
22

33
import (
4-
"crypto/tls"
54
"errors"
65
"net/url"
76
"strings"
@@ -13,6 +12,7 @@ import (
1312
"github.com/elastic/beats/libbeat/outputs"
1413
"github.com/elastic/beats/libbeat/outputs/mode"
1514
"github.com/elastic/beats/libbeat/outputs/mode/modeutil"
15+
"github.com/elastic/beats/libbeat/outputs/transport"
1616
)
1717

1818
type httpOutput struct {
@@ -71,8 +71,13 @@ func (out *httpOutput) init(cfg *common.Config) error {
7171
var maxWaitRetry = time.Duration(60) * time.Second
7272

7373
loadBalance := config.LoadBalance
74-
m, err := modeutil.NewConnectionMode(clients, !loadBalance,
75-
maxAttempts, waitRetry, config.Timeout, maxWaitRetry)
74+
m, err := modeutil.NewConnectionMode(clients, modeutil.Settings{
75+
Failover: !loadBalance,
76+
MaxAttempts: maxAttempts,
77+
Timeout: config.Timeout,
78+
WaitRetry: waitRetry,
79+
MaxWaitRetry: maxWaitRetry,
80+
})
7681
if err != nil {
7782
return err
7883
}
@@ -83,7 +88,7 @@ func (out *httpOutput) init(cfg *common.Config) error {
8388
}
8489

8590
func makeClientFactory(
86-
tls *tls.Config,
91+
tls *transport.TLSConfig,
8792
config *httpConfig,
8893
out *httpOutput,
8994
) func(string) (mode.ProtocolClient, error) {
@@ -111,10 +116,16 @@ func makeClientFactory(
111116
params = nil
112117
}
113118

114-
return NewClient(
115-
hostURL, proxyURL, tls,
116-
config.Username, config.Password,
117-
params, config.Timeout, config.CompressionLevel)
119+
return NewClient(ClientSettings{
120+
URL: hostURL,
121+
Proxy: proxyURL,
122+
TLS: tls,
123+
Username: config.Username,
124+
Password: config.Password,
125+
Parameters: params,
126+
Timeout: config.Timeout,
127+
CompressionLevel: config.CompressionLevel,
128+
})
118129
}
119130
}
120131

0 commit comments

Comments
 (0)