Skip to content

Commit cb0c046

Browse files
committed
Initial code
0 parents  commit cb0c046

File tree

6 files changed

+867
-0
lines changed

6 files changed

+867
-0
lines changed

client.go

Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
package http
2+
3+
import (
4+
"crypto/tls"
5+
"expvar"
6+
"fmt"
7+
"io"
8+
"io/ioutil"
9+
"net/http"
10+
"net/url"
11+
"time"
12+
13+
"github.com/elastic/beats/libbeat/common"
14+
"github.com/elastic/beats/libbeat/logp"
15+
"github.com/elastic/beats/libbeat/outputs/transport"
16+
)
17+
18+
type Client struct {
19+
Connection
20+
params map[string]string
21+
22+
// additional configs
23+
compressionLevel int
24+
proxyURL *url.URL
25+
}
26+
27+
type Connection struct {
28+
URL string
29+
Username string
30+
Password string
31+
32+
http *http.Client
33+
connected bool
34+
35+
encoder bodyEncoder
36+
}
37+
38+
// Metrics that can retrieved through the expvar web interface.
39+
var (
40+
ackedEvents = expvar.NewInt("libbeatHttpPublishedAndAckedEvents")
41+
eventsNotAcked = expvar.NewInt("libbeatHttpPublishedButNotAckedEvents")
42+
publishEventsCallCount = expvar.NewInt("libbeatHttpPublishEventsCallCount")
43+
44+
statReadBytes = expvar.NewInt("libbeatHttpPublishReadBytes")
45+
statWriteBytes = expvar.NewInt("libbeatHttpPublishWriteBytes")
46+
statReadErrors = expvar.NewInt("libbeatHttpPublishReadErrors")
47+
statWriteErrors = expvar.NewInt("libbeatHttpPublishWriteErrors")
48+
)
49+
50+
func NewClient(
51+
hostURL string, proxyURL *url.URL, tls *tls.Config,
52+
username, password string,
53+
params map[string]string,
54+
timeout time.Duration,
55+
compression int,
56+
) (*Client, error) {
57+
proxy := http.ProxyFromEnvironment
58+
if proxyURL != nil {
59+
proxy = http.ProxyURL(proxyURL)
60+
}
61+
62+
logp.Info("Http url: %s", hostURL)
63+
64+
dialer := transport.NetDialer(timeout)
65+
dialer = transport.StatsDialer(dialer, &transport.IOStats{
66+
Read: statReadBytes,
67+
Write: statWriteBytes,
68+
ReadErrors: statReadErrors,
69+
WriteErrors: statWriteErrors,
70+
})
71+
72+
var err error
73+
var encoder bodyEncoder
74+
if compression == 0 {
75+
encoder = newJSONEncoder(nil)
76+
} else {
77+
encoder, err = newGzipEncoder(compression, nil)
78+
if err != nil {
79+
return nil, err
80+
}
81+
}
82+
83+
client := &Client{
84+
Connection: Connection{
85+
URL: hostURL,
86+
Username: username,
87+
Password: password,
88+
http: &http.Client{
89+
Transport: &http.Transport{
90+
Dial: dialer.Dial,
91+
TLSClientConfig: tls,
92+
Proxy: proxy,
93+
},
94+
Timeout: timeout,
95+
},
96+
encoder: encoder,
97+
},
98+
params: params,
99+
100+
proxyURL: proxyURL,
101+
}
102+
103+
return client, nil
104+
}
105+
106+
func (client *Client) Clone() *Client {
107+
// when cloning the connection callback and params are not copied. A
108+
// client's close is for example generated for topology-map support. With params
109+
// most likely containing the ingest node pipeline and default callback trying to
110+
// create install a template, we don't want these to be included in the clone.
111+
112+
transport := client.http.Transport.(*http.Transport)
113+
c, _ := NewClient(
114+
client.URL,
115+
client.proxyURL,
116+
transport.TLSClientConfig,
117+
client.Username,
118+
client.Password,
119+
nil, // XXX: do not pass params?
120+
client.http.Timeout,
121+
client.compressionLevel,
122+
)
123+
return c
124+
}
125+
126+
func (conn *Connection) Connect(timeout time.Duration) error {
127+
conn.connected = true
128+
return nil
129+
}
130+
131+
func (conn *Connection) IsConnected() bool {
132+
return conn.connected
133+
}
134+
135+
func (conn *Connection) Close() error {
136+
conn.connected = false
137+
return nil
138+
}
139+
140+
// PublishEvents posts all events to the http endpoint. On error a slice with all
141+
// events not published will be returned.
142+
func (client *Client) PublishEvents(
143+
events []common.MapStr,
144+
) ([]common.MapStr, error) {
145+
begin := time.Now()
146+
publishEventsCallCount.Add(1)
147+
148+
if len(events) == 0 {
149+
return nil, nil
150+
}
151+
152+
if !client.connected {
153+
return events, ErrNotConnected
154+
}
155+
156+
var failedEvents []common.MapStr
157+
158+
sendErr := error(nil)
159+
for _, event := range events {
160+
sendErr = client.PublishEvent(event)
161+
// TODO more gracefully handle failures return the failed events
162+
// below instead of bailing out directly here:
163+
if sendErr != nil {
164+
return nil, sendErr
165+
}
166+
}
167+
168+
debugf("PublishEvents: %d metrics have been published over HTTP in %v.",
169+
len(events),
170+
time.Now().Sub(begin))
171+
172+
ackedEvents.Add(int64(len(events) - len(failedEvents)))
173+
eventsNotAcked.Add(int64(len(failedEvents)))
174+
if len(failedEvents) > 0 {
175+
return failedEvents, sendErr
176+
}
177+
178+
return nil, nil
179+
}
180+
181+
func (client *Client) PublishEvent(event common.MapStr) error {
182+
if !client.connected {
183+
return ErrNotConnected
184+
}
185+
186+
debugf("Publish event: %s", event)
187+
188+
status, _, err := client.request("POST", "", client.params, event)
189+
if err != nil {
190+
logp.Warn("Fail to insert a single event: %s", err)
191+
if err == ErrJSONEncodeFailed {
192+
// don't retry unencodable values
193+
return nil
194+
}
195+
}
196+
switch {
197+
case status == 0: // event was not send yet
198+
return nil
199+
case status >= 500 || status == 429: // server error, retry
200+
return err
201+
case status >= 300 && status < 500:
202+
// other error => don't retry
203+
return nil
204+
}
205+
206+
return nil
207+
}
208+
209+
func (conn *Connection) request(
210+
method, path string,
211+
params map[string]string,
212+
body interface{},
213+
) (int, []byte, error) {
214+
url := makeURL(conn.URL, path, "", params)
215+
debugf("%s %s %v", method, url, body)
216+
217+
if body == nil {
218+
return conn.execRequest(method, url, nil)
219+
}
220+
221+
if err := conn.encoder.Marshal(body); err != nil {
222+
logp.Warn("Failed to json encode body (%v): %#v", err, body)
223+
return 0, nil, ErrJSONEncodeFailed
224+
}
225+
return conn.execRequest(method, url, conn.encoder.Reader())
226+
}
227+
228+
func (conn *Connection) execRequest(
229+
method, url string,
230+
body io.Reader,
231+
) (int, []byte, error) {
232+
req, err := http.NewRequest(method, url, body)
233+
if err != nil {
234+
logp.Warn("Failed to create request", err)
235+
return 0, nil, err
236+
}
237+
if body != nil {
238+
conn.encoder.AddHeader(&req.Header)
239+
}
240+
return conn.execHTTPRequest(req)
241+
}
242+
243+
func (conn *Connection) execHTTPRequest(req *http.Request) (int, []byte, error) {
244+
req.Header.Add("Accept", "application/json")
245+
if conn.Username != "" || conn.Password != "" {
246+
req.SetBasicAuth(conn.Username, conn.Password)
247+
}
248+
249+
resp, err := conn.http.Do(req)
250+
if err != nil {
251+
conn.connected = false
252+
return 0, nil, err
253+
}
254+
defer closing(resp.Body)
255+
256+
status := resp.StatusCode
257+
if status >= 300 {
258+
conn.connected = false
259+
return status, nil, fmt.Errorf("%v", resp.Status)
260+
}
261+
262+
obj, err := ioutil.ReadAll(resp.Body)
263+
if err != nil {
264+
conn.connected = false
265+
return status, nil, err
266+
}
267+
return status, obj, nil
268+
}
269+
270+
func closing(c io.Closer) {
271+
err := c.Close()
272+
if err != nil {
273+
logp.Warn("Close failed with: %v", err)
274+
}
275+
}

config.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package http
2+
3+
import (
4+
"time"
5+
6+
"github.com/elastic/beats/libbeat/outputs"
7+
)
8+
9+
type httpConfig struct {
10+
Protocol string `config:"protocol"`
11+
Path string `config:"path"`
12+
Params map[string]string `config:"parameters"`
13+
Username string `config:"username"`
14+
Password string `config:"password"`
15+
ProxyURL string `config:"proxy_url"`
16+
LoadBalance bool `config:"loadbalance"`
17+
CompressionLevel int `config:"compression_level" validate:"min=0, max=9"`
18+
TLS *outputs.TLSConfig `config:"tls"`
19+
MaxRetries int `config:"max_retries"`
20+
Timeout time.Duration `config:"timeout"`
21+
}
22+
23+
var (
24+
defaultConfig = httpConfig{
25+
Protocol: "",
26+
Path: "",
27+
ProxyURL: "",
28+
Username: "",
29+
Password: "",
30+
Timeout: 90 * time.Second,
31+
CompressionLevel: 0,
32+
TLS: nil,
33+
MaxRetries: 3,
34+
LoadBalance: true,
35+
}
36+
)
37+
38+
func (c *httpConfig) Validate() error {
39+
if c.ProxyURL != "" {
40+
if _, err := parseProxyURL(c.ProxyURL); err != nil {
41+
return err
42+
}
43+
}
44+
45+
return nil
46+
}

0 commit comments

Comments
 (0)