Skip to content

Commit 0631133

Browse files
committed
[Monolog] Added ElasticsearchLogstashHandler
1 parent 25f1804 commit 0631133

File tree

4 files changed

+265
-0
lines changed

4 files changed

+265
-0
lines changed

src/Symfony/Bridge/Monolog/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
-----
66

77
* The `RouteProcessor` class has been made final
8+
* Added `ElasticsearchLogstashHandler`
89

910
4.3.0
1011
-----
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Bridge\Monolog\Handler;
13+
14+
use Monolog\Formatter\FormatterInterface;
15+
use Monolog\Formatter\LogstashFormatter;
16+
use Monolog\Handler\AbstractHandler;
17+
use Monolog\Logger;
18+
use Symfony\Component\HttpClient\HttpClient;
19+
use Symfony\Contracts\HttpClient\Exception\ExceptionInterface;
20+
use Symfony\Contracts\HttpClient\HttpClientInterface;
21+
22+
/**
23+
* Push logs directly to Elasticsearch and format them according to Logstash specification.
24+
*
25+
* This handler dials directly with the HTTP interface of Elasticsearch. This
26+
* means it will slow down your application if Elasticsearch takes times to
27+
* answer. Even if all HTTP calls are done asynchronously.
28+
*
29+
* In a development environment, it's fine to keep the default configuration:
30+
* for each log, an HTTP request will be made to push the log to Elasticsearch.
31+
*
32+
* In a production environment, it's highly recommended to wrap this handler
33+
* in a handler with buffering capabilities (like the FingersCrossedHandler, or
34+
* BufferHandler) in order to call Elasticsearch only once with a bulk push. For
35+
* even better performance and fault tolerance, a proper ELK (https://www.elastic.co/what-is/elk-stack)
36+
* stack is recommended.
37+
*
38+
* @author Grégoire Pineau <lyrixx@lyrixx.info>
39+
*/
40+
class ElasticsearchLogstashHandler extends AbstractHandler
41+
{
42+
private $endpoint;
43+
private $index;
44+
private $client;
45+
private $responses;
46+
47+
public function __construct(string $endpoint = 'http://127.0.0.1:9200', string $index = 'monolog', HttpClientInterface $client = null, int $level = Logger::DEBUG, bool $bubble = true)
48+
{
49+
if (!interface_exists(HttpClientInterface::class)) {
50+
throw new \LogicException(sprintf('The %s handler needs an HTTP client. Try running "composer require symfony/http-client".', __CLASS__));
51+
}
52+
53+
parent::__construct($level, $bubble);
54+
$this->endpoint = $endpoint;
55+
$this->index = $index;
56+
$this->client = $client ?: HttpClient::create(['timeout' => 1]);
57+
$this->responses = new \SplObjectStorage();
58+
}
59+
60+
public function handle(array $record): bool
61+
{
62+
if (!$this->isHandling($record)) {
63+
return false;
64+
}
65+
66+
$this->sendToElasticsearch([$record]);
67+
68+
return !$this->bubble;
69+
}
70+
71+
public function handleBatch(array $records): void
72+
{
73+
$records = array_filter($records, [$this, 'isHandling']);
74+
75+
if ($records) {
76+
$this->sendToElasticsearch($records);
77+
}
78+
}
79+
80+
protected function getDefaultFormatter(): FormatterInterface
81+
{
82+
return new LogstashFormatter('application', null, null, 'ctxt_', LogstashFormatter::V1);
83+
}
84+
85+
private function sendToElasticsearch(array $records)
86+
{
87+
$formatter = $this->getFormatter();
88+
89+
$body = '';
90+
foreach ($records as $record) {
91+
if ($this->processors) {
92+
foreach ($this->processors as $processor) {
93+
$record = $processor($record);
94+
}
95+
}
96+
97+
$body .= json_encode([
98+
'index' => [
99+
'_index' => $this->index,
100+
'_type' => '_doc',
101+
],
102+
]);
103+
$body .= "\n";
104+
$body .= $formatter->format($record);
105+
$body .= "\n";
106+
}
107+
108+
$response = $this->client->request('POST', $this->endpoint.'/_bulk', [
109+
'body' => $body,
110+
'headers' => [
111+
'Content-Type' => 'application/json',
112+
],
113+
]);
114+
115+
$this->responses->attach($response);
116+
117+
$this->wait(false);
118+
}
119+
120+
public function __destruct()
121+
{
122+
$this->wait(true);
123+
}
124+
125+
private function wait(bool $blocking)
126+
{
127+
foreach ($this->client->stream($this->responses, $blocking ? null : 0.0) as $response => $chunk) {
128+
try {
129+
if ($chunk->isTimeout() && !$blocking) {
130+
continue;
131+
}
132+
if (!$chunk->isFirst() && !$chunk->isLast()) {
133+
continue;
134+
}
135+
if ($chunk->isLast()) {
136+
$this->responses->detach($response);
137+
}
138+
} catch (ExceptionInterface $e) {
139+
$this->responses->detach($response);
140+
error_log(sprintf("Could not push logs to Elasticsearch:\n%s", (string) $e));
141+
}
142+
}
143+
}
144+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Bridge\Monolog\Tests\Handler;
13+
14+
use Monolog\Formatter\FormatterInterface;
15+
use Monolog\Formatter\LogstashFormatter;
16+
use Monolog\Logger;
17+
use PHPUnit\Framework\TestCase;
18+
use Symfony\Bridge\Monolog\Handler\ElasticsearchLogstashHandler;
19+
use Symfony\Component\HttpClient\MockHttpClient;
20+
use Symfony\Component\HttpClient\Response\MockResponse;
21+
22+
class ElasticsearchLogstashHandlerTest extends TestCase
23+
{
24+
public function testHandle()
25+
{
26+
$callCount = 0;
27+
$responseFactory = function ($method, $url, $options) use (&$callCount) {
28+
$body = <<<EOBODY
29+
{"index":{"_index":"log","_type":"_doc"}}
30+
{"@timestamp":"2020-01-01T00:00:00.000000+01:00","@version":1,"host":"my hostname","message":"My info message","type":"application","channel":"app","level":"INFO"}
31+
32+
33+
EOBODY;
34+
35+
$this->assertSame('POST', $method);
36+
$this->assertSame('http://es:9200/_bulk', $url);
37+
$this->assertSame($body, $options['body']);
38+
$this->assertSame('content-type: application/json', $options['request_headers'][0]);
39+
++$callCount;
40+
41+
return new MockResponse();
42+
};
43+
44+
$handler = new ElasticsearchLogstashHandlerWithHardCodedHostname('http://es:9200', 'log', new MockHttpClient($responseFactory));
45+
46+
$record = [
47+
'message' => 'My info message',
48+
'context' => [],
49+
'level' => Logger::INFO,
50+
'level_name' => Logger::getLevelName(Logger::INFO),
51+
'channel' => 'app',
52+
'datetime' => new \DateTime('2020-01-01T00:00:00+01:00'),
53+
'extra' => [],
54+
];
55+
56+
$handler->handle($record);
57+
58+
$this->assertSame(1, $callCount);
59+
}
60+
61+
public function testBandleBatch()
62+
{
63+
$callCount = 0;
64+
$responseFactory = function ($method, $url, $options) use (&$callCount) {
65+
$body = <<<EOBODY
66+
{"index":{"_index":"log","_type":"_doc"}}
67+
{"@timestamp":"2020-01-01T00:00:00.000000+01:00","@version":1,"host":"my hostname","message":"My info message","type":"application","channel":"app","level":"INFO"}
68+
69+
{"index":{"_index":"log","_type":"_doc"}}
70+
{"@timestamp":"2020-01-01T00:00:01.000000+01:00","@version":1,"host":"my hostname","message":"My second message","type":"application","channel":"php","level":"WARNING"}
71+
72+
73+
EOBODY;
74+
75+
$this->assertSame('POST', $method);
76+
$this->assertSame('http://es:9200/_bulk', $url);
77+
$this->assertSame($body, $options['body']);
78+
$this->assertSame('content-type: application/json', $options['request_headers'][0]);
79+
++$callCount;
80+
81+
return new MockResponse();
82+
};
83+
84+
$handler = new ElasticsearchLogstashHandlerWithHardCodedHostname('http://es:9200', 'log', new MockHttpClient($responseFactory));
85+
86+
$records = [
87+
[
88+
'message' => 'My info message',
89+
'context' => [],
90+
'level' => Logger::INFO,
91+
'level_name' => Logger::getLevelName(Logger::INFO),
92+
'channel' => 'app',
93+
'datetime' => new \DateTime('2020-01-01T00:00:00+01:00'),
94+
'extra' => [],
95+
],
96+
[
97+
'message' => 'My second message',
98+
'context' => [],
99+
'level' => Logger::WARNING,
100+
'level_name' => Logger::getLevelName(Logger::WARNING),
101+
'channel' => 'php',
102+
'datetime' => new \DateTime('2020-01-01T00:00:01+01:00'),
103+
'extra' => [],
104+
],
105+
];
106+
107+
$handler->handleBatch($records);
108+
109+
$this->assertSame(1, $callCount);
110+
}
111+
}
112+
113+
class ElasticsearchLogstashHandlerWithHardCodedHostname extends ElasticsearchLogstashHandler
114+
{
115+
protected function getDefaultFormatter(): FormatterInterface
116+
{
117+
return new LogstashFormatter('application', 'my hostname', null, 'ctxt_', LogstashFormatter::V1);
118+
}
119+
}

src/Symfony/Bridge/Monolog/composer.json

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
},
2424
"require-dev": {
2525
"symfony/console": "^3.4|^4.0|^5.0",
26+
"symfony/http-client": "^4.4|^5.0",
2627
"symfony/security-core": "^3.4|^4.0|^5.0",
2728
"symfony/var-dumper": "^3.4|^4.0|^5.0"
2829
},

0 commit comments

Comments
 (0)