Skip to content

Commit b1b77c5

Browse files
committed
refactor BigDataSpark/w1/wikipedia
1 parent 4829faf commit b1b77c5

File tree

6 files changed

+144
-142
lines changed

6 files changed

+144
-142
lines changed

courses/BigDataSpark/w1/wikipedia/build.sbt

+23-18
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,34 @@
11
course := "bigdata"
22
assignment := "wikipedia"
33

4-
scalaVersion := "3.1.0"
4+
// HAS TO BE: sbt 1.7.0, spark 3.2.0, Java 11. Does not work otherwise.
5+
scalaVersion := "3.4.2"
56
scalacOptions ++= Seq("-language:implicitConversions", "-deprecation")
67
libraryDependencies ++= Seq(
7-
"org.scalameta" %% "munit" % "0.7.26" % Test,
8-
excludes(("org.apache.spark" %% "spark-core" % "3.2.0").cross(CrossVersion.for3Use2_13)),
9-
excludes(("org.apache.spark" %% "spark-sql" % "3.2.0").cross(CrossVersion.for3Use2_13))
8+
"org.scalameta" %% "munit" % "1.0.0" % Test,
9+
excludes(
10+
("org.apache.spark" %% "spark-core" % "3.2.0").cross(CrossVersion.for3Use2_13)
11+
),
12+
excludes(
13+
("org.apache.spark" %% "spark-sql" % "3.2.0").cross(CrossVersion.for3Use2_13)
14+
)
1015
)
1116

12-
//netty-all replaces all these excludes
17+
// netty-all replaces all these excludes
1318
def excludes(m: ModuleID): ModuleID =
14-
m.exclude("io.netty", "netty-common").
15-
exclude("io.netty", "netty-handler").
16-
exclude("io.netty", "netty-transport").
17-
exclude("io.netty", "netty-buffer").
18-
exclude("io.netty", "netty-codec").
19-
exclude("io.netty", "netty-resolver").
20-
exclude("io.netty", "netty-transport-native-epoll").
21-
exclude("io.netty", "netty-transport-native-unix-common").
22-
exclude("javax.xml.bind", "jaxb-api").
23-
exclude("jakarta.xml.bind", "jaxb-api").
24-
exclude("javax.activation", "activation").
25-
exclude("jakarta.annotation", "jakarta.annotation-api").
26-
exclude("javax.annotation", "javax.annotation-api")
19+
m.exclude("io.netty", "netty-common")
20+
.exclude("io.netty", "netty-handler")
21+
.exclude("io.netty", "netty-transport")
22+
.exclude("io.netty", "netty-buffer")
23+
.exclude("io.netty", "netty-codec")
24+
.exclude("io.netty", "netty-resolver")
25+
.exclude("io.netty", "netty-transport-native-epoll")
26+
.exclude("io.netty", "netty-transport-native-unix-common")
27+
.exclude("javax.xml.bind", "jaxb-api")
28+
.exclude("jakarta.xml.bind", "jaxb-api")
29+
.exclude("javax.activation", "activation")
30+
.exclude("jakarta.annotation", "jakarta.annotation-api")
31+
.exclude("javax.annotation", "javax.annotation-api")
2732

2833
// Without forking, ctrl-c doesn't actually fully stop Spark
2934
run / fork := true
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
sbt.version=1.6.1
1+
sbt.version=1.7.0

courses/BigDataSpark/w1/wikipedia/src/main/scala/wikipedia/WikipediaData.scala

+5-4
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ import scala.io.Source
44
import scala.io.Codec
55

66
object WikipediaData:
7-
87
private[wikipedia] def lines: List[String] =
98
Option(getClass.getResourceAsStream("/wikipedia/wikipedia-grading.dat")) match
10-
case None => sys.error(
11-
"Please download the dataset as explained in the assignment instructions")
9+
case None =>
10+
sys.error(
11+
"Please download the dataset as explained in the assignment instructions"
12+
)
1213
case Some(resource) =>
1314
Source
1415
.fromInputStream(resource)(Codec.UTF8)
@@ -19,5 +20,5 @@ object WikipediaData:
1920
val subs = "</title><text>"
2021
val i = line.indexOf(subs)
2122
val title = line.substring(14, i)
22-
val text = line.substring(i + subs.length, line.length-16)
23+
val text = line.substring(i + subs.length, line.length - 16)
2324
WikipediaArticle(title, text)

courses/BigDataSpark/w1/wikipedia/src/main/scala/wikipedia/WikipediaRanking.scala

+16-35
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import org.apache.spark.SparkConf
44
import org.apache.spark.SparkContext
55
import org.apache.spark.SparkContext.*
66
import org.apache.log4j.{Logger, Level}
7-
87
import org.apache.spark.rdd.RDD
98
import scala.util.Properties.isWin
109

@@ -17,10 +16,7 @@ case class WikipediaArticle(title: String, text: String):
1716
def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
1817

1918
object WikipediaRanking extends WikipediaRankingInterface:
20-
// Reduce Spark logging verbosity
21-
Logger
22-
.getLogger("org.apache.spark")
23-
.setLevel(Level.ERROR)
19+
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) // Reduce Spark log verbosity
2420

2521
if isWin then
2622
System.setProperty(
@@ -46,21 +42,17 @@ object WikipediaRanking extends WikipediaRankingInterface:
4642
"Groovy"
4743
)
4844

49-
val conf: SparkConf = // TODO
50-
new SparkConf()
51-
.setMaster("local[4]")
52-
.setAppName("MyApp")
53-
45+
val conf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("MyApp") // TODO
5446
val sc: SparkContext = new SparkContext(conf) // TODO
5547

5648
// Hint: use a combination of `sc.parallelize`,
5749
// `WikipediaData.lines` and `WikipediaData.parse`
5850
val wikiRdd: RDD[WikipediaArticle] = // TODO
5951
sc.parallelize(WikipediaData.lines map WikipediaData.parse, 4)
6052

61-
/** Returns the number of articles on which the language `lang` occurs. Hint1:
62-
* consider using method `aggregate` on RDD[T]. Hint2: consider using method
63-
* `mentionsLanguage` on `WikipediaArticle`
53+
/** Returns the number of articles on which the language `lang` occurs. Hint1: consider
54+
* using method `aggregate` on RDD[T]. Hint2: consider using method `mentionsLanguage`
55+
* on `WikipediaArticle`
6456
*/
6557
def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int = // TODO
6658
val fun = (count: Int, article: WikipediaArticle) =>
@@ -79,22 +71,18 @@ object WikipediaRanking extends WikipediaRankingInterface:
7971
langs: List[String],
8072
rdd: RDD[WikipediaArticle]
8173
): List[(String, Int)] = // TODO
82-
val langRanks =
83-
for lang <- langs
84-
yield (lang, occurrencesOfLang(lang, rdd))
85-
74+
val langRanks = for lang <- langs yield (lang, occurrencesOfLang(lang, rdd))
8675
langRanks.sortBy(-_._2)
8776

88-
/* Compute an inverted index of the set of articles, mapping each language
89-
* to the Wikipedia pages in which it occurs.
77+
/* Compute an inverted index of the set of articles, mapping each language to the
78+
* Wikipedia pages in which it occurs.
9079
*/
9180
def makeIndex(
9281
langs: List[String],
9382
rdd: RDD[WikipediaArticle]
9483
): RDD[(String, Iterable[WikipediaArticle])] = // TODO
9584
val fun = (article: WikipediaArticle) =>
96-
for lang <- langs filter article.mentionsLanguage
97-
yield (lang, article)
85+
for lang <- langs filter article.mentionsLanguage yield (lang, article)
9886

9987
rdd // flatMap, for/yield, filter, groupByKey are LAZY
10088
.flatMap(fun) // RDD[(String, WikipediaArticle)]
@@ -129,8 +117,7 @@ object WikipediaRanking extends WikipediaRankingInterface:
129117
rdd: RDD[WikipediaArticle]
130118
): List[(String, Int)] = // TODO
131119
val fun = (article: WikipediaArticle) =>
132-
for lang <- langs filter article.mentionsLanguage
133-
yield (lang, 1)
120+
for lang <- langs filter article.mentionsLanguage yield (lang, 1)
134121

135122
rdd
136123
.flatMap(fun)
@@ -139,30 +126,24 @@ object WikipediaRanking extends WikipediaRankingInterface:
139126
.collect // LAZY up to here!
140127
.toList
141128

142-
def main(args: Array[String]): Unit =
143-
144-
/* Languages ranked according to (1) */
145-
val langsRanked: List[(String, Int)] =
129+
def main: Unit =
130+
val langsRanked: List[(String, Int)] = /* Languages ranked according to (1) */
146131
timed("Part 1: naive ranking", rankLangs(langs, wikiRdd))
147132

148-
/* An inverted index mapping languages to wikipedia pages
149-
* on which they appear */
150-
def index: RDD[(String, Iterable[WikipediaArticle])] =
151-
makeIndex(langs, wikiRdd)
133+
/* An inverted index mapping languages to wikipedia pages on which they appear */
134+
def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd)
152135

153136
/* Languages ranked according to (2), using the inverted index */
154137
val langsRanked2: List[(String, Int)] =
155138
timed("Part 2: ranking using inverted index", rankLangsUsingIndex(index))
156139

157-
/* Languages ranked according to (3) */
158-
val langsRanked3: List[(String, Int)] =
140+
val langsRanked3: List[(String, Int)] = /* Languages ranked according to (3) */
159141
timed(
160142
"Part 3: ranking using reduceByKey",
161143
rankLangsReduceByKey(langs, wikiRdd)
162144
)
163145

164-
/* Output the speed of each ranking */
165-
println(timing)
146+
println(timing) /* Output the speed of each ranking */
166147
sc.stop()
167148

168149
val timing = new StringBuffer

courses/BigDataSpark/w1/wikipedia/src/main/scala/wikipedia/WikipediaRankingInterface.scala

+16-18
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,23 @@ import org.apache.spark.SparkContext
44
import org.apache.spark.SparkContext.*
55
import org.apache.spark.rdd.RDD
66

7-
/**
8-
* The interface used by the grading infrastructure. Do not change signatures
9-
* or your submission will fail with a NoSuchMethodError.
10-
*/
7+
/** The interface used by the grading infrastructure. Do not change signatures or your
8+
* submission will fail with a NoSuchMethodError.
9+
*/
1110
trait WikipediaRankingInterface:
12-
def makeIndex(langs: List[String],
13-
rdd: RDD[WikipediaArticle])
14-
: RDD[(String, Iterable[WikipediaArticle])]
15-
def occurrencesOfLang(lang: String,
16-
rdd: RDD[WikipediaArticle])
17-
: Int
18-
def rankLangs(langs: List[String],
19-
rdd: RDD[WikipediaArticle])
20-
: List[(String, Int)]
21-
def rankLangsReduceByKey(langs: List[String],
22-
rdd: RDD[WikipediaArticle])
23-
: List[(String, Int)]
24-
def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])])
25-
: List[(String, Int)]
11+
def makeIndex(
12+
langs: List[String],
13+
rdd: RDD[WikipediaArticle]
14+
): RDD[(String, Iterable[WikipediaArticle])]
15+
def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int
16+
def rankLangs(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)]
17+
def rankLangsReduceByKey(
18+
langs: List[String],
19+
rdd: RDD[WikipediaArticle]
20+
): List[(String, Int)]
21+
def rankLangsUsingIndex(
22+
index: RDD[(String, Iterable[WikipediaArticle])]
23+
): List[(String, Int)]
2624
def langs: List[String]
2725
def sc: SparkContext
2826
def wikiRdd: RDD[WikipediaArticle]

0 commit comments

Comments
 (0)