Big Data Hadoop PDF
Big Data Hadoop PDF
Big Data Hadoop PDF
Autor:
Sergio Losada Rodríguez
Tutor:
Pablo Nebrera Herrera
Profesor asociado
El tribunal nombrado para juzgar el Proyecto arriba indicado, compuesto por los siguientes miembros:
Presidente:
Vocales:
Secretario:
Sevilla, 2014
Este proyecto marca el final de una de las etapas más intensas e inolvidables de mi vida como es la
Universidad, pero el comienzo de todo un mundo de posibilidades, caminos y objetivos.
Quiero agradecer haber podido llegar hasta aquí a cada uno de los miembros de mi familia, a los que están y
los que estuvieron. Sobre todo a mis padres por el esfuerzo que realizan día a día para que yo haya podido
tener la oportunidad de realizar estos estudios, ¡muchísimas gracias a los dos! Sin olvidarme de esos pequeños
corazones que tengo como primas que solo pueden transmitir alegría y felicidad.
Tampoco olvidaré todos los buenos ratos y estudios con “BLAAAS MIT”, ellos han sido los culpables de que
estos cuatro años hayan sido una aventura que hemos podido vivir juntos.
Agradecer también a Kannan Kalidasan por ayudarme en mis primeros pasos con Hadoop y siempre estar
disponible para solucionar algún problema, y a cada persona que voluntaria o involuntariamente ha puesto un
grano de arena en este proyecto.
Por último, dar las gracias a mi tutor Pablo Nebrera y a la empresa ENEO Tecnología por permitirme realizar
este proyecto con ellos, ser parte del equipo, y hacer que mi primera experiencia en una empresa sea
inolvidable.
i
Resumen
Este proyecto realiza una inmersión en el tema de Big Data. Su objetivo es el almacenamiento de grandes
cantidades de eventos de seguridad utilizando para ello una base de datos distribuida, para que posteriormente
dicha información pueda ser filtrada y extraida realizando consultas a la misma. El estudio se centrará en la
configuración e implantación de la arquitectura Big Data al proyecto redBorder Horama, y posteriormente, se
realizará un estudio estadístico (DataMining) sobre los datos que sean consultados, en búsqueda de anomalías,
agrupamientos o clasificaciones.
Para ello se han utilizado distintas herramientas de Apache, como pueden ser Hadoop, para la base de
datos; Ambari, encargada de instalaciones y monitorización; Pig, para las consultas a Hadoop; Kafka, para el
sistema de mensajería y otras no pertenecientes a Apache como Snort, para la captura de los eventos o la
librería WEKA, para el uso de funciones estadísticas en la parte del análisis y procesamiento de los datos.
El proyecto ha sido una introducción al mundo de BigData, a los problemas que están resueltos y a todos
aquellos que aún quedan por resolver, las distintas herramientas existentes y cómo cooperan entre ellas para
resolver dichos problemas ante tales cantidades de eventos. Y, sobre todo, la importancia del análisis de datos
en la actualidad con herramientas o funciones estadísticas para obtener una mejor comprensión de los mismos,
útiles para empresas en cuestión de rendimiento, seguridad o marketing.
iii
Abstract
The overall objective of this project is to collect and store massive volume of security event logs in
distributed system. Later the required information are retrieved by processing the data and filtered by
consulting the database. Also this study will focus on configuration and integration of Big Data
architecture into redBorder Horama platform. Finally statistical study of the data will be done by using
Data Mining techniques like outlier detection, clustering or classifications.
Highly scalable and most popular Big Data stacks are used for different purpose to achieve the project's
goal. Hadoop as Distributed processing Software . On top of Hadoop layer, there are various integrated
processing systems like Pig as consulting agent with database, Kafka used as messaging system , Snort as
detection system of network intrusion. For data analysis and statistical modeling , Weka Libraries are
used.
This project leveraged and integrated the Big Data technologies with current system to solve the problems
and provide real time solutions in network events based processing. Additionally, through data analytics
functionalities , it helps to get better data insights about the events, which benefits the companies to find
the hidden business opportunities in terms of performance, security and marketing.
v
Índice
Agradecimientos
i
Resumen
iii
Abstract
v
Índice
vii
Índice
de
Tablas
ix
Índice
de
Figuras
xi
1
Introducción
1
1.1
Motivación
1
1.2
Objetivos
1
1.3
Metodología
1
2
Arquitectura
3
2.1
ZooKeeper
3
2.2
Apache
Kafka
4
2.2.1
Introducción
4
2.2.2
Arquitectura
y
funcionamiento
4
2.3
Snort
7
2.3.1
Introducción
7
2.3.2
Arquitectura
y
funcionamiento
7
2.3.3
Alternativas
9
2.3.4
Uso
en
el
proyecto
10
2.4
Camus
10
3
Hadoop
13
3.1
Introducción
13
3.2
HDFS
14
3.2.1
Diseño
14
3.2.2
Demonios
14
3.2.3
Leer
y
escribir
datos
16
3.2.4
Alta
disponibilidad
17
3.2.5
Herramientas
de
línea
de
comandos
19
3.3
MapReduce
&
YARN
20
3.3.1
Introducción
20
3.3.2
Fases
MapReduce
20
3.3.3
Arquitectura
y
demonios
22
3.4
Puesta
en
marcha
23
3.4.1
Instalación
23
3.4.2
Configuración
24
3.4.3
Arranque
27
3.5
Hadoop
Single
Node
&
Hadoop
Clúster
28
3.5.1
Single
Node
28
3.5.2
Clúster
mode
33
vii
3.6
Mantenimiento
del
clúster
35
3.6.1
Añadir
o
dar
de
baja
un
DataNode
35
3.6.2
Comprobar
la
salud
e
integridad
del
Sistema
de
ficheros
35
3.6.3
Balanceo
de
bloques
de
datos
en
HDFS
36
3.7
Monitorización
-‐
Ambari
37
3.8
Backup
and
Recovery
41
4
Pig
43
4.1
Introducción
43
4.2
Filosofía
de
PIG
43
4.3
Latin
Pig
44
4.3.1
Alternativas
44
4.4
Objetivos
e
implementación
44
4.4.1
Script
en
Ruby
46
4.4.2
Ejecución
47
5
DataMining
49
5.1
Introducción
DataMining
49
5.2
Objetivos
50
5.3
Clasificación
52
5.4
Clústerización
60
5.5
Outlier
Detection
62
6
Presupuesto
63
7
Conclusiones
64
7.1
Conclusiones
64
7.2
Mejoras
y
líneas
futuras
64
Referencias
67
Anexo
A
:
Configuración
de
Hadoop
70
Anexo
B
:
Script
de
Ruby
82
Anexo
C
:
DataMining
Code
(Java)
86
Índice
de
Conceptos
111
ÍNDICE DE TABLAS
ix
ÍNDICE DE FIGURAS
xi
Figura 35: (Ambari) Opciones de instalación 38
Figura 36: (Ambari) Proceso de instalación de los nodos 1 38
Figura 37: (Ambari) Proceso de instalación de los nodos 2 39
Figura 38: Selección de demonios para cada nodo 39
Figura 39: Añadir propiedades 40
Figura 40: Resumen de la instalación 40
Figura 41: Proceso de instalación de demonios 40
Figura 42: Monitorización mediante Ambari 41
Figura 43: Datos, información y conocimiento 50
Figura 44: Formato JSON de los eventos 54
Figura 45: UI DataMining 54
Figura 46: Algoritmos de clasificación 54
Figura 47: Directorio de resultados de clasificación 55
Figura 48: J48 tree 56
Figura 49: Resumen de los resultados del J48 57
Figura 50: Todos los resultados de algoritmos de clasificación 57
Figura 51: resultado del análisis de clasificación 59
Figura 52: Mejores métodos de clasificación 58
Figura 53: Opciones de agrupamiento 59
Figura 54: Resultados agrupamiento (1 atributo) 61
Figura 55: Mejores resultados de agrupamiento 60
Figura 56: Partes de los mejores métodos por separado que después se sumarán 61
Figura 57: Mejores agrupadores para el DataSet completo 60
Figura 58: Resultado del análisis de detección de anomalías 62
Figura 59: Resumen del análisis de detección de anomalías 61
xiii
1 INTRODUCCIÓN
1.1 Motivación
Desde hace unos años la cantidad de datos que se están generando ha incrementado exponencialmente,
prediciéndose que el tráfico fluyendo a través de internet en 2014 alcanzará aproximadamente los 667
exabytes (106 TB). Gran parte de esa enorme cantidad de datos termina almacenándose en discos duros de
hogares, bases de datos de empresas, etc.
Este hecho dificulta el proceso de almacenamiento y procesamiento de los mismos. Por ejemplo Facebook
debe procesar 300 millones de fotos al día y 2.7 billones de Likes lo que hace aproximadamente un total de
105 TB cada media hora. Aprovechando que el hardware también está mejorando sus prestaciones
(procesadores, memorias…) hacen falta algoritmos y software que lo acompañen para ser capaz de procesar
tales cantidades de información. Es lo que se conoce en la actualidad con el término Big Data.
En este proyecto se aplicará dicho software y algoritmos a eventos de seguridad recibidos en una red,
recogidos por un IPS, y se incorporarán al proyecto redBorder Horama, donde deberán trabajar conjuntamente
con el resto de servicios del mismo.
1.2 Objetivos
En primer lugar, el propósito general de este proyecto se puede dividir en tres partes: Almacenamiento,
consultas y procesamiento de los datos.
Debe abarcar desde que cientos, miles o millones de eventos que capture el IPS sean almacenados en una base
de datos capaz de trabajar con tales cantidades de información. Que toda esa información se pueda extraer
posteriormente de forma fácil y filtrándola según las necesidades del usuario como cualquier consulta a una
base de datos.
Y por último, ser capaz de procesar esos datos con una serie de algoritmos que muestren que información
esconden miles o millones de eventos.
1.3 Metodología
Antes de comenzar con el proyecto, el primer capítulo mostrarán los pilares sobre los que se sostiene el
mismo, de dónde provienen esos datos, cómo han sido recogidos, y cómo se van encaminando hasta el
corazón del proyecto, que será la base de datos distribuida Apache Hadoop (Capítulo 3).
En Apache Hadoop será donde se almacenen todos los eventos de forma estructurada y distribuida en distintos
nodos. Se abarcarán todos los posibles campos que implica trabajar con esta base de datos, como pueden ser
1
2 Introducción
instalación (tanto manualmente como a través de un wizard), escritura, lectura, distribución, copias de
seguridad, monitorización…
Una vez todos esos datos se encuentran correctamente almacenados se abarcará el proceso de extracción de los
mismos, que se podrá ver en el Capítulo 4, en el que se utilizará una herramienta también de Apache que
trabaja conjuntamente con Hadoop denominada Pig.
En el Capítulo 5 se mostrarán los distintos algoritmos y cómo se han implementado para procesar la
información que se ha estado almacenando. Y por supuesto la utilidad de cada uno y que tipo de conocimiento
aportan.
Finalmente en el último Capítulo se mostrarán las conclusiones del proyecto y las futuras líneas de mejora para
el mismo.
2 ARQUITECTURA
E
ste capítulo se centrará en todos los componentes de la arquitectura del proyecto que son necesarios
comprender para el posterior entendimiento del resto de los componentes fundamentales, como Hadoop
o Pig.
Las herramientas que se detallan a continuación, como ZooKeeper, Kafka, Snort o Camus son los pilares sobre
los que se sostiene el proyecto. La buena configuración y que realicen correctamente las funciones que les
corresponde es fundamental para conseguir trabajar con Hadoop (Capítulo 3).
Se comenzará viendo una introducción a cada uno, las partes fundamentales del mismo y que funcionalidad
cumple en este proyecto.
2.1 ZooKeeper
ZooKeeper es un servicio de coordinación de alto rendimiento para aplicaciones distribuidas que permite
mantener conexiones estables entre servidores con distintas tecnologías.
Pertenece a Apache Software Foundation, aunque es un subproyecto de Hadoop. Provee un servicio de
configuración centralizada y registro de nombres de código abierto para grandes Sistemas Distribuidosi,
sincronización… todo ello en una interfaz simple.
Empezó como parte de Hadoop pero el interés despertado y su uso general hizo que pasara a ser promovido
por Apache Top Level Project. La arquitectura de ZooKeeper soporta alta disponibilidadii a través de servicios
redundantes. Existen maestros y clientes. Se puede tener más de un maestro dando la opción a los clientes de
acceder a la información en cualquiera de los mismos, aunque solo uno (leader) puede escribir en disco para
guardar la información, como puede apreciarse en la Figura 1. Los nodos ZooKeeper guardan sus datos en un
espacio de nombres jerárquico como hace un sistema de archivos. Los clientes pueden leer y escribir desde/a
los nodos y de esta forma tienen un servicio de configuración compartido.
3
4 Arquitectura
El motivo de esta breve introducción a ZooKeeper es porque se usa en bastantes sistemas distribuidos como
Apache Kafka y Apache Hadoop, dos de los pilares de este proyecto.
Hadoop ha incorporado ZooKeeper en su funcionamiento a partir de la versión 2.X (las cuales se verán en este
proyecto). Hadoop eligió aprovechar la gestión de clústers de ZooKeeper para agrupar capacidades en vez de
desarrollar la suya propia, por lo que en resumen, ZooKeeper proporciona servicios operacionales a los
clústers de Hadoop.
2.2.1 Introducción
Apache Kafka es un sistema de mensajería Publish-Suscribe distribuido y es capaz de ofrecer un alto
rendimiento. En este proyecto se usa ya que es el lugar por el que Hadoop consigue toda la información con la
que trabajará. Está diseñado con los siguientes objetivos:
• Mensajería persistente que proporciona un rendimiento constante en el tiempo.
• Alto rendimiento: Incluso con hardware modesto, un único broker puede manejar cientos de
megabytes de lectura y escritura por segundo desde miles de clientes.
• Soporte para la partición de mensajes a través de los servidores de Kafka y consumo distribuido en
un clúster de máquinas consumidoras, manteniendo la ordenación por partición.
• Soporte para la carga de datos en paralelo en Hadoop.
• Distribuido: Los mensajes se conservan en el disco y se replican dentro del clúster para evitar la
pérdida de datos. Cada broker puede manejar terabytes de mensajes sin impacto en el rendimiento.
Kafka proporciona un mecanismo de carga paralela en Hadoop, así como la capacidad de partición en tiempo
real del consumo en un clúster.
Utiliza ZooKeeper para el descubrimiento de nodos y sincronizar todo el clúster. Como se puede apreciar en la
Figura anterior, se pueden distinguir distintos tipos de componentes además de otros que se explican a
continuación:
Ø Topic y Logs: Un topic es una categoría donde los mensajes son publicados. Para cada topic, el clúster
de Kafka mantiene un log particionado como se puede apreciar en la Figura 3.
Ø Consumidores: Kafka proporciona a los consumidores una abstracción que engloba los modelos de
mensajería queuing y publish-subscribe, y se denomina el consumer group. Los consumidores se
etiquetan con el nombre de un consumer group, y cada mensaje publicado en un topic se entrega a
la instacia de un consumidor dentro de su consumer group. Dichas instancias pueden estar
separadas en distintos procesos o máquinas.
Si todas las instancias del consumidor pertenecen al mismo consumer group, entonces la cola
trabaja como una cola que balancea carga a través de los consumidores. En cambio, si las
instancias de los consumidores pertenecen a distintos grupos, la cola trabajará de la forma publish-
subscribe y todos los mensajes serán transmitidos a todos los consumidores. De todas formas, para
conseguir escalabilidad y alta disponibilidad cada grupo consta de múltiples instancias de
consumidor.
Como se dijo al explicar ZooKeeper, éste es usado por Apache Kafka, los consumidores lo usan para
mantener la referencia del offset por el que van leyendo.
Por ejemplo, como se puede apreciar en la Figura 4, disponemos de tres productores, dos brokers y dos
consumidores, el primero con tres instancias y el segundo con dos instancias.
Big Data, Hadoop y DataMining sobre eventos de seguridad 7
2.3 Snort
2.3.1 Introducción
Snort trabaja en un importante campo en el mundo de la seguirdad de la red, es multiplataforma y una
herramienta ligera de detección de intrusiones de red que puede usarse para vigilar redes TCP/IP y detectar
una amplia variedad de tráfico de red sospechoso, así como ataques.
Snort también puede ser desplegado rápidamente para llenar agujeros portenciales en la cobertura de la
seguridad de una red, como por ejemplo, cuando un nuevo ataque surge y los proveedores de seguridad
comerciales son lentos para liberar nuevas firmas de reconocimiento a dicho ataque.
Por tanto puede decirse que Snort es un sniffer y logger de paquetes basado en libpcapiii, que puede utilizarse
como un sistema de detección de intrusos en una red (NIDS). Cuenta con normas utilizadas para realizar
patrones de coincidencia de contenidos y detectar distintos tipos de ataques, tales como desbordamiento del
buffer, escaneo de puertos, ataques CGIiv y muchos más.
Snort tiene la capacidad de dar la alerta en tiempo real, con alertas que son enviadas a syslogv. Su motor de
detección se programa usando un lenguaje simple que describe la pareja <tests de paquete, acción a realizar>.
Su facilidad de uso simplifica y acelera el desarrollo de nuevas reglas de detección (llegando a obtener reglas
de exploits en cuestión de horas).
• Preprocesadores. Permiten extender las funcionalidades preparando los datos para la detección.
Existen diferentes tipos de preprocesadores dependiendo del tráfico que se quiera analizar.
• Motor de detección. Analiza los paquetes en base a las reglas definidas para detectar los ataques.
• Archivo de reglas. Definen el conjunto de reglas que regirán el análisis de los paquetes detectados.
• Plugins de detección. Partes del software que son compilados con Snort y se usan para modificar el
motor de detección.
• Plugins de salida. Permiten definir qué, cómo y dónde se guardan las alertas y los correspondientes
paquetes de red que las generaron. Pueden ser archivos de texto, bases de datos, servidor syslog,
etc.
2.3.3 Alternativas
Snort no es la única solución disponible en el mercado para los mismos fines. A continuación se verán algunos
de los rivales que pueden encontrarse, con sus respectivas ventajas e inconvenientes.
2.3.3.1 tcpdump
Snort es bastante similar a tcpdumpvi pero se centra más en aplicaciones de seguridad de rastreo de paquetes.
La mayor de las características que Snort tiene, mientras que tcpdump no, es la inspección del paylaodvii. Snort
decodifica la capa de aplicación de un paquete y aplica reglas que recogen el tráfico que tiene datos específicos
contenidos dentro de dicha capa. Esto permite a Snort detectar muchos tipos de actividades hostiles contenidas
en la carga útil del paquete.
Otra ventaja es que la pantalla de salida decodificada de Snort es algo más fácil de usar que la salida de
tcpdump. Snort se centra en la recogida de los paquetes tan pronto como sea posible y procesarlos en su motor
de detección.
Una de las características más importante que ambos comparten es la capacidad de filtrar el tráfico con
comandos Berkeley Packet Filter (BPFviii). Esto permite que el tráfico sea recogido basándose en una variedad
de campos específicos. Por ejemplo, mediante comandos BPF ambas herramientas pueden ser instruidas para
procesar el tráfico TCP solamente. Mientras que tcpdump recogería todo el tráfico TCP, Snort puede utilizar
sus reglas flexibles para realizar funciones adicionales, como buscar y guardar sólo aquellos paquetes que
tienen en su campo TCP un determinado contenido o que contienen peticiones web que asciendan hasta una
vulnerabildiad CGI.
2.3.3.2 NFR
NFR es un IDS que da a los usuarios una herramienta de gran alcance para luchar contra el acceso ilegal a una
red. Con la flexibilidad que aporta, el administrador de red puede saber un poco mejor acerca de quién tiene
acceso a su red o qué tipo de tráfico generan sus empleados.
Snort comparte algunos de los conceptos funcionales con NFR, pero NFR es una herramienta de análisis de
red más flexible y completa. Consecuentemente implica más complejidad, por ejemplo, escribir una regla de
Snort para detectar un nuevo ataque toma solo unos minutos una vez que se ha encontrado la firma del ataque,
mientras que realizar lo mismo en NFR conlleva más tiempo, y en temas de seguridad la velocidad con la que
frenar los ataques puede ser crucial.
2.3.3.3 Suricata
Durante años, Snort ha sido el estándar para la detección en código abierto de sistemas de intrusión y
prevención (IDS/IPS). Su motor combina los beneficios de las firmas, los protocolos y la inspección basada en
anomalías y se ha convertido en una de las mayores implementaciones de IDS/IPS en el mundo.
Suricata va por el mismo camino aunque se encuentra menos extendido. Fundado por la Open Information
Security Foundation (OISF), también se basa en firmas pero integra algunas técnicas revolucionarias. Su
motor de detección incorpora un normalizador y analizador http que proporciona un procesamiento muy
avanzado de secuencias http, lo que permite la comprensión de tráfico en el séptimo nivel del modelo OSI.
En pruebas de rendimiento realizadas por expertos se puede comprobar que aún hoy día Snort es más preciso y
efectivo que Suricata aunque la evolución de este último esta convirtiéndole en un digno rival de Snort.
10 Arquitectura
2.4 Camus
Camus es una tubería entre la cola Kafka y HDFS (Hadoop Data File System). Sobre HDFS se hablará
detalladamente en la parte de Hadoop, pero en resumen es el sistema de ficheros donde se quieren almacenar
los eventos.
Camus es una tarea MapReduce (Capítulo 3.3) que distribuye los datos que va obteniendo de la cola Kafka.
Sus características son las siguientes:
Big Data, Hadoop y DataMining sobre eventos de seguridad 11
Es usado en LinkedIn donde procesa 10 billones de mensajes cada día, por lo que el rendimiento es muy bueno
pese a que es un proyecto joven.
Posteriormente en el apartado de HDFS (Capítulo 3.2) se detallarán los directorios que se van a crear y cómo
va a organizarse la información, pero se puede adelantar que para la ejecución de Camus, utilizaremos Cron.
Cron es un demonio que ejecuta comandos en unos intervalos determinados. En este proyecto se ha
configurado Cron para que se ejecute de hora en hora, de manera que crea un árbol como el mostrado en la
siguiente Figura, en la que los directorios se van ramificando hasta llegar a las horas, donde en su interior se
encontrarán los eventos almacenados en formato .json.gz.
En la Figura 9 se puede apreciar un esquema de esta parte del camino que siguen los eventos. Cómo los
eventos procedentes de Snort pasan por la cola Kafka, y camus los va recogiendo de hora en hora, e
insertándolos en HDFS.
H
adoop es en la actualidad el software de código abierto más extendido en el mundo del Big Data que
sirve para almacenar y analizar cantidades masivas de datos.
En este capítulo se explotará este software desde distintos puntos de vista. Viendo sus partes, como se
comunican entre ellas, su puesta en marcha, mantenimiento, monitorización, copias de seguridad y
recuperación de las misma, etc.
3.1 Introducción
En estos últimos años, ha habido una importante diferencia en el almacenamiento, manejo y procesamiento de
los datos. Las compañías están almacenando más datos de más fuentes en más formatos con los que nunca
antes habían trabajado.
Con el análisis de esos datos se busca conocer más sobre lo que representan (ya pueden ser las personas,
buscadores, logs, o cualquier cosa que sea relevante para una organización). El almacenamiento y
procesamiento de esos datos no es un problema nuevo, el fraude en el comercio, detección de anomalías,
análisis demográfico y muchas otras aplicaciones han tenido que tratar con estos problemas durante décadas.
Apache Hadoop proporciona una infraestructura pragmática, rentable y escalable para construir muchos tipos
de aplicaciones que resuelven diversos tipos de problemas de los que se ha hablado antes. Compuesto por un
sistema de ficheros distribuido llamado Hadoop Distributed Filesystem y una capa de computación que
implementa todo el procesamiento llamada MapReduce, Hadoop es un proyecto Open Source y un sistema
capaz de procesar enormes cantidades de datos.
Hadoop usa un clúster sin un hardware ni infraestructura de red en particular, funciona como una sola
plataforma donde se desarrollan todos los procesos. La computación perteneciente a MapReduce está realizada
en paralelo, para que proporcione a los desarrolladores un nivel de abstracción que les permita obviar temas de
sincronización entre sus distintos nodos.
El interés y la investigación que Hadoop ha levantado entre sus usuarios ha creado un ecosistema sobre este
software, tanto de forma comercial como open Source. Algunos de los subproyectos que se han generado a
raíz de Hadoop son: Hive, Pig, ZooKeeper o Ambari entre otros (algunos de ellos se verán en este proyecto).
La primera de las características que identifican a este software es su sistema de ficheros de datos (HDFS) que
se verá en el siguiente apartado.
13
14 Hadoop
3.2 HDFS
3.2.1 Diseño
HDFS sigue en muchos aspectos el diseño tradicional del sistema de archivos. Se almacenan como bloques y
metadatos, y se guarda la localización de los mismos para después obtener los datos con un mapeo de los
bloques que lo componen, estructura del árbol, permisos, etc. Muy similar a ext3, por ejemplo.
HDFS es lo que se denomina un sistema de ficheros en espacio de usuario, que es una manera de decir que el
sistema de ficheros se ejecuta fuera del Kernel. Esto nos proporciona simpleza, flexibilidad y posiblemente
más seguro a la hora de ponerlo en práctica. Consecuentemente, no se monta HDFS como se realizaría con
ext3 y requiere aplicaciones explícitamente para su uso.
Otra de los puntos más fuertes de HDFS es que es un sistema de ficheros distribuido. Estos tipos de sistemas
se usan para superar el límite que un disco duro individual o una máquina puede proporcionar. Cada máquina
del clúster almacena un subconjunto de datos que componen el sistema de archivos completo con la idea de
tener varias máquinas con distintos discos duros y así distribuir toda la información en ellos. Los metadatos se
almacenan en un servidor centralizado actuando como un directorio de bloques y dotándolo de una visión
global del estado del sistema de archivos.
Otra diferencia con respecto al resto de sistemas de archivos es su tamaño de bloque. Es común que estos
sistemas utilicen un tamaño de 4KB o 8KB para sus datos. Hadoop en cambio utiliza un tamaño de bloque
significativamente mayor, 64MB por defecto, aunque los administradores de este tipo de clústers lo suelen
elevar a 128MB o 256MB. El aumento del tamaño de los bloques provoca que los datos se escriban en trozos
contiguos más grandes en el disco, que a su vez significa que se pueden escribir y leer en operaciones
secuenciales más grandes. Esto minimiza las búsqueda de los bloques contiguos (es donde más tiempo se
pierde), y mejora el rendimiento en operaciones de I/O.
En lugar de recurrir a la protección de esos datos, HDFS replica cada bloque en varias máquinas del clúster (3
veces por defecto). Los cambios que se realizan en un bloque también se traspasan a sus réplicas, por lo que
las aplicaciones pueden leer de cualquiera de los bloques disponibles. Tener multiples réplicas significa tener
más fallos, pero serán más fácilmente tolerados. HDFS rastrea activamente y gestiona el número de réplicas
disponibles de un bloque, de forma que si el número de copias de uno de estos bloques está por debajo del
factor de réplicas establecido, HDFS genera automáticamente una copia de las réplicas restantes. Los
desarrolladores con sus aplicaciones no quieren preocuparse de copias, discos, metadatos… simplemente
poder realizar operaciones I/O de la forma más fácil posible, por lo que HDFS presenta el sistema de ficheros
como uno de alto-nivel con operaciones y conceptos familiares (POSIXx).
3.2.2 Demonios
Hay tres tipos de demonios que conforman HDFS y cada uno tiene un papel distinto. Se puede ver en la
siguiente tabla.
Tabla 1: HDFS Daemons
posteriormente.
Los bloques son más que trozos de un archivo. El DataNode (DN) es el responsable de almacenar y recuperar
dichos datos, tiene acceso local directo a uno o más discos. En los sistemas de producción dichos discos son
para uso exclusivo de Hadoop. El tamaño para almacenamiento puede ser añadido al clúster añadiendo más
DataNodes. Como se ha dicho antes en vez de incorporar seguridad a los datos se aprovechan las réplicas por
si se estropea algún DataNode tener los datos disponibles en otros. Esto además nos proporciona una mejora
de rendimiento ya que si una aplicación requiere de esos datos y los bloques están siendo usados por otras
aplicaciones, podrá utilizar las copias alojadas en otros DataNodes sin necesidad de esperar a que terminen el
resto.
Mientras que los DataNodes son los responsables de almacenar los bloques de datos, el NameNode (NN) es el
demonio que guarda los metadatos del sistema de archivos y mantiene una imagen completa del mismo. Los
clientes se conectan al NameNode para realizar operaciones con el sistema de ficheros, pero los bloques de
datos se transmiten desde y hacia DataNodes directamente por lo que el ancho de banda no está limitado por
un solo nodo. Los DataNodes informan regularmente de su estado al NameNode, de esta forma en un instante
determinado el NameNode tiene una visión completa de todos los DataNodes del clúster, su salud, los bloques
que tienen disponible, etc. Podemos ver la estructura de estos tres demonios en la figura 10.
Cuando un DataNode se inicia, así como a cada hora a partir de entonces, envía lo que se denomina un
informe de bloques al NameNode, en el que lista todos los bloques que el DataNode tiene actualmente en sus
discos y permite al NameNode realizar un seguimiento de los cambios que en ellos se produzcan. Esto permite
también que ante un fallo en algún DataNode, el NameNode podrá repartir la información que éste tenía
anteriormente a otro DataNode nuevo o a los que ya disponía que sigan operativos. El único inconveniente de
todo este proceso es que en el arranque del clúster, el NameNode tiene que esperar a que le llegue toda la
información posible de los datos desde los DataNodes para poder crear toda la estructura y comenzar a
funcionar.
El último de los demonios que queda por ver es el SecondaryNameNode. Despista su nombre ya que no solo
16 Hadoop
se trata de una copia de seguridad del NameNode, éste será detallado en punto posterior.
Lectura
Como podemos ver en la Figura 11 y suponiendo que ya esté alojado en HDFS el fichero /user/Sergio/foo.txt,
el cliente empieza contactando con el NameNode indicándole que archivo desearía leer. El NameNode
comprueba la existencia del archivo y sus permisos, en caso de que exista y con permisos a ese usuario, el
NameNode responde al cliente con el primer identificador de bloque y la lista de DataNodes en el que existe
una copia del mismo.
En el caso de que el NameNode no esté operativo, no lo estará HDFS, por lo que no será posible realizar la
lectura del fichero. Con los ID de bloque y las direcciones de los DataNodes, el cliente ya puede ponerse en
contacto con estos últimos y leer los bloques de datos que necesite, terminándose este proceso cuando se lee el
último bloque del fichero o el cliente cierra la secuencia del archivo.
Otro problema que puede surgir es que en el proceso de lectura el DataNode muera, en ese caso, la biblioteca
automáticamente intentará leer de otra copia del mismo bloque.
Escritura
La escritura de archivos HDFS es un poco más compleja que la lectura. Si se considera el caso más simple
representado en la Figura 12, un cliente está creando un nuevo archivo, realiza una solicitud para abrir un
archivo con su nombre de escritura utilizando la API de Hadoop FileSystem. Se envía esa solicitud al
NameNode para crear el metadato (solo si el usuario tiene permisos para hacerlo).
Una vez realizada la entrada de ese metadato, se envía al cliente una respuesta en la que se le indica que el
fichero se ha abierto correctamente y que ya puede empezar a escribir datos. A medida que el cliente va
escribiendo datos, se va fragmentando en paquetes, que se colocan en cola en la memoria. Un hilo separado
del cliente va consumiendo los paquetes de esa cola, a medida que va necesitando más bloques o es necesario
la realización de réplicas, se establece una conexión directa al primer DataNode, al segundo, tercero… esto
forma una tubería de replicación. Los DataNodes envían un asentimiento, y cuando les llega al cliente, este
Big Data, Hadoop y DataMining sobre eventos de seguridad 17
sabrá que los datos se han escrito en todos los nodos o en los que haya sido necesario replicar. A medida que
va necesitando más espacio, supondrá más bloques. Cuando ha terminado de enviar el último bloque, el cliente
indica que el archivo está completo.
Como administrador de un clúster no se puede permitir que la caída del mismo dependa de un solo punto de
fallo, en este caso el NameNode lo ha sido durante muchos años, pero la comunidad ha invertido mucho
tiempo en conseguir solventar esta vulnerabilidad.
La alta disponibilidad del NameNode (HA) se despliega como un par activo/pasivo de namenodes. Los edits se
van modificando en ambos NameNodes, de forma que si se cae el NameNode el Secondary NameNode debe
estar completamente preparado para asumir el mando.
Esa comunicación entre ambos se puede realizar de forma manual o de forma automática. En la primera, se
debe enviar un comando para efectuar la comunicación entre NameNodes. En cambio, cuando se realiza de
forma automática, cada NameNode ejecuta un proceso adicional llamado controlador de conmutación por
error (failover controller) que se puede ver en la Figura 14, que monitoriza la salud del proceso y coordina las
transacciones entre ambos.
Por ejempo para listar y copiar archivos del/hacia HDFS deben utilizarse las siguientes líneas:
Lo que se ha realizado en las siguientes líneas no es más que subir un fichero a HDFS, al instentar listarlo en el
directorio actual no aparece ya que no está en el sistema propio de ficheros. Pero al bajarse el mismo fichero
que se subió en el directorio actual ya si existirá y se tendrá acceso a él. Con esto se quiere ver que el sistema
20 Hadoop
de ficheros local y HDFS son completamente independientes. Se podrán subir y bajar ficheros entre ellos y
para acceder a HDFS deberá de escribirse hadoop fs para tener acceso a todas los comandos que Hadoop
ofrece.
Ejemplo
Si se imagina un problema que consiste en contar el número de apariciones de cada palabra dentro de un texto
plano. En la fase input se va a transformar el texto plano en una lista de claves/valor donde la clave podría ser
el desplazamiento del primer carácter de cada sentencia del texto y la lista de valores estaría formada por las
palabras que conforman cada una de estas sentencias. Esta lista de claves/valor está distribuida entre los
distintos nodos del clúster.
La función de mapeo cogerá un elemento de la lista clave/valor. Si la palabra aparece por primera vez, e creará
un par clave/valor, donde la clave será la palabra y el valor se pondrá a uno. Si la palabra ya ha aparecido se
aumentará en una unidad de valor asociado a esa palabra. Finalmente la función Map, emitirá un conjunto de
pares clave/valor donde la clave será una palabra y el valor será el número de apariciones de esa palabra dentro
de cada una de las sentencias.
En la fase Shuffle, se tendrá en cada nodo del clúster un conjunto formado por pares donde cada clave será una
palabra y el valor será igual al número de aparicicones de esta en una sentencia concreta del texto plano
22 Hadoop
original. Durante esta fase se va a asegurar que cada par que comparte la misma clave sea enviada al mismo
nodo reduce.
Finalmente, a cada función Reduce en cada uno de los nodos se le pasará como parámetro una clave (en
nuestro caso la palabra) y una lista con el valor de las apariciones en cada una de las sentencias del texto plano.
El algoritmo aplicado será sumar dicha lista de valores para obtener el número de apariciones de cada una de
las palabras dentro del texto original. Una vez obtenido el número de apariciones de cada palabra, se deberá
escribir su resultado a la salida estándar, un sistema de ficheros HDFS o a otro tipo de sistema como otra base
de datos donde poder evaluar dicho resultado.
Cambios conceptuales
En MR1 un cliente enviaba una tarea al JobTracker, y éste lo desmenuzaba en tasks que remitía a los
TaskTrackers.
En Yarn, el cliente envía una aplicación al ResourceManager, que remite esta petición al ApplicationManager.
Los ApplicationMaster son los responsables de obtener un container (pack de recursos) del Scheduler para
ejecutar su aplicación, lanzarla, monitorizarla y relanzarla si fuera necesario. El ApplicationMaanager se
encarga de mantener levantados los ApplicationesMasters. ZooKeeper se mantiene a su vez como vigilante de
los nodos.
En la siguiente figura puede verse un esquema de YARN con todo lo explicado anteriormente.
Además de los demonios vistos anteriormente, es opcional pero se puede incorporar otro demonio más
llamado JobHistory Server. En él se podrá ver en tiempo real todos las tareas que se estén ejecutando y las que
ya se hayan ejecutado, viendo cuáles han sido satisfactorios, cuáles han fallado… etc.
En este caso si se accede al directorio de Hadoop 2.4 podrá encontrarse tanto la versión en formato .tar.gz
como la versión src.
En el proyecto se trabajará con máquinas “Centos release 6.5”. En esta ocasión se alojará la carpeta completa
de Hadoop en /opt/rb/var/hadoop. Por lo que con wget se puede descargar de la página mencionada
anteriormente.
Una vez se encuentra todo Hadoop alojado en /opt/rb/var/hadoop, se debe establecer las variables de entorno
$HADOOP_HOME al directorio anterior, y para los ficheros de configuración $HADOOP_CONF_DIR que
debe ser /opt/rb/var/hadoop/etc/hadoop.
En la siguiente figura puede verse la estructura de directorios que se tiene dentro de la carpeta de hadoop.
3.4.2 Configuración
En el directorio de Hadoop que se ha visto anteriormente, se puede ver la carpeta etc, donde se podrá encontrar
los archivos de configuración ($HADOOP_CONF_DIR).
Los 4 pilares de la configuración de Hadoop son los archivos: core-site.xml, hdfs-site.xml, mapred-site.xml y
yarn-site.xml. Debido a la longitud de los mismos, se ha añadido cada uno de ellos en el Anexo A.
Aquí solo se verán las propiedades más relevantes de algunos de ellos.
Nota: el valor de estas propiedades se ha obtenido de nodos alojados en la empresa.
Tabla 2: core-site.xml
CORE-SITE.XML
Tabla 3: hdfs-site.xml
HDFS-SITE.XML
bloques.
Tabla 4: mapred-site.xml
MAPRED-SITE.XML
Tabla 5: yarn-site.xml
YARN-SITE.XML
El otro problema que surge con la configuración es el tema de la memoria. Ésta debe de ponderarse según la
memoria total del nodo, es decir, al NameNode no le corresponderá la misma memoria si el ordenador tiene 4
GB de RAM que si tiene 16Gb.
Por lo que en un fichero de configuración se le ha asignado una ponderación a cada uno de los nodos:
…
{:name => "hadoop_nodemanager", :count => 8},
{:name => "hadoop_datanode", :count => 3},
{:name => "hadoop_namenode", :count => 3},
{:name => "hadoop_historyserver", :count => 2},
{:name => "hadoop_resourcemanager", :count => 2},
…
Como se puede apreciar al nodo que más memoria se le ha concedido será al NodeManager ya que será el que
tenga que ejecutar las tareas que el ResourceManager le asigne, y por tanto, el que en un momento puntual
más memoria RAM requerirá.
Estos parámetros hacen que se genere su correspondiente archivo donde se encuentran los KB que le han sido
asignados:
[root@pablo02 sysconfig]# ls | grep hadoop
hadoop_datanode
hadoop_historyserver
hadoop_namenode
Big Data, Hadoop y DataMining sobre eventos de seguridad 27
hadoop_nodemanager
hadoop_resourcemanager
[root@pablo02 sysconfig]# cat hadoop_*
MEMTOTAL=1164377
MEMTOTAL=776251
MEMTOTAL=1164377
MEMTOTAL=3105007
MEMTOTAL=776251
Por lo que en las plantillas .erb (embedded ruby) se accede a esos valores
<property>
<name>mapreduce.reduce.java.opts</name>
<value>-Xmx<%= @memory_services["hadoop_nodemanager"].nil? ?
"827392": (@memory_services["hadoop_nodemanager"]*0.8).to_i
%>K
</value>
</property>
Y de esta forma se está configurando los valores que a la memoria respecta de mapred-site.xml y yarn-
site.xml, según la memoria total del nodo en el que se encuentra alojado, solventándose así los dos
problemas que se tenían en un comienzo.
3.4.3 Arranque
Para comenzar a utilizar Hadoop lo primero que se deberá hacer es formatear el HDFS.
Para ello, dentro de la carpeta sbin de Hadoop se podrá realizar de la siguiente forma:
- JobHistory
$HADOOP_HOME/sbin/mr-jobhistory-daemon.sh start historyserver --config
$HADOOP_CONF_DIR
28 Hadoop
Una vez arrancados todos los nodos, se podrá comprobar si todo está funcionando correctamente accediendo a
las páginas webs de cada uno de ellos, o simplemente listando mediante ls el contenido de hadoop como se vio
anteriormente en el apartado de línea de comandos.
Las páginas webs se mostrarán en el siguiente apartado con el levantamiento del singleNode y del clúster.
Como se puede apreciar, se lista los subdirectorios que están por debajo del directorio raíz de HDFS. Se
puede subir ficheros, descargar ficheros con los comandos vistos en el apartado anterior.
Ahora se mostrará la vista desde la interfaz web de los distintos nodos.
Si se accede a la del NameNode, podrá verse la información que se muestra en las siguientes figuras.
Ip_namenode:50070 (La ip y el puerto por el que escucha se a configurado en hdfs-site.xml)
Big Data, Hadoop y DataMining sobre eventos de seguridad 29
Se puede ver la fecha en la que se inició el clúster, la versión de Hadoop y algunos identificadores. Se puede
apreciar que en la parte superior se tiene un menú donde acceder a la ventana de los DataNodes, cómo ha ido
el proceso de arranque, etc.
Si se continua descendiendo en la misma pestaña se puede encontrar un resumen del estado del sistema de
archivos.
En esta imagen se aprecia el número de bloques, memoria total y usada, tanto la memoria que se ha reservado
para HDFS como la memoria que se ha dejado para utilizar de forma no distribuida. También en la parte
inferior se ve cuántos DataNodes están activos, en este caso como se está trabajando en un único nodo solo se
dispone de 1 DataNode operativo. También se aprecian los DataNodes que se han dado de baja, o a los que se
le está dando.
También se puede comprobar que los ficheros image y edits se encuentran en el directorio
/var/lib/hadoop/name.
En el menú de arriba, al seleccionar la pestaña Datanodes se mostrará la información de cada uno de los
DataNodes, en este caso se verá que se dispone de un DataNode en funcionamiento (junto con sus
características) y ninguno ha sido dado de baja.
Por ejemplo se dispone de una carpeta en el directorio raíz de HDFS llamada /rb, se puede crear una copia de
seguridad o instantánea de ese directorio.
Los pasos a seguir serán habilitarle a dicho directorio la posibilidad de crear Snapshots sobre él, y realizarle el
mismo.
[root@pablo05 hadoop]# hdfs dfsadmin -allowSnapshot /rb
Allowing snaphot on /rb succeeded
[root@pablo05 hadoop]# hdfs dfs -createSnapshot /rb
Created snapshot /rb/.snapshot/s20140828-124722.329
Por lo que al acceder a la pestaña de Snapshots se verá que se ha creado una instantánea de ese
directorio.
Big Data, Hadoop y DataMining sobre eventos de seguridad 31
En la siguiente pestaña se mostrará qué tal ha ido el arranque del clúster (Startup Progress). En él se puede
apreciar cómo comienza cargándose la fsimage, y posteriormente se le aplican los cambios de edits.
Y, por último, en Utilities, se puede acceder tanto al contenido del sistema de ficheros como a los logs.
Todo lo visto con anterioridad pertenece al NameNode, si se accede a la ip del DataNode y puerto 50075 se
podrá ver la información relativa al DataNode.
Para el ResourceManager (puerto 8021) se pueden ver las tareas pendientes, las que están ejecutándose, la
memoria total para ejecutar dichos procesos, los NodeManagers que tiene disponibles (Active Nodes).
Por último, el JobHistory (puerto 19888) muestra solo los trabajos que se están ejecutando y se han ejecutado,
si el resultado ha sido satisfactorio, o en el caso de que haya fallado, se muestran los logs del error, por lo que
es una buena página donde se puede hacer el seguimiento de todos los procesos MapReduce que trabajen en
nuestro clúster.
Si se vuelve a navegar por las interfaces web de los distintos nodos se puede ver como ha incrementado el
número de DataNodes o NodeManagers (y consecuentemente la capacidad de almacenamiento y de
procesamiento).
Como se puede apreciar la salud del sistema de archivos es saludable (HEALTHY). Además ofrece ese
resumen en el que se muestra el total de bloques almacenados, validados, replicados, etc.
suficiente.
El equilibrador funciona calculando primero el recuento medio de bloques por DataNode y luego la desviación
de cada DataNode con respecto a la media, definiendo nodos sobreutilizados y subutilizados. El umbral suele
rondar el 10% aunque en el caso de tener por ejemplo dos nodos puede llegar hasta el 20% de variación.
El inconveniente de ejecutar el equilibrador es el gran ancho de banda que puede llegar a consumir,
afortunadamente a través de una propiedad modificable de los archivos de configuración vistos anteriormente
se puede limitar ese flujo con dfs.balance.bandwidthPerSec (bytes). La transferencia de datos no pasa a través
de la máquina que ejecuta el equilibrador. Para poder ejecutar el equilibrador se deberá seguir los siguientes
pasos:
1. Primero convertirse en superusuario HDFS o un usuario con privilegios suficientes para el manejo de
estos datos.
2. Ejecutar hadoop balancer –threshold N para lanzar el balanceador en primer plano, donde N es el
porcentaje de bloques dentro de los cuales debe encontrarse la diferencia de un DataNode con otro.
3. Supervisar la salida o logs, si desea utilizar el balanceador en background para comprobar el seguimiento
del proceso.
Si se necesita añadir alguna propiedad en particular de las que se mecionaron anteriormente en el apartado de
la configuración se podrá hacer en este paso.
Y finalmente se tiene una dashboard donde poder monitorizar el clúster de Hadoop, memoria, Network Usage,
CPU, capacidad, YARN…
Ahora en cambio se realizará la copia a otro lugar dentro del mismo clúster.
42 Hadoop
A la hora de realizar un recovery, será el proceso inverso, de un clúster donde se tenga almacenado el backup,
se realizará la copia a el nuevo clúster de trabajo para recuperar el estado que tenía en el momento que se
realizó la copia de seguridad.
4 PIG
H
oy día las empresas almacenan millones de GigaBytes y TeraBytes diarios, toda esa información es un
bien muy preciado que podría ser considerado como materia prima. De esos datos o eventos, se quiere
obtener una información o conocimiento. El análisis estadístico juega un papel muy importante en ese
sentido, y será abarcado en el Punto 5 de este proyecto. Pero para realizar dicho análisis de los datos, puede
que interese realizar un filtrado antes, por ejemplo tener los eventos que vayan desde las 18 h hasta las 20 h de
ayer, o del mes pasado…
O, por otro lado, se quiere realizar, en el caso de los eventos de seguridad con los que se está trabajando en este
proyecto, un filtrado por IP origen e IP destino y que muestre por pantalla cuál es el Payload que más se repite
en ese conjunto de eventos.
Por lo que pasa a tomar un peso muy importante toda la parte de la extracción de la información para los
distintos usos que empresas o usuarios les quieran dar. En este capítulo se hablará de todo el proceso seguido
para la extracción y filtrado de datos utilizando para ello una herramienta también de Apache.
4.1 Introducción
Originalmente desarrollado por Yahoo en el 2006 fue adoptado por la Apache Software Foundation un año
después. Apache Pig es una plataforma para el análisis de grandes conjuntos de datos que consta de un
lenguaje de alto nivel para expresar programas de análisis.
La mejor característica de Pig es que su estructura es susceptible a la parelización, lo que a su vez le permite
manejar enormes cantidades de información. La capa de infraestructura de Pig está formada por un compilador
que produce secuencias MapReduce, lo que permite a un nivel de abstracción para los usuarios que solo se
deben preocupar de analizar los datos y dedicar de esta forma menos tiempo a desarrollar aplicaciones
MapReduce.
El lenguaje que utiliza es Pig Latin, que crea estructuras parecidas a SQL, de manera que en lugar de escribir
aplicaciones separadas de MapReduce, se pueda crear un script en Pig Latin que es automáticamente
paralelizado y distribuido a lo largo del clúster, codifica las sentencias que realizan la carga, escaneo, búsqueda
y filtrado de los datos de entrada y sentencias para el formateado y almacenamiento de los datos de salida.
Estos datos pueden ser estructurados mediante un schema y así su acceso será más sencillo.
Los usuarios de esta plataforma van desde Yahoo (creador), LinkedIn, Twitter hasta usuarios y empresas que
necesitan acceder a grandes cantidades de datos. Más de la mitad de los procesos que hoy día son ejecutados
en Hadoop están basados en Latin Pig.
43
44 Pig
• Pigs eat anything: Al igual que cualquier cerdo, éste también se come todo tipo de datos:
estructurados, semiestructurados o no estructurados.
• Pigs lives anywhere: A pesar de que Pig fue implementado en Hadoop, no está orientado únicamente
a esta plataforma. Su propósito es ser un lenguaje de procesamiento paralelo.
• Pigs are domestic animals: Diseñados para ser controlado y usado fácilmente por los usuarios.
• Pigs Fly: Pig procesa datos rápidamente. La intención es mejorar el rendimiento y no las
características, lo que evita que demasiada funcionalidad le impida “volar”.
4.3.1 Alternativas
Exiten tres posibilidades para el manejo de grandes cantidades de datos como son Hive, Pig o crear
aplicaciones MapReduce en Java.
Se necesitará MapReduce Java cuando se requiera profundizar y controlar los más mínimos detalles de las
operaciones que se realizan sobre los datos. No tan preciso como hacerlo con Java es crear esos procesos en
Pig o Hive. Estos últimos aportan un nivel de flexibilidad del que nos podemos ayudar para conseguir el
objetivo que se busque. No hay una regla definida para saber cuál usar, depende totalmente del caso de uso
particular, sobre qué base de datos se usará, el tipo de tratamiento que necesitan esos datos… son parámetros
importantes para decidir la mejor opción
A continuación se muestran ventajas e inconvenientes de cada uno de ellos:
• MapReduce
- Ventajas: trabaja con datos estructurados y no estructurados, muy bueno para escribir programas
con lógica compleja.
- Inconvenientes: mucho tiempo de desarrollo, dificil añadir funcionalidades.
• Hive
- Ventajas: poco tiempo de desarrollo, fácil añadir funcionalidades.
- Inconvenientes: no es fácil implementar scripts que impliquen lógica compleja y trabaja solo con
datos estructurados.
• Pig
- Ventajas: trabaja tanto con datos estructurados como no estructurados, además de que se pueden
añadir funcionalidades fácilmente.
- Inconvenientes: Hay que aprender un nuevo lenguaje (Latin Pig) para que después sea convertido
a MapReduce.
El objetivo fundamental es, sobre los eventos, poder obtener el Payload y las veces que se repite cada uno,
aplicando los filtros que el usuario considere necesarios.
Para ello se ha creado un script en Pig, como el siguiente:
REGISTER 'json_simple-1.1.jar';
REGISTER 'elephant-bird-core-3.0.5.jar';
REGISTER 'elephant-bird-pig-4.4.jar';
REGISTER 'elephant-bird-hadoop-compat-4.1.jar';
data = LOAD ficherosACargar
USING com.twitter.elephantbird.pig.load.JsonLoader() as (
json: map[]
);
tuples = FOREACH data GENERATE camposACargar
filter1 = FILTER tuples BY filtroARealizar
info= FOREACH filter1 GENERATE payload;
groupBy = GROUP info BY payload;
countBy = FOREACH groupBy GENERATE group, COUNT (info);
DUMP countBy;
STORE countBy INTO outputFile USING PigStorage('\t');
Como se puede comprobar en las primeras líneas del archivo se incluyen esas 4 librerías que necesitarán para
la ejecución del mismo. Posteriormente se cargarán los ficheros que se vayan a filtrar separados por una coma
(ficherosACargar), y para ello se usará JsonLoader debido a que los eventos están almacenados en formato
JSON. Esta carga de los datos se realizará como un mapeo de todos ellos, de esta forma no hay que cargarlos
uno a uno especificando si se trata de un String, Integer, etc. Si se añade un nuevo campo a los eventos ese
campo también será cargado como parte del mapeo y no habrá que añadirlo a mano.
Posteriormente por cada evento que se haya cargado (data) se generarán tuplas con los campos indicados en
camposACargar. Sobre esas tuplas se aplicarán los filtros que el usuario haya indicado en filtroARealizar.
En este momento se dispone únicamente de tuplas que cumplan los filtros que se han impuesto, con los
campos que se hayan cargado, incluido el campo Payload que es el que queremos obtener. Sobre esas tuplas se
retendrá el campo Payload, se agrupará y contabilizará cuántas veces aparece el mismo.
Con DUMP se podrá imprimir el resultado por pantalla, y con STORE se dará la posibilidad de almacenar el
resultado el el path outputFile.
En la siguientes líneas podrá verse una serie de ejemplos de algunas ejecuciones del script. En la primera
puede apreciarse una serie de ficheros recogidos de HDFS con los que se trabajará. En la siguiente puede verse
como solo se cargarán los campos timestamp y payload, por lo que el filtro para obtener los payloads solo se
filtrará respecto al tiempo. Dicho filtro puede verse en el camo filtroARealizar que en este caso realiza un filtro
entre dos timestamp. En la última de las variables (outputFile) se indicará la dirección donde queramos
almacenar los resultados.
ficherosACargar : 'hdfs://192.168.101.230:8020/rb/raw/data/rb_event/hourly/2014
/06/09/17/rb_event.0.0.1055.62479.gz,hdfs://192.168.101.230:8020/rb/raw/d
ata/rb_event/hourly/2014/06/09/17/rb_event.0.0.33424.95903.gz,hdfs://192.
168.101.230:8020/rb/raw/data/rb_event/hourly/2014/06/09/17/rb_event.0.1.3
2478.93974.gz,hdfs://192.168.101.230:8020/rb/raw/data/rb_event/hourly/201
46 Pig
4/06/09/17/rb_event.0.1.997.61496.gz'
camposACargar : (INT)json#'timestamp' as timestamp,
(CHARARRAY)json#'payload' as payload;
filtroARealizar : BY timestamp > 1402334580 AND timestamp < 1402334640;
Como se ha visto en los anteriores párrafos, se disponen de algunas variables como los ficheros a cargar, si se
quiere o no guardar el resultado en un archivo a la salida, filtro a pasar… por lo que se ha optado por generar
un script en Ruby del que se hablará en el siguiente apartado.
El objetivo de este script es que según los parámetros que se le pasen al mismo se generará un script en Latin
Pig, se ejecutará, se mostrarán o guardarán los resultados y se borrará el script de Pig temporal.
La información de ayuda que se ve arriba se mostrará con el parámetro [-h] help.
El script se alimenta como mínimo del parámetro [-t] que el el timestamp de comienzo. Se tiene dos opciones
para la ejecución del script, si el parámetro [-f] no se encuentra en la ejecución, el análisis de los eventos se
centrará en el minuto en el que se encuentra ese timestamp. En cambio, si se desea continuar filtrando
añadiendo el parámetro [-f] el análisis se llevará a cabo desde el timestamp que se ha especificado hasta hoy
día, es decir hasta los últimos eventos registrados.
Por ejemplo, en la siguiente figura se ha especificado el timestamp correspondiente a la fecha y hora que se
sitúa justo en la parte superior. En este caso no se ejecuta con el parámetro –f por lo que el análisis se llevará a
cabo en el minuto en el que se encuentra ese timestamp (línea azul), desde el minuto 45 al minuto 46.
Big Data, Hadoop y DataMining sobre eventos de seguridad 47
En el caso de especificar el parámetro –f, como se puede apreciar en la siguiente figura, la línea azul engloba
desde el timestamp de comienzo hasta hoy día.
Con el parámetro [-p] se imprimirá por pantalla el script de Pig que se genera justo antes de que se ejecute.
El parámetro [-w] (write) se utiliza para especificar un fichero de salida donde almacenar los resultados, por lo
que, por defecto, se almacenarán en dos ficheros en HDFS. Uno de ellos será el resultado del script de Pig, y el
otro serán los datos del anterior pero tratados por Ruby para darle fotmato JSON.
Por último queda mencionar los parámetros que se pueden filtrar:
• [-r] Sensor_IP: IP del sensor.
• [-s] Source_IP: IP de origen.
• [-d] Destination_IP: IP de destino.
• [-i] Sig_id: Signature id, es un identificador único de la regla (una regla se caracteriza por sig:gid
siendo gid el identificador del grupo).
4.4.2 Ejecución
Como se puede ver en el Anexo B, el script de Ruby visto en el apartado anterior, además de formar el script
de Pig se encargará de ejecutarlo. Al especificar el directorio donde se quiere que se guarden los resultados Pig
creará un archivo con el valor de los campos separados por comas y dentro de paréntesis.
Dicho resultado es poco legible por lo que el script de Ruby también se encargará de recoger el resultado y
parsearlo a formato JSON para facilitar la comprensión del mismo.
A continuación se muestra el resultado de una ejecución, ya parseado a formato JSON por el script de Ruby,
en él puede apreciarse que lo que se está buscando es colocar en orden de mayor ocurrencia los payloads junto
con los campos que lo acompañan. Por ejemplo, el primero ha ocurrido en 13 ocasiones. El payload del
segundo y tercer evento es el mismo pero no comparten el campo sig_id, por lo que no son completamente
iguales aunque cada uno de ellos ha ocurrido en 12 ocasiones.
Es una manera fácil y rápida de comprobar si existe alguna anomalía, por ejemplo en el caso de que la media
de ocurrencia de la consulta que se ha realizado sea de 20 y uno de ellos tenga una ocurrencia de 300 en el
mismo margen de tiempo.
Big Data, Hadoop y DataMining sobre eventos de seguridad 49
5 DATAMINING
E
n este capítulo se abarcará la última de las fases de este proyecto, en la que se partirá de los datos, se
generará información y se obtendrá un conocimiento. Todos aquellos datos que se han generado y
almacenado, deberán de ser útiles y para ello se abarcará esta sección con un campo de las ciencias de la
computación que intenta descubrir patrones en grandes volúmenes de datos. Se buscará obtener con la ayuda
de distintas librerías y distintos algoritmos una vista comprensible de lo que ha ocurrido en cientos, miles o
millones de eventos.
Se usará para ello distintos métodos entre los que se encuentran clasificación, agrupamiento (o clústering) y
outlier detection (o detección de anomalías). El objetivo es aportarle inteligencia al sistema, que sea capaz de
aprender a partir del conocimiento que va adquiriendo, y así poder mejorar los resultados de futuros análisis.
Actualmente todas estas ventajas han desembocado en un abuso del almacenamiento de la información en las
bases de datos. Se puede decir que las empresas almacenan tres tipos de datos, aquellos que solo guardan en el
disco duro, aquellos que guardan en el disco duro pero además se requiere hacer consultas, por lo que se leen,
y finalmente aquellos que además de hacer consultas se analizan y son los que proporcionan en conjunto
verdadero conocimiento y apoyan en la toma de decisiones. Es necesario entonces contar con tecnologías que
ayuden a explotar el potencial de este tipo de datos.
El Data Mining surge como una tecnología que intenta ayudar a comprender el contenido de una base de
datos.
De forma general, los datos son la “materia prima bruta”. En el momento que el usuario les atribuye algún
significado especial pasan a convertirse en “información”. Cuando los especialistas elaboran o encuentran
un modelo, haciendo que la interpretación entre la información y ese modelo represente un valor agregado,
entonces se estará hablando de “conocimiento”.
En la siguiente figura se muestra la jerarquía que existe en una base de datos, entre datos, información y
conocimiento. El área interna dentro del triángulo representa la estrecha unión entre dato e información, no así
entre la información y conocimiento. El data mining trabaja en el nivel superior buscando patrones,
comportamientos, agrupaciones, secuencias, tendencias o asociaciones que puedan generar algún modelo que
permita comprender mejor el dominio para ayudar en una posible toma de decisión.
5.2 Objetivos
Como se ha dicho anteriormente, en el conjunto de datos que se va almacenando, se tendrán cientos, miles y
millones de eventos almacenados. Dichos eventos son nuestra materia prima, y de ellos se quiere obtener una
información que nos aporte a nuestro entender más conocimiento que el que nos aportan miles de líneas llenas
de datos.
Como se mencionó al comienzo del capítulo, se van a utilizar tres métodos de DataMining como son la
clasificación, clústerización y detección de anomalías.
Se necesitan distintos algoritmos estadísticos para la realización de los anteriores métodos. Para ello y tras una
larga búsqueda por internet y realización de pruebas, se barajó la posibilidad de usar una de las tres librerías
que se detallan a continuación.
Apache Mahout fue la primera posibilidad que se barajó junto con WEKA aunque más tarde se realizaron
pruebas con ELKI.
ELKI es un software escrito en Java con algoritmos que buscan analizar los datos del clúster en búsqueda de
detección de anomalías, aunque también proporciona algoritmos de clasificación por lo que en principio era un
candidato fuerte de cara al proyecto. Está diseñado para ser fácilmente desplegado por estudiantes de este
campo y recoge contribuciones de los usuarios con aportaciones, correcciones y nuevos métodos. El problema
es que oponía dificultad de cara a recoger el formato de los eventos que se almacenaron en Hadoop y los
Big Data, Hadoop y DataMining sobre eventos de seguridad 51
resultados no mostraban con claridad y exactitud algunos de los parámetros que más interesaban, como pueden
ser errores absolutos, relativos, instancias clasificadas correctamente, etc.
Por lo que la decisión volvía a centrarse entre Mahout y WEKA. Ambas disponen de una comunidad muy
grande de usuarios, ofreciendo la oportunidad de encontrar más documentación, más algoritmos, parsers,
foros…
Mahout no soporta tantos algoritmos como hace WEKA. Uno de sus mayores objetivos de Mahout es la
escalabilidad y sobre todo es fuerte en las Recomendaciones. Estas últimas son otro tipo de métodos muy útiles
pero que no serán de mucha utilidad en la finalidad de este proyecto, por ejemplo Netflix o Google lo usan,
recogen una gran cantidad de datos de cada persona, de las películas que ven o las páginas por las que navega,
y la valoración de las mismas. Con esa información intentará clasificar a dicha persona en un grupo de
individuos que tengan los mismos o semenjantes gustos, de esa forma una película que le haya gustado a las
personas que se encuentren en ese grupo será una buena recomendación de cara al individuo que entra en
Netflix buscando una película que ver.
WEKA abarca muchos más algoritmos y dispone de una interfaz de usuario intuitiva y que muestra resultados
gráficamente muy interesantes. Dispone de funciones para almacenar los resultados en ficheros y obtener
valores estadísticos, que después se usarán para una serie de bucles intentando encontrar la mejor opción para
el análisis de esos datos. Todo esto se detallará en los siguientes apartados.
Valorando ambos y ya que se presupone que la mayoría de la gente no entiende ni conoce muchos de los
algoritmos estadísticos con los que se trabajará a continuación, se ha optado por elegir WEKA ganando más
variedad de métodos y posibilidades. Además el fuerte de Mahout son las recomendaciones, por lo que al no
ser un objetivo en este proyecto tampoco suponía un punto muy grande a favor.
En el diagrama puede comprobarse todo los casos de uso que se ofrecen en la interfaz para el usuario.
52 DataMining
5.3 Clasificación
El proceso de clasificación consiste en asignar un conjunto de datos a grupos fijados, de manera que se
minimice la probabilidad de clasificación errónea. Por ejemplo, en este caso, el problema podría ser tener que
dividir la base de datos de los eventos que se han almacenado en grupos que sean los más homogéneos
posibles con respecto a cada una de las variables o campos que los eventos tienen.
Es una de las tareas más frecuentes en DataMining. El modelo que obtenemos con los datos que se han
introducido y las clases que se han generado será capaz de decir para un nuevo ejemplo cual será la clase a la
que pertenezca.
Para el uso de este método de DataMining se usará como se ha dicho anteriormente la librería WEKA. Se
dispone de 34 métodos a aplicar. A continuación se hará una breve introducción de alguno de ellos indicando
también el grupo en el que se encuentran:
BAYES
FUNCTIONS
LAZY
META
RULES
TREES
Entre los más interesantes y conocidos se encuentran J48, perteneciente al grupo de TREES (que se encargan
de dibujar un árbol con la clasificación que se ha establecido).
Para aplicar el/los método/s de clasificación de los que se disponen, se ha creado una interfaz de usuario
54 DataMining
(Figura 45) donde se puede seleccionar la opción de clasificación (aparte de las que se verán en los siguientes
apartados), preguntará qué atributo del evento queremos clasificar.
Los eventos están almacenados en formato JSON como se puede apreciar en la siguiente figura:
Tras elegir el atributo classification (aunque podría ser cualquier otro), se selecciona que no se quieren utilizar
todos los métodos, y entre los métodos disponibles se elegirá el J48.
Big Data, Hadoop y DataMining sobre eventos de seguridad 55
Como resultado, se generará un fichero correspondiente a J48 en la carpeta de classification (dentro del
directorio de resultados).
En el interior del fichero se puede encontrar todo el análisis aplicado a los datos.
Comienza con los distintos datos que se pueden encontrar (si son Nominales, enteros, si faltan…). Después se
especifica qué método es el que se va a utilizar, en el caso de este ejemplo será J48 con los valores por defecto
habiendo eliminado del análisis el campo Payload (que es el remove 8 que se puede encontrar en la siguiente
línea) ya que puede provocar mucha distorsión en los resultados.
FilteredClassifier using weka.classifiers.trees.J48 -C 0.25 -M 2 on data
filtered through weka.filters.unsupervised.attribute.Remove -R 8
56 DataMining
En la anterior figura o árbol se puede apreciar que se ha clasificado los distintos valores del campo
classification según el resto de los parámetros, y cuántas coincidencias se ha encontrado de cada uno de ellos.
Por ejemplo, cuando se da esta serie de circunstancias priority < 1, rev <= 6, dst_port > 5222 encontraremos
que el valor del campo classification es “Attempted Administrator User Privilege Gain” y ocurre en 15
ocasiones. Por lo que de esta forma, en el caso de una IP origen de la que se estén recibiendo muchos eventos,
podrá verse en una ojeada al árbol qué serie de circunstancias o patrones están ocurriendo en el resto de
campos que provocan que aparezca dicha IP (como el puerto, el origen… y de esta forma poder crear alguna
regla en el firewall para impedir que dichos eventos sigan apareciendo masivamente).
Big Data, Hadoop y DataMining sobre eventos de seguridad 57
Y para terminar ofrece un resumen de la calidad del método de clasificación utilizado, donde se puede apreciar
valores como las instancias clasificadas correctamente, el índice Kappa, error cuadrático medio, error relativo
o absoluto, etc
A continuación se puede apreciar que se han generado todos los ficheros con sus resultados y el fichero
results.txt.
Si se comprueba el interior de results.txt se puede apreciar los distintos campos nombrados anteriormente,
acompañando su correspondiente método.
Con algunos de estos valores puede realizarse una aproximación para obtener cuál es el mejor de los métodos
para el conjunto de eventos con el que se está trabajando. La fórmula escogida ha sido:
" CCI * 0.7 " RRSE % %
$ + $1− ' * 0.3' * κ
# 100 # 100 & &
CCI: Correctly Classified Instances
RRSE: Root Relative Squared Error
κ: Coeficiente Kappa
Este coeficiente Kappa mide el acuerdo entre dos evaluadores al clasificar los elementos de N en C categorías
mutuamente excluyentes. Un coeficiente Kappa = 1 supone un acuerdo entre los evaluadores del 100% en la
decisión que se ha tomado, y en el caso del valor 0 supone aleatoriedad completa, por lo que a la hora de
obtener el resultado de la fórmula se precisa que haya habido la menos aleatoriedad possible, o lo que es lo
mismo, seguridad en el resultado.
En el caso del ejemplo con el que se está trabajando se obtendrá el siguiente resultado del análisis. Donde
puede comprobarse que con esos 3 métodos se obtendrá el major resultado, y en su caso el campo de
instancias clasificadas correctamente ha sido de un 100%.
5.4 Clústerización
El proceso de clustering o agrupamiento, consiste en subdividir un conjunto de datos en grupos mutuamente
excluyentes de tal manera que cada miembro de un grupo esté lo más cercano posible a otro elemento, y
grupos diferentes estén los más lejos posible entre sí, de modo que la distancia está medida respecto a todas las
variables disponibles.
Un ejemplo de aplicación de clustering podría ser el caso en el que una empresa desea introducirse en el
mercado de bebidas, pero antes hace una encuesta de mercado para averiguar si existen grupos de clientes con
costumbres particulares en el consumo de bebidas. Por tanto, una vez realizado el agrupamiento, la empresa se
introducirá en el grupo que esté menos servido por la competencia.
En el uso de clústering se pueden utilizar los 5 algoritmos que se introducen a continuación.
De la misma forma que en el apartado anterior, ahora se puede elegir entre buscar el mejor algoritmo para
clústerizar un solo campo o el mejor método de clústerización de cara al dataset completo.
Se accederá a este fichero para buscar con cuál o cuáles algoritmo/s se va a encontrar los mejores resultados.
En el caso de utilizar el campo classification, se obtendrá el siguiente resultado, en el que el mejor método es
el algoritmo EM con 10 clústers y ha clasificado correctamente sobre el 62% de las instancias del dataset.
…
Figura 56: Partes de los mejores métodos por separado
En las tablas se puede ver qué método es el mejor para cada uno de los campos. Por lo que al final, se sumará
todos los valores de cada posición de las distintas tablas y se mostrará que método ha salido exitoso en más
ocasiones.
En este caso, al tener gran cantidad de campos que solo disponen de un valor, los métodos con un solo clúster
han predominado ante el resto, aunque EM con 2 y 10 clústers y MakeDensityBasedClústerer con 2 clústers
también aportan buenos resultados.
Como resumen del análisis se muestra por terminal el número de outliers y extremes values.
6 PRESUPUESTO
En este capítulo se realizará un cálculo aproximado del coste de este proyecto. Compuesto por las necesidades
hardware (para el desarrollo del mismo) y las horas dedicadas.
El sistema operativo que corre en el anfitrión será redBorder Horama (Centos). A todo esto si fuera necesario
la incorporación de un equipo portátil, bastaría que tuviera una características normales, es decir toda la
capacidad de memoria y procesamiento correrá en los servidores colocados en el anfitrión. Por lo que, con un
equipo de 300 - 400€ sería suficiente.
En conclusión, para la realización de este proyecto será necesario un presupuesto de entre 26.000 – 27.000 €.
64 Conclusiones
7 CONCLUSIONES
7.1 Conclusiones
En este proyecto se ha podido comprobar a pequeña escala el problema al que se enfrentan día a día grandes
empresas como Facebook, Twitter, LinkedIn… y se han utilizado las mismas soluciones que ellas
implementan.
Ha sido una introducción al mundo del Big Data y se han estudiado y utilizado una serie de herramientas que
ayudan a los datos, en este caso eventos, a realizar el recorrido a lo largo de las distintas etapas: captura,
almacenamiento, persistencia, consulta, procesado y análisis de los mismos.
A pesar de ello se puede considerar que el corazón del proyecto ha sido Hadoop y con él, se ha trabajado en
temas de clústers, alta disponibilidad, procesos MapReduce, copias de seguridad y rendimiento de la red. Ha
sido el primer contacto con una base de datos distribuida y la seguridad que ofrece, las funcionalidades que
aporta y lo que se ha aprendido con ella recompensa todas las horas que se le ha dedicado.
En la parte de DataMining se ha hecho énfasis en conseguir que cualquier persona sin conocimientos
estadísticos tenga a su disposición la posibilidad de realizar el análisis de su DataSet obteniendo los mejores
resultados, gracias al motor que se ha creado que busca dichos resultados, ya sea para temas de clasificación o
en temas de agrupamiento.
Como mención a la relación de este proyecto con los estudios universitarios cursados, se han utilizado distintos
lenguajes de programación, algunos de ellos vistos en asignaturas y otros estudiados aparte. Abundan
conocimientos necesarios de la seguridad y monitorización de la red como son el IPS, Ambari, iptables,
cortafuegos, etc. La metodología del desarrollo de un proyecto de estas características ha sido explicada en la
asignatura de proyectos de telemática, y también, la administración de sistemas operativos Linux ha sido de
gran ayuda a la hora de trabajar con los distintos servicios que conviven en este proyecto.
También ha sido la primera toma de contacto en una empresa, en la que se ha trabajado junto con otros
compañeros y, sin duda, dejando a un lado los conocimientos adquiridos en el proyecto, es una experiencia
totalmente diferente a desarrollar un proyecto en solitario.
Cada pocos meses todas las herramientas que se han utilizado en este proyecto aumentan su versión, se
corrigen errores y se añaden nuevas funcionalidades, por lo que, estar al tanto de las nuevas versiones siempre
será necesario para la continuidad de este proyecto.
Una posible y notable mejora es poder recoger los ficheros de resultados creados en la parte de DataMining
(los cuales relacionan cada uno de los algoritmos con porcentajes, calidad de análisis, errores… ) con un
lenguaje estadístico como R y poder representar en gráficas e informes de una forma más visual todos los
resultados obtenidos y las conclusiones del análisis.
Y, por último, de cara a la empresa, será utilizado próximamente, además de en eventos del IPS (cubierto en
este proyecto) en logs y malware. Además, en la actualidad se le está dando uso trabajando junto con Spark,
que es un framework para el análisis de datos en un clúster. La ventaja de pertenecer a Apache es la buena
integración que tiene con los frameworks en los que Apache está trabajando en la actualidad.
Big Data, Hadoop y DataMining sobre eventos de seguridad 65
REFERENCIAS
[1] O'Reilly Media - Big Data Now [Libro]
Editorial: O'Reilly ISBN: 978-1-449-37420
[2] Eric Sammer - Hadoop Operations [Libro]
Editorial: O'Reilly ISBN: 978-1-449-32705-7
[3] Tom White - Hadoop, The Definitive Guide [Libro]
Editorial: O'Reilly ISBN: 978-1-449-31152-0
[4] Jason Venner - Pro Hadoop [Libro]
Editorial: Apress ISBN: 978-1-4302-1942-2
[5] Alan Gates - Programming Pig [Libro]
Editorial: O'Reilly ISBN: 978-1-449-30264-1
[6] Anand Rajaraman, Jure Leskovec, Jeffrey D. Ullman - Mining of Massive Datasets [En línea] [Documento
PDF]
Disponible: http://infolab.stanford.edu/~ullman/mmds/book.pdf
[7] Ted Dunning, Ellen Friedman - Practical Machine Learning [Libro]
Editorial: O'Reilly ISBN: 978-1-491-90408-4
[8] Dhruba Borthakur - HDFS, Arquitecture and Design [En línea] [Documento PDF]
Disponible: http://hadoop.apache.org/docs/r0.18.0/hdfs_design.pdf
[9] Tyson Condie, Neil Conway, Peter Alvaro, Koseph M. Hellerstein, Khaled Elmeleegy, Russell Sears -
MapReduce Online [En línea] [Documento PDF]
Disponible: http://db.cs.berkeley.edu/papers/nsdi10-hop.pdf
[10] Apache Software Foundation - Apache Hadoop 2.4 [En línea] [Web]
Disponible: http://hadoop.apache.org/docs/r2.4.0/
[11] The University of Waikato - Documentation of Weka [En línea] [Web]
Disponible: http://www.cs.waikato.ac.nz/ml/weka/documentation.html
[12] Cloudera - CDH Hadoop Documentation [En línea] [Web]
Disponible: http://www.cloudera.com/content/support/en/documentation.html
[13] Hortonworks- Hadoop & Ambari Documentation [En línea] [Web]
Disponible: http://hortonworks.com
[14] Apache Software Foundation - Ambari [En línea] [Web]
Disponible: http://ambari.apache.org
[15] Apache Software Foundation - Mahout Documentation [En línea] [Web]
Disponible: https://mahout.apache.org
[16] Index-Structures - ELKI Documentation [En línea] [Web]
Disponible: http://elki.dbs.ifi.lmu.de
67
68
Referencias
[17] Vinod Kumar - Signature based intrusion detection system using Snort [En línea] [Documento PDF]
Disponible :
http://www.google.es/url?sa=t&rct=j&q=&esrc=s&source=web&cd=3&ved=0CDkQFjAC&url=http%3A
%2F%2Fwww.ijcait.com%2FIJCAIT%2Findex.php%2Fwww-
ijcs%2Farticle%2Fdownload%2F171%2F81&ei=mLMVVIZbzsfsBtORgYgC&usg=AFQjCNHcAGNZZIk-
qS1UWxOmOO_DTzrAMg&sig2=FZqoBgyuDKESetj-Eaubtw&bvm=bv.75097201,d.ZGU&cad=rja
[18] Martin Roesch - Lightwitght Intrusion Detection For Networks [En línea] [Documento PDF]
Disponible: http://static.usenix.org/event/lisa99/full_papers/roesch/roesch.pdf
[19] Apache Software Foundation - Apache Kafka Documentation [En línea] [Web]
Disponible: http://kafka.apache.org
[20] The Snort Project - Snort Users Manual [En línea] [Documento PDF]
Disponible:
https://s3.amazonaws.com/snort-org-
site/production/document_files/files/000/000/001/original/snort_manual.pdf?AWSAccessKeyId=AKIAIXACIE
D2SPMSC7GA&Expires=1410712450&Signature=IZHzFjKkxIvLXjGz4RE5X7p5TZo%3D
[21] LinkedIn - Camus Documentation [En línea] [Web]
Disponible: https://github.com/linkedin/camus
[22] Apache Software Foundation - ZooKeeper Documentation [En línea] [Web]
Disponible: http://zookeeper.apache.org
[23] Ekpe Okorafor, Mensah Kwabena Patrick - Avaliability of JobTracker Machine In Hadoop / MapReudce
ZooKeeper in Coordinated Clusters. [En línea] [Documento PDF]
Disponible: http://airccse.org/journal/acij/papers/0512acij02.pdf
[24] The University of Waikato -Classes and Functions of Weka [En línea] [Web]
Disponible: http://weka.sourceforge.net/doc.dev/overview-summary.html
[25] Yogesh Kumar, Krishan Kumar, Gulshan Kumar - Feature Selection For Intrusion Detection Systems
[Libro]
Editorial: Lambert ISBN: 978-3-659-51510-1
[26] Michael Collins - Network Security Through Data Analysis [Libro]
Editorial: O'Reilly ISBN: 978-1-449-35790-0
[27] JayJacobs, Bob Rudis - Data-Driven Security [Libro]
Editorial: Wiley ISBN: 978-1-118-79372-5
[28] Sean Owen, Robin Anil, Ted Dunning, Ellen Friedman - Mahout in Action [Libro]
Editorial: Manning ISBN: 978-1-935-518268-9
Big Data, Hadoop y DataMining sobre eventos de seguridad 69
70
Anexo A : Configuración de Hadoop
/etc/hadoop/core-site.xml
http://www.apache.org/licenses/LICENSE-2.0
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://pablo06:8020</value>
</property>
<property>
<name>fs.trash.interval</name>
<value>360</value>
</property>
<property>
<name>mapreduce.jobtracker.webinterface.trusted</name>
<value>false</value>
</property>
<property>
<name>io.serializations</name>
<value>org.apache.hadoop.io.serializer.WritableSerialization</value>
</property>
<property>
<name>fs.checkpoint.size</name>
<value>256</value>
</property>
<property>
<name>hadoop.security.authorization</name>
<value>false</value>
</property>
<property>
<name>ipc.client.connection.maxidletime</name>
<value>30000</value>
Big Data, Hadoop y DataMining sobre eventos de seguridad 71
</property>
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
</property>
<property>
<name>io.compression.codecs</name>
<value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.co
mpress.DefaultCodec</value>
</property>
<property>
<name>ipc.client.idlethreshold</name>
<value>8000</value>
</property>
<property>
<name>hadoop.security.authentication</name>
<value>simple</value>
</property>
<property>
<name>ipc.client.connect.max.retries</name>
<value>50</value>
</property>
</configuration>
/etc/hadoop/hdfs-site.xml
http://www.apache.org/licenses/LICENSE-2.0
<configuration>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.namenode.rpc-address</name>
<value>pablo06:8020</value>
72
Anexo A : Configuración de Hadoop
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/var/lib/hadoop/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/var/lib/hadoop/data</value>
</property>
<property>
<name>dfs.namenode.safemode.threshold-pct</name>
<value>1.0f</value>
</property>
<property>
<name>dfs.datanode.du.reserved</name>
<value>1073741824</value>
</property>
<property>
<name>dfs.datanode.max.transfer.threads</name>
<value>1024</value>
</property>
<property>
<name>dfs.namenode.stale.datanode.interval</name>
<value>30000</value>
</property>
<property>
<name>dfs.clúster.administrators</name>
<value>hadoop</value>
</property>
<property>
<name>dfs.blockreport.initialDelay</name>
<value>120</value>
</property>
<property>
<name>dfs.namenode.accesstime.precision</name>
<value>0</value>
</property>
<property>
<name>dfs.namenode.handler.count</name>
<value>5</value>
</property>
<property>
<name>dfs.namenode.avoid.write.stale.datanode</name>
<value>true</value>
</property>
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
<property>
<name>dfs.replication.max</name>
<value>50</value>
</property>
<property>
<name>fs.permissions.umask-mode</name>
<value>022</value>
</property>
<property>
Big Data, Hadoop y DataMining sobre eventos de seguridad 73
<name>dfs.datanode.address</name>
<value>pablo02:50010</value>
</property>
<property>
<name>dfs.namenode.checkpoint.period</name>
<value>21600</value>
</property>
<property>
<name>dfs.block.access.token.enable</name>
<value>true</value>
</property>
<property>
<name>dfs.hosts.exclude</name>
<value>/opt/rb/var/hadoop/etc/hadoop/dfs.exclude</value>
</property>
<property>
<name>dfs.namenode.checkpoint.edits.dir</name>
<value>${dfs.namenode.checkpoint.dir}</value>
</property>
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
</property>
<property>
<name>dfs.datanode.http.address</name>
<value>0.0.0.0:50075</value>
</property>
<property>
<name>dfs.support.append</name>
<value>true</value>
</property>
<property>
<name>dfs.datanode.ipc.address</name>
<value>pablo02:8010</value>
</property>
<property>
<name>dfs.datanode.balance.bandwidthPerSec</name>
<value>6250000</value>
</property>
<property>
<name>dfs.heartbeat.interval</name>
<value>3</value>
</property>
<property>
<name>dfs.datanode.failed.volumes.tolerated</name>
<value>0</value>
</property>
<property>
<name>dfs.namenode.avoid.read.stale.datanode</name>
<value>true</value>
</property>
<property>
<name>dfs.client.block.write.replace-datanode-on-
failure.policy</name>
<value>NEVER</value>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
74
Anexo A : Configuración de Hadoop
<value>true</value>
</property>
<property>
<name>dfs.datanode.data.dir.perm</name>
<value>750</value>
</property>
<property>
<name>dfs.namenode.http-address</name>
<value>0.0.0.0:50070</value>
</property>
</configuration>
/etc/hadoop/mapred-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
<configuration>
<property>
<name>yarn.app.mapreduce.am.resource.mb</name>
<value>268</value>
</property>
<property>
<name>mapreduce.map.java.opts</name>
<value>-Xmx440710K</value>
</property>
<property>
<name>mapred.child.java.opts</name>
<value>-Xmx275444K</value>
</property>
<property>
<name>mapreduce.task.io.sort.factor</name>
<value>107</value>
</property>
Big Data, Hadoop y DataMining sobre eventos de seguridad 75
<property>
<name>mapreduce.reduce.java.opts</name>
<value>-Xmx881420K</value>
</property>
<property>
<name>yarn.app.mapreduce.am.command-opts</name>
<value>-Xmx550888K</value>
</property>
<property>
<name>mapred.map.child.java.opts</name>
<value>-Xmx550888K</value>
</property>
<property>
<name>mapred.reduce.child.java.opts</name>
<value>-Xmx550888K</value>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>537</value>
</property>
<property>
<name>mapreduce.task.io.sort.mb</name>
<value>107</value>
</property>
<property>
<name>mapreduce.map.memory.mb</name>
<value>537</value>
</property>
<property>
<name>mapreduce.clúster.administrators</name>
<value>root</value>
</property>
<property>
<name>mapreduce.reduce.shuffle.parallelcopies</name>
<value>50</value>
</property>
<property>
<name>yarn.app.mapreduce.am.admin-command-opts</name>
<value>-Djava.net.preferIPv4Stack=true -
Dhadoop.metrics.log.level=WARN </value>
</property>
<property>
<name>mapreduce.admin.reduce.child.java.opts</name>
<value>-Djava.net.preferIPv4Stack=true -
Dhadoop.metrics.log.level=WARN</value>
</property>
<property>
<name>mapreduce.application.classpath</name>
<value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,$HADOOP_MAPRED_HOM
E/share/hadoop/mapreduce/lib/*</value>
</property>
<property>
<name>yarn.app.mapreduce.am.log.level</name>
<value>INFO</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
76
Anexo A : Configuración de Hadoop
<value>0.0.0.0:19888</value>
</property>
<property>
<name>mapreduce.reduce.input.buffer.percent</name>
<value>0.0</value>
</property>
<property>
<name>mapreduce.admin.map.child.java.opts</name>
<value>-Djava.net.preferIPv4Stack=true -
Dhadoop.metrics.log.level=WARN</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress.type</name>
<value>BLOCK</value>
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>pablo02:10020</value>
</property>
<property>
<name>mapreduce.reduce.log.level</name>
<value>INFO</value>
</property>
<property>
<name>mapreduce.jobhistory.done-dir</name>
<value>/var/lib/hadoop/mr-history-done</value>
</property>
<property>
<name>mapreduce.admin.user.env</name>
<value>LD_LIBRARY_PATH=/usr/lib/hadoop/lib/native:/usr/lib/hadoop/lib/
native/`$JAVA_HOME/bin/java -d32 -version &> /dev/null;if [ $?
-eq 0 ]; then echo Linux-i386-32; else echo Linux-amd64-64;fi`</value>
</property>
<property>
<name>mapreduce.reduce.speculative</name>
<value>false</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress</name>
<value>false</value>
</property>
<property>
<name>mapreduce.reduce.shuffle.input.buffer.percent</name>
<value>0.7</value>
</property>
<property>
<name>mapreduce.am.max-attempts</name>
<value>2</value>
</property>
<property>
<name>mapreduce.map.output.compress</name>
<value>false</value>
</property>
<property>
<name>mapreduce.reduce.shuffle.merge.percent</name>
<value>0.66</value>
</property>
Big Data, Hadoop y DataMining sobre eventos de seguridad 77
<property>
<name>mapreduce.map.log.level</name>
<value>INFO</value>
</property>
<property>
<name>yarn.app.mapreduce.am.staging-dir</name>
<value>/user</value>
</property>
<property>
<name>mapreduce.jobhistory.intermediate-done-dir</name>
<value>/mr-history/tmp</value>
</property>
<property>
<name>mapreduce.map.speculative</name>
<value>false</value>
</property>
<property>
<name>mapreduce.shuffle.port</name>
<value>13562</value>
</property>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.job.reduce.slowstart.completedmaps</name>
<value>0.05</value>
</property>
<property>
<name>mapreduce.map.sort.spill.percent</name>
<value>0.7</value>
</property>
<property>
<name>mapreduce.task.timeout</name>
<value>300000</value>
</property>
<property>
<name>mapreduce.jobtracker.address</name>
<value>pablo02:8021</value>
</property>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
78
Anexo A : Configuración de Hadoop
/etc/hadoop/yarn-site.xml
<?xml version="1.0"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
<configuration>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>537</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>1075</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>1075</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-
services.mapreduce_shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
<name>yarn.resourcemanager.address</name>
<value>pablo02:8032</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>pablo02:8030</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>pablo02:8031</value>
</property>
<property>
Big Data, Hadoop y DataMining sobre eventos de seguridad 79
<name>yarn.resourcemanager.admin.address</name>
<value>pablo02:8033</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>0.0.0.0:8021</value>
</property>
<property>
<name>yarn.nodemanager.remote-app-log-dir</name>
<value>/app-logs</value>
</property>
<property>
<name>yarn.nodemanager.local-dirs</name>
<value>/var/lib/hadoop/yarn</value>
</property>
<property>
<name>yarn.nodemanager.container-executor.class</name>
<value>org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecu
tor</value>
</property>
<property>
<name>yarn.nodemanager.health-checker.interval-ms</name>
<value>135000</value>
</property>
<property>
<name>yarn.nodemanager.admin-env</name>
<value>MALLOC_ARENA_MAX=$MALLOC_ARENA_MAX</value>
</property>
<property>
<name>yarn.application.classpath</name>
<value>/opt/rb/var/hadoop/etc/hadoop,/opt/rb/var/hadoop/share/hadoop/c
ommon/*,/opt/rb/var/hadoop/share/hadoop/common/lib/*,/opt/rb/var/hadoo
p/share/hadoop/hdfs/*,/opt/rb/var/hadoop/share/hadoop/hdfs/lib/*,/opt/
rb/var/hadoop/share/hadoop/mapreduce/*,/opt/rb/var/hadoop/share/hadoop
/mapreduce/lib/*,/opt/rb/var/hadoop/share/hadoop/hadoop-
mapreduce/*,/opt/rb/var/hadoop/share/hadoop/yarn/*,/opt/rb/var/hadoop/
share/hadoop/yarn/lib/*,/opt/rb/var/hadoop/share/hadoop/hadoop-
yarn/*</value>
</property>
<property>
<name>yarn.nodemanager.linux-container-executor.group</name>
<value>root</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacit
y.CapacityScheduler</value>
</property>
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>2</value>
</property>
<property>
<name>yarn.nodemanager.address</name>
<value>pablo02:45454</value>
80
Anexo A : Configuración de Hadoop
</property>
<property>
<name>yarn.nodemanager.delete.debug-delay-sec</name>
<value>600</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>pablo02</value>
</property>
<property>
<name>yarn.acl.enable</name>
<value>true</value>
</property>
<property>
<name>yarn.nodemanager.remote-app-log-dir-suffix</name>
<value>logs</value>
</property>
<property>
<name>yarn.nodemanager.log-dirs</name>
<value>/var/log/hadoop/nodemanager</value>
</property>
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>2592000</value>
</property>
<property>
<name>yarn.nodemanager.log.retain-second</name>
<value>604800</value>
</property>
<property>
<name>yarn.log.server.url</name>
<value>http://pablo02:19888/jobhistory/logs</value>
</property>
<property>
<name>yarn.nodemanager.disk-health-checker.min-healthy-
disks</name>
<value>0.25</value>
</property>
<property>
<name>yarn.nodemanager.health-checker.script.timeout-ms</name>
<value>60000</value>
</property>
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<property>
<name>yarn.nodemanager.container-monitor.interval-ms</name>
<value>3000</value>
</property>
<property>
<name>yarn.nodemanager.log-aggregation.compression-type</name>
<value>gz</value>
</property>
Big Data, Hadoop y DataMining sobre eventos de seguridad 81
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>10</value>
</property>
<property>
<name>yarn.admin.acl</name>
<value>*</value>
</property>
</configuration>
82
Anexo B : Script de Ruby
#!/usr/bin/ruby
require "getopt/std"
require 'active_support/all'
PIGFILE="/tmp/rb_filter_ips-#{rand(10000)}.pig"
NAMENODE= "hadoopnamenode.redborder.clúster"
opt = Getopt::Std.getopts("t:fr:s:d:i:w:hp")
load_str=" (INT)json#'timestamp' as timestamp"
filter_str=" "
if opt["h"]
# Help
puts ("Usage: filter_ips.rb -t TIMESTAMP_START [-w
OUTPUT_NAME_FOLDER] [-f] [-r SENSOR_IP] [-s SOURCE_IP ] [-d
DESTINATION_IP] [-i SIG_ID] [-h] [-p]
-h -> help (options)
-p -> show PIG file
-t -> [ NECESSARY ] Time_start.
-f -> [ OPTIONAL ] Continue filtering.
-r -> [ OPTIONAL ] Sensor_IP
-s -> [ OPTIONAL ] Source_IP
-d -> [ OPTIONAL ] Destination_IP
-i -> [ OPTIONAL ] Sig_id
-w -> [ OPTIONAL ] Output folder")
else
output =""
aux=""
aux2=""
flag=0
# TIME
if opt["t"]
# Get time variables
start_date = Time.at(opt['t'].to_i).utc
# timestamp start minute
time_tstamp1 = (start_date - start_date.sec).to_i
# timestamp stop minute
time_tstamp2 = ((start_date - start_date.sec) + 60).to_i
# time now
end_date = Time.now.utc
flag=1
elsif flag == 1
# only filter a minute (no [-f])
filter_str = filter_str.concat("timestamp > #{time_tstamp1} AND
timestamp < #{time_tstamp2}")
output = "hdfs:\/\/#{NAMENODE}:8020"
output = output.concat(`hdfs dfs -ls
/rb/raw/data/rb_event/hourly/#{start_date.strftime("%Y")}/#{start_date
.strftime("%m")}/#{start_date.strftime("%d")}/#{start_date.strftime("%
H")}/* | grep raw/data | awk '{print $8}' | tr '\n' ',' | sed
's/,/,hdfs:\\/\\/#{NAMENODE}:8020/g' | sed
's/hdfs:\\/\\/#{NAMENODE}:8020$//'`)
output = output.chomp(',')
end
if opt["r"]
# Filter by sensor_ip
load_str = load_str.concat(", (CHARARRAY)json#'sensor_ip' as
sensor_ip")
filter_str = filter_str.concat(" AND sensor_ip eq
'#{opt['r']}'")
flag=1
end
if opt["s"]
# Filter by source
load_str = load_str.concat(", (CHARARRAY)json#'src' as src")
filter_str = filter_str.concat(" AND src eq '#{opt['s']}'")
flag=1
end
if opt["d"]
# Filter by destination
load_str = load_str.concat(", (CHARARRAY)json#'dst' as dst")
filter_str = filter_str.concat(" AND dst eq '#{opt['d']}'")
flag=1
end
if opt["i"]
# Filter by sig_id
load_str = load_str.concat(", (INT)json#'sig_id' as sig_id")
84
Anexo B : Script de Ruby
# Not filter?
if flag==0
puts("You must filter any field")
# Make file.pig
elsif
File.open(PIGFILE, 'w') do |filter|
filter.puts "REGISTER 'json_simple-1.1.jar';
REGISTER 'elephant-bird-core-3.0.5.jar';
REGISTER 'elephant-bird-pig-4.4.jar';
REGISTER 'elephant-bird-hadoop-compat-4.1.jar';
data = LOAD '" + output + "'
USING com.twitter.elephantbird.pig.load.JsonLoader() as (
json: map[]
);
tuples = FOREACH data GENERATE" + load_str + "
filter1 = FILTER tuples BY" + filter_str + "
info= FOREACH filter1 GENERATE payload;
if opt["w"]
# write output in a file
filter.puts "STORE countBy INTO '#{opt['w']}' USING
PigStorage('\t');"
end
# Close file.pig
filter.close
if opt["p"]
# show pig file
file = File.open("#{PIGFILE}", "rb")
contents = file.read
puts contents
file.close
end
if opt["w"]
# write output in a file
puts "Result stored in
hdfs://#{NAMENODE}:8020/user/root/#{opt['w']}"
Big Data, Hadoop y DataMining sobre eventos de seguridad 85
else
puts execution
end
end
end
end
86
Anexo C : DataMining Code (Java)
JSON2Arff
import java.io.*;
import java.util.Arrays;
import java.util.List;
import org.json.CDL;
import org.json.JSONArray;
import weka.core.Instances;
import weka.core.converters.ArffSaver;
import weka.core.converters.CSVLoader;
/**
* Created by sergio
*/
/**
* takes 2 arguments:
* - JSON input file
* - ARFF output file
*/
// Load CSV
CSVLoader loader = new CSVLoader();
loader.setSource(new ByteArrayInputStream(csv.getBytes("UTF-
8")));
loader.setNominalAttributes(classifier_Index.toString());
Instances data = loader.getDataSet();
Big Data, Hadoop y DataMining sobre eventos de seguridad 87
// Save ARFF
ArffSaver saver = new ArffSaver();
saver.setInstances(data);
saver.setFile(new File(output_File));
saver.setDestination(new File(output_File));
saver.writeBatch();
}
Client (main)
import java.io.*;
/**
* Created by sergio
*/
/**
* UI for the user
*/
public static void main(String[] args) throws Exception {
outlier...
String all; // Use all classification algorithms?
String output; // Write result in a output file?
String inputFile; // Input file
String class_Attribute; // Attribute for classification
String class_Algorithm; // Algorithm for classification
String clus_Attribute; // Attribute for clústering
String clus_Option; // Clústering 1 field or ALL fields
Integer a,b,c; // Flags
Integer[] counts; // Number of outlier detection &
extremes values
Integer[][] toPrint; // Best clústerer solution summary
Integer[][][] bestMethod = new Integer[34][5][10];
Double[][] solution = new Double[5][10];
int cont = 1; // Continue in while action
String[] allAlgorithms =
{"EM","FarthestFirst","HierarchicalClústerer",
"MakeDensityBasedClústerer","SimpleKMeans"};
String[] allFields =
{"action","classification","dgmlen","domain","domain_id","dst_name","d
st_net",
"dst_net_name","dst_port","dst_port_name","ethdst","ethlength","ethsrc
","iplen","l4_proto",
"l4_proto_name","msg","priority","rev","sensor_id","sensor_ip","sensor
_name","sig_generator",
"sig_id","src_net","src_net_name","src_port","tcpack","tcpflags",
"tcplen","tcpseq","tcpwindow","tos","ttl","type"};
while (cont == 1) {
/** CLASSIFICATION */
if (option.equals("1")) {
// ALGORITHMS
System.out.println("Do you want to use all the
Classification Algorithms? [yes/no]");
all = readFromCommandLine();
if (all.equals("yes")) {
Big Data, Hadoop y DataMining sobre eventos de seguridad 89
// Classification by all the methods
class_Algorithm = "all";
output="yes";
} else {
// Classification by 1 method
System.out.println("Select which Classification
Algorithm do you want to use:");
printClassMethods();
class_Algorithm = readFromCommandLine();
// Do you want save the results in a output file?
System.out.println("Do you want to write the
result in a output file? [yes/no]");
output = readFromCommandLine();
}
// START CLASSIFICATION
ClassificationAttribute.startClass(inputFile, output,
class_Algorithm, class_Attribute);
/** CLÚSTERING */
} else if (option.equals("2")) {
if (clus_Option.equals("1")){
// Best Clústering Method for 1 Attribute
System.out.println("Now, the following algorithms
will be used for [1-10] clústers for the attribute that you want to
clúster: \n");
printClusMethods();
printAttributes();
clus_Attribute = readFromCommandLine();
} else if (clus_Option.equals("2")) {
// Best Clústering Method for your dataSet
90
Anexo C : DataMining Code (Java)
for (a=0;a<5;a++){
for (b=0;b<10;b++){
solution[a][b] = 0.0;
}
}
solution[b][c]=solution[b][c]+bestMethod[a][b][c];
}
}
}
// Print table
for (b=0;b<5;b++){
for (c=0;c<10;c++){
System.out.print(solution[b][c].intValue()
+ " ");
}
System.out.println("");
}
// Print solution
toPrint = BestClústerer.findMax(solution);
System.out.println("The best methods to clúster
this dataSet are: ");
for (a=0;a<5;a++){
for (b=0;b<10;b++){
if (toPrint[a][b]==1){
System.out.println(" - " +
allAlgorithms[a] + ": " + (b+1) + " clústers.");
}
Big Data, Hadoop y DataMining sobre eventos de seguridad 91
}
}
} else {
// Wrong option
System.out.println("Wrong option, please select a
option in the range [1,2]");
}
cont = 0;
System.out.println("Processing...");
OutlierAttribute.startOutlier(inputFile,
"classification");
counts = OutlierAttribute.countOutlier();
cont = 0;
/** EXIT */
} else if (option.equals("0")) {
}
}
ClassificationAttribute
import weka.classifiers.Evaluation;
import weka.classifiers.bayes.BayesNet;
import weka.classifiers.bayes.NaiveBayes;
import weka.classifiers.bayes.NaiveBayesMultinomialText;
import weka.classifiers.bayes.NaiveBayesUpdateable;
import weka.classifiers.functions.Logistic;
import weka.classifiers.functions.MultilayerPerceptron;
import weka.classifiers.functions.SMO;
import weka.classifiers.functions.SimpleLogistic;
import weka.classifiers.lazy.IBk;
import weka.classifiers.lazy.KStar;
import weka.classifiers.lazy.LWL;
import weka.classifiers.meta.*;
import weka.classifiers.misc.SerializedClassifier;
import weka.classifiers.rules.*;
import weka.classifiers.trees.*;
import weka.core.Instances;
import weka.core.converters.*;
import weka.filters.unsupervised.attribute.Remove;
import java.io.FileOutputStream;
import java.io.PrintStream;
/**
* Created by sergio
*/
/**
* takes a dataset as first argument and our classifier field as
second argument
*
* @params input_File, output_YesNot,
classifier_algorithm, classifier_field
* @throws Exception if something goes wrong
*/
public static void startClass (String input_File, String
output_YesNot, String classifier_algorithm, String classifier_field)
throws Exception {
"AttributeSelectedClassifier","Bagging","ClassificationViaRegression",
"CVParameterSelection","FilteredClassifier","LogitBoost","MultiClassCl
assifier",
"MultiScheme","RandomCommittee","RandomSubSpace","Stacking","Vote",
"DecisionTable","JRip","OneR","PART","ZeroR","DecisionStump","J48","LM
T",
"RandomForest","RandomTree","REPTree"};
} else {
// Print the result in a output file if output_YesNot =
96
Anexo C : DataMining Code (Java)
"yes"
PrintStream original = System.out;
if (output_YesNot.equals("yes")) {
PrintStream out = new PrintStream(new
FileOutputStream("results/classification/" + classifier_algorithm +
".txt"));
System.setOut(out);
}
/**
* Filter and Classifiers
*/
protected static void useFilterClassifier(Instances data, String
classifier_algorithm ,Integer payload_Index) throws Exception {
/** NaiveBayes */
else if (classifier_algorithm.equals("NaiveBayes")) {
NaiveBayes classificator = new NaiveBayes();
fc.setClassifier(classificator);
}
Big Data, Hadoop y DataMining sobre eventos de seguridad 97
/** NaiveBayesMultinomialText */
else if
(classifier_algorithm.equals("NaiveBayesMultinomialText")) {
NaiveBayesMultinomialText classificator = new
NaiveBayesMultinomialText();
fc.setClassifier(classificator);
}
/** NaiveBayesUpdateable */
else if (classifier_algorithm.equals("NaiveBayesUpdateable"))
{
NaiveBayesUpdateable classificator = new
NaiveBayesUpdateable();
fc.setClassifier(classificator);
}
/** Logistic */
else if (classifier_algorithm.equals("Logistic")) {
Logistic classificator = new Logistic();
fc.setClassifier(classificator);
}
/** MultilayerPerceptron */
else if (classifier_algorithm.equals("MultilayerPerceptron"))
{
MultilayerPerceptron classificator = new
MultilayerPerceptron();
fc.setClassifier(classificator);
}
/** SimpleLogistic */
else if (classifier_algorithm.equals("SimpleLogistic")) {
SimpleLogistic classificator = new SimpleLogistic();
fc.setClassifier(classificator);
}
/** SMO */
else if (classifier_algorithm.equals("SMO")) {
SMO classificator = new SMO();
fc.setClassifier(classificator);
}
/** IBk */
else if (classifier_algorithm.equals("IBk")) {
IBk classificator = new IBk();
fc.setClassifier(classificator);
}
/** KStar */
else if (classifier_algorithm.equals("KStar")) {
KStar classificator = new KStar();
fc.setClassifier(classificator);
}
/** LWL */
else if (classifier_algorithm.equals("LWL")) {
LWL classificator = new LWL();
fc.setClassifier(classificator);
98
Anexo C : DataMining Code (Java)
/** AdaBoostM1 */
else if (classifier_algorithm.equals("AdaBoostM1")) {
AdaBoostM1 classificator = new AdaBoostM1();
fc.setClassifier(classificator);
}
/** AttributeSelectedClassifier */
else if
(classifier_algorithm.equals("AttributeSelectedClassifier")) {
AttributeSelectedClassifier classificator = new
AttributeSelectedClassifier();
fc.setClassifier(classificator);
}
/** Bagging */
else if (classifier_algorithm.equals("Bagging")) {
Bagging classificator = new Bagging();
fc.setClassifier(classificator);
}
/** ClassificationViaRegression */
else if
(classifier_algorithm.equals("ClassificationViaRegression")) {
ClassificationViaRegression classificator = new
ClassificationViaRegression();
fc.setClassifier(classificator);
}
/** CostSensitiveClassifier */
else if
(classifier_algorithm.equals("CostSensitiveClassifier")) {
CostSensitiveClassifier classificator = new
CostSensitiveClassifier();
fc.setClassifier(classificator);
}
/** CVParameterSelection */
else if (classifier_algorithm.equals("CVParameterSelection"))
{
CVParameterSelection classificator = new
CVParameterSelection();
fc.setClassifier(classificator);
}
/** FilteredClassifier */
else if (classifier_algorithm.equals("FilteredClassifier")) {
FilteredClassifier classificator = new
FilteredClassifier();
fc.setClassifier(classificator);
}
/** LogitBoost */
else if (classifier_algorithm.equals("LogitBoost")) {
LogitBoost classificator = new LogitBoost();
fc.setClassifier(classificator);
}
Big Data, Hadoop y DataMining sobre eventos de seguridad 99
/** MultiClassClassifier */
else if (classifier_algorithm.equals("MultiClassClassifier"))
{
MultiClassClassifier classificator = new
MultiClassClassifier();
fc.setClassifier(classificator);
}
/** MultiClassClassifierUpdateable */
else if
(classifier_algorithm.equals("MultiClassClassifierUpdateable")) {
MultiClassClassifierUpdateable classificator = new
MultiClassClassifierUpdateable();
fc.setClassifier(classificator);
}
/** MultiScheme */
else if (classifier_algorithm.equals("MultiScheme")) {
MultiScheme classificator = new MultiScheme();
fc.setClassifier(classificator);
}
/** RandomCommittee */
else if (classifier_algorithm.equals("RandomCommittee")) {
RandomCommittee classificator = new RandomCommittee();
fc.setClassifier(classificator);
}
/** RandomSubSpace */
else if (classifier_algorithm.equals("RandomSubSpace")) {
RandomSubSpace classificator = new RandomSubSpace();
fc.setClassifier(classificator);
}
/** Stacking */
else if (classifier_algorithm.equals("Stacking")) {
Stacking classificator = new Stacking();
fc.setClassifier(classificator);
}
/** Vote */
else if (classifier_algorithm.equals("Vote")) {
Vote classificator = new Vote();
fc.setClassifier(classificator);
}
/** SerializedClassifier */
else if (classifier_algorithm.equals("SerializedClassifier"))
{
SerializedClassifier classificator = new
SerializedClassifier();
fc.setClassifier(classificator);
}
/** DecisionTable */
else if (classifier_algorithm.equals("DecisionTable")) {
DecisionTable classificator = new DecisionTable();
fc.setClassifier(classificator);
100
Anexo C : DataMining Code (Java)
/** JRip */
else if (classifier_algorithm.equals("JRip")) {
JRip classificator = new JRip();
fc.setClassifier(classificator);
}
/** OneR */
else if (classifier_algorithm.equals("OneR")) {
OneR classificator = new OneR();
fc.setClassifier(classificator);
}
/** PART */
else if (classifier_algorithm.equals("PART")) {
PART classificator = new PART();
fc.setClassifier(classificator);
}
/** ZeroR */
else if (classifier_algorithm.equals("ZeroR")) {
ZeroR classificator = new ZeroR();
fc.setClassifier(classificator);
}
/** DecisionStump */
else if (classifier_algorithm.equals("DecisionStump")) {
DecisionStump classificator = new DecisionStump();
fc.setClassifier(classificator);
}
/** J48 */
else if (classifier_algorithm.equals("J48")) {
J48 classificator = new J48();
fc.setClassifier(classificator);
}
/** LMT */
else if (classifier_algorithm.equals("LMT")) {
LMT classificator = new LMT();
fc.setClassifier(classificator);
}
/** RandomForest */
else if (classifier_algorithm.equals("RandomForest")) {
RandomForest classificator = new RandomForest();
fc.setClassifier(classificator);
}
/** RandomTree */
else if (classifier_algorithm.equals("RandomTree")) {
RandomTree classificator = new RandomTree();
fc.setClassifier(classificator);
}
/** REPTree */
else if (classifier_algorithm.equals("REPTree")) {
Big Data, Hadoop y DataMining sobre eventos de seguridad 101
REPTree classificator = new REPTree();
fc.setClassifier(classificator);
}
if (flag == 1) {
// Execute & Results
fc.buildClassifier(data);
System.out.println(fc.toString());
Evaluation evaluation = new Evaluation(data);
evaluation.evaluateModel(fc, data);
System.out.println(evaluation.toSummaryString());
System.out.println(evaluation.toMatrixString());
}
}
}
BestClassifier
import java.io.*;
/**
* Created by sergio
*/
}
System.setOut(original);
ClústeringAttribute
import weka.clústerers.*;
import weka.core.Instances;
import weka.core.converters.ConverterUtils;
import weka.filters.Filter;
import weka.filters.unsupervised.attribute.Remove;
import java.io.FileOutputStream;
import java.io.PrintStream;
/**
* Created by sergio
*/
// Clúster process
for(int i=0; i<allAlgorithms.length; i++){
PrintStream original = System.out;
/**
Big Data, Hadoop y DataMining sobre eventos de seguridad 105
* Filter and Clústers Methods
*/
protected static void useFiltClúster(Instances data, String
clúster_algorithm) throws Exception {
/** FarthestFirst */
else if (clúster_algorithm.equals("FarthestFirst")) {
for(int i=1; i<11; i++) {
FarthestFirst clústerer = new FarthestFirst();
clústerer.setNumClústers(i);
clústerer.buildClústerer(dataClústerer);
ClústerEvaluation eval = new ClústerEvaluation();
eval.setClústerer(clústerer);
eval.evaluateClústerer(data);
System.out.println(eval.clústerResultsToString());
}
}
/** HierarchicalClústerer */
else if (clúster_algorithm.equals("HierarchicalClústerer")) {
for(int i=1; i<11; i++) {
HierarchicalClústerer clústerer = new
HierarchicalClústerer();
clústerer.setNumClústers(i);
clústerer.buildClústerer(dataClústerer);
ClústerEvaluation eval = new ClústerEvaluation();
eval.setClústerer(clústerer);
eval.evaluateClústerer(data);
System.out.println(eval.clústerResultsToString());
}
}
/** MakeDensityBasedClústerer */
else if
(clúster_algorithm.equals("MakeDensityBasedClústerer")) {
for(int i=1; i<11; i++) {
MakeDensityBasedClústerer clústerer = new
106
Anexo C : DataMining Code (Java)
MakeDensityBasedClústerer();
clústerer.setNumClústers(i);
clústerer.buildClústerer(dataClústerer);
ClústerEvaluation eval = new ClústerEvaluation();
eval.setClústerer(clústerer);
eval.evaluateClústerer(data);
System.out.println(eval.clústerResultsToString());
}
}
/** SimpleKMeans */
else if (clúster_algorithm.equals("SimpleKMeans")) {
for(int i=1; i<11; i++){
SimpleKMeans clústerer = new SimpleKMeans();
clústerer.setNumClústers(i);
clústerer.buildClústerer(dataClústerer);
ClústerEvaluation eval = new ClústerEvaluation();
eval.setClústerer(clústerer);
eval.evaluateClústerer(data);
System.out.println(eval.clústerResultsToString());
}
}
BestClústerer
import java.io.*;
/**
* Created by sergio
*/
return bestMethod;
}
var_Max = var_aux;
for (k=0;k<5;k++){
for (l=0;l<10;l++){
bestMethod[k][l] = 0;
}
}
for (i=0;i<5;i++){
for (j=0;j<10;j++){
var_aux = values[i][j];
if (var_aux >= var_Max){
i_Aux = i;
j_Aux = j+1;
if (var_aux > var_Max){
for (k=0;k<5;k++){
for (l=0;l<10;l++){
bestMethod[k][l] = 0;
}
}
bestMethod[i][j]=1;
} else {
bestMethod[i][j]=1;
}
var_Max = var_aux;
}
}
}
return bestMethod;
}
}
OutlierAttribute
import weka.core.Instances;
import weka.core.converters.ConverterUtils;
import weka.filters.Filter;
import weka.filters.unsupervised.attribute.InterquartileRange;
import weka.filters.unsupervised.attribute.Remove;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.FileWriter;
/**
* Created by sergio
*/
// InterquartileRange process
InterquartileRange quartil = new InterquartileRange();
quartil.setOptions(weka.core.Utils.splitOptions("-R first-last
-O 5.0 -E 10.0 -do-not-check-capabilities"));
quartil.setInputFormat(newData);
Instances newData2 = Filter.useFilter(newData, quartil);
// Save data
BufferedWriter writer = new BufferedWriter(
new
FileWriter("results/outlierDetection/results.arff"));
writer.write(newData2.toString());
writer.newLine();
writer.flush();
writer.close();
}
return counts;
}
}
ÍNDICE DE CONCEPTOS
i
Sistemas Distribuidos: Sistemas cuyos componentes hardware y software, que están conectados en red, se
ii
Alta disponibilidad: protocolo de diseño de sistema y su implementación asociada que asegura cierto grado
de continuidad operacional ante algún tipo de fallo como la caída de algún nodo o servicio.
iii
Libpcap: es una interfaz independiente del sistema que captura paquetes a nivel de usuario. Proporciona un
framework portatil para monitorizar la red. Las aplicaciones incluyen la recopilación de estadísticas de red,
control de seguridad…
iv
Ataques CGI: Common Gateway Interface es un estándar que permite comunicar programas del lado cliente
con servidores de información (HTTP por ejemplo). Ampliamente usado en el desarrollo de aplicaciones Web,
pero pueden generar problemas de seguridad, vulnerabilidades como que se puede llamar al CGI directamente
desde la línea URL del browser, que estén mal diseñados, que llamen a otros programas a través del uso de
funciones que abren shells…
v
Syslog: Es un estándar para el envoi de mensajes de registro en una red informática IP. Por syslog se conoce
tanto al protocol de red como a la aplicación o biblioteca que envía los mensajes de registro.
vi
tcpdump: es una herramienta en línea de commandos cuya utilidad principal es analizar el tráfico que circula
por la red.
vii
Payload: es una function adicional que posee cierta amenaza en particular. La traducción exacta del ingles
es “carga útil”, y se refiere a acciones adicionales como pueden ser virus, gusanos o troyanos (robo de datos,
eliminación de archivos, sobre-escritura del disco, reemplazo del BIOS…). Aclaracion: un payload no es
necesariamente malign, sino que refiere también a efectos secundarios nocivos para el ordenador.
viii
BPF: (Berkeley Packet Filter) ofrece en algunos sistemas Unix, una interfaz de enlace de datos entre capas
que permiten que los paquetes de capa de enlace sean enviados y recibidos, además si el controlador de la
interfaz de red es compatible con el modo promiscuo, permite que la interfaz sea puesta en ese modo, por lo
que todos los paquetes de red, incluso los destinados a otros anfitriones, se pueden recibir.
ix
Avro: es un sistema de serialización de datos, dispone de ricas estructuras de datos, un rápido y compacto
formateo de datos binaries, RPC, simple integración con lenguajes dinámicos…
x
POSIX: Interfaz de sistema operative portable, es una norma escrita por la IEEE que define una interfaz
estandar del sistema operativo y el entorno.
111