Hello Dataflow!

Grupodot
Comments Off on Hello Dataflow!

Por: Alvaro Ronderos

MapReduce

The MapReduce concept has been used many times within Google as a response to needs of parallel processing large volumes of information, Jeffrey Dean and Sanjay Ghemawat expose this model in the paper published by Google on 2004, which was relevant for further developments as Hadoop. Even though the MapReduce model can’t be applied to every data processing problem, the success of this model is based on the ability to handle volumes of information significantly large (reaching terabytes or petabytes) and reduce it through transformation and relevant data collection, which is done by two functions called Map() and Reduce(). To achieve its purpose, the model uses systems of distribution and parallel computation.

Map():

This function receives a collection of duplas (value key), process them and generates a set of intermediate pars.
map (in_key, in_value) -> list(out_key, intermediate_value)

Reduce():

In this function, intermediate values are combined by grouping them for each key part of the collection and producing a set of combined values (generally one).
reduce (out_key, list(intermediate_value)) -> list(out_value)

The following image illustrates the transformation process of the MapReduce model:

Ejemplo de MapReduce tomada de http://research.google.com/archive/mapreduce-osdi04-slides/index-auto-0007.html

Ejemplo de MapReduce tomada de http://research.google.com/archive/mapreduce-osdi04-slides/index-auto-0007.html

A decade later we live in times where, thanks to Internet, large volumes of information are managed (Big Data) and require information processed in the easiest way possible and with the greatest reliability. Nowadays, systems have to be designed not only to process large volumes of information (batch), there are also cases where information flows through the network (streaming) and must be processed in Real Time (dealing with latency and delays by the required time for information processing), with results that should be informed in the shortest time possible, just as it is with the scenario of the scores count for online games or the number of transactions running on a system based on time.

As a response for the Big Data processing needs, Google through its cloud platform, offers a new model as evolution or the MapReduce model, it’s name is DataFlow this new model was published by Google on the 2015 paper, it’s framework has a SDK for Java; which integrates with diverse components of Google Cloud: Compute, Storage, Big Data and Services, allowing focused developments in the processing of large amounts of information either in batch (Google Storage) or via Streaming (Google Pub/Sub)

Integración de Dataflow. tomada de http://www.itnews.com.au/news/google-puts-cloud-dataflow-into-public-beta-402841

integración de Dataflow. tomada de http://www.itnews.com.au/news/google-puts-cloud-dataflow-into-public-beta-402841

Hello Word Dataflow:

The basic example using the DataFlow framework can be found here, in this article we are going to analyze that example, but before we need to take into account:

  1. Configure the gcloud project
  2. Create a rule on the firewall of the project, thus allowing tcp connection through the 12345 port, otherwise the example will generate an error of type:

    java.lang.RuntimeException: java.io.IOException: DEADLINE_EXCEEDED: (zmq) RPC timed out

    To create the rule, you must access Networking > Firewall rules > Create a firewall rule. After creating it, the list of firewall rules should be enlisted along with the created rule:

    Reglas del firewall para el proyecto en gcloud

    Reglas del firewall para el proyecto en gloud. Fuente: el autor

  3. Install Google Cloud SDK.
  4. Download the example source code to be executed with maven or with eclipse, for this case it will be executed the example with the Eclipse plugin.

MinimalWordCount

This example is the simplest of all which the project has, where the basics elements of DataFlow can be seen applied: Pipelines, PCollections, ParDo, and reading and writing files from on Storage.

In the route src/main/java/com/google/cloud/dataflow/examples/ of the project, it can be found the file MinimalWordCount.java which, if the comments are removed, should look like this:

Clase minimalWordCount. fuente: el autor.

Clase minimalWordCount. fuente: el autor.

  1. In line 77, must be replaced the id of the project which can be seen on the home of the Google development console instead of SET_YOUR_PROJECT_ID_HERE.
  2. In line 78 must be replaced gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY by the created bucket’s name to store the DataFlow required elements to operate. In this case the bucket gdot-dataflow and the staging folder were created, therefore the value for which the replacement was performed was gs://gdot-dataflow/staging.
    Bucket para almacenar datos. fuente: el autor.

    Bucket for data storage. Source: the author.

    In the folder called staging it will be store all of the DataFlow required files to execute the program; at the moment it’s empty, but one the class MinimalWordCount is executed, this olders will have all of those files required by Dataflow, as it can be seen in the following figure:

    Carpeta staging del bucket gdot-dataflow. Fuente: el autor.

    Carpeta staging del bucket gdot-dataflow. Fuente: el autor.

  3. In line 98, gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX  must be replaced by the bucket’s address where the results are expected to be written, in this case gs://gdot-dataflow/results/ was used. Since the results folder was not created within the bucket, the program automatically will create it once it’s executed.

Stages for the MinimalWordCount program

In line 79 the Pipeline is instantiated with the options specified in the first lines of the program:

 Pipeline p = Pipeline.create(options);

Among some of the different options that can be find in a Pipeline at the moment it’s instantiated, we find the numworkers (amount of Compute Engine instances, used to execute the program – three by default), network (where the Compute Engine machines are laced). For the complete list, go here.

In line 80, the program uses the API of Text I/O to read all kinds of lines from texts that are on the routegs://dataflow-samples/shakespeare/* and store them in a PCollecton, where every element is a textline.

 p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))

If we want to know the different kinds of texts stored on the gs://dataflow-samples/shakespeare/* we need to access cloud shell and execute this command:

gsutil ls gs://dataflow-samples/shakespeare/

This command enlists the different texts that can be find in the folder. To see each one of them, we need to use the command cat:

gsutil cat gs://dataflow-samples/shakespeare/kinglear.txt

Ejemplo del comando cat sobre el archivo kinglear.txt de la carpeta shakespeare. Fuente: el autor.

Example of the cat command on the file kinglear.txt on the shakespeare folder. Source: the author.

From line 81 to 90, it’s apply the ParDo transformation on a collection of lines that were obtained on the first stage:

  .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
     private static final long serialVersionUID = 0;
     @Override
     public void processElement(ProcessContext c) {
       for (String word : c.element().split("[^a-zA-Z']+")) {
         if (!word.isEmpty()) {
           c.output(word);
         }
       }
     }
  }))

The ParDo transformation receives as parameter an object of type DoFn, where it must be specified the type of input and output element, in this case DoFn<String, String> specifies that the input element is a String (TextIO.Read produces a PCollection of type String) and DoFn<String, String> indicates that the transformation produces an output element of type String.

En el método processElement se procede a separar en palabras el contenido de cada una de las lineas de la colección de entrada, a continuación se recorren cada una de las palabras y si el String no esta vació es enviado a la PCollection de salida mediante la sentencia c.output(word).

In line 91, all of the repeated elements are grouped by counting each of the coincidences through the Count function:

.apply(Count.<String>perElement())

The function Count.<String>perElement() takes the PCollection of the previous stage and retakes another one, where each key is a single word and the value is the amount of coincidences of the original PCollection.

From line 118 till 123, a new stage is made in the Pipeline flow, where the PCollection is formatted:

.apply("FormatResults",MapElements.via(new SimpleFunction<KV<String,Long>, String>() {
     private static final long serialVersionUID = 0;
     @Override
     public String apply(KV<string, long=""> element) {
       return element.getKey() + ": " + element().getValue();
     }
}))

The apply function of the SimpleFunction class, receives all of the KV elements (inmutable par of Key/value) of the PCollection produced in the previous stage and returns a String with a “Key : Value” format, that it’s added to the new PCollection.

In line 98, the results from the past stage are stored (The PCollection of the formatted Strings):

.apply(TextIO.Write.to("gs://gdot-dataflow/results/"));

DataFlow produces an amount of output files, i.e the final collections won’t be store in the only output file, due to the fact that the parallel processing nature of the ParDo transformation.

To produce an only file, there needs to be a new stage where the Pipeline produced files are combined.

In line 99 the program is commanded to execute the Pipeline:

p.run();

When the MinimalWordCount class is executed, the Eclipse console will show a similar trace to the one found below:

Traza en eclipse clase MinimalWordCount. Fuente: el autor.

Traza en eclipse clase MinimalWordCount. Fuente: el autor.

From the Google console, in the Compute Engine section, it show how DataFlow creates and launches the needed instances for the program execution, according to the amount set in the options of creating Pipeline, meaning that the parallel processing will be made in the three machines. Once the execution is done, DataFlow will stop and eliminate the implemented instances for the processing of the information.

Compute Engine’s instances. DataFlow example. Source: the author.

Compute Engine’s instances. DataFlow example. Source: the author.

In the Cloud Dataflow section of the Google’s development console, it can be seen the list of executed Jobs (each Job represents an execution of DataFlow), by acessing the executed job, the following information can be found:

Pipeline flow for the minimalwordcount program. Source: the author

Pipeline flow for the minimalwordcount program. Source: the author

On the left, it can be found each of the stages and on the right, relevant information on the process;  Summary indicates the related data to the executed Job as it is it’s ID, the time that it lasted on DataFlow (two minutes and fifteen seconds), if mistakes or warnings were present, and the type of Job (Batch or Streaming);  Job Log allows to see the log on performed activities and Step informs about the added elements on each of the DataFlow stages.

In the following table, there can be seen as how through each Step of DataFlow modifies the amount of elements going from 172,948 text lines extracted from the shakespeare to 945,845 when each lines is separated in words and later, group them in a PCollection of 32,786 elements.

[supsystic-tables id=”1″]

 

In this basic example there will be extracted and transformed the 172,948 text lines in a collection of 32,786 store in diverse files within the route gs://gdot-dataflow/results/

Up next is the list of generated files as a result of the DataFlow process:

Results of the program store in the route gs://gdot-dataflow/results/ . Source: the author.

Results of the program store in the route gs://gdot-dataflow/results/ . Source: the author.

Each file contains inputs:

decreased: 1
'shall': 2
War: 4
empress': 14
Provost: 99
stoops: 6

And that is all the information related to the DataFlow basic example, where in two minutes, more that 945k words were extracted and transformed to later be stored, that’s why the author considers that DataFlow is Famework with a bright future in the Cloud Computing arena.

¡Till next time!

References:

[1]  MapReduce: Simplified Data Processing on Large Clusters http://research.google.com/archive/mapreduce.html
[2] The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf.
[3] What is Google Cloud Dataflow? https://cloud.google.com/dataflow/what-is-google-cloud-dataflow