Practical DevOps for Big Data/Quality Optimisation
Introduction
editAs discussed in Section 1, the last years have seen a steep rise in data generation worldwide, with the development and widespread adoption of several software projects targeting the Big Data paradigm. Many companies currently engage in Big Data analytics as part of their core business activities, nonetheless, there are no tools and techniques to support the design of the underlying hardware configuration backing such systems. The DICE optimisation tool (codename D-SPACE4Cloud) is a software tool that supports this task. It is able to support software architects and system operators in the capacity planning process of shared Hadoop Cloud.
Motivations
editNowadays, data intensive application adoption has moved from experimental projects to mission-critical, enterprise-wide deployments providing a competitive advantage and business innovation. The deployment and setup of a big data cluster are time-consuming activities. Initially, MapReduce jobs were meant to run on dedicated clusters. Now, data intensive applications have evolved and large queries, submitted by different users, need to be performed on shared clusters, possibly with some guarantees on their duration. Capacity allocation becomes one of the most important aspects. Determining the optimal number of nodes in a cluster shared among multiple users performing heterogeneous tasks is an important and difficult problem.
Existing solutions
editTo the best of authors' knowledge, there are no available tools offering such peculiarities that are specifically developed for the DICE reference technologies.
How the tool works
editD-SPACE4Cloud is a novel tool implementing a battery of optimization and prediction techniques integrated so as to efficiently assess several alternative resource configurations, in order to determine the minimum cost cluster deployment satisfying Quality of Service (QoS) constraints. In a nutshell, the tool implements a search space exploration able to determine the optimal virtual machine (VM) type and the number of instance replicas for a set of concurrent applications. The underlying optimization problem is demonstrated to be NP-hard and it is solved heuristically, whereas applications execution time or throughput are estimated via queueing network (QN) or Stochastic Well Formed Net (SWN) models.
The optimization process
editTwo main analyses can be conducted using D-SPACE4Cloud:
- Public Cloud Analysis: In this scenario, the software architect or the system operator wants to analyse the case in which the whole big data cluster is provisioned on a public cloud. The first consequence of this choice is that the virtualized resources (i.e., VMs) that can be used to provision the cluster can be considered practically infinite. This also means that, under the common assumption that rejecting a job has a much higher cost than the VM leasing costs, it will not apply any job rejection policy in this case. Consequently, the concurrency level for each job, i.e., the number of concurrent user running the data intensive application can be set arbitrarily high being always (theoretically) possible to provision a cluster able to handle the load. In this scenario, a system operator may want to know which machine type to select and how many of them in order to execute the application with a certain level of concurrency, meaning considering several similar applications running at the same time in the cluster. She/he might also like to know which cloud provider is cheaper to choose, considering that providers have also different pricing models.
- Private Cloud Analysis: In this case the cluster is provisioned in house. This choice in some sense changes radically the problem. In fact, the resources used to constitute a cluster are generally limited by the hardware available. Therefore, the resource provisioning problem has to contemplate the possibility to exhaust the computational capacity (memory and CPUs) before being able to provision a cluster capable of satisfying a certain concurrency level and deadlines. In such a situation, the software architect or the system operator could consider two sub-scenarios:
- Allowing job rejection, that is consider the possibility to reject a certain number of jobs (lowering consequently the concurrency level), i.e., introducing an Admission Control (AC) mechanism. In this case, since the overall capacity is limited, the system reacts to the excess of load by rejecting jobs; this has an impact on the execution costs as it seems fair to believe that pecuniary penalties can be associated to rejection.
- Denying job rejection, that is imposing that a certain concurrency level must be respected. This translates into a strong constraint for the problem that may not be satisfiable with the resources at hand.
In this scenario, D-SPACE4Cloud estimates the total capacity of an in-house cluster required to support data intensive applications execution minimising the Total Cost of ownership (TCO) of the system, which includes physical servers acquisition costs, electricity costs and, possibly, penalties for job rejections.
The Architecture
editD-SPACE4Cloud is a distributed software system able to exploit multi-core architecture to execute the optimization in parallel, which encompasses different modules that communicate by means of RESTful interfaces or SSH following the Service Oriented Architecture (SOA) paradigm. In particular, it features a presentation layer (an Eclipse plug-in), an orchestration service (referred to as frontend) and a horizontally scalable optimization service (referred to as backend), which makes use of third-party services as RDBMS, simulators and mathematical solvers. The tool implements an optimization mechanism that efficiently explores the space of possible configurations, henceforth referred to as Solution space. Figure 1 depicts the main elements of the D-SPACE4Cloud architecture that come into play in the optimization scenario. The Eclipse plug-in allows the software architect to specify the input models and performance constraints and transforms the input DICE UML diagrams into the input performance models for the performance solver (GreatSPN or JMT). The frontend exposes a graphical interface designed to facilitate the download of the optimization results (which are computed through batch jobs) while the backend implements a strategy aimed at identifying the minimum cost deployment. Multiple DTSMs are provided as input, one for each VMs considered as a candidate deployment. VMs can be associated with different cloud providers. D-SPACE4Cloud will identify the VM type and the corresponding number which fulfill performance constraints and minimize costs. The tool takes as input also a DDSM model, which is updated with the final solution found and can be automatically deployed through the DICER tool. Moreover, the tool requires as input a description of the execution environment (list of providers, list of VM types or a description of the computational power available in house) and the performance constraints. Input files and parameters can be specified by the user through a wizard. D-SPACE4Cloud exploits the model-to-model transformation mechanism implemented within the DICE Simulation tool to generate a suitable performance model, i.e., SWN or QN to be used to predict the expected execution time for Hadoop MapReduce or Spark DIAs or cluster utilization for Storm. The Initial Solution Builder generates a starting solution for the problem using a Mixed Integer Non-linear Programming (MINLP) formulation where the job duration is expressed by means of a convex function: Figure 1 provides further details. The fast MINLP model is exploited to determine the most cost effective VM type for all applications. Yet the quality of the returned solution can still be improved since the MINLP problem is just an approximate model. For this reason, a more precise QN or SWN is adopted to get a more accurate execution time assessment for each application: the increased accuracy leaves room for further cost reduction. However, since QN or SWN simulations are time-consuming, the space of possible cluster configurations has to be explored in the most efficient way, avoiding evaluating unpromising configurations.
In the light of such considerations, a heuristic approach has been adopted and a component called Parallel LS Optimizer has been devised. Internally, it implements a parallel Hill Climbing (HC) technique to optimize the number of replicas of the assigned resource for each application; the goal is to find the minimum number of resources to fulfil the QoS requirements. This procedure is applied independently, and in parallel, on all application classes and terminates when a further reduction in the number of replicas would lead to an infeasible solution. In particular, HC is a local-search-based procedure that operates on the current solution performing a change (more often referred to as move) in the structure of the solution in such a way that the newly generated solution could possibly show an improved objective value. If the move is successful it is applied again on the new solution and the process is repeated until no further improvement is possible. The HC algorithm stops when a local optimum is found; however, if the objective to optimize is convex, HC is able to find the global optimum for the problem. This is the case of the considered cost function, which depends linearly on the number of VMs in the cluster since the VM types to use is fixed in the first phase of the optimization process based on MINLP techniques. In other words, the joint selection of the VM type and their number is NP-hard, but when the type of VM is fixed in the first phase, the HC obtains the final solution for all classes in polynomial time. The HC move consists in increasing/decreasing and changing the VM type for each DIA. This task is executed in parallel where possible, considering SLAs and available resources (in case of private cloud) as constraints. Parallelism is particularly important as each solution spawned during the optimization is evaluated via simulation and this is a time-consuming task. Therefore, in order to make the DICE optimization tool usable, we focused on increasing the level of parallelism as much as possible. The final minimum cost configuration found is then returned to the D-SPACE4Cloud Eclipse plug-in and is translated into the DDSM which is ready for the TOSCA Model to Text transformation and deployment implemented by the DICER tool.
Open Challenges
editEven if D-Space4Cloud implements an efficient parallel search, simulation is still time-consuming. The development of an ad-hoc simulator, which significantly speed-ups the design space exploration time is on-going. In this way, we expect that optimization time can be reduced from hours (in the current implementation) to minutes. Moreover, modelling complex application DAGs requires also a significant effort. The development of a log processor, which automatically builds the required DICE UML models from application logs (once early DIA code implementation is available) to simplify such process is also on-going.
Application Domain
editD-SPACE4Cloud supports the capacity planning process of shared Hadoop Cloud clusters running MapReduce or Spark applications with deadline guarantees or Storm topologies with guarantees on cluster utilization.
A test case – NETF Fraud detection application
editD-SPACE4Cloud has been used by Netfective Technology with the aim to evaluate the cost impact of the implementation of some privacy mechanisms on the taxpayers' data. As discussed in Section 20, the Cassandra databases are filled with taxpayers’ details, historical tax declarations, tax payments, and so on.
To achieve anonymization, the first privacy mechanisms applied is masking (see, e.g., Vieria, M.; Madeira, H. “Towards a Security Benchmark for database Management Systems.”, in DSN 2015 Proceedings). For that purpose, a dictionary table has been introduced, which implements a one-to-one mapping between a clear ID and a masked ID. In other words, the Cassandra tables store masked IDs for the taxpayers, while the clear ID is available only in the dictionary table which has a restricted access. As an example, Figure 2 shows the three reference queries we considered during the DICE project, i.e., Query 1, 5 and 7 and an example of an anonymised query. Query 1 accesses two tables to perform its analysis: Declare and TaxPayer. It intends to measure the difference between incomes earned by a taxpayer during two successive years. This is carried out to detect fraudsters by comparing the two incomes according to some criteria. For instance, if the income received a certain year is less than 20% (this percentage can be set as a parameter) than the one received the previous year, then the taxpayer is suspect. Since incomes are stored in the table Declare, Query 1 executes two joins: the first to make sure that the two tax declarations relate to the same taxpayer; and the second to obtain the full set of information about her/him. The result of this query is saved and used by other queries by passing the expected arguments, such as the percentage of income decrease, and the number of years to be taken into consideration. Query 5 involves only the table Declare. This table contains all the information needed to know every individual income and other credentials helpful to justify the amount of tax to be paid. Query 7 involves three tables: TaxPayer, TaxDeclaration, and Signatory. Each tax return must be signed by the taxpayers before they submit it to the fiscal agency.
Query 3 is derived from Query 1 and adds a join to read the clear IDs. Similarly, Query 6 and Query 8 are derived from Query 5 and Query 7 respectively (but are omitted here for simplicity). The second privacy mechanisms considered is encryption with AES at 128 and 256 bit. In this case, the IDs and sensitive data stored in the Cassandra tables are encrypted and decryption is performed contextually while running, e.g., Query 1.
Query 1:
SELECT tp.id, , tp.gender, tp.birthdate, tp.birthdepartment, tp.birthcommune, d1.taxdeclaration, d1.declarationdate, d1.income, d2.taxdeclaration AS D2TAXDECLARATION, d2.declarationdate AS D2DECLARATIONDATE, d2.income AS D2INCOME FROM Declare d1 INNER JOIN Declare d2 ON d1.taxpayer = d2.taxpayer INNER JOIN taxpayer tp ON d1.taxpayer = tp.id |
Query 5:
SELECT * FROM Declare d1 |
Query 7:
SELECT tp.id,s.location, td.roomcount FROM taxdeclaration td, signatory s, taxpayer tp WHERE s.taxpayer = tp.id AND s.taxdeclaration = td.id |
Query 3:
SELECT dic.und_id, , tp.gender, tp.birthdate, tp.birthdepartment, tp.birthcommune, d1.taxdeclaration,d1.declarationdate, d1.income, d2.taxdeclaration AS D2TAXDECLARATION,d2.declarationdate AS D2DECLARATIONDATE, d2.income AS D2INCOME FROM Declare d1 INNER JOIN Declare d2 ON d1.taxpayer = d2.taxpayer INNER JOIN taxpayer tp ON d1.taxpayer = tp.id INNER JOIN dictionary dic ON dic.id=tp.id |
The queries reported in Figure 2 have been implemented in Spark 2.0 and ran on Microsoft Azure HDInsight on D12v2 VMs with 8 cores and 28GB of memory. The number of executors per node (virtual machine) was either two or four. We set the maximum executor memory 8 GB as for the driver memory while executors were assigned two or four cores. The number of nodes varied between 3 and 12. Overall we ran experiments on up to 48 cores. Runs were performed considering the default HDInsight configuration. This performance campaign has been performed to identify the queries profiles. In the following, the baseline Query 5 and its anonymised versions will be compared considering the same configuration.
Here, we present our approach to model Spark applications for performance evaluation using UML diagrams. The Spark application is composed of transformations and actions. D-SPACE4Cloud interprets the UML activity diagram as the DAG of the Spark application, i.e., an activity diagram represents the execution workflow over the application RDDs. A Spark application manages one or more RDDs and therefore, our UML activity diagram accepts several initial and final nodes. Each stage shows the operations executed for each RDD.
Figure 3 shows the Activity Diagram for Query 8 consists of 8 transformations. The UML fork and join nodes are also supported following standard UML modelling semantics. The activity nodes (actions, forks and joins) in the UML activity diagram are grouped by UML partitions (e.g.,Transformation and Action).
A UML profile needs to be applied on each activity and partition nodes in our UML representation. A UML profile is a set of stereotypes that can be applied to UML model elements for extending their semantics.
The Spark profile heavily relies on the standard MARTE profile. This is because MARTE offers the GQAM sub-profile, a complete framework for quantitative analysis, which is indeed specialized for performance analysis, then perfectly matching to our purposes. Moreover, MARTE offers the NFPs and VSL sub-profiles. The NFP sub-profile aims to describe the non-functional properties of a system, performance in our case. The latter, VSL sub-profile, provides a concrete textual language for specifying the values of metrics, constraints, properties, and parameters related to performance, in our particular case.
VSL expressions are used in Spark-profiled models with two main goals: (i) to specify the values of the NFP in the model (i.e., to specify the input parameters) and (ii) to specify the metric/s that will be computed for the current model (i.e., to specify the output results). An example VSL expression for a host demand tagged value of type NFP Duration is:
(expr=$mapT1, unit=ms, statQ=mean, source=est)
This expression specifies that map1 in Figure 3 demands $mapT1 milliseconds of processing time on average (statQ=mean) and will be obtained from an estimation in the real system (source=est). $mapT1 is a variable that can be set with concrete values during the analysis of the model.
The initialization of a RDD is described by an initial node in the UML activity diagram. The stereotype SparkWorkloadEvent is used for labelling the initial node. This stereotype captures the essential details of the creation of the RDD. Mainly, the initialization is represented by two parameters. The first one is the sparkPopulation tag. It corresponds to the number of chunks in which the input data is divided when generating the RDD structure. Transformations (SparkMap) and actions (SparkReduce) have independent stereotypes because they are conceptually different, but they inherit from SparkOperation stereotype and, indirectly, from MARTE::GQAM::GaStep stereotype since they are computational steps. In fact, they share the parallelism, or number of concurrent tasks per operation, which is specified by the tag numTasks of the SparkOperation stereotype. Each task has an associated execution time denoted by the tag hostDemand, an attribute inherited from GaStep. The tag OpType specifies the type of Spark operation (i.e., an enumerable SparkOperation=ftransformation, actiong) in case of using the SparkOperation stereotype when modeling UML diagrams. The concept of scheduling in Spark is captured by the stereotype SparkScenario. For simplicity in this first version, we only support a Spark cluster deployed with a static assignation of the resources to Spark jobs (e.g., YARN or standalone modes); and an internal fifo policy (task scheduler). Therefore, the number of CPU cores and memory are statically assigned to the application on launching time. This configuration is reflected in the tags nAssignedCores, nAssignedMemory and sparkDefaultParallelism. They represent respectively the amount of computational cores and memory resources assigned by the scheduler to the current application; and the default parallelism of the cluster configuration. The attribute sparkDefaultParallelism specifies the default number of partitions in a RDD when SparkWorkloadEvent!sparkPopulation is not defined. It also determines the number of partitions returned in a RDD by transformations and actions like count, join or reduceByKey when the value of numTasks is not explicitly set by the user.
The stereotype SparkScenario inherits from MARTE::GQAM::GaScenario. It gathers the rest of the contextual information of the application; for instance, the response time or throughput that will be computed by the simulation. The SparkScenario stereotype is applied to the activity diagram.
Each UML partition is mapped to a computational resource in the UML deployment diagram following the scheduling policy defined for the topology. Figure 4 shows the deployment diagram, which complements the previous activity diagram. Each computational resource is stereotyped as SparkNode (equivalent to GaExecHost) and defines its resource multiplicity, i.e., number of cores. This stereotype is inherited from MARTE GQAM.
Finally, the SparkNode stereotype is applied over the computational devices in the UML deployment diagram. It represents a resource in the Spark cluster where the tasks are run. The main attributes are the nCores and Memory. The first tag corresponds to the number of available CPUs in the device. The second tag is a boolean that indicates if the size of the partitions in the RDD fit in the memory of the server or they must be stored in a temporal file. The SparkNode stereotype inherits from DICE::DTSM::- Core::CoreComputationNode and it is equivalent to the GaExecHost stereotype from MARTE. The Spark profile has been implemented for the Papyrus Modelling environment for Eclipse.
Experimental Results
editAs an illustrative example, here we report the cost impact analysis of the privacy mechanism for queries 5 and 6 at 1.5 millions dataset with 85s initial deadline. The deadline was iteratively decreased by 20s in order to consider 10 optimization instances. The results are reported in Figure 5. From the experimental results one can see that above 45s and for 20s no extra costs are incurred, while for 25 and 15s deadline the cost overhead due to masking technique is in between 50 and 66%. Deadlines lower than 15s resulted to be too strict and D-SPACE4Cloud did not find any feasible solution. Overall, these results provide a strong point in favour of the optimization procedure implemented in D-SPACE4Cloud, as they prove that making the right choice for deployment can lead to substantial savings throughout the application lifecycle and moreover software architect can take more informed decisions and evaluate a priori the impact of different QoS levels on cloud operation costs.
From the experimental results one can see that above 45s and for 20s no extra costs are incurred, while for 25 and 15s deadline the cost overhead due to masking technique is in between 50 and 66%. Deadlines lower than 15s resulted to be too strict and D-SPACE4Cloud did not find any feasible solution.
Figure 6 reports the results of for Query 1 and 3, when 10 millions entries data set is considered for performance profiling. The initial deadline is set to 3,500s that is maximum of the execution time of both queries registered on the real cluster and which is iteratively reduced by 500s .
The results show that cost overhead due to masking technique is between 33 and 40%, and no extra costs are incurred for deadlines larger than 2500s. Deadlines lower than 1500s were too strict and no feasible solutions were found.
Further experiments were targeted for encryption. For encryption, we selected data set 10 million and we evaluated Query 7, AES 256 bit encrypted and unencrypted for which we registered the largest performance degradation that is 5%. In this way the results we achieve will be conservative. 80s was set as initial deadline, which was then iteratively reduced by 5s.
Results are reported in Figure 7 which shows that 50% cost overhead is achieved at 40s, which is also the minimum deadline that can be supported by the system (otherwise no feasible solution can be found).
Finally, we report the results we achieved by considering the largest dataset, i.e., 30 million, for Query 5. For performance profiling we considered the configuration at 13 nodes which registered the largest performance degradation. 80s was set as initial deadline, which then was iteratively reduced by 20s. Figure 8 reports the cost for AES 256 bit encryption.
The experiment shows that cost ratio due to encryption is only 13% at maximum. While below 40 seconds D-SPACE4Cloud could not find any feasible solution. Figure 9 reports the results for AES 128 bit encryption for Query 5, the runs were taken using 13 nodes. Here we considered 88,000 milliseconds as initial deadline since it is the largest value measured in real system during experiments concerning to performance degradation.
On average encryption cause only 8% overhead on cost and a larger overhead is obtained when the deadline is set to 48s. This is due to an anomalous behaviour of the D-SPACE4Cloud tool, which identifies a local optimal solution and stops its hill climbing algorithm.
Although we performed many experiments, but here we only listed the decisive one. From above experiments results are concluded as masking causes more overhead than encryption, while there is no significant difference between 128 and 256 bit encryption. Overhead on cost due to anonymization was found around 66% , while because of encryption was found only 50% in the very worst case.
Conclusion
editThree privacy mechanisms have been considered for NETF case study. In general for NETF case study anonymization causes more overhead than Encryption, two different encryption techniques have negligible difference in terms of effect on performance and cost of system. NETF benchmark has around 66% impact on cost due to masking, while due to encryption the cost increases by around 50%.