Dagon : Executing Direct Acyclic Graphs As Parallel Jobs On Anything

Download as pdf or txt
Download as pdf or txt
You are on page 1of 10

DagOn*: Executing direct acyclic graphs

as parallel jobs on anything


1st Raffaele Montella 2nd Diana Di Luccio 3rd Sokol Kosta
Department of Science and Technologies Department of Science and Technologies Department of Electronic Systems
University of Naples ”Parthenope” University of Naples ”Parthenope” Aalborg University Copenhagen
Naples, Italy Naples, Italy Copenhagen, Denmark
raffaele.montella@uniparthenope.it diana.diluccio@uniparthenope.it sok@cmi.aau.dk

Abstract—The democratization of computational resources, sciences, high energy, biological and medical studies, perform-
thanks to the advent of public, private, and hybrid clouds, ing complex computations on heterogeneous facilities repeated
changed the rules in many science fields. For decades, one of on different data-sets has become one of the main investigation
the effort of computer scientists and computer engineers was
the development of tools able to simplify access to high-end tools in support to (and sometimes in substitution of) field or
computational resources by computational scientists. However, laboratory activities.
nowadays any science field can be considered “computational” as The rise of machine learning techniques, above all the deep
the availability of powerful, but easy to manage workflow engines, neural network, unchained the power of big data analysis and,
is crucial. In this work, we present DagOn* (Direct acyclic
graph On anything), a lightweight Python library implementing
at the same time, sped-up the need to manage computing
a workflow engine able to execute parallel jobs represented by power in a straightforward fashion.
direct acyclic graphs on any combination of local machines, Workflow engines for data-intensive science have existed
on-premise high performance computing clusters, containers, since the beginning of the grid computing era [1], such as Tri-
and cloud-based virtual infrastructures. We use a real-world
ana [2], Taverna [3], Kepler [4], Unicore [5] and Pegasus [6].
production-level application for weather and marine forecasts
to illustrate the use of this new workflow engine. In this paper, we present DagOn* (Direct acyclic graph
Index Terms—Workflow, Data Intensive, Cloud Computing, On anything, named after the Phoenicians’ god known by
Direct Acyclic Graph, Parallel Processing ancient Greeks as Triton; note that “*” symbol is the wild
card for “anything”) an open source Python library and related
I. I NTRODUCTION
components enabling the execution of direct acyclic graph
The success of a novel emerging technology could be (DAG) jobs on anything, ranging from the local machine to
proven as it is used in a pervasive way by common people virtual HPC clusters hosted on private, public or hybrid clouds.
having no scientific or technical knowledge about how it works In the rest of the paper, we will refer to a node of the DAG
under the hood. Remarkable examples proving this concept (which represents the workflow) as task. Each task must be
could be found in Geographic Information Systems, Cloud wrapped by an execution envelope, ensuring the definition of
Computing and Internet of Things based technologies used by a sandbox in which the software has to be executed. The task
common people in their everyday life. consumes input data and produces output data. The tasks are
Applying concept to science, we could state that at the assembled using a specific scripting language. The parents/si-
time of writing, any science field that can be considered as blings relationship can be implicit when each dependence is
computational science as opposed to a decade ago, when automatically carried out from data dependencies or it must
one of the many effort of computer scientists and computer be declared in an explicit way trough the definition language.
engineers was how to make simpler the interaction of domain
DagOn* is a production-oriented workflow engine targeting
scientists with computational resources.
computational scientists, and for this purpose it has been
Following the same path, it is arguable that the next big
developed in order to meet following requirements:
thing will involve data science and parallel techniques to deal
with big data. • Integration in the Python environment;
Workflow engines play a primary role in computational • minimal footprint in terms of storage room used for
sciences because the availability of cloud provided elastic external software components’ execution sandbox;
and virtually infinite computational resources. In this scenario, • avoiding any workflow engine centered management of
the computation orchestrator acts as cornerstone in search of data (need to share the file system between computational
performance, affordability, reliability, availability and, above nodes);
all, reproducibility of any computational experiment. • easy definition of tasks with direct use of Python scripts,
The massive use of computational models and data analysis web interaction, external software components execution,
techniques have been taking part in the scientific approach cloud hosted virtual machines or containerized tools;
since the last few decades. In earth system sciences, material • execution sites independence.
The development of DagOn* has been motivated by our When a Parsl script is run, the Parsl library causes annotated
operational application for routinely produced weather and functions (Apps) to be intercepted by the Parsl execution
marine forecasts. fabric, which captures and serializes their parameters, analyzes
The rest of this document is organized as follows: in their dependencies, and runs them on selected resources (sites).
Section II, related work is discussed and the approach require- The execution fabric brings dependency awareness to Apps
ments are described; the motivations are carried the Section III by introducing data futures. A data future is a construct that
where we state the DagOn* position respect to the related includes the real result as well as the status and exceptions for
work; Section IV is about design strategies, parallelism, and that asynchronous function invocation.
the distribution approach; Section V describes the DagOn* If one App is responsible for writing a future, other Apps
application lifecycle in details; Section VI is about the real- are blocked from reading it until it is written. This feature
world operational application that drove the development of allows Apps to execute in parallel whenever they do not
DagOn*; and finally, Section VII gives conclusion remarks share dependencies or their data dependencies have been
and draws the path for future research development. resolved [15].
II. R ELATED W ORK C. Galaxy
There is a plethora of available workflow engines devoting Galaxy is a popular, web-based genomic workbench which
their features to enable the user to run complex applications enables users to perform computational data processing [17].
involving up to millions of jobs on different computational Galaxy has established a significant community of users and
resources [7]. Among the most recent literature there are developers thanks to the approach focused on building a col-
a number of Python-based tools and libraries designed to laborative environment for performing complex analysis, with
support the workflows management as PaPy [8] a parallel automatic and unobtrusive provenance tracking [18] allowing
and distributed data processing pipelines, PyCOMPSs [9] a transparent sharing of not only the precise computational
framework to facilitates the development of parallel computa- details underlying an analysis, but also intent, context, and
tional workflows, Fireworks [10] a dynamic workflow system narrative [19].
designed for highthroughput applications and DASK [11] Galaxy Pages are the principal means to communicate
a parallel computing library designed for parallel analytics. research performed in Galaxy. The users can create their Pages
The management strategy of each workflow engine is pretty as interactive and web-based documents describing a complete
similar [12]. genomics experiment.
At the best of our knowledge, we can consider Swift (II-A), The computational experiments are documented and pub-
Parsl (II-B) and Galaxy (II-C) as the workflow environments lished with all computational outputs directly connected, al-
which can be related to the presented work. lowing readers to view any level of experiment detail and
reproduce some or all of the experiment extracting methods
A. Swift to be modified and reused [20].
Swift is a scripting language designed for composing ap- Leveraging on the FACE-IT Globus Galaxy [21], in [22] it
plication programs into parallel applications which can be has been described how the Galaxy workflow engine has been
executed on multi-core processors, clusters and cloud, using extended in order to support earth system related applications
a C-like syntax with dataflow-driven semantics and implicit focusing on agricultural and climate modelling, weather/ma-
parallelism [13]. rine forecasts production and food quality assessment for
Unlike most the other scripting languages, Swift focuses on mussel farms [23].
the issues that arise from the concurrent execution, compo-
III. M OTIVATIONS
sition, and coordination of many independent (and, typically,
distributed) computational tasks. This language expresses the DagOn*, the workflow engine proposed in this paper, is the
execution of programs that consume and produce file-resident result of our experience in executing massive workflows on a
dataset. heterogeneous infrastructure powered by diverse and different
computing resources spread across local web farms and cloud
B. Parsl facilities.
Based on Swift [14] model, Parsl [15] is a software compo- Because our application domain (both production and on
nent leveraging on Python parallel scripting library, supporting demand [24] earth system modelling, Internet of Things data
asynchronous and implicitly parallel data-oriented workflows. processing [25] and machine learning tasks [26]), Swift could
Parsl brings advanced parallel workflow capabilities to represent a valid alternative to Galaxy.
scripts (or applications) written in Python. Parsl scripts allow Nevertheless, the Swift Parallel programming language is
selected Python functions and external applications (called perfect for complex workflow description based on data flow,
Apps) to be connected by shared input/output data objects executing software across multiple sites and high performance
into flexible parallel workflows. task spawning, but still lacks in system integration.
Parsl abstracts the specific execution environment, allowing Parsl learned the lesson from Swift, offering a workflow
the same script to be executed on multi-core processors, engine as Python library able to execute both Python tasks
clusters, clouds, and supercomputers [16]. and external software in a straightforward fashion.
We learned the lesson from Swift, developing a data flow the task is able stop its execution until it receives data or
oriented workflow engine, but offering the task flow definition other forms of notification via web-socket.
as a feature. • Batch. These are external software components executed
As Parsl, we offer our workflow as a Python library, but once using SSH or a local scheduler. This kind of task com-
a workflow has been defined, its representation as JSON file ponent, alongside with the Cloud and the Container task
could be saved to be used via a web graphical user interface as types, is the base interface to external executors.
Galaxy. DagOn* represents data files used by different external While Native and Web Tasks are executed within the
task using a custom name space identified by the workflow:// DagOn* application, Batch represents an external soft-
schema in order to resolve dependencies automatically. ware component that can be executed on the same ma-
The management of the staging operations is performed chine, on another machine in the same cluster or on any
automatically by the workflow engine as the persistence of remote machine the user has the permissions to access
the scratch directories minimizing the storage footprint. and perform program execution.
A DagOn*-based application could be developed using The external software is executed in a scratch directory
multiple workflows with reciprocal interaction. The developer acting as a sandbox for a customized running environ-
can implement custom task components extending the ones ment managed by the DagOn* garbage collector (IV-B).
already available. The data dependencies are defined using the DagOn*
As in Galaxy, the developer could implement data types workflow:// schema (IV-A). The DagOn* design takes the
wrapping the name space based data set management. benefit of the object oriented paradigm defining a plug-in
At the time of writing, DagOn* supports Python naive architecture for the actual batch executors.
tasks, web tasks, batch tasks, SLURM tasks, AWS tasks, and The current DagOn* implementation enables the devel-
container tasks. oper to use external software component running on the
same machine that runs the DagOn* application in a naive
IV. D ESIGN
way; it issues an SSH remote shell command or submits
DagOn* has two main components: a Python library imple- the job to SLURM [27] as local scheduler.
menting the application lifecycle at runtime and a service com- • Cloud. A task can be embedded in a virtual machine
ponent for workflows monitoring and management (Fig. 1). A image and executed on the cloud. Cloud tasks leverage
DagOn* application is developed as Python script using any on cloud-specific libraries as Boto for Amazon Web Ser-
Python extension with a regular sequential approach. Parallel vices1 or cloud-generic components as Apache Libcloud
tasks are defined using the Task component. footnotehttps://libcloud.apache.org using a cloud inter-
The DagOn* Runtime performs the interaction with the face meta-model paradigm [28].
actual executors as on premises local or remote resources, When this kind of task is used, a virtual machine im-
virtual machines instanced on public/private/hybrid clouds and age has to be prepared and stored in the cloud image
containers. repository. The image has to provide the life support to
The Task component takes in charge the dependencies, the the external software with the needed configuration for
interaction with the workflow, the file staging system and it the stage operations. The virtual machine image can be
participates to the monitoring process in conjunction with the prepared with the support of a shared file system, grid-ftp
DagOn* Service. data transfer2 or with just the regular secure copy.
More specialized Task types are designed as extensions of At the time of executing the task, a new virtual machine
the Task component. instance is created using the previously prepared virtual
The different task types are classified as follows: machine image. When the instance is up and running,
• Native. Regular Python function which are executed lo- a local scratch directory is created and data are staged
cally in a concurrent application thread. This kind of tasks in. Once data is correctly staged in the remote scratch
have been designed in order to implement lightweight directory, the actual external software is launched. The
operations that are convenient to be executed in parallel outputs can be explicitly staged out to a well known data
with no need for a specific computing power or storage sink or remain in the scratch directory until they are still
capacity. This type of task shares the same object scope needed.
of the DagOn* application. The DagOn* garbage collector (IV-B) takes charge of the
• Web. Web tasks have been designed in order to enable virtual machine stop and/or terminate management.
the developer to make workflows interacting with remote • Container. A task can be represented by a container
resources accessible using web services. This task type script and executed on a containerized infrastructure.
performs SOAP web service consuming and API REST Lightweight container technology has recently arisen as
invocations. It waits for an external event and interacts an alternative to complete virtualization saving consistent
with other DagOn* workflows leveraging on the DagOn* CPU and input/output costs [29]. This kind of task is
Service. Two or more DagOn* workflow apps can interact
with each other using the web services produced by the 1 https://aws.amazon.com/it/sdk-for-python/

DagOn* Service and this kind of task. In this scenario 2 http://toolkit.globus.org/toolkit/docs/latest-stable/gridftp/


Fig. 1. The DagOn* architectural schema.

one of the most ambitious feature of DagOn*. While The Workflow component performs the initial registration
its specifications have been formally defined, the actual and the final de-registration of the managed workflow on the
implementation is limited to a naive approach working DagOn* Service. The same DagOn* application can be built
with Docker scripts [30] wrapping each task at the work- using one or more Workflow instances. Each one is uniquely
flow level and running each task inside a container. This identified by a 128 bit universally unique identifier. In this
approach limits the scope of the container while it has way multiple workflows belonging to the same or different
consistent costs for the startup and shutdown operations. DagOn* applications can interact with each other via the
At the time of executing a containerized task, the con- DagOn* Service and the Web tasks.
tainer manager takes charge of the container description
script in which previously have been coded the features The Workflow component has been designed to manage a
needed bind the external software component, the data checkpoint/resume feature.
transfer mechanism (shared file system, grid-ftp or secure
copy), the creation of the scratch directory and the exter- The Stager component is not directly used by the DagOn*
nal software component itself. As the container is ready, application developer, but it is managed by the Workflow
the stage in process begins and when it is completed component in order to mitigate the need of physically copy
the external software is actually launched. As already data produced by one task and used by another. At the time
described for the cloud task type, the outputs can be to perform the actual data stage in, this component selects
explicitly staged out to a well known data sink or remain automatically the data transfer model accordingly with the
in the scratch directory until they are still needed. The kind of task. This happens when all the dependencies are
DagOn* garbage collector (IV-B) takes charge of the resolved and just before the external software component has
container shutdown management. to be run.
The DagOn* application developer can extend any Task The Stager component performs a symbolic data link in
component in order to design the app in terms of Tools, as in order to avoid useless explicit data copying whether the
Galaxy, providing to the final user a more component oriented external software component runs on the local machine or on a
approach. remote machine sharing the same file system. We consider this
The Workflow component is the container for the tasks. The approach simplifies the overall workflow data management.
parallel relations between tasks can be defined explicitly (task
flow) or using the DagOn* peculiar data dependence model The Stager component can select the GridFTP copy if
(data flow) based on the workflow:// schema. available or the regular secure copy of the external software
This component is in charge of evaluation of all the depen- component has to be executed on a remote machine, a virtual
dencies between tasks and manages the data movement from machine instanced in a public, private or hybrid cloud, or a
a task to another when those dependencies involve data. container running in a local or dedicated infrastructure.
A. The workflow:// schema When a virtual machine instance is not needed anymore and
there are no conditions or enforced policies overriding default
The Batch component takes charge of the management
rules, the garbage collector stops and eventually terminates it.
of data dependencies using the DagOn* peculiar paradigm
For containerized task garbage collector behaves similarly
designed around the workflow:// schema.
as with cloud tasks. The difference is related about where the
As in previous similar solutions (i.e. Galaxy), the exter- container wrapping the task is actually contained.
nal software runs in a sandbox implemented as a scratch For containerized task the role of the garbage collector is
directory where a customized, high configurable environment similar to the one performed with cloud tasks.
is enforced. The shell script wrapping the external software
provides the environment configuration such as the local di- C. DagOn* Service
rectory structure, libraries and ancillary software components As depicted in Fig. 1, the DagOn* Service is a software
and configuration files. component not belonging to the DagOn* Runtime (delivered
Input files have to be staged (stage-in operation) in order to embedded in the DagOn* library). It may be deployed in a
be processed. The outputs are produced in the form of files DagOn* powered infrastructure in a not mandatory way. This
(or directories) considering the scratch directory as the root. component is devoted to monitoring tasks interacting with the
Usually, at the end of the process the results have to be staged DagOn* Runtime using a REST API. This service can be used
out (stage out operation) for the next usage. to track the status of a workflow or a task of a specified
DagOn* design considers a workflow:// schema as the workflow, as described in the Section V about the DagOn*
root of current workflow a virtual file system. Under this application lifecycle.
conditions, workflow:///task unique name/ is the root of the The service publishes a directory of active workflows and
scratch directory created by the DagOn* Runtime. enables the DagOn* application developer to implement inter-
If two or more workflows have to interact (regardless if actions between different workflows of the same application
they belong to the same or different applications) the use or belonging to different applications using the Web tasks.
of the workflow:// schema remains consistent identifying any A simple portal enables the users to check the workflows
resource as workflow://workflow uuid/task unique name/. overall execution status, tasks success and failures, resource
The workflow:// schema can be used in conjunction with the allocation and other insights.
DagOn* Service to receive web-socket based notifications. The DagOn* Service has been designed with the idea
of managing the user authentication and authorization, the
B. The garbage collector resource allocation and the pay as you go issues.

Batch tasks can be defined by one or more references to data V. A PPLICATION L IFECYCLE
file produced by previously executed tasks. Because multiple Due to previous experiences with Swift and FACE-IT
tasks can use data from one or more other tasks scratch Galaxy, we elected Python as scripting language to describe
directories, the DagOn* Runtime takes into account the usage our workflow-based applications managed by the DagOn*
of each task results. Runtime. We defined a programming model (see Section V-A)
The role of the DagOn* garbage collector is track the in order to enable the embedding of one or more workflows in
storage and computational resources allocated during tasks regular Python applications. Fig. 2 shows the DagOn* lifecycle
execution and proceed to dispose them when no longer needed. using a really simplified application. In order to describe in
In case of an external software component executed on the details the lifecycle, we have to consider the interaction of the
local machine or on a remote machine sharing the same file application with the DagOn* Runtime (Section V-B) and with
system, when the task scratch directory is no longer needed the DagOn* Service (Section V-C). From the time axis point
(all depending tasks are successfully completed) the directory of view, in this example, we divide the whole lifecycle in 10
is removed, making the temporary storage available for new time steps.
computations.
In the case the of cloud tasks, the garbage collector takes A. Programming model
care of virtual machine stop or termination. If the same virtual The following listing represents the schema of a simple
machine stance have to be used by a single task or multiple DagOn* application implementing a typical diamond shaped
ones, advanced disposing policies can be enforced. In a pay direct acyclic graph:
per use scenario, where the account is billed on hour basis, import dagon
the policies can even manage billing issues using customized ...
logic provided by the developer. For example, the garbage task_a=new SBatch("a","...")
collector could remove the remote scratch directory, but take task_b=new SBatch("b","... workflow:///a")
the virtual machine instance alive if there are no more queued task_c=new SBatch("c","... workflow:///a")
task_d=new SBatch("d",
similar tasks. In the case the rest of the hour is already payed "... workflow:///b workflow:///c")
and a new upcoming task could be executed saving the virtual ...
machine instantiating and startup time. workflow=Workflow("myapp",settings)
Fig. 2. The DagOn* programming model and the application life cycle as an interaction sequence between the direct acyclic graph parallel job execution,
the DagOn* Runtime and the DagOn* Service.

workflow.add_task(task_a) If all dependencies are coherent and consistent, the work-


workflow.add_task(task_b) flow is ready to be run invoking the homonyms blocking
workflow.add_task(task_c) method. Leveraging on the Python environment, a DagOn*
workflow.add_task(task_d)
workflow.make_dependencies() application can be divided in more workflows each of them
workflow.run() dynamically arranged implementing branching and looping.
...
sys.exit(0) B. Runtime
The DagOn* Runtime performs the following basic tasks:
The configuration part has been omitted in order to simplify • Create Scratch Directory. A scratch directory is created
the sample code. when an external software component has to be executed
The tasks named “a”, “b”, “c” and “d” are defined as on a local or remote machine. This operation ensures the
SLURM tasks. Within the same workflow, the name of each software component has its own isolation at the best of
single task must be unique. In this listing sample, the actual the chosen execution context.
shell script name is not shown, while the arguments needed to • Stage in. In order to be processed, the needed data have
execute the external software component are specified using to be available to the external software component. The
the workflow:// schema. stage in operation can be a simple local symbolic link
The task “a” is the workflow root and produces some data or a fine high performance data transfer between remote
that will be used by the tasks “b” and “c”. Finally, the task resources. This operation is managed by the Stager com-
“d” ends the workflow consuming data produced by “b” and ponent once the workflow:// schema is enforced in order
“c”. In order to simplify the description of the workflow, we to define the needed transfer operations.
omitted that data produced by “d” are staged out to a data sink • Run. This is the crucial task operation. Once the task is
acting as final user. completed successfully, the related depending tasks can
Once all tasks are defined, an instance of the Workflow start their execution.
object has been created. This example is related to a workflow • Remove Scratch Directory. This operation is managed by
with task dependencies defined as data flow. Each task is the garbage collector.
added to the workflow and then the dependencies are resolved Step 1 is fully managed with a DagOn* Service interaction.
explicitly invoking a method. At the timestep 2, the local scheduler SLURM is ready to
In case of task flow oriented workflows, when a task is execute the task “a”.
added to the workflow the list of the backward dependencies A scratch directory is created: the job representing the task
and forward dependencies has to be provided in an explicit is executed on the selected resource and terminate with success
fashion. at the time step 3.
At the timesteps 4 and 5, the local scheduler is ready to
run the jobs related to the tasks “b” and “c” and the related
scratch directories are created. The tasks “b” and “c” need for
data produced by task “a”. The workflow:// schema references
are resolved and the Stager component performs the stage in
operation in a concurrent way.
Once data are in the tasks scratch directory, each task is
executed ending with success (time steps 6 and 7).
At time step 7, both tasks “b” and “c” are completed: the
data produced by task “a” is no longer needed and its scratch
directory is deleted.
Meanwhile, the job representing the task “d” is ready to
be executed by the local scheduler and its scratch directory
is created at time step 8. Task “d” needs for data produced
by tasks “b” and “c”. The references are resolved using the
workflow:// schema and the Stager component transfers data
in the task “d” scratch directory.
When all data have been transferred, the task “d” is executed
and successfully completed (time step 9).
Fig. 3. DagOn* real case application with WRF (weather forecasts map at the
Finally, at time step 10 the scratch directories of the tasks top) and ROMS (sea surface current map at the bottom) off-line coupling. The
“b”, “c” and “d” are disposed. sample forecasts are related to Campania Region (d03 computational domain)
at 2018-08-08 Z12 UTC (simulations initialized at 2018-08-08 Z00 UTC).
C. Service
The interactions between the Runtime and the Service
are performed transparently from the point of view of the Eest Sector, and other services relative to atmosphere and sea
application developer using REST APIs. management [31]. The configured model chain uses an off-
The DagOn* Service performs the following basic tasks: line coupling approach in which the outputs of each involved
numerical model are initial or forcing conditions to the next
• Publish. The workflow configuration is acquired by the
component (see the blocks and the relative connections in
service and made available using REST APIs. This op-
Fig. 4).
eration is performed both at the workflow execution start
The model chain starting point is Weather Research and
and at its termination.
Forecast (WRF) model [32], [33], which provides hourly
• Register. A task registers to the service updating its status
(external time step of output) weather forecasts necessary to
each time there is a change in the execution lifecycle.
force the air quality model chain components (blue dotted line
• Unregister. Once a task is completed, it removes itself
in Fig. 4) and the hydrodynamic components (pink dotted line
from the service.
in Fig. 4). The WRF model is initialized every three hours
At the time step 1 the DagOn* Runtime registers the work- (according with the global dataset availability), with initial
flow on the DagOn* Service using the JSON representation. In and boundary conditions produced by the National Centers
this way the service can provide updates about the workflow for Environmental Prediction (NCEP) Global Forecast System
status. (GFS) [34]. An example of the output of this part of the
At time step 2 the task “a” performs its registration to the workflow application is visible in the upper side of Fig. 3.
service and un-registers itself at the step 3 when it is com- Hydrodynamics is evaluated by two different models
pleted. Then, the tasks “b” and “c” perform their register/un- driven by the same forcing conditions (10m wind speed and
register operations between the time steps 4 and 8. The task direction provided hourly by the WRF model):
“d” notifies about its status changes in the time steps 8 and
9. Finally, at time step 10 the DagOn* Runtime interacts with
the Service pushing its status as terminated. • the third generation model WaveWatch III (WW3) [35],
[36], for the sea wave component;
VI. U SE CASE : PROVIDING OPERATIONAL • the Regional Ocean Model System (ROMS) [37], for the
WEATHER / MARINE FORECASTS sea temperature, salinity and current components.
The scientific workflow shown in Fig. 4 has been configured The water column initial and boundary conditions, neces-
using DagOn* workflow engine. It is operational at Campania sary to ROMS model, are referred to COPERNICUS service4 .
Center for Atmospheric and Sea Monitoring and Modelling3 The ROMS model is coupled with the Lagrangian model
(CCMMMA) with the aim to provide high resolution meteo- WaComM (Water quality Community Model) [38] to forecast
marine forecasts focused on Center and South Thyrrenian Sea, the circulation pattern of tracers spilled out by coastal sources.
3 http://meteo.uniparthenope.it 4 http://marine.copernicus.eu
The lower side of Fig. 3 represents the output of the ROMS been introduced. The DagOn* programming model enables the
model from our DagOn* workflow. advanced user to embed workflows in already existing Python
The models addressed to high resolution air quality fore- scientific applications.
casts are CALMET [39], [40], a diagnostic 3-dimensional The DagOn* architecture design (Section IV) and the
meteorological model, and CHIMERE [41], a model designed programming model and application lifecycle (Section V)
to produce hourly forecasts of ozone, aerosols and other has been described focusing on its peculiar features as the
pollutants. dependencies management via the workflow:// schema and the
Currently this DagOn* workflow empowers three opera- garbage collector.
tional applications: The DagOn* design and development has been primary
1) The circulation pattern tracers in the Lagrangian Wa- driven by our previous experiences with FACE-IT Globus
ComM model are associated, using an empirical trend Galaxy, Swift and by the needs of our real world use case
curve, to potential pollutants, with the aim to evaluate application for weather and marine forecasts production as
their interaction with the mussel farms [42], [43]. shown in Section VI.
2) In the context of coastal management during a sea DagOn* is far to be considered a mature product ready for
storm event, the framework Son Of Beach (SoB) takes prime time and some features still need to be implemented or
the hourly offshore wave conditions provided by WW3 reinforced as short term goals.
model as input. It computes the beach run-up in order The security, authentication and authorization on a pool of
to forecast the coastal vulnerability related to the coastal computing resources have to be centralized and managed with
flooding during the extreme weather events [44]. a single sign on system component and a credential repository.
3) CALMET provides the atmospheric pattern driving the The checkpoint and recovery workflow features have to be
air pollution during a geo-localized accidental or mali- implemented in the next development iterations. Finally, the
ciously induced wildfire [45]. DagOn* service needs for improvements in web user inter-
The computational domain spatial discretization (grid face, REST APIs, usage data analysis and resource allocation
model) supported by the different models and the different management.
forecasting scales justifies the introduction in the workflow The cloud task interface has to be improved in order to
of some tools (grey blocks in Fig. 4) to support the data enforce the plug-in approach and mitigate the effects due to the
intensive processing (interpolation, rotation, re-sampling on leveling down of the features offered by diverse and different
different space and time etc.). cloud providers and cloud management APIs.
The integration between the DagOn* workflows and the
Regardless the scale (spatial resolution) of the different container based execution technologies is one of the most
models, we can identify three computational domains with intriguing and improvable designed, but partially implemented,
increasing spatial resolution: the d01 extended to European features. Several other possible approaches for workflow task
continent (and focusing on the Mediterranean Sea area); the execution into containers have been already considered in
d02 at intermediate scale including the Italian seas; the d03 DagOn* design. A different and more effective way to inte-
at regional scale focused on the Center and South Thyrrenian grate DagOn* workflows and containers could be implemented
Sea, Eest Sector. and tested as future research directions. One possible log term
All models have 1-hour external time step (different internal perspective could be running several instances of external
time step (∆t) for each model and domain), and almost all software component on the same container eliminating the
models (WRF, ROMS, WW3, CHIMERE and WaComM) startup overheads.
support the restart mode to prevent them to cold start from A serious compare and contrast evaluation versus Swift
a quiet physic condition at each simulation. In the described and Parsl is in our future plan, as the evaluation of possible
workflow configuration, each 24 simulated hours the results integration with Parsl.
are published on-line and the models continue their run starting
from archived restart point. ACKNOWLEDGMENTS
Tab. I summarizes the data size and the computational power
This work has been supported by the University of Napoli
involved in each domain for WRF and ROMS models for one
Parthenope, Italy (project DSTE333 “Modelling Mytilus
single hour of simulation results. The computational demand
Farming System with Enhanced Web Technologies” funded
is remarkable especially considering that all processing is
by Campania Region/Veterinary sector).
done daily in a production fashion. This justifies the use
of a dedicated on premises computational infrastructure and
R EFERENCES
storage facility that can be elastically extended leveraging on
public cloud resources [46]. [1] R. Lovas, G. Dózsa, P. Kacsuk, N. Podhorszki, and D. Drótos, “Work-
flow support for complex grid applications: Integrated and portal solu-
VII. C ONCLUSION AND FUTURE DIRECTIONS tions,” in Grid Computing. Springer, 2004, pp. 129–138.
[2] I. Taylor, M. Shields, I. Wang, and A. Harrison, “The triana workflow
In this work DagOn*, a Python based tool for data intensive environment: Architecture and applications,” in Workflows for e-Science.
scientific workflows targeting production applications, has Springer, 2007, pp. 320–339.
Fig. 4. Block diagram of a scientific workflow to provide operational weather/marine forecast.

Grid dimension [N. of cells] Output File Dimension [Gb] N. of output file N. of MPI processes Execution time [mm:ss]
Model Domain ∆t [sec.] Grid spacing [m]
West-East South-North Bottom-Top (1-hour of simulation) for each run (7 days) for each ∆t (1-hour of simulation)
d01 150 230 209 25000 0.11
96
WRF d02 30 361 336 28 5000 0.29 05:05
169 (12 core*8 nodes)
d03 6 301 306 1000 0.22
80
ROMS d03 30 1375 1021 30 200 2.28 07:12
(10 core*4 nodes)
TABLE I
A PARTIAL SUMMARY OF SIMULATION PRODUCTS ’ DATA SIZE AND INVOLVED COMPUTING POWER

[3] K. Wolstencroft, R. Haines, D. Fellows, A. Williams, D. Withers, Concurrency and Computation: Practice and Experience, vol. 27, no. 17,
S. Owen, S. Soiland-Reyes, I. Dunlop, A. Nenadic, P. Fisher et al., pp. 5037–5059, 2015.
“The taverna workflow suite: designing and executing workflows of web [11] M. Rocklin, “Dask: Parallel computation with blocked algorithms and
services on the desktop, web or in the cloud,” Nucleic acids research, task scheduling,” in Proceedings of the 14th Python in Science Confer-
vol. 41, no. W1, pp. W557–W561, 2013. ence, no. 130-136. Citeseer, 2015.
[4] D. Barseghian, I. Altintas, M. B. Jones, D. Crawl, N. Potter, J. Gallagher, [12] L. Liu, M. Zhang, Y. Lin, and L. Qin, “A survey on workflow manage-
P. Cornillon, M. Schildhauer, E. T. Borer, E. W. Seabloom et al., ment and scheduling in cloud computing,” in Cluster, Cloud and Grid
“Workflows and extensions to the kepler scientific workflow system Computing (CCGrid), 2014 14th IEEE/ACM International Symposium
to support environmental sensor data access and analysis,” Ecological on. IEEE, 2014, pp. 837–846.
Informatics, vol. 5, no. 1, pp. 42–50, 2010. [13] M. Wilde, M. Hategan, J. M. Wozniak, B. Clifford, D. S. Katz, and
[5] S. Gesing, I. Márton, G. Birkenheuer, B. Schuller, R. Grunzke, J. Krüger, I. Foster, “Swift: A language for distributed parallel scripting,” Parallel
S. Breuers, D. Blunk, G. Fels, L. Packschies et al., “Workflow interop- Computing, vol. 37, no. 9, pp. 633–652, 2011.
erability in a grid portal for molecular simulations,” in Proceedings of [14] J. M. Wozniak, T. G. Armstrong, M. Wilde, D. S. Katz, E. Lusk, and
the International Workshop on Science Gateways (IWSG10), 2010, pp. I. T. Foster, “Swift/t: Large-scale application composition via distributed-
44–48. memory dataflow processing,” in Cluster, Cloud and Grid Computing
[6] E. Deelman, K. Vahi, G. Juve, M. Rynge, S. Callaghan, P. J. Maechling, (CCGrid), 2013 13th IEEE/ACM International Symposium on. IEEE,
R. Mayani, W. Chen, R. F. da Silva, M. Livny et al., “Pegasus, a work- 2013, pp. 95–102.
flow management system for science automation,” Future Generation [15] Y. Babuji, K. Chard, I. Foster, D. S. Katz, M. Wilde, A. Woodard,
Computer Systems, vol. 46, pp. 17–35, 2015. and J. Wozniak, “Parsl: Scalable parallel scripting in python,” in 10th
International Workshop on Science Gateways, 2018.
[7] A. Barker and J. Van Hemert, “Scientific workflow: a survey and
[16] Y. Babuji, A. Brizius, K. Chard, I. Foster, D. Katz, M. Wilde, and
research directions,” in International Conference on Parallel Processing
J. Wozniak, “Introducing parsl: A python parallel scripting library,”
and Applied Mathematics. Springer, 2007, pp. 746–753.
2017.
[8] M. Cieslik and C. Mura, “Papy: Parallel and distributed data-processing [17] J. Goecks, A. Nekrutenko, and J. Taylor, “Galaxy: a comprehensive
pipelines in python,” arXiv preprint arXiv:1407.4378, 2014. approach for supporting accessible, reproducible, and transparent com-
[9] E. Tejedor, Y. Becerra, G. Alomar, A. Queralt, R. M. Badia, J. Torres, putational research in the life sciences,” Genome biology, vol. 11, no. 8,
T. Cortes, and J. Labarta, “Pycompss: Parallel computational workflows p. R86, 2010.
in python,” The International Journal of High Performance Computing [18] E. Afgan, A. Lonie, J. Taylor, K. Skala, and N. Goonasekera, “Ar-
Applications, vol. 31, no. 1, pp. 66–82, 2017. chitectural models for deploying and running virtual laboratories in
[10] A. Jain, S. P. Ong, W. Chen, B. Medasani, X. Qu, M. Kocher, the cloud,” in Information and Communication Technology, Electronics
M. Brafman, G. Petretto, G.-M. Rignanese, G. Hautier et al., “Fireworks: and Microelectronics (MIPRO), 2016 39th International Convention on.
A dynamic workflow system designed for high-throughput applications,” IEEE, 2016, pp. 282–286.
[19] C. Sloggett, N. Goonasekera, and E. Afgan, “Bioblend: automating Technology & Internet-Based Systems (SITIS), 2016 12th International
pipeline analyses within galaxy and cloudman,” Bioinformatics, vol. 29, Conference on. IEEE, 2016, pp. 717–724.
no. 13, pp. 1685–1686, 2013. [39] J. S. Scire, F. R. Robe, M. E. Fernau, and R. J. Yamartino, “A users
[20] B. Kim, T. Ali, C. Lijeron, E. Afgan, and K. Krampis, “Bio-docklets: guide for the calmet meteorological model,” Earth Tech, USA, vol. 37,
virtualization containers for single-step execution of ngs pipelines,” 2000.
GigaScience, 2017. [40] L. Morales, F. Lang, and C. Mattar, “Mesoscale wind speed simulation
[21] R. K. Madduri, D. Sulakhe, L. Lacinski, B. Liu, A. Rodriguez, K. Chard, using calmet model and reanalysis information: An application to wind
U. J. Dave, and I. T. Foster, “Experiences building globus genomics: a potential,” Renewable Energy, vol. 48, pp. 57–71, 2012.
next-generation sequencing analysis service using galaxy, globus, and [41] S. Mailler, L. Menut, D. Khvorostyanov, M. Valari, F. Couvidat,
amazon web services,” Concurrency and Computation: Practice and G. Siour, S. Turquety, R. Briant, P. Tuccella, B. Bessagnet et al.,
Experience, vol. 26, no. 13, pp. 2266–2279, 2014. “Chimere-2017: from urban to hemispheric chemistry-transport model-
[22] R. Montella, D. Kelly, W. Xiong, A. Brizius, J. Elliott, R. Madduri, ing,” Geoscientific Model Development, vol. 10, no. 6, pp. 2397–2423,
K. Maheshwari, C. Porter, P. Vilter, M. Wilde et al., “Face-it: A science 2017.
gateway for food security research,” Concurrency and Computation: [42] A. Galletti, R. Montella, L. Marcellino, A. Riccio, D. Di Luccio,
Practice and Experience, vol. 27, no. 16, pp. 4423–4436, 2015. A. Brizius, and I. T. Foster, “Numerical and implementation issues in
[23] R. Montella, A. Brizius, D. Di Luccio, C. Porter, J. Elliot, R. Madduri, food quality modeling for human diseases prevention.” in HEALTHINF,
D. Kelly, A. Riccio, and I. Foster, “Using the face-it portal and workflow 2017, pp. 526–534.
engine for operational food quality prediction and assessment: An [43] D. Di Luccio, A. Galletti, L. Marcellino, A. Riccio, R. Montella, and
application to mussel farms monitoring in the bay of napoli, italy,” A. Brizius, “Some remarks about a community open source lagrangian
Future Generation Computer Systems, 2018. pollutant transport and dispersion model,” Procedia Computer Science,
[24] E. Chianese, A. Galletti, G. Giunta, T. Landi, L. Marcellino, R. Montella, vol. 113, pp. 490–495, 2017.
and A. Riccio, “Spatiotemporally resolved ambient particulate matter [44] D. Di Luccio, G. Benassai, G. Budillon, L. Mucerino, R. Montella,
concentration by fusing observational data and ensemble chemical and E. Pugliese Carratelli, “Wave run-up prediction and observation
transport model simulations,” Ecological Modelling, vol. 385, pp. 173– in a micro-tidal beach,” Natural Hazards and Earth System
181, 2018. Sciences Discussions, vol. 2017, pp. 1–21, 2017. [Online]. Available:
[25] R. Montella, S. Kosta, and I. Foster, “Dynamo: Distributed leisure yacht- https://www.nat-hazards-earth-syst-sci-discuss.net/nhess-2017-252/
carried sensor-network for atmosphere and marine data crowdsourcing [45] G. Agrillo, E. Chianese, A. Riccio, and A. Zinzi, “Modeling and
applications,” in Cloud Engineering (IC2E), 2018 IEEE International characterization of air pollution: Perspectives and recent developments
Conference on. IEEE, 2018, pp. 333–339. with a focus on the campania region (southern italy),” International
[26] R. Montella, A. Petrosino, and V. Santopietro, “Whoareyou (way): A Journal of Environmental Research, vol. 7, no. 4, pp. 909–916, 2013.
mobile cuda powered picture id card recognition system,” in Interna- [46] R. Montella and I. Foster, “Using hybrid grid/cloud computing technolo-
tional Conference on Image Analysis and Processing. Springer, 2017, gies for environmental data elastic storage, processing, and provision-
pp. 375–382. ing,” in Handbook of Cloud Computing. Springer, 2010, pp. 595–618.
[27] A. B. Yoo, M. A. Jette, and M. Grondona, “Slurm: Simple linux utility
for resource management,” in Workshop on Job Scheduling Strategies
for Parallel Processing. Springer, 2003, pp. 44–60.
[28] P. Merle, O. Barais, J. Parpaillon, N. Plouzeau, and S. Tata, “A
precise metamodel for open cloud computing interface,” in 8th IEEE
International Conference on Cloud Computing (CLOUD 2015), 2015,
pp. 852–859.
[29] C. Zheng and D. Thain, “Integrating containers into workflows: A case
study using makeflow, work queue, and docker,” in Proceedings of the
8th International Workshop on Virtualization Technologies in Distributed
Computing. ACM, 2015, pp. 31–38.
[30] C. Boettiger, “An introduction to docker for reproducible research,” ACM
SIGOPS Operating Systems Review, vol. 49, no. 1, pp. 71–79, 2015.
[31] G. Benassai, P. Aucelli, G. Budillon, M. De Stefano, D. Di Luccio,
G. Di Paola, R. Montella, L. Mucerino, M. Sica, and M. Pennetta, “Rip
current evidence by hydrodynamic simulations, bathymetric surveys and
uav observation,” Natural Hazards and Earth System Sciences, vol. 17,
no. 9, pp. 1493–1503, 2017.
[32] W. C. Skamarock and J. B. Klemp, “A time-split nonhydrostatic at-
mospheric model for weather research and forecasting applications,”
Journal of Computational Physics, vol. 227, no. 7, pp. 3465–3485, 2008.
[33] J. G. Powers, J. B. Klemp, W. C. Skamarock, C. A. Davis, J. Dudhia,
D. O. Gill, J. L. Coen, D. J. Gochis, R. Ahmadov, S. E. Peckham et al.,
“The weather research and forecasting model: Overview, system efforts,
and future directions,” Bulletin of the American Meteorological Society,
vol. 98, no. 8, pp. 1717–1737, 2017.
[34] S. Saha, S. Nadiga, C. Thiaw, J. Wang, W. Wang, Q. Zhang, H. Van den
Dool, H.-L. Pan, S. Moorthi, D. Behringer et al., “The ncep climate
forecast system,” Journal of Climate, vol. 19, no. 15, pp. 3483–3517,
2006.
[35] H. L. Tolman, “The numerical model wavewatch: a third generation
model for hindcasting of wind waves on tides in shelf seas,” 1989.
[36] H. Tolman, “Distributed-memory concepts in the wave model wavewatch
iii,” Parallel Computing, vol. 28, no. 1, pp. 35–52, 2002.
[37] A. F. Shchepetkin and J. C. McWilliams, “The regional oceanic model-
ing system (roms): a split-explicit, free-surface, topography-following-
coordinate oceanic model,” Ocean modelling, vol. 9, no. 4, pp. 347–404,
2005.
[38] R. Montella, D. Di Luccio, P. Troiano, A. Riccio, A. Brizius, and
I. Foster, “Wacomm: A parallel water quality community model for pol-
lutant transport and dispersion operational predictions,” in Signal-Image

You might also like