cs 6240 assignment 4 goals 1 gain deeper understanding of action trans
Search for question
Question
CS 6240: Assignment 4
Goals: (1) Gain deeper understanding of action, transformation, and lazy execution in Spark. (2)
Implement PageRank in MapReduce and Spark.
This homework is to be completed individually (i.e., no teams). You must create all deliverables yourself
from scratch: it is not allowed to copy someone else's code or text, even if you modify it. (If you use
publicly available code/text, you need to indicate what was copied and cite the source in your report!)
Please submit your solution as a single PDF file on Gradescope (see link in Canvas) by the due date and
time shown there. During the submission process, you need to tell Gradescope on which page the
solution to each question is located. Not doing this will result in point deductions. In general, treat this
like a professional report. There will also be point deductions if the submission is not neat, e.g., it is
poorly formatted. (We want our TAs to spend their time helping you learn, not fixing messy reports or
searching for solutions.)
For late submissions you will lose one point per hour after the deadline. This HW is worth 100 points and
accounts for 15% of your overall homework score. To encourage early work, you will receive a 10-point
bonus if you submit your solution on or before the early submission deadline stated on Canvas. (Notice
that your total score cannot exceed 100 points, but the extra points would compensate for any
deductions.)
To enable the graders to run your solution, make sure your project includes a standard Makefile with
the same top-level targets (e.g., local and aws) as the one presented in class. As with all software
projects, you must include a README file briefly describing all the steps necessary to build and execute
both the standalone and the AWS Elastic MapReduce (EMR) versions of your program. This description
should include the build commands and fully describe the execution steps. This README will also be
graded, and you will be able to reuse it on all this semester's assignments with little modification
(assuming you keep your project layout the same).
You have about 2 weeks to work on this assignment. Section headers include recommended timings to
help you schedule your work. The earlier you work on this, the better.
Important Programming Reminder
As you are working on your code, commit and push changes frequently. The commit history should
show a natural progression of your code as you add features and fix bugs. Committing large, complete
chunks of code may result in significant point loss. (You may include existing code for standard tasks like
adding files to the file cache or creating a buffered file reader, but then the corresponding commit
comment must indicate the source.) If you are not sure, better commit too often than not often enough. PageRank in Spark (Week 1)
In addition to implementing a graph algorithm from scratch to better understand the BFS design pattern
and the influential PageRank algorithm, the first part of this assignment also explores the subtleties of
Spark's actions and transformations, and how they affect lazy evaluation and job submission. We will
work with synthetic data to simplify the program a little and to make it easier to create inputs of
different sizes. Thoughtful creation of synthetic data is an important skill for big-data program design,
testing, and debugging.
Recall that Spark transformations describe data manipulations, but do not trigger execution. This is the
"lazy evaluation” in Spark. Actions on the other hand force an immediate execution of all operations
needed to produce the desired result. Stated differently, transformations only define the lineage of a
result, while actions force the execution of that lineage. What will happen when an iterative program
performs both actions and transformations in a loop? What goes into the lineage after 1, 2, or more
loop iterations? And will the entire lineage be executed?
Let us find out by exploring a program that computes PageRank with dangling pages for a simple
synthetic graph. Your program should work with two data tables: Graph stores pairs (p1, p2), each
encoding a link from some page p1 to another page p2. Ranks stores pairs (p, pr), encoding the
PageRank pr for each page p. To fill these tables with data, create a graph that consists of k linear chains,
each with k pages. Number the pages from 1 to k², where k is a program
2
3
parameter to control problem size. The figure shows an example for k=3. Notice
that the last page in each linear chain is a dangling page. We will use the single-
dummy-page approach to deal with dangling pages. This means that your
program also must create a single dummy page-let's give it the number 0
(zero) and add it to Ranks. Add an edge (d, 0) for each dangling page d. Set the initial PR value for each
of the k² real pages in Ranks to 1/k²; set the initial PR value of the dummy page to 0.
1
4
7
5
8
6
9
For simplicity, we recommend you implement the program using (pair) RDDs, but you may choose to
work with DataSet instead. The following instructions assume an RDD-based implementation. Start by
exploring the PageRank Scala program included in the Spark distribution. Make sure you fully
understand what each statement is doing. Create a simple example graph and step through the
program, e.g., on paper or using the interactive Spark shell. You will realize that the example program
does not handle dangling pages, i.e., dangling pages lose their PR mass in each iteration. Can you find
other problems?
Your program will have a structure similar to the example program, but follow these requirements and
suggestions:
You are allowed to take certain shortcuts in your program that exploit the special graph
structure. In particular, you may exploit that each node has at most 1 outgoing link. Make sure
you add a comment about this assumption in your code. ●
●
Make k a parameter of your Spark Scala program and generate RDDs Graph and Ranks directly in
the program. There are many examples on the Web on how to create lists of records and turn
them into (pair) RDDs.
1.
2.
Make sure you add dummy page 0 to Ranks and the corresponding k dummy edges to Graph.
Initialize each PR value in Ranks to 1/k², except for page 0, whose initial PR value should be zero.
Be careful when you look at the example PR program in the Spark distribution. It sets initial PR
values to 1.0, and its PR computation adds 0.15 instead of 0.15/#pages for the random jump
probability. Intuitively, they multiply each PR value by #pages. While that is a valid approach, it
is not allowed for this assignment.
Try to ensure that Graph and Ranks have the same Partitioner to avoid shuffling for the join.
Check if the join computes exactly what you want. Does it matter if you use an inner or an outer
join in your program?
To read out the total dangling PR mass accumulated in dummy page 0, use the lookup method
of pair RDD. Then re-distribute this mass evenly over all real pages.
When debugging your program, see if the PR values add up to 1 after each iteration. Small
variations are expected, especially for large graphs, due to numerical precision issues. However,
if the PR sum significantly deviates from 1, this may indicate a bug in your program.
Add a statement right after the end of the for-loop (i.e., outside the loop) for the PR iterations
to write the debug string of Ranks to the log file.
Now you are ready to explore the subtleties of Spark lazy evaluation. First explore the lineage of Ranks
as follows:
Set the loop condition so that exactly 1 iteration is performed and look at the lineage for Ranks.
Change the loop condition so that exactly 2 iterations are performed and look at the lineage for
Ranks after those 2 iterations. Did it change?
The lineage describes the job needed to compute the result of the action that triggered it. Since pair
RDD's lookup method is an action, a new job is executed in each iteration of the loop. Can you describe
in your own words what the job triggered in the i-th iteration computes? Try it.
An interesting aspect of Spark, and a reason for its high performance, is that it can re-use previously
computed results. This means that in practice, only a part of the lineage may get executed. To
understand this better, consider the following simple example program:
1. val myRDD1 = some_expensive_transformations_on_some_big_input()
2. myRDD1.collect()
3. val myRDD2 = myRDD1.some_more_transformations()
4. myRDD2.collect()
This program executes 2 jobs. The first is triggered by line 2 and it computes all steps defined by the
corresponding transformations in the lineage of myRDD1. The next job is triggered by line 4. Since
myRDD2 depends on myRDD1, all myRDD1's lineage is also included in the lineage of myRDD2. But will Spark execute the entire lineage? What if myRDD1 was still available from the earlier job triggered by
line 2? Then it would be more efficient for Spark to simply re-use the existing copy of myRDD1 and only
apply the additional transformations to it!
Use Spark textbooks and online resources to find out if Spark is smart enough to realize such RDD re-use
opportunities. Then study this empirically in your PageRank program where the lineage of Ranks in
iteration i depends on all previous (i-1) iterations:
1. Can you instrument your program with the appropriate printing or logging statements to find
out execution details for each job triggered by an action in your program?
2. See if you can find other ways to make Spark tell you which steps of an RDD lineage were
executed, and when Spark was able to avoid execution due to availability of intermediate results
from earlier executions.
3. Change the caching behavior of your program by using cache() or persist() on Ranks. Does it
affect the execution behavior of your program? Try this for small k, then for really large k (so
that Ranks might not completely fit into the combined memory of all machines in the cluster).
Bonus challenge: For an optional 5-point bonus (final score cannot exceed 100), run your PageRank
program on the Twitter followership data. If you took shortcuts for the synthetic data, e.g., by exploiting
that no page has more than 1 outgoing link, you need to appropriately generalize your program to work
correctly on the Twitter data.
PageRank in MapReduce (Week 2)
Implement the PageRank program in MapReduce and run it on the synthetic graph. You may choose any
of the methods we discussed in the module and in class for handling dangling pages, including global
counters (try if you can read it out in the Reduce phase) and order inversion. In contrast to the Spark
program, generate the synthetic graph in advance and feed it as an input file to your PageRank program.
Follow the approach from the module and store the graph as a set of vertex objects (which could be
encoded as Text), each containing the adjacency list and the PageRank value.
Since we will work with relatively small input, make sure that your program creates at least 20 Map
tasks. You can use NLineInput Format to achieve this.
Report
Write a brief report about your findings, answering the following questions:
1. [12 points] Show the pseudo-code for the PR program in Spark Scala. Since many Scala functions
are similar to pseudo-code, you may copy-and-paste well-designed (good variable naming!) and
well-commented Scala code fragments here. Notes: Your program must support k and the
number of PR iterations as parameters. Your program may take shortcuts to exploit the
structure of the synthetic graph, in particular that each page has at most 1 outgoing link. (Your program should work on the synthetic graphs, no matter the choice of k>0, but it does not need
to work correctly on more generally structured graphs.)
2. [10 points] Show the link to the source code for this program in your Github Classroom
repository.
3. [10 points] Run the PR program locally (not on AWS) for k=100 for 10 iterations. Report the PR
values your program computed for pages 0 (dummy), 1, 2,..., 19.
4. [19 points] Run the PR program locally (not on AWS) for k=100. Set the loop condition so that
exactly 1 iteration is performed and report the lineage for Ranks after that iteration. Change the
loop condition so that exactly 2 iterations are performed and report the lineage for Ranks after
those 2 iterations. Then change the loop condition again so that exactly 3 iterations are
performed and report the lineage for Ranks after those 3 iterations.
5. [15 points] Find out if Spark executes the complete job lineage or if it re-uses previously
computed results. Make sure you are not using cache() or persist() on the Ranks RDD. (You
may use it on the Graph RDD.) Since the PR values in RDD Ranks in iteration 10 depend on Ranks
from iteration 9, which in turn depends on Ranks from iteration 8, and so on, we want to find
out if the job triggered by the lookup action in iteration 10 runs all 10 iterations from scratch, or
if it uses Ranks from iteration 9 and simply applies one extra iteration to it.
a. Let's add a print statement as the first statement inside the loop that performs an
iteration of the PR algorithm. Use println(s"Iteration ${i}") or similar to print the value of
loop variable i. The idea is to look at the printed messages to determine what happened.
In particular, if a job executes the complete lineage, we might hope to see "Iteration 1"
when the first job is triggered, then "Iteration 1" (again) and “Iteration 2” for the second
job (because the second job includes the result of the first iteration in its lineage, i.e., a
full execution from scratch would run iterations 1 and 2), then “Iteration 1,” "Iteration
2," and "Iteration 3" when the third iteration's job is triggered, and so on. But would
that really happen? To answer this question, show the lineage of Ranks after 3 iterations
and report if adding the print statement changed the lineage.
b. Remove the print statement, run 10 iterations for k=100, and look at the log file. You
should see lines like "Job ... finished: lookup at took ..." that tell you the jobs
executed, the action that triggered the job (lookup), and how long it took to execute. If
Spark does not re-use previous results, the growing lineage should cause longer
computation time for jobs triggered by later iterations. On the other hand, if Spark re-
uses Ranks from the previous iteration, then each job runs only a single additional
iteration and hence job time should remain about the same, even for later iterations.
Copy these lines from the log file for all jobs executed by the lookup action in the 10
iterations. Based on the times reported, do you believe Spark re-used Ranks from the
previous iteration?
C. So far we have not asked Spark to cache() or persist() Ranks. Will this change Spark's
behavior? To find out, add ".cache()" to the command that defines Ranks in the loop.
Run your program again for 10 iterations for k=100 and look at the log file. What
changed after you added cache()? Look for lines like "Block ... stored as values in
memory" and "Found block ... locally". Report some of those lines and discuss what they