Monday, May 8, 2017
Deploy a Scala job to a Spark Cluster using SBT
Deploy a Scala job to a Spark Cluster using SBT
Environment
- sbt 0.13.8
- Scala 2.11.6
- Spark 1.2.1
- Debian Linux (Ubuntu 14.10)
Scala Program
This simple program written in Scala will analyze a local file on my system, count the number of times that lines containing a and b occur, and print the total to the console.
/*** SimpleApp.scala ***/
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object SimpleApp {
def main(args: Array[String]) {
val logFile = "/home/craig/spark/README.md" // Should be some file on your system
val sc = new SparkContext("local", "Simple App", "$SPARK_HOME", List("target/scala-2.11/simple-project_2.11-1.0.jar"))
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}
Directory Structure
Ive created the following directory structure in my home directory:
mkdir -p ~/apps/simple/src/main/scala
The scala program above will be saved under the "src/main/scala" directory.
Build Script
Create a build script called simple.sbt under the "~/apps/simple/" directory:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.6"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.1"
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.6.0"
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
The lines highlighted in red should be replaced with your specific version numbers.
Building the Program
Navigate to ~/apps/simple and type "sbt" on the terminal session:
craig@spark:~/apps/simple$ sbt
[info] Set current project to Simple Project (in build file:/home/craig/apps/simple/)
>
Type "package" at the prompt:
> package
[info] Updating {file:/home/craig/apps/simple/}simple...
[info] Resolving jline#jline;2.12.1 ...
[info] Done updating.
[info] Compiling 1 Scala source to /home/craig/apps/simple/target/scala-2.11/classes...
[info] Packaging /home/craig/apps/simple/target/scala-2.11/simple-project_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 18 s, completed Mar 24, 2015 11:15:18 AM
My directory structure now looks like this:
craig@spark:~/apps/simple$ tree
.
??? simple.sbt
??? src
?�� ??? main
?�� ??? scala
?�� ??? SimpleApp.scala
??? target
??? resolution-cache
?�� ??? reports
?�� ?�� ??? ivy-report.css
?�� ?�� ??? ivy-report.xsl
?�� ?�� ??? simple-project-simple-project_2.11-compile-internal.xml
?�� ?�� ??? simple-project-simple-project_2.11-compile.xml
?�� ?�� ??? simple-project-simple-project_2.11-docs.xml
?�� ?�� ??? simple-project-simple-project_2.11-optional.xml
?�� ?�� ??? simple-project-simple-project_2.11-plugin.xml
?�� ?�� ??? simple-project-simple-project_2.11-pom.xml
?�� ?�� ??? simple-project-simple-project_2.11-provided.xml
?�� ?�� ??? simple-project-simple-project_2.11-runtime-internal.xml
?�� ?�� ??? simple-project-simple-project_2.11-runtime.xml
?�� ?�� ??? simple-project-simple-project_2.11-scala-tool.xml
?�� ?�� ??? simple-project-simple-project_2.11-sources.xml
?�� ?�� ??? simple-project-simple-project_2.11-test-internal.xml
?�� ?�� ??? simple-project-simple-project_2.11-test.xml
?�� ??? simple-project
?�� ??? simple-project_2.11
?�� ??? 1.0
?�� ??? resolved.xml.properties
?�� ??? resolved.xml.xml
??? scala-2.11
?�� ??? classes
?�� ?�� ??? SimpleApp$$anonfun$1.class
?�� ?�� ??? SimpleApp$$anonfun$2.class
?�� ?�� ??? SimpleApp.class
?�� ?�� ??? SimpleApp$.class
?�� ??? simple-project_2.11-1.0.jar
??? streams
??? compile
?�� ??? compile
?�� ?�� ??? $global
?�� ?�� ??? streams
?�� ?�� ??? out
?�� ??? compileIncremental
?�� ?�� ??? $global
?�� ?�� ??? streams
?�� ?�� ??? export
?�� ?�� ??? out
?�� ??? copyResources
?�� ?�� ??? $global
?�� ?�� ??? streams
?�� ?�� ??? copy-resources
?�� ?�� ??? out
?�� ??? dependencyClasspath
?�� ?�� ??? $global
?�� ?�� ??? streams
?�� ?�� ??? export
?�� ??? externalDependencyClasspath
?�� ?�� ??? $global
?�� ?�� ??? streams
?�� ?�� ??? export
?�� ??? $global
?�� ?�� ??? $global
?�� ?�� ??? discoveredMainClasses
?�� ?�� ??? data
?�� ??? incCompileSetup
?�� ?�� ??? $global
?�� ?�� ??? streams
?�� ?�� ??? inc_compile_2.11
?�� ??? internalDependencyClasspath
?�� ?�� ??? $global
?�� ?�� ??? streams
?�� ?�� ??? export
?�� ??? mainClass
?�� ?�� ??? $global
?�� ?�� ??? streams
?�� ?�� ??? out
?�� ??? managedClasspath
?�� ?�� ??? $global
?�� ?�� ??? streams
?�� ?�� ??? export
?�� ??? packageBin
?�� ?�� ??? $global
?�� ?�� ??? streams
?�� ?�� ??? inputs
?�� ?�� ??? out
?�� ?�� ??? output
?�� ??? unmanagedClasspath
?�� ?�� ??? $global
?�� ?�� ??? streams
?�� ?�� ??? export
?�� ??? unmanagedJars
?�� ??? $global
?�� ??? streams
?�� ??? export
??? $global
??? dependencyPositions
?�� ??? $global
?�� ??? streams
?�� ??? update_cache_2.11
?�� ??? input_dsp
?�� ??? output_dsp
??? $global
?�� ??? $global
?�� ??? streams
?�� ??? out
??? ivyConfiguration
?�� ??? $global
?�� ??? streams
?�� ??? out
??? ivySbt
?�� ??? $global
?�� ??? streams
?�� ??? out
??? projectDescriptors
?�� ??? $global
?�� ??? streams
?�� ??? out
??? update
??? $global
??? streams
??? out
??? update_cache_2.11
??? inputs
??? output
73 directories, 50 files
Running the Program
Inside your working directory (~/apps/simple), execute this command:
$SPARK_HOME/bin/./spark-submit --class "SimpleApp" --master local[8] target/scala-2.11/simple-project_2.11-1.0.jar
Successful output on my workstation looks like this:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/03/24 11:20:37 INFO SecurityManager: Changing view acls to: craig
15/03/24 11:20:37 INFO SecurityManager: Changing modify acls to: craig
15/03/24 11:20:37 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(craig); users with modify permissions: Set(craig)
15/03/24 11:20:38 INFO Slf4jLogger: Slf4jLogger started
15/03/24 11:20:38 INFO Remoting: Starting remoting
15/03/24 11:20:38 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.0.4.15:49222]
15/03/24 11:20:38 INFO Utils: Successfully started service 'sparkDriver' on port 49222.
15/03/24 11:20:38 INFO SparkEnv: Registering MapOutputTracker
15/03/24 11:20:38 INFO SparkEnv: Registering BlockManagerMaster
15/03/24 11:20:38 INFO DiskBlockManager: Created local directory at /tmp/spark-91e5f424-b1e6-4d51-a010-a4e0ac788725/spark-2d07771d-f3ad-4c70-bb5b-1329be484b4f
15/03/24 11:20:38 INFO MemoryStore: MemoryStore started with capacity 265.1 MB
15/03/24 11:20:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/03/24 11:20:38 INFO HttpFileServer: HTTP File server directory is /tmp/spark-cf38f85d-efd0-4941-b4f6-d245b9d8380c/spark-f903d7b0-c67c-48f0-9eb0-4a3279903d2a
15/03/24 11:20:38 INFO HttpServer: Starting HTTP Server
15/03/24 11:20:39 INFO Utils: Successfully started service 'HTTP file server' on port 50465.
15/03/24 11:20:39 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/03/24 11:20:39 INFO SparkUI: Started SparkUI at http://10.0.4.15:4040
15/03/24 11:20:39 INFO SparkContext: Added JAR target/scala-2.11/simple-project_2.11-1.0.jar at http://10.0.4.15:50465/jars/simple-project_2.11-1.0.jar with timestamp 1427221239211
15/03/24 11:20:39 INFO Executor: Starting executor ID <driver> on host localhost
15/03/24 11:20:39 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@10.0.4.15:49222/user/HeartbeatReceiver
15/03/24 11:20:39 INFO NettyBlockTransferService: Server created on 54739
15/03/24 11:20:39 INFO BlockManagerMaster: Trying to register BlockManager
15/03/24 11:20:39 INFO BlockManagerMasterActor: Registering block manager localhost:54739 with 265.1 MB RAM, BlockManagerId(<driver>, localhost, 54739)
15/03/24 11:20:39 INFO BlockManagerMaster: Registered BlockManager
15/03/24 11:20:39 INFO MemoryStore: ensureFreeSpace(180608) called with curMem=0, maxMem=278019440
15/03/24 11:20:39 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 176.4 KB, free 265.0 MB)
15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(25432) called with curMem=180608, maxMem=278019440
15/03/24 11:20:40 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 24.8 KB, free 264.9 MB)
15/03/24 11:20:40 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:54739 (size: 24.8 KB, free: 265.1 MB)
15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/03/24 11:20:40 INFO SparkContext: Created broadcast 0 from textFile at SimpleApp.scala:9
15/03/24 11:20:40 INFO FileInputFormat: Total input paths to process : 1
15/03/24 11:20:40 INFO SparkContext: Starting job: count at SimpleApp.scala:10
15/03/24 11:20:40 INFO DAGScheduler: Got job 0 (count at SimpleApp.scala:10) with 2 output partitions (allowLocal=false)
15/03/24 11:20:40 INFO DAGScheduler: Final stage: Stage 0(count at SimpleApp.scala:10)
15/03/24 11:20:40 INFO DAGScheduler: Parents of final stage: List()
15/03/24 11:20:40 INFO DAGScheduler: Missing parents: List()
15/03/24 11:20:40 INFO DAGScheduler: Submitting Stage 0 (FilteredRDD[2] at filter at SimpleApp.scala:10), which has no missing parents
15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(2720) called with curMem=206040, maxMem=278019440
15/03/24 11:20:40 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.7 KB, free 264.9 MB)
15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(1950) called with curMem=208760, maxMem=278019440
15/03/24 11:20:40 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1950.0 B, free 264.9 MB)
15/03/24 11:20:40 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:54739 (size: 1950.0 B, free: 265.1 MB)
15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/03/24 11:20:40 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838
15/03/24 11:20:40 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (FilteredRDD[2] at filter at SimpleApp.scala:10)
15/03/24 11:20:40 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/03/24 11:20:40 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1361 bytes)
15/03/24 11:20:40 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/03/24 11:20:40 INFO Executor: Fetching http://10.0.4.15:50465/jars/simple-project_2.11-1.0.jar with timestamp 1427221239211
15/03/24 11:20:40 INFO Utils: Fetching http://10.0.4.15:50465/jars/simple-project_2.11-1.0.jar to /tmp/spark-4055924d-8bde-4216-9932-526612364a63/spark-ac26b090-87ab-4f55-b0b9-3c8571d43307/fetchFileTemp507742889710724975.tmp
15/03/24 11:20:40 INFO Executor: Adding file:/tmp/spark-4055924d-8bde-4216-9932-526612364a63/spark-ac26b090-87ab-4f55-b0b9-3c8571d43307/simple-project_2.11-1.0.jar to class loader
15/03/24 11:20:40 INFO CacheManager: Partition rdd_1_0 not found, computing it
15/03/24 11:20:40 INFO HadoopRDD: Input split: file:/home/craig/spark/README.md:0+1814
15/03/24 11:20:40 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/03/24 11:20:40 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/03/24 11:20:40 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/03/24 11:20:40 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/03/24 11:20:40 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(6208) called with curMem=210710, maxMem=278019440
15/03/24 11:20:40 INFO MemoryStore: Block rdd_1_0 stored as values in memory (estimated size 6.1 KB, free 264.9 MB)
15/03/24 11:20:40 INFO BlockManagerInfo: Added rdd_1_0 in memory on localhost:54739 (size: 6.1 KB, free: 265.1 MB)
15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block rdd_1_0
15/03/24 11:20:40 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2326 bytes result sent to driver
15/03/24 11:20:40 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 1361 bytes)
15/03/24 11:20:40 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
15/03/24 11:20:40 INFO CacheManager: Partition rdd_1_1 not found, computing it
15/03/24 11:20:40 INFO HadoopRDD: Input split: file:/home/craig/spark/README.md:1814+1815
15/03/24 11:20:40 INFO MemoryStore: ensureFreeSpace(5400) called with curMem=216918, maxMem=278019440
15/03/24 11:20:40 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 260 ms on localhost (1/2)
15/03/24 11:20:40 INFO MemoryStore: Block rdd_1_1 stored as values in memory (estimated size 5.3 KB, free 264.9 MB)
15/03/24 11:20:40 INFO BlockManagerInfo: Added rdd_1_1 in memory on localhost:54739 (size: 5.3 KB, free: 265.1 MB)
15/03/24 11:20:40 INFO BlockManagerMaster: Updated info of block rdd_1_1
download more info
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment