Practico 2 - Nifi

Descargar como pdf o txt
Descargar como pdf o txt
Está en la página 1de 25

Práctico 2 - NIFI

Acceso e Interfaz 2
Componentes 3
Flowfile 3
Processors 3
Connectors 3
Ejemplo 1 - Moviendo Datos 4
Gestionando los errores 7
Ejemplo 2 - Atributos 11
Generando contenido 11
Añadiendo un atributo 12
Linaje de los datos 12
Guardando los datos en HDFS 13
Ejemplo 3 - Filtrado de datos 14
Lectura y división 14
Filtrado de FF 15
Grupos 17
Creando un grupo 17
Puertos 18
Funnels 20
Plantillas 21
Creando plantillas 21
Cargando plantillas 21
Ejemplo 4 - Trabajando con conjuntos de registros 22
Convirtiendo formatos 22
Renombrando el destino 23
Tarea 25
Acceso e Interfaz
IMPORTANTE! Iniciar el ambiente creado en Azure y recordar detenerlo al terminar de
trabajar.

Introducir en el navegador la URL http://nifi.bigdata.ort.edu.uy/nifi/ y veremos un interfaz


similar a la siguiente imagen:

Página de inicio en Nifi


Hay que destacar cuatro zonas:
● Menú superior: con los iconos grandes (procesadores, puertos de entrada y salida,
etc...).
● Barra de debajo con iconos: indican el estado de la ejecución (hilos, procesadores
en marcha, detenidos, etc..).
● Cuadro Navigate para hacer zoom.
● Cuadro Operate con las opciones del flujo de trabajo o del recurso seleccionado.
● Zona de trabajo drag&drop (canvas).
Componentes

Flowfile
Básicamente es el dato, el cual se persiste en disco tras su creación. En realidad es un
puntero al dato en su almacenamiento local, de esta manera se acelera su rendimiento. El
Flowfile se puede decir que se compone de dos partes:
● Contenido: el dato en sí.
● Atributos: metadatos en pares de clave/valor.

Processors
Encargado de ejecutar alguna transformación o regla sobre los datos o el flujo para generar
un nuevo Flowfile. La salida de un Processor es un Flowfile que será la entrada de otro
processor. Así pues, para implementar un flujo de datos en NiFi, crearemos una secuencia
de processors que reproduzcan las acciones y transformaciones que queremos realizar
sobre los datos.
Documentación oficial (https://nifi.apache.org/docs.html). Un ejemplo de los processors más
utilizados:

● Transformación de datos: ReplaceText, JoltTransformJSON, CompressContent.


● Enrutado y mediación: RouteOnAttribute, RouteOnContent
● Acceso a base de datos: ExecuteSQL, ConvertJSONToSQL, PutSQL
● Extracción de atributos: EvaluateJsonPath, ExtractText, UpdateAttribute
● Interacción con el sistema: ExecuteProcess
● Ingestión de datos: GetFile, GetFTP, GetHTTP, GetHDFS
● Envío de datos: PutEmail, PutFile, PutFTP, PutKafka, PutMongo, PutHDFS
● División y agregación: SplitText, SplitJson, SplitXml, MergeContent
● HTTP: GetHTTP, ListenHTTP, PostHTTP
● AWS: FetchS3Object, PutS3Object, PutSNS, PutSQS

Connectors
Es una cola dirigida (con un origen y un destino que determinan un sentido) que une
diferentes procesadores y contiene los FF (FlowFiles) que todavía no se han ejecutado,
pudiendo definir diferentes prioridades (por ejemplo, FIFO o LIFO según necesitemos).

Los conectores van a unir la salida de un procesador con la entrada de otro (o un


procesador consigo mismo, por ejemplo, para realizar reintentos sobre una operación).
Ejemplo 1 - Moviendo Datos
Realicemos una práctica en NiFi para adentrarnos en su entorno, creando un flujo de datos
básico que traslade un archivo desde una ubicación a otra.
Pasos:
1. Seleccionamos un procesador (el primer icono grande) y lo arrastramos a nuestra
área de trabajo. Esto abrirá un cuadro de diálogo con tres secciones distintas:

Diálogo de elección de procesador

● En el lado izquierdo, encontramos una nube de etiquetas que nos permite filtrar los
procesadores.
● En la parte superior derecha, disponemos de un campo de búsqueda para encontrar
procesadores por su nombre.
● En la sección central, se muestra el listado de procesadores desde donde podemos
seleccionarlos.
Por lo tanto, utilizamos esta interfaz para buscar el procesador 'GetFile' y lo añadimos al
flujo. Este procesador permite recuperar un archivo desde una carpeta.

2. Damos doble clic en el elemento gráfico que representa nuestro procesador y, en la


pestaña de propiedades (Properties), especificamos el directorio de entrada desde el
cual el procesador deberá recoger el archivo. Esto lo hacemos a través de la
propiedad 'Input Directory'. En nuestro caso, le asignaremos el valor
'/home/ort/Documentos/in':

Propiedades de GetFile
Antes de cambiar al siguiente procesador, en la pestaña 'Settings', completamos el campo
'Name' con el valor 'ObtenerFichero'.

3. Luego, añadimos un nuevo procesador de tipo 'PutFile', y en las propiedades


establecemos el directorio de salida utilizando la propiedad 'directory' con el valor
'/home/ort/Documentos/out'

4. Una buena práctica es asignar nombres a los procesadores. Si observamos la


pestaña 'Settings', en el campo 'Name', ingresamos 'PonerFichero'.

5. En la pestaña ‘Relationships’, podemos configurar el comportamiento a seguir si el


procesador se ejecuta correctamente ('success') o si falla ('failure'). Dado que vamos
a hacer que este procesador sea el paso final, configuraremos ambas opciones para
que se autoterminen, marcando ambas casillas:
Finalización PutFile

IMPORTANTE!
Si nos olvidamos de autoterminar las relaciones, o tenemos conexiones sin conectar, no
podremos iniciar los procesadores implicados. Esto lo tenemos que realizar para todos los
procesadores que tengamos en nuestro flujo de datos.

6. Conectamos ambos procesadores mediante la creación de una conexión. Para


hacerlo, seleccionamos el primer procesador y, después de hacer clic en el icono de
la flecha que aparece al mantener el ratón sobre él, lo arrastramos hasta el segundo
procesador.
Conexión mediante un conector entre procesadores

7. Antes de iniciar el primer procesador, creamos un pequeño archivo en el directorio


que hemos especificado como entrada:

echo "Hola Mundo!" > hola_mundo.txt

8. Arrancamos el procesador haciendo clic con el botón derecho y seleccionando la


opción 'Start'. Comprobamos que el archivo ya no está en la carpeta 'in', pero
aparece en la cola ('Queued 1'). También podemos verificar que no está en la
carpeta 'out'

9. Finalmente, iniciamos el procesador 'Poner Fichero', y observaremos cómo la cola


se vacía y el archivo aparece en la carpeta 'out'.

Gestionando los errores


¿Qué ocurre si leemos un archivo con el mismo nombre dos veces? De acuerdo con la
configuración de nuestro flujo, solo se conservará la primera copia.

Si accedemos a la pestaña 'Properties' del procesador 'PonerFichero', podemos modificar


este comportamiento en la propiedad 'Conflict Resolution Strategy' a 'replace'. De esta
manera, se conservará la última copia del archivo en caso de conflictos con nombres de
archivo duplicados.
Propiedades de PutFile - gestión de conflictos

De hecho, en lugar de decidir si se ignora o se sobrescribe, lo más apropiado sería definir


un nuevo flujo que dependa del estado de finalización del procesador. De esta manera,
podríamos almacenar todos los archivos que llegan con el mismo nombre para su posterior
análisis.

Por lo tanto, vamos a eliminar la autoterminación que habíamos configurado previamente en


el procesador 'PonerFichero'. En su lugar, configuraremos el flujo para que, en caso de
error, redirija los archivos a un nuevo procesador 'PutFile' que los coloque en una carpeta
diferente (por ejemplo, '/home/ort/Documentos/conflictos').

Flujo failure para los ficheros repetidos


Si bien ahora tenemos un mecanismo para almacenar los archivos que tienen el mismo
nombre, aún solo se guardará una copia (seguimos enfrentando el mismo problema que
antes, pero esta vez solo con archivos duplicados).

Por lo tanto, necesitamos renombrar los archivos antes de colocarlos en la carpeta de


conflictos para mantener un historial. Para lograr esto, debemos introducir un procesador
adicional que cambie el nombre de los archivos antes de almacenarlos.

NiFi agrega automáticamente la propiedad 'filename' a todos los Flowfiles (FF). Esta
propiedad se puede consultar utilizando el Lenguaje de Expresiones de NiFi (NiFi EL) y, a
través del procesador 'UpdateAttribute', podemos modificar su valor.

Vamos a colocar el procesador 'UpdateAttribute' antes de colocar los archivos en la carpeta


de conflictos:

Añadimos el procesador UpdateAttribute

Hemos optado por agregar un prefijo al nombre del archivo que consiste en la fecha del
sistema en formato de milisegundos, lo que nos dará nombres de archivo similares a
'1637151536113-fichero.txt'. Para lograr esto, agregamos un nuevo atributo llamado
'filename' haciendo clic en el icono '+' que aparece en la esquina superior derecha. Luego,
en su valor, utilizamos la expresión ${now():toNumber()}-${filename}:
Añadimos el atributo filename
Ejemplo 2 - Atributos

Generando contenido
1. Agregar un procesador del tipo GenerateFlowFile. Este procesador crea FF con
datos aleatorios o contenido personalizado, lo que resulta muy útil para probar y
depurar flujos de datos. En las opciones del procesador vamos a la pestaña de
propiedades y completamos los campos:
● Flow Size: 10 bytes
● Batch Size: 1 para que nos genere un FF por cada ejecución
● Data Format: Text
● Unique Flowfiles: true e indicamos que los FF van a ser únicos.
En la pestaña Scheduling de este procesador vamos a indicar que se ejecute cada 3
segundos (en el campo Run Schedule le ponemos como valor ‘3s’).

2. Una vez listo el generador, vamos a añadir el procesador ReplaceText con el que
cambiaremos el texto. Tras ello, conectamos ambos procesadores.

Conexión con ReplaceText

3. Como podemos ver, a la izquierda del nombre del procesador, hay un icono de
advertencia que nos indica que debemos configurar el nuevo procesador, y también
señala que ambas relaciones no están conectadas o que falta completarlas.

Para solucionar esto, configuramos la estrategia de reemplazo para que siempre


reemplace el contenido (en el campo 'Replacement Strategy' seleccionamos 'Always
Replace'), y al hacerlo, el campo 'Search Value' se desactiva. Además, en
'Replacement Value', indicamos simplemente 'prueba'. Finalmente, marcamos la
opción para que la conexión 'failure' se autotermine.

4. Ahora agregamos un procesador del tipo LogAttribute para mostrar los atributos del
Flujos de Archivos (FF) en el registro, y conectamos el procesador anterior
(ReplaceText) a este mediante la relación 'success'.

5. Arrancamos el primer procesador y revisamos la cola para verificar lo que ha


generado. Para hacerlo, seleccionamos la cola y elegimos la opción 'List queue' para
ver su contenido. Luego, seleccionamos un elemento en la cola y hacemos clic en el
icono del ojo para visualizar su contenido, y comprobamos que ha generado datos
aleatorios.

6. Si ejecutamos el siguiente procesador, notaremos que extrae el Flujo de Archivos


(FF) de la cola anterior y aparecerá en la cola siguiente. Si comprobamos su valor,
veremos que el valor original ha sido reemplazado por 'prueba'.

Añadiendo un atributo
Vamos a extraer el contenido del FF a un atributo mediante el procesador ExtractText.

7. En las propiedades, creamos una nueva propiedad (haciendo clic en el botón '+' en
la esquina superior derecha) que llamaremos 'contenido'. En su valor, ingresamos la
expresión '.*', que indica que deseamos que coincida con todo.

8. Una vez creado, colocamos este procesador entre los dos anteriores,
específicamente en la conexión que tiene la etiqueta 'matched', que es cuando ha
coincidido con la expresión regular. En la conexión 'unmatched', marcamos la opción
para que se autotermine. Verificamos que no tengamos ninguna advertencia en
ningún procesador.

Flujo completo - ejemplo 2

9. Por último, ejecutamos todos los procesadores y verificamos cómo en el registro


aparece el nuevo atributo creado. También podemos acceder a la cola y, en la parte
izquierda de cada flujo, hacer clic en el icono de la 'i' para comprobar la pestaña
'Attributes'.

Linaje de los datos


Para verificar los datos finales, es muy útil utilizar la opción de Data Provenance, que nos
proporciona un linaje de los datos.
Para hacerlo, sobre el procesador final, hacemos clic con el botón derecho y seleccionamos
la opción 'View Data Provenance'. Si seleccionamos uno de los flujos, a la derecha de cada
flujo, podemos hacer clic en el primer icono para ver un gráfico y un control deslizante que
modifica el gráfico en función del tiempo (en cada uno de los pasos, podemos hacer doble
clic para ver la información y el contenido del Flujos de Archivos en ese momento exacto).

Guardando los datos en HDFS


Para almacenar los datos en HDFS, necesitamos utilizar el procesador 'PutHDFS' y
configurar las propiedades correspondientes:
● Hadoop configuration resources: con la ruta de los archivos de configuración de
Hadoop, en nuestro caso: /home/ort/hadoop/etc/hadoop/hdfs-site.xml,
/home/ort/hadoop/etc/hadoop/core-site.xml
● Directory: con la carpeta de HDFS donde queremos almacenar los datos, por
ejemplo, /user/ort/nifi_out
Por lo tanto, agregaremos este procesador, nuevamente conectado con la relación
'matched' después de la extracción del texto. Esto permitirá que, además de escribir en el
registro, persistamos todo el contenido en HDFS.

Persistiendo FF en HDFS
Ejemplo 3 - Filtrado de datos
Para este ejemplo utilizaremos el csv de ventas sales.csv, el cual tiene la siguiente
estructura:

ProductID;Date;Zip;Units;Revenue;Country
725;1/15/1999;41540;1;115.5;Germany
787;6/6/2002;41540;1;314.9;Germany
788;6/6/2002;41540 ;1;314.9;Germany

Usando Apache NiFi, vamos a generar un nuevo archivo CSV que incluirá exclusivamente
los registros correspondientes a ventas realizadas en Francia.

Este proceso implicará la lectura del archivo fuente utilizando el procesador GetFile, la
descomposición de cada fila en un FF mediante el procesador SplitRecord, la aplicación de
un filtro a los datos mediante el procesador QueryRecord y, finalmente, el almacenamiento
de los resultados en disco mediante el procesador PutFile.

Lectura y división
1. Utilizar el procesador GetFile para leer el archivo fuente. Configurar la opción "Keep
Source File" en "true" para que el archivo fuente no se elimine después de la lectura.
Esto garantizará que el archivo original permanezca intacto. Y en la opción “File
Filter” ponemos ‘.*\.csv’

Configuración GetFile
2. A través del procesador SplitRecord, procederemos a dividir cada fila del archivo
CSV en FF. Para lograrlo, es necesario configurar tanto un RecordReader como un
RecordWriter para que NiFi pueda interactuar adecuadamente con el formato CSV.
NiFi ya proporciona varias implementaciones predefinidas que podemos emplear
con este propósito.
● En el Record Reader, seleccionamos Create new service, y elegimos
CVSReader.
● A su vez, en el Record Writer elegimos CVSRecordSetWriter.
Para configurar estos servicios:
a. Hacer clic en la flecha junto a cada servicio de controlador en la pantalla de
configuración.
b. Dentro de la configuración de cada servicio, verifica y ajusta los valores
según sea necesario. Configurar lo siguiente para CSVReader y
CSVRecordSetWriter:
● Establecer el separador de campos en ";" en el campo "Value
Separator" para ambos, ya que estás utilizando ";" como separador de
campos en tu archivo CSV.
● Marcar la casilla "Treat First Line as Header" como "true" si el archivo
CSV contiene una primera fila con encabezados.
c. Haz clic en el icono de rayo junto a cada servicio para activarlos. Esto
asegurará que estén listos para su uso en tus procesadores.
3. Finalmente, en el campo Records per Split le indicamos 1 para que coloque cada fila
en un FF.

Filtrado de FF
4. Mediante el procesador QueryRecord para ejecutar una consulta SQL contra los FF.
El resultado del nuevo FF será el resultado de la consulta, en este caso, las ventas
de más de una unidad realizadas en Francia.
Configurar el Record Reader y el Record Writer:
● Usa las mismas configuraciones de Record Reader y Record Writer que
seleccionaste previamente para trabajar con archivos CSV.
Configura la propiedad "Include Zero Record FlowFiles":
● Establecer la propiedad "Include Zero Record FlowFiles" en "false". Esto
evitará que los FF que no cumplan con la consulta se enrutan hacia el flujo.
Agregar la propiedad de consulta:
● Agrega una nueva propiedad llamada "FranciaMayor1".
● En el campo de contenido, colocar la consulta SQL que se desea ejecutar.
En este caso, la consulta sería algo como:
select * from Flowfile where Country = 'France' and Units > 1
5. Finalmente, igual que en el ejemplo 1, vamos a cambiarle el nombre a cada FF para
generar un archivo por cada resultado mediante UpdateAttribute (utilizar el UUID
como nombre) y persistimos los datos con PutFile.
Flujo completo - ejemplo 3
Grupos
En NiFi, existe un solo canvas de nivel superior, pero tiene la flexibilidad de que permite
construir y organizar múltiples flujos lógicos utilizando grupos de procesos. Cada grupo de
procesos representa un flujo lógico independiente en el canvas de nivel superior. Estos
flujos lógicos no necesariamente están conectados entre sí, lo que te permite estructurar y
organizar tu flujo de datos de manera modular y clara.

Esta característica es útil para administrar flujos de datos complejos donde diferentes partes
del flujo pueden ser independientes o tener diferentes funciones. Los grupos de procesos
ayudan a mantener la organización y facilitan la gestión de flujos lógicos separados en un
único entorno de NiFi.

Dentro de los grupos de procesos en NiFi, se utiliza Input Ports (puertos de entrada) y
Output Ports (puertos de salida) para controlar cómo los datos entran y salen del grupo.
Estos puertos son esenciales para definir la conectividad entre los grupos y para asegurarse
de que los flujos de datos se enruten de manera adecuada a través de la arquitectura de
NiFi.

En NiFi puedes identificar en qué nivel te encuentras en cualquier momento mediante la


notación que se muestra en la parte inferior izquierda de la interfaz gráfica. Esta notación
tiene un formato similar a:
Nifi Flow >> Subnivel >> Otro Nivel

Creando un grupo
Vamos a trabajar con un ejemplo básico que implica la lectura de un archivo de texto, la
división de su contenido en fragmentos y la visualización de algunos de sus atributos en el
registro.

Para lograr esto, conectaremos un procesador GetFile a un procesador SplitText y,


finalmente, a un procesador LogAttribute. El procesador SplitText nos permitirá dividir
cualquier flujo de texto en fragmentos, según el número de líneas deseado, y brinda
opciones para manejar encabezados, omitir líneas en blanco, entre otras configuraciones.
Dividimos un archivo en fragmentos

Para probar esta configuración, se puede copiar cualquier archivo, como el README o el
NOTICE, en la carpeta de entrada que se ha indicado y luego verificar los registros para ver
la salida.

Con la configuración funcionando, se puede encapsular el procesador SplitText dentro de un


grupo. Un grupo en NiFi encapsula la lógica de los procesadores y los trata como una
entidad única, similar a una caja negra.

Para hacer esto, simplemente arrastrar el ícono de "Process Group" desde la barra superior
y asignarle un nombre, por ejemplo, "Split". Luego, coloca el procesador SplitText dentro de
este grupo. Esto permitirá organizar y modularizar tus flujos de datos de manera más
eficiente.

Puertos
Después de crear el grupo "Split", procedemos de la siguiente manera:
● Copiamos el procesador SplitText.
● Damos doble clic en el grupo para acceder a su interior y navegar un nivel más
profundo en el lienzo.
● Una vez dentro del grupo, pegamos el procesador SplitText copiado.
● Creamos un puerto de entrada llamado "entrada" y un puerto de salida llamado
"salida" dentro de este grupo.
● Establecemos conexiones desde estos puertos para conectarlos al procesador
SplitText, utilizando las mismas conexiones que estaban previamente configuradas.

De esta manera, hemos encapsulado el procesador SplitText dentro del grupo "Grupo" y
hemos habilitado puertos de entrada y salida para facilitar la interacción con el flujo de datos
dentro de este grupo. Esto mejora la organización y modularización de tu flujo de datos en
NiFi.

Salimos del grupo y conectamos los procesadores del nivel superior al grupo que hemos
creado. Luego, verificamos que el flujo de datos continúa funcionando correctamente.
Sustituimos el procesador por el grupo creado
Funnels
Los funnels son un tipo de componente que posibilita el trabajo en paralelo y posteriormente
la unión de múltiples flujos en uno solo, además de permitir la definición centralizada de su
prioridad.

Para ilustrar este concepto, vamos a utilizar varios procesadores GenerateFlowFile (en este
caso, utilizaremos 4) para generar datos que luego mostraremos a través de LogAttribute.

Varios procesadores que apuntan a uno


Si deseamos reemplazar el procesador LogAttribute por otro tipo de procesador,
normalmente tendríamos que eliminar todas las conexiones y volver a conectar todo desde
cero. Sin embargo, para evitar este proceso tedioso, podemos agregar un Funnel que
actuará como un punto centralizador para todas las conexiones.

El Funnel agrupa las conexiones


Plantillas
NiFi ofrece la capacidad de trabajar con plantillas para facilitar la reutilización de flujos de
datos, así como la importación y exportación de estas plantillas.

Creando plantillas
Si partimos del ejemplo anterior, podemos seleccionar todos los elementos que deseamos
incluir en la plantilla mediante la combinación de teclas "Shift" y arrastrando el ratón.

Una vez que hayamos seleccionado los componentes deseados, podemos utilizar el botón
derecho del ratón o acceder al menú "Operate" y luego seleccionar "Create Template".

Si deseamos descargar una plantilla para utilizarla en otra instalación:


● En la esquina superior derecha, seleccionamos la opción "Templates" desde el
menú.
● En la lista de plantillas que tenemos cargadas, para cada una de ellas, disponemos
de la opción de descargarla o eliminarla.

Cargando plantillas
Desde el menú "Operate," buscar el icono que representa una plantilla junto a una flecha
hacia arriba. Esto permite seleccionar un archivo .xml que contiene la definición de la
plantilla.

Una vez que se haya cargado la plantilla, utilizar los controles en la parte superior del menú
para arrastrarla al área de trabajo.

Una de las ventajas más significativas de NiFi es la capacidad de utilizar plantillas


preexistentes. Existe una colección de plantillas en
https://github.com/hortonworks-gallery/nifi-templates.

En este caso, vamos a utilizar la plantilla de "CSV-to-JSON," la cual puedes descargar


desde
https://raw.githubusercontent.com/hortonworks-gallery/nifi-templates/master/templates/csv-t
o-json-flow.xml.

Una vez descargado el archivo XML, puedes importarlo en NiFi, y luego arrastrar el
componente correspondiente a tu área de trabajo para utilizarlo.
Ejemplo 4 - Trabajando con conjuntos de registros
Utilizando los FlowFiles como registros y los procesadores del tipo Record (que ya hemos
empleado en el ejemplo 3, donde realizamos filtrado mediante una sentencia SQL),
podemos trabajar con los datos como un conjunto de registros en lugar de tratarlos de
manera individual.

Estos procesadores simplifican la construcción de flujos de datos, ya que nos permiten crear
procesadores que aceptan cualquier formato de datos sin la necesidad de lidiar con la
complejidad del análisis y la lógica de serialización. Otra ventaja significativa de este
enfoque es que podemos mantener los FlowFiles más grandes, cada uno de los cuales
contiene múltiples registros, lo que resulta en un mejor rendimiento.

Existen tres componentes a resaltar:


● De lectura: AvroReader, CsvReader, ParquetReader, JsonPathReader,
JsonTreeReader, ScriptedReader, etc.
● De escritura: AvroRecordSetWriter, CsvRecordSetWriter, JsonRecordSetWriter,
FreeFormTextRecordSetWriter, ScriptedRecordSetWriter, etc.
● Procesador de registros:
○ ConvertRecord: convierte entre formatos y/o esquemas similares. Por
ejemplo, la conversión de CSV a Avro se puede realizar configurando
ConvertRecord con un CsvReader y un AvroRecordSetWriter.
○ LookupRecord: extrae uno o más campos de un registro y busca un valor
para esos campos en un LookupService (ya sea a un fichero CSV, XML,
accediendo a una base de datos o un servicio REST, etc...). Estos servicios
funcionan como un mapa, de manera que reciben la clave y el servicio
devuelve el valor.
○ QueryRecord: ejecuta una declaración SQL contra los registros y escribe los
resultados en el contenido del archivo de flujo.
○ ConsumeKafkaRecord_N_M y PublishKafkaRecord_N_M

Convirtiendo formatos
Vamos a convertir el archivo CSV del ejemplo 3, que contiene información sobre ventas a
formato JSON.

Podríamos optar por una configuración más simple que consta de un procesador GetFile
conectado a un ConvertRecord, y este a su vez conectado a un PutFile. Para asegurarnos
de que el archivo generado tenga una extensión que coincida con el formato al que lo
estamos convirtiendo, podemos agregar un procesador UpdateAttribute para modificar el
nombre del archivo antes de la serialización de los datos.

El flujo de datos resultante será similar a:


Conversión de formato mediante ConvertRecord

En el caso del ConvertRecord, hemos utilizado los siguientes elementos:

Configuración de ConvertRecord

Para el CSVReader, configurar el separador de campos con el ‘;’ e indicar que la primera fila
contiene un encabezado. Para el JSONRecordSetWriter no configurar nada.

Renombrando el destino
Necesitamos renombrar el fichero de salida. Para ello, necesitamos hacer uso del
procesador UpdateAttribute y utilizar el Nifi Expression Language para modificar la
propiedad filename y recortar la extensión y concatenar con la nueva mediante la expresión
${filename:substringBefore('.csv')}.json:
Modificando la extensión de filename
Tarea
1. Realizar el ejemplo 1, utilizando un archivo cuyo nombre sea número de estudiante
(ej: 254876.txt), adjuntar capturas de la cola y su contenido al leer el archivo antes
de moverlo a la carpeta destino.

2. Realizar el ejemplo 2, pero en vez de llamar al atributo ‘contenido’, ponerle ‘datos’ y


almacenar el resultado en HDFS en la ruta /user/ort/nifi/num_estudiante/ejemplo2.

3. Realizar el ejemplo 3, y comparar el resultado de ejecutarlo al hacer que el


SplitRecord contenga 10000 filas para cada FF.

4. Realizar el ejemplo 4, colocando los procesadores ConvertRecord y UpdateAttribute


dentro de un grupo de procesadores, con el nombre ejemplo5_num_estudiante
(ejemplo5_254876). Exportar todo el ejercicio como una plantilla, y adjuntarla a la
entrega.

También podría gustarte