Skip to content

Commit c6d69ff

Browse files
Logstash sample
1 parent f94f5be commit c6d69ff

File tree

2 files changed

+68
-0
lines changed

2 files changed

+68
-0
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
input {
2+
stdin {
3+
type => "mysql_event"
4+
format => "json_event"
5+
debug => true
6+
}
7+
}
8+
output {
9+
stdout { debug => true debug_format => "json"}
10+
elasticsearch { embedded => true }
11+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
#
5+
# Output logstash events to the console
6+
#
7+
# You can pipe it to logstash like this:
8+
# python examples/logstash/mysql_to_logstash.py | java -jar logstash-1.1.13-flatjar.jar agent -f examples/logstash/logstash-simple.conf
9+
10+
import json
11+
import time
12+
import sys
13+
14+
from pymysqlreplication import BinLogStreamReader
15+
from pymysqlreplication.row_event import (
16+
DeleteRowsEvent,
17+
UpdateRowsEvent,
18+
WriteRowsEvent,
19+
)
20+
21+
MYSQL_SETTINGS = {
22+
"host": "127.0.0.1",
23+
"port": 3306,
24+
"user": "root",
25+
"passwd": ""
26+
}
27+
28+
29+
def main():
30+
stream = BinLogStreamReader(
31+
connection_settings=MYSQL_SETTINGS,
32+
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])
33+
34+
for binlogevent in stream:
35+
for row in binlogevent.rows:
36+
event = {}
37+
event["schema"] = binlogevent.schema
38+
event["table"] = binlogevent.table
39+
40+
if isinstance(binlogevent, DeleteRowsEvent):
41+
event["action"] = "delete"
42+
event = dict(event.items() + row["values"].items())
43+
elif isinstance(binlogevent, UpdateRowsEvent):
44+
event["action"] = "update"
45+
event = dict(event.items() + row["after_values"].items())
46+
elif isinstance(binlogevent, WriteRowsEvent):
47+
event["action"] = "insert"
48+
event = dict(event.items() + row["values"].items())
49+
print json.dumps(event)
50+
sys.stdout.flush()
51+
52+
53+
stream.close()
54+
55+
56+
if __name__ == "__main__":
57+
main()

0 commit comments

Comments
 (0)