본문 바로가기
Programming

[Apache Spark] Core Programming

by Deafhong 2019. 3. 18.
반응형
p.s> 개인적인 공부 및 내용 정리를 위해 파파고 및 구글을 통해 번역을 한 것이므로, 틀린 번역 내용이 있을 수도 있습니다.
이점을 감안하시고, 읽어봐주세요.


스파크 코어는 전체 프로젝트의 밑거름이 된다. 분산형 작업 파견, 스케줄링 및 기본 I/O 기능을 제공한다. 스파크는 RDD(Resilient Distributed Datasets)로 알려진 전문화된 기본 데이터 구조를 이용한다. RDD는 두 가지 방법으로 만들 수 있다. 하나는 외부 스토리지 시스템의 데이터 세트를 참조하는 것이고, 두 번째는 기존 RDD에 변환(예: 지도, 필터, 감쇠기, 결합)을 적용하는 것이다.

RDD 추상화는 언어 통합 API를 통해 노출된다. 이것은 응용 프로그램이 RDD를 조작하는 방법이 로컬 데이터 컬렉션을 조작하는 것과 유사하기 때문에 프로그래밍 복잡성을 단순화한다.

스파크 셸

스파크는 데이터를 대화식으로 분석하는 강력한 도구인 대화형 쉘을 제공한다. 그것은 Scala나 Python 언어로 이용 가능하다. 스파크의 1차 추상화는 RDD(Resilient Distributed Dataset)라고 하는 아이템의 분산 집합이다. RDD는 Hadoop 입력 형식(HDFS 파일 등)에서 생성하거나 다른 RDD를 변형하여 생성할 수 있다.

오픈 스파크 셸

다음 명령은 스파크 셸을 여는 데 사용된다.
$ spark-shell

단순 RDD 만들기
텍스트 파일에서 간단한 RDD를 만들자. 다음 명령을 사용하여 간단한 RDD를 생성한다.
scala> val inputfile = sc.textFile(“input.txt”)

위 명령의 출력은
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12

스파크 RDD API는 RDD를 조작하기 위한 몇 가지 변환과 몇 가지 동작을 도입한다.

RDD 변환

RDD 변환은 포인터를 새로운 RDD로 되돌리고 당신이 RDD들 사이에 의존성을 만들 수 있게 해준다. 의존성 체인의 각 RDD는 그것의 데이터를 계산하는 기능을 가지고 있고 그것의 상위 RDD에 대한 포인터(의존성)를 가지고 있다.
스파크는 게으르기 때문에 일자리 창출과 집행을 촉발할 어떤 변화나 행동을 불러오지 않으면 아무 것도 실행되지 않는다. 다음의 워드카운트 예시를 보자.
따라서, RDD 변환은 데이터 집합이 아니라 스파크에게 데이터를 얻는 방법과 그것을 가지고 무엇을 해야 하는지를 알려주는 프로그램의 한 단계일 수 있다.

아래에 제시된 것은 RDD 변환 목록이다.
S.No
Transformations & Meaning
1
map(func)
Returns a new distributed dataset, formed by passing each element of the source through a function func.
2
filter(func)
Returns a new dataset formed by selecting those elements of the source on which func returns true.
3
flatMap(func)
Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
4
mapPartitions(func)
Similar to map, but runs separately on each partition (block) of the RDD, so funcmust be of type Iterator<T> ⇒ Iterator<U> when running on an RDD of type T.
5
mapPartitionsWithIndex(func)
Similar to map Partitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) ⇒ Iterator<U> when running on an RDD of type T.
6
sample(withReplacement, fraction, seed)
Sample a fraction of the data, with or without replacement, using a given random number generator seed.
7
union(otherDataset)
Returns a new dataset that contains the union of the elements in the source dataset and the argument.
8
intersection(otherDataset)
Returns a new RDD that contains the intersection of elements in the source dataset and the argument.
9
distinct([numTasks])
Returns a new dataset that contains the distinct elements of the source dataset.
10
groupByKey([numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note − If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.
11
reduceByKey(func, [numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V, V) ⇒ V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
12
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different from the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
13
sortByKey([ascending], [numTasks])
When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the Boolean ascending argument.
14
join(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
15
cogroup(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called group With.
16
cartesian(otherDataset)
When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
17
pipe(command, [envVars])
Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
18
coalesce(numPartitions)
Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
19
repartition(numPartitions)
Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
20
repartitionAndSortWithinPartitions(partitioner)
Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

Actions

다음 표는 값을 반환하는 조치 목록을 제공한다.
S.No
Action & Meaning
1
reduce(func)
Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
2
collect()
Returns all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
3
count()
Returns the number of elements in the dataset.
4
first()
Returns the first element of the dataset (similar to take (1)).
5
take(n)
Returns an array with the first n elements of the dataset.
6
takeSample (withReplacement,num, [seed])
Returns an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
7
takeOrdered(n, [ordering])
Returns the first n elements of the RDD using either their natural order or a custom comparator.
8
saveAsTextFile(path)
Writes the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark calls toString on each element to convert it to a line of text in the file.
9
saveAsSequenceFile(path) (Java and Scala)
Writes the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
10
saveAsObjectFile(path) (Java and Scala)
Writes the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
11
countByKey()
Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
12
foreach(func)
Runs a function func on each element of the dataset. This is usually, done for side effects such as updating an Accumulator or interacting with external storage systems.
Note − modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

Programming with RDD ( RDD로 프로그래밍 )

예를 들어 RDD 프로그래밍에서 소수의 RDD 변환과 조치의 구현을 보자.

Example

워드 카운트의 예를 고려해보자 - 문서에 나타나는 각 단어를 계수한다. 다음 텍스트를 입력으로 간주하여 입력으로 저장한다.

input.txt - input file.
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
아래의 절차를 따라 주어진 예를 실행한다.

Open Spark-Shell ( 오픈 스파크-쉘 )

다음 명령은 스파크 셸을 열 때 사용한다. 일반적으로 스파크는 스칼라를 이용해 만들어진다. 그래서 스파크 프로그램은 스칼라 환경에서 운영된다.
$ spark-shell

만약 스파크 쉘이 성공적으로 열리면, 당신은 다음의 출력을 발견할 것이다. "Sc로 사용할 수 있는 스파크 컨텍스트" 출력의 마지막 선을 보면 스파크 컨테이너가 이름 sc로 자동으로 스파크 컨텍스트 객체가 생성된다는 것을 의미한다. 프로그램의 첫 단계를 시작하기 전에 SparkContext 객체를 생성해야 한다.

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>

Create an RDD ( RDD 만들기 )

먼저, 우리는 Spark-Scala API를 사용하여 입력파일을 읽고 RDD를 생성해야 한다.
다음 명령은 지정된 위치에서 파일을 읽을 때 사용한다. 여기서, 입력 파일 이름으로 새로운 RDD가 생성된다. textFile("") 메서드에서 인수로 주어지는 문자열은 입력 파일 이름의 절대 경로다. 단, 파일명만 주어지면 입력 파일이 현재 위치에 있음을 의미한다.

scala> val inputfile = sc.textFile("input.txt")

Execute Word count Transformation ( 워드 카운트 변환 실행 )

우리의 목표는 그 단어들을 한 줄로 세는 것이다. 각 행을 단어로 분할하기 위한 평면 맵을 작성한다( flatMap(line => line.split("") ).
다음, map 함수를 사용하여 값이 '1'인 키로 각 단어를 읽는다. ( map(word => (word, 1)).
마지막으로, 유사한 키의 값을 추가하여 이러한 키를 줄인다. ( reduceByKey( _ + _ ) )

다음 명령은 워드 카운트 로직을 실행하는 데 사용된다. 이것을 실행한 후, 당신은 어떤 출력도 찾을 수 없을 것이다. 왜냐하면 이것은 행동이 아니기 때문이다; 이것은 변환이다; 새로운 RDD를 가리키거나 주어진 데이터로 무엇을 해야 하는지 스파크를 알려준다.)

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

Current RDD ( 현재 RDD )

RDD와 작업하는 동안, 현재 RDD에 대해 알고 싶은 경우, 다음 명령을 사용한다. 그것은 당신에게 디버깅을 위한 현재의 RDD와 그것의 의존성에 대한 설명을 보여줄 것이다.

Caching the Transformations ( 전환 캐싱 )

당신은 그것의 persist() 또는 cache() 방법을 사용하여 RDD가 지속되도록 표시할 수 있다. 그것이 행동으로 처음 계산될 때, 그것은 노드에서 기억될 것이다. 다음 명령을 사용하여 중간 변환을 메모리에 저장한다.
scala> counts.cache()

Applying the Action ( 액션 적용 )

모든 변환을 저장하듯이 어떤 동작을 적용하면 텍스트 파일로 결과가 된다. SaveAsTextFile()" 메소드를 위한 String 인수는 출력 폴더의 절대 경로다. 출력을 텍스트 파일에 저장하려면 다음 명령을 시도해 보십시오. 다음 예에서 '출력' 폴더가 현재 위치에 있다.

scala> counts.saveAsTextFile("output")

Checking the Output ( 출력 확인 )

다른 터미널을 열어 홈 디렉토리(다른 터미널에서 스파크가 실행됨)로 이동한다. 출력 디렉터리를 점검하려면 다음 명령을 사용하십시오.
[hadoop@localhost ~]$ cd output/
[hadoop@localhost output]$ ls -1

part-00000
part-00001
_SUCCESS

다음 명령은 파트-00000 파일의 출력을 보기 위해 사용된다.
[hadoop@localhost output]$ cat part-00000

Output

(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)

다음 명령은 파트-00001 파일의 출력을 보기 위해 사용된다.
[hadoop@localhost output]$ cat part-00001

Output

(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)


UN Persist the Storage ( 유엔 저장고 페르소나 )

UN-영구하기 전에 이 응용 프로그램에 사용되는 저장 공간을 보려면 브라우저에서 다음 URL을 사용하십시오.

http://localhost:4040

스파크 쉘에서 실행 중인 애플리케이션에 사용되는 저장 공간을 보여주는 다음 화면이 나타난다.


특정 RDD의 저장 공간을 UN-영구하려면 다음 명령을 사용한다.
Scala> counts.unpersist()

다음과 같은 출력물을 볼 수 있다.
15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810)
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106)
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14

브라우저의 저장공간 확인을 위해 다음 URL을 사용한다.
http://localhost:4040/

다음과 같은 화면이 나타난다. 스파크 쉘에서 실행 중인 어플리케이션에 사용된 저장공간을 보여준다.




반응형

'Programming' 카테고리의 다른 글

[Apache Spark] Advanced Spark Programming  (1) 2019.03.18
[Apache Spark] Deployment  (1) 2019.03.18
[Apache Spark] Installation  (0) 2019.03.18
[Apache Spark] RDD  (0) 2019.03.18
[Apache Spark] - Home  (0) 2019.03.18