Practico 2 - Nifi
Practico 2 - Nifi
Practico 2 - Nifi
Acceso e Interfaz 2
Componentes 3
Flowfile 3
Processors 3
Connectors 3
Ejemplo 1 - Moviendo Datos 4
Gestionando los errores 7
Ejemplo 2 - Atributos 11
Generando contenido 11
Añadiendo un atributo 12
Linaje de los datos 12
Guardando los datos en HDFS 13
Ejemplo 3 - Filtrado de datos 14
Lectura y división 14
Filtrado de FF 15
Grupos 17
Creando un grupo 17
Puertos 18
Funnels 20
Plantillas 21
Creando plantillas 21
Cargando plantillas 21
Ejemplo 4 - Trabajando con conjuntos de registros 22
Convirtiendo formatos 22
Renombrando el destino 23
Tarea 25
Acceso e Interfaz
IMPORTANTE! Iniciar el ambiente creado en Azure y recordar detenerlo al terminar de
trabajar.
Flowfile
Básicamente es el dato, el cual se persiste en disco tras su creación. En realidad es un
puntero al dato en su almacenamiento local, de esta manera se acelera su rendimiento. El
Flowfile se puede decir que se compone de dos partes:
● Contenido: el dato en sí.
● Atributos: metadatos en pares de clave/valor.
Processors
Encargado de ejecutar alguna transformación o regla sobre los datos o el flujo para generar
un nuevo Flowfile. La salida de un Processor es un Flowfile que será la entrada de otro
processor. Así pues, para implementar un flujo de datos en NiFi, crearemos una secuencia
de processors que reproduzcan las acciones y transformaciones que queremos realizar
sobre los datos.
Documentación oficial (https://nifi.apache.org/docs.html). Un ejemplo de los processors más
utilizados:
Connectors
Es una cola dirigida (con un origen y un destino que determinan un sentido) que une
diferentes procesadores y contiene los FF (FlowFiles) que todavía no se han ejecutado,
pudiendo definir diferentes prioridades (por ejemplo, FIFO o LIFO según necesitemos).
● En el lado izquierdo, encontramos una nube de etiquetas que nos permite filtrar los
procesadores.
● En la parte superior derecha, disponemos de un campo de búsqueda para encontrar
procesadores por su nombre.
● En la sección central, se muestra el listado de procesadores desde donde podemos
seleccionarlos.
Por lo tanto, utilizamos esta interfaz para buscar el procesador 'GetFile' y lo añadimos al
flujo. Este procesador permite recuperar un archivo desde una carpeta.
Propiedades de GetFile
Antes de cambiar al siguiente procesador, en la pestaña 'Settings', completamos el campo
'Name' con el valor 'ObtenerFichero'.
IMPORTANTE!
Si nos olvidamos de autoterminar las relaciones, o tenemos conexiones sin conectar, no
podremos iniciar los procesadores implicados. Esto lo tenemos que realizar para todos los
procesadores que tengamos en nuestro flujo de datos.
NiFi agrega automáticamente la propiedad 'filename' a todos los Flowfiles (FF). Esta
propiedad se puede consultar utilizando el Lenguaje de Expresiones de NiFi (NiFi EL) y, a
través del procesador 'UpdateAttribute', podemos modificar su valor.
Hemos optado por agregar un prefijo al nombre del archivo que consiste en la fecha del
sistema en formato de milisegundos, lo que nos dará nombres de archivo similares a
'1637151536113-fichero.txt'. Para lograr esto, agregamos un nuevo atributo llamado
'filename' haciendo clic en el icono '+' que aparece en la esquina superior derecha. Luego,
en su valor, utilizamos la expresión ${now():toNumber()}-${filename}:
Añadimos el atributo filename
Ejemplo 2 - Atributos
Generando contenido
1. Agregar un procesador del tipo GenerateFlowFile. Este procesador crea FF con
datos aleatorios o contenido personalizado, lo que resulta muy útil para probar y
depurar flujos de datos. En las opciones del procesador vamos a la pestaña de
propiedades y completamos los campos:
● Flow Size: 10 bytes
● Batch Size: 1 para que nos genere un FF por cada ejecución
● Data Format: Text
● Unique Flowfiles: true e indicamos que los FF van a ser únicos.
En la pestaña Scheduling de este procesador vamos a indicar que se ejecute cada 3
segundos (en el campo Run Schedule le ponemos como valor ‘3s’).
2. Una vez listo el generador, vamos a añadir el procesador ReplaceText con el que
cambiaremos el texto. Tras ello, conectamos ambos procesadores.
3. Como podemos ver, a la izquierda del nombre del procesador, hay un icono de
advertencia que nos indica que debemos configurar el nuevo procesador, y también
señala que ambas relaciones no están conectadas o que falta completarlas.
4. Ahora agregamos un procesador del tipo LogAttribute para mostrar los atributos del
Flujos de Archivos (FF) en el registro, y conectamos el procesador anterior
(ReplaceText) a este mediante la relación 'success'.
Añadiendo un atributo
Vamos a extraer el contenido del FF a un atributo mediante el procesador ExtractText.
7. En las propiedades, creamos una nueva propiedad (haciendo clic en el botón '+' en
la esquina superior derecha) que llamaremos 'contenido'. En su valor, ingresamos la
expresión '.*', que indica que deseamos que coincida con todo.
8. Una vez creado, colocamos este procesador entre los dos anteriores,
específicamente en la conexión que tiene la etiqueta 'matched', que es cuando ha
coincidido con la expresión regular. En la conexión 'unmatched', marcamos la opción
para que se autotermine. Verificamos que no tengamos ninguna advertencia en
ningún procesador.
Persistiendo FF en HDFS
Ejemplo 3 - Filtrado de datos
Para este ejemplo utilizaremos el csv de ventas sales.csv, el cual tiene la siguiente
estructura:
ProductID;Date;Zip;Units;Revenue;Country
725;1/15/1999;41540;1;115.5;Germany
787;6/6/2002;41540;1;314.9;Germany
788;6/6/2002;41540 ;1;314.9;Germany
Usando Apache NiFi, vamos a generar un nuevo archivo CSV que incluirá exclusivamente
los registros correspondientes a ventas realizadas en Francia.
Este proceso implicará la lectura del archivo fuente utilizando el procesador GetFile, la
descomposición de cada fila en un FF mediante el procesador SplitRecord, la aplicación de
un filtro a los datos mediante el procesador QueryRecord y, finalmente, el almacenamiento
de los resultados en disco mediante el procesador PutFile.
Lectura y división
1. Utilizar el procesador GetFile para leer el archivo fuente. Configurar la opción "Keep
Source File" en "true" para que el archivo fuente no se elimine después de la lectura.
Esto garantizará que el archivo original permanezca intacto. Y en la opción “File
Filter” ponemos ‘.*\.csv’
Configuración GetFile
2. A través del procesador SplitRecord, procederemos a dividir cada fila del archivo
CSV en FF. Para lograrlo, es necesario configurar tanto un RecordReader como un
RecordWriter para que NiFi pueda interactuar adecuadamente con el formato CSV.
NiFi ya proporciona varias implementaciones predefinidas que podemos emplear
con este propósito.
● En el Record Reader, seleccionamos Create new service, y elegimos
CVSReader.
● A su vez, en el Record Writer elegimos CVSRecordSetWriter.
Para configurar estos servicios:
a. Hacer clic en la flecha junto a cada servicio de controlador en la pantalla de
configuración.
b. Dentro de la configuración de cada servicio, verifica y ajusta los valores
según sea necesario. Configurar lo siguiente para CSVReader y
CSVRecordSetWriter:
● Establecer el separador de campos en ";" en el campo "Value
Separator" para ambos, ya que estás utilizando ";" como separador de
campos en tu archivo CSV.
● Marcar la casilla "Treat First Line as Header" como "true" si el archivo
CSV contiene una primera fila con encabezados.
c. Haz clic en el icono de rayo junto a cada servicio para activarlos. Esto
asegurará que estén listos para su uso en tus procesadores.
3. Finalmente, en el campo Records per Split le indicamos 1 para que coloque cada fila
en un FF.
Filtrado de FF
4. Mediante el procesador QueryRecord para ejecutar una consulta SQL contra los FF.
El resultado del nuevo FF será el resultado de la consulta, en este caso, las ventas
de más de una unidad realizadas en Francia.
Configurar el Record Reader y el Record Writer:
● Usa las mismas configuraciones de Record Reader y Record Writer que
seleccionaste previamente para trabajar con archivos CSV.
Configura la propiedad "Include Zero Record FlowFiles":
● Establecer la propiedad "Include Zero Record FlowFiles" en "false". Esto
evitará que los FF que no cumplan con la consulta se enrutan hacia el flujo.
Agregar la propiedad de consulta:
● Agrega una nueva propiedad llamada "FranciaMayor1".
● En el campo de contenido, colocar la consulta SQL que se desea ejecutar.
En este caso, la consulta sería algo como:
select * from Flowfile where Country = 'France' and Units > 1
5. Finalmente, igual que en el ejemplo 1, vamos a cambiarle el nombre a cada FF para
generar un archivo por cada resultado mediante UpdateAttribute (utilizar el UUID
como nombre) y persistimos los datos con PutFile.
Flujo completo - ejemplo 3
Grupos
En NiFi, existe un solo canvas de nivel superior, pero tiene la flexibilidad de que permite
construir y organizar múltiples flujos lógicos utilizando grupos de procesos. Cada grupo de
procesos representa un flujo lógico independiente en el canvas de nivel superior. Estos
flujos lógicos no necesariamente están conectados entre sí, lo que te permite estructurar y
organizar tu flujo de datos de manera modular y clara.
Esta característica es útil para administrar flujos de datos complejos donde diferentes partes
del flujo pueden ser independientes o tener diferentes funciones. Los grupos de procesos
ayudan a mantener la organización y facilitan la gestión de flujos lógicos separados en un
único entorno de NiFi.
Dentro de los grupos de procesos en NiFi, se utiliza Input Ports (puertos de entrada) y
Output Ports (puertos de salida) para controlar cómo los datos entran y salen del grupo.
Estos puertos son esenciales para definir la conectividad entre los grupos y para asegurarse
de que los flujos de datos se enruten de manera adecuada a través de la arquitectura de
NiFi.
Creando un grupo
Vamos a trabajar con un ejemplo básico que implica la lectura de un archivo de texto, la
división de su contenido en fragmentos y la visualización de algunos de sus atributos en el
registro.
Para probar esta configuración, se puede copiar cualquier archivo, como el README o el
NOTICE, en la carpeta de entrada que se ha indicado y luego verificar los registros para ver
la salida.
Para hacer esto, simplemente arrastrar el ícono de "Process Group" desde la barra superior
y asignarle un nombre, por ejemplo, "Split". Luego, coloca el procesador SplitText dentro de
este grupo. Esto permitirá organizar y modularizar tus flujos de datos de manera más
eficiente.
Puertos
Después de crear el grupo "Split", procedemos de la siguiente manera:
● Copiamos el procesador SplitText.
● Damos doble clic en el grupo para acceder a su interior y navegar un nivel más
profundo en el lienzo.
● Una vez dentro del grupo, pegamos el procesador SplitText copiado.
● Creamos un puerto de entrada llamado "entrada" y un puerto de salida llamado
"salida" dentro de este grupo.
● Establecemos conexiones desde estos puertos para conectarlos al procesador
SplitText, utilizando las mismas conexiones que estaban previamente configuradas.
De esta manera, hemos encapsulado el procesador SplitText dentro del grupo "Grupo" y
hemos habilitado puertos de entrada y salida para facilitar la interacción con el flujo de datos
dentro de este grupo. Esto mejora la organización y modularización de tu flujo de datos en
NiFi.
Salimos del grupo y conectamos los procesadores del nivel superior al grupo que hemos
creado. Luego, verificamos que el flujo de datos continúa funcionando correctamente.
Sustituimos el procesador por el grupo creado
Funnels
Los funnels son un tipo de componente que posibilita el trabajo en paralelo y posteriormente
la unión de múltiples flujos en uno solo, además de permitir la definición centralizada de su
prioridad.
Para ilustrar este concepto, vamos a utilizar varios procesadores GenerateFlowFile (en este
caso, utilizaremos 4) para generar datos que luego mostraremos a través de LogAttribute.
Creando plantillas
Si partimos del ejemplo anterior, podemos seleccionar todos los elementos que deseamos
incluir en la plantilla mediante la combinación de teclas "Shift" y arrastrando el ratón.
Una vez que hayamos seleccionado los componentes deseados, podemos utilizar el botón
derecho del ratón o acceder al menú "Operate" y luego seleccionar "Create Template".
Cargando plantillas
Desde el menú "Operate," buscar el icono que representa una plantilla junto a una flecha
hacia arriba. Esto permite seleccionar un archivo .xml que contiene la definición de la
plantilla.
Una vez que se haya cargado la plantilla, utilizar los controles en la parte superior del menú
para arrastrarla al área de trabajo.
Una vez descargado el archivo XML, puedes importarlo en NiFi, y luego arrastrar el
componente correspondiente a tu área de trabajo para utilizarlo.
Ejemplo 4 - Trabajando con conjuntos de registros
Utilizando los FlowFiles como registros y los procesadores del tipo Record (que ya hemos
empleado en el ejemplo 3, donde realizamos filtrado mediante una sentencia SQL),
podemos trabajar con los datos como un conjunto de registros en lugar de tratarlos de
manera individual.
Estos procesadores simplifican la construcción de flujos de datos, ya que nos permiten crear
procesadores que aceptan cualquier formato de datos sin la necesidad de lidiar con la
complejidad del análisis y la lógica de serialización. Otra ventaja significativa de este
enfoque es que podemos mantener los FlowFiles más grandes, cada uno de los cuales
contiene múltiples registros, lo que resulta en un mejor rendimiento.
Convirtiendo formatos
Vamos a convertir el archivo CSV del ejemplo 3, que contiene información sobre ventas a
formato JSON.
Podríamos optar por una configuración más simple que consta de un procesador GetFile
conectado a un ConvertRecord, y este a su vez conectado a un PutFile. Para asegurarnos
de que el archivo generado tenga una extensión que coincida con el formato al que lo
estamos convirtiendo, podemos agregar un procesador UpdateAttribute para modificar el
nombre del archivo antes de la serialización de los datos.
Configuración de ConvertRecord
Para el CSVReader, configurar el separador de campos con el ‘;’ e indicar que la primera fila
contiene un encabezado. Para el JSONRecordSetWriter no configurar nada.
Renombrando el destino
Necesitamos renombrar el fichero de salida. Para ello, necesitamos hacer uso del
procesador UpdateAttribute y utilizar el Nifi Expression Language para modificar la
propiedad filename y recortar la extensión y concatenar con la nueva mediante la expresión
${filename:substringBefore('.csv')}.json:
Modificando la extensión de filename
Tarea
1. Realizar el ejemplo 1, utilizando un archivo cuyo nombre sea número de estudiante
(ej: 254876.txt), adjuntar capturas de la cola y su contenido al leer el archivo antes
de moverlo a la carpeta destino.