본문 바로가기
Server-side 개발 & 트러블 슈팅/🐘 Hadoop (하둡)

[hadoop] 하둡 PageRank 알고리즘 개념과 MapReduce를 이용한 실습

by 코딩하는 동현 2025. 4. 20.

Hadoop 기반 PageRank 실습 블로그


PageRank 알고리즘 개요

PageRank는 웹페이지 간의 링크 구조를 바탕으로 페이지의 상대적인 중요도를 측정하기 위한 알고리즘으로, 구글의 공동 창업자 Larry Page와 Sergey Brin이 개발하였다. 이는 그래프 형태로 표현된 웹 구조에서 각 노드(페이지)의 중요도를 반복적으로 계산하여 수렴시키는 방식이다.

핵심 개념

  • 링크는 투표다: A가 B로 링크를 건다면, 이는 A가 B를 중요하다고 ‘투표’한 것이다.
  • 중요한 페이지로부터 받은 투표는 더 큰 가치가 있다.
  • 링크를 많이 건 페이지는 투표 가치를 나눠준다.
  • 모든 페이지의 PR 값은 반복 계산을 통해 안정화된다.
  • Damping Factor (보통 0.85): 링크를 따라갈 확률. 나머지는 무작위 이동.

행렬 기반 표현 방식

Adjacency Matrix (인접 행렬) 예:

각 행은 ‘from(출발 페이지)’, 열은 ’to(도착 페이지)’를 의미하며, 연결되어 있으면 1, 없으면 0이다

 

 

확률 행렬로 변환 예:

각 페이지가 몇 개의 out-link를 가지고 있는지 세고, 해당 링크에 중요도를 1/out-degree만큼 분배한다

 

 

 

초기 PageRank 벡터 설정

모든 페이지에 동일한 점수(예: 1/3)를 부여하고 시작합니다. 이 벡터는 행렬과 곱해지며 반복된다.

초기 PR 벡터: [1/3, 1/3, 1/3]

 

 

페이지별 PageRank 업데이트 계산

예를 들어, A에 들어오는 링크는 C밖에 없다면, 각 노드로 “들어오는” 링크들만 고려하여 점수를 누적한다.

PR(A) = PR(C) × M_A,C
      = 1/3 × 1
      = 1/3

 

 

  • B가 A에게 주는 영향력: B → A가 없으면 기여도 0
  • C가 A에게 주는 영향력: C → A가 있다면, PR(C) × (1 / C의 outlink 수)

 

반복적으로 이 계산을 하다 보면 PageRank 벡터는 수렴 상태에 도달하며, 모든 페이지의 중요도가 안정적으로 계산됩니다.

 

 

수식

PR(A) = (1-d)/N + d × Σ[ PR(i) / C(i) ]

  • d: Damping Factor (0.85)
  • N: 전체 페이지 수
  • PR(i): A로 링크를 건 페이지 i의 PageRank 값
  • C(i): 페이지 i의 Out-link 수

입력 파일 예시

입력 형식: 노드ID\t랭크정보

1	0:1:1:1:0:
2	0:0:0:0:1:
3	0:1:0:0:1:
4	1:0:1:0:0:
5	0:0:1:1:0:

코드 구조 및 주요 흐름

1. preprocess() 메서드

  • 입력 파일을 읽어 각 노드의 Out-link 수를 계산한 후, 확률 분포로 변환한다.
  • 그 결과를 HDFS에 저장한다.
BufferedReader in = new BufferedReader(...);
FSDataOutputStream out = fs.create(...);
...
while((line = in.readLine()) != null){
    ...
    tempAdjacency[i][j] = tempAdjacency[i][j] / onesInThisRow;
    stringAdjacency[i] += tempAdjacency[i][j] + ":";
}


예) 0:1:1:1:0: → 0.0:0.33:0.33:0.33:0.0:


2. Mapper (MyMapper)

  • 노드의 ID를 key로, out-link 정보를 value로 전달한다.
  • 각 이웃 노드에게 PageRank 기여값도 전달한다.

 

예: 1\t0.0:0.5:0.5:0.0:0.0: 입력 →

  • 자기 ID(1)와 현재 outlink 정보 그대로 출력
  • 각 이웃 노드(2,3 등)에게 본인의 랭크 기여도 값을 전달

출력 형태:

<1, 0.0:0.5:0.5:0.0:0.0:>  
<2, 0.5>  
<3, 0.5>

 

// Mapper 클래스: 각 노드의 outlink 정보를 이웃 노드에 전달
public static class MyMapper extends Mapper<Object, Text, IntWritable, Text> {
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        // 입력 형식: "노드ID\t랭크정보"
        // 예: 1    0.2:0.3:0.5:0.0:0.0:
        String[] line = value.toString().split("\\s+");  // 공백, 탭 등을 기준으로 나눔

        // line[0] = 노드 ID (예: "1") → IntWritable 타입으로 변환
        IntWritable keyNode = new IntWritable(Integer.parseInt(line[0]));

        // key: 현재 노드 ID, value: 전체 랭크 정보 문자열
        context.write(keyNode, new Text(line[1]));

        // 랭크 정보 문자열을 ":" 기준으로 나누어 각 이웃 노드에 전파할 기여값 추출
        String[] inLinks = line[1].split(":");

        // 각 이웃 노드(i+1)에게 현재 노드가 주는 랭크 기여값(inLinks[i])을 전달
        for (int i = 0; i < inLinks.length; i++) {
            context.write(new IntWritable(i + 1), new Text(inLinks[i]));
        }
    }
}

3. Reducer (MyReducer)

  • 각 노드로 들어온 랭크 기여값을 합산
  • 다시 outlink 방향으로 재분배
  • 이전 값과 비교하여 변화가 있으면 counters++ (→ 수렴 안됨)

 

  • 입력: (이웃 노드 ID, [기여값들 + 구조정보])
  • 출력: (노드 ID, 새로운 분배 문자열)

결과적으로 finalAdjList = "0.1:0.2:0.0:0.3:0.4:" 와 같이 하나의 colon-separated 문자열로 완성됩니다.

  • 이 형식은 Mapper에서 다시 읽을 수 있도록 설계된 것입니다.
  • Reducer의 출력은 다음 iteration에서 Mapper의 입력으로 사용되기 때문에, 다음에 파싱하기 쉬운 형태(콜론 기준 분리)로 만드는 것입니다.
// Reducer 클래스: 각 노드로 들어온 PageRank 기여값을 합산하고, 새로 계산된 PR을 outlink로 분배
public static class MyReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
    protected void reduce(IntWritable key, Iterable<Text> nodeDistances, Context context)
            throws IOException, InterruptedException {

        double sum = 0.0; // 이 노드로 들어오는 PageRank 기여값을 합산
        String[] adjList = new String[5]; // 해당 노드가 가진 아웃링크 정보를 저장할 배열

        // 전달된 값 리스트를 순회: 하나는 그래프 구조 정보(콜론 포함), 나머지는 PR 기여값
        for (Text nodeDistance : nodeDistances) {
            if (nodeDistance.toString().contains(":")) {
                // 그래프 구조 (adjacency list)인 경우 분해하여 저장
                adjList = nodeDistance.toString().split(":");
            } else {
                // 랭크 기여값인 경우 sum에 누적
                sum += Double.parseDouble(nodeDistance.toString());
            }
        }

        // 유효한 아웃링크 개수 계산 (0.0이 아닌 경우만 카운트)
        int count = 0;
        for (String outlinks : adjList) {
            if (!outlinks.equals("0.0"))
                count++;
        }

        // 새로운 랭크 정보를 저장할 배열
        String[] newAdjList = new String[5];

        // outlink가 있다면, sum을 나누어 분배 / 없으면 0.0 유지
        for (int i = 0; i < adjList.length; i++) {
            if (adjList[i].equals("0.0")) {
                newAdjList[i] = "0.0";
            } else {
                double newRank = sum / count; // PR 기여값을 균등 분배
                newAdjList[i] = String.valueOf(newRank);
            }
        }

        // 최종 문자열 형식으로 변환 ("0.1:0.2:0.0:0.3:0.4:")
        String finalAdjList = "";
        for (String aList : newAdjList)
            finalAdjList = finalAdjList.concat(aList).concat(":");

        // 이전 값과 비교해 변화가 큰 경우 → 아직 수렴하지 않았음
        for (int i = 0; i < adjList.length; i++) {
            if (Math.abs(Double.parseDouble(adjList[i]) - Double.parseDouble(newAdjList[i])) > 0.0001) {
                counters++; // 반복을 계속하게 만듬
                break;
            }
        }

        // 최종 출력: key(노드 번호), value(업데이트된 랭크 분배 정보)
        context.write(key, new Text(finalAdjList));
    }
}

4. lastCalculate() 메서드

  • 마지막 PR 계산 결과 파일을 읽고 최종 PageRank 점수를 합산하여 저장한다.
for(int i = 0; i < counter; i++) {
    for(int j = 0; j < counter; j++) {
        sum[i] += tempAdjacency[i][j];
    }
    out.writeBytes((i+1) + "\t" + sum[i] + "\n");
}

5. main() 실행 흐름

  1. 입력 파일 전처리
  2. 수렴 조건 만족할 때까지 MapReduce 반복 실행
  3. 최종 결과 계산 및 저장

 

단계 설명
전처리 원래 입력 파일을 PR 분배 형식으로 가공 (preprocess)
반복 MapReduce 변화가 있는 동안 MapReduce 반복 실행하며 PR 업데이트
수렴 판단 PR 값이 충분히 변화가 작아지면 반복 종료
최종 계산 마지막 출력 파일을 기반으로 각 노드의 PageRank를 집계하여 결과 파일로 출력
// 메인 함수: 전처리 수행 → 반복 MapReduce 실행 → 최종 결과 계산
public static void main(String[] args) throws Exception {

    // 입력 파일 경로 (사용자로부터 전달받음)
    String inputPath = args[0];

    // 전처리 후 저장될 파일 경로
    String preprocessPath = "/input/preprocess.txt";

    // 최종 결과가 저장될 경로
    String finalPath = args[1];

    // 중간 출력 디렉토리의 기본 경로
    String outputPath = "/outputs/output";

    // 1. 초기 입력 파일을 전처리하여 PR 정보로 변환
    preprocess(inputPath, preprocessPath);
    inputPath = preprocessPath;

    // 2. PageRank 점수가 수렴할 때까지 반복 실행
    do {
        // Hadoop Configuration 및 Job 객체 초기화
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 파일 시스템 핸들 얻기
        FileSystem fs = FileSystem.get(conf);
        Path tmppath = new Path(outputPath + roundNumber);

        // 동일한 이름의 출력 디렉토리가 있으면 삭제
        if (fs.exists(tmppath))
            fs.delete(tmppath, true);

        // 이번 반복에서 변화 감지용 카운터 초기화
        counters = 0;

        // MapReduce 실행 (Mapper + Reducer)
        myMapReduceTask(job, inputPath, outputPath + roundNumber);

        // 다음 반복을 위해 입력 파일 경로 갱신
        inputPath = outputPath + roundNumber + "/part-r-00000";
        roundNumber++;

    } while (counters > 0); // 변화가 있다면 계속 반복

    // 3. 반복 종료 후, 최종 PageRank 값 계산 및 출력
    lastCalculate(outputPath + (roundNumber - 1) + "/part-r-00000", finalPath);
}

 

6. 최종 코드

import java.io.*;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

public class PageRank {
    private static int roundNumber = 0;
    private static int counters;

    // 입력 adjacency matrix를 읽고, 링크 수를 기반으로 rank 분배 비율을 계산하여 출력
    public static void preprocess(String inputFileName, String outputFileName) throws IOException {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path inFile = new Path(inputFileName);
        Path outFile = new Path(outputFileName);

        BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(inFile)));
        FSDataOutputStream out = fs.create(outFile);

        double[][] tempAdjacency = null;
        String[] stringAdjacency = null;
        int counter = 0;

        try {
            String line;
            while ((line = in.readLine()) != null) {
                line = (line.split("\\s+"))[1];
                int len = line.split(":").length;
                if (tempAdjacency == null || stringAdjacency == null) {
                    tempAdjacency = new double[len][len];
                    stringAdjacency = new String[len];
                }
                stringAdjacency[counter] = line;
                counter++;
            }

            for (int i = 0; i < counter; i++) {
                String[] StringtempAdjacency = stringAdjacency[i].split(":");
                int onesInThisRow = 0;
                stringAdjacency[i] = "";

                for (int j = 0; j < StringtempAdjacency.length; j++) {
                    if (StringtempAdjacency[j].equals("1"))
                        onesInThisRow++;
                    tempAdjacency[i][j] = Double.parseDouble(StringtempAdjacency[j]);
                    tempAdjacency[i][j] = tempAdjacency[i][j] / counter;
                }

                for (int j = 0; j < StringtempAdjacency.length; j++) {
                    tempAdjacency[i][j] = tempAdjacency[i][j] / (double) onesInThisRow;
                    stringAdjacency[i] = stringAdjacency[i].concat(tempAdjacency[i][j] + ":");
                }
            }

            for (int i = 0; i < counter; i++) {
                out.writeBytes((i + 1) + "\t" + stringAdjacency[i]);
                if (i != (counter - 1))
                    out.writeBytes("\n");
            }
        } catch (Exception e) {
        } finally {
            in.close();
            out.close();
        }
    }

    // 반복이 끝난 후 마지막 PR 값을 집계하여 출력
    public static void lastCalculate(String inputFileName, String outputFileName) throws IOException {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path inFile = new Path(inputFileName);
        Path outFile = new Path(outputFileName);

        BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(inFile)));
        FSDataOutputStream out = fs.create(outFile);

        double[][] tempAdjacency = null;
        String[] stringAdjacency = null;
        int counter = 0;

        try {
            String line;
            while ((line = in.readLine()) != null) {
                line = (line.split("\\s+"))[1];
                int len = line.split(":").length;
                if (tempAdjacency == null || stringAdjacency == null) {
                    tempAdjacency = new double[len][len];
                    stringAdjacency = new String[len];
                }
                stringAdjacency[counter] = line;
                counter++;
            }

            for (int i = 0; i < counter; i++) {
                String[] StringtempAdjacency = stringAdjacency[i].split(":");
                for (int j = 0; j < StringtempAdjacency.length; j++) {
                    tempAdjacency[i][j] = Double.parseDouble(StringtempAdjacency[j]);
                }
            }

            double[] sum = new double[counter];
            for (int i = 0; i < counter; i++) {
                for (int j = 0; j < counter; j++) {
                    sum[i] += tempAdjacency[i][j];
                }
            }
            for (int i = 0; i < counter; i++)
                out.writeBytes((i + 1) + "\t" + sum[i] + "\n");
        } catch (Exception e) {
        } finally {
            in.close();
            out.close();
        }
    }

    // Mapper: 각 노드에서 outlink 정보를 이웃 노드로 전달함
    public static class MyMapper extends Mapper<Object, Text, IntWritable, Text> {
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] line = value.toString().split("\\s+");
            IntWritable keyNode = new IntWritable(Integer.parseInt(line[0]));
            context.write(keyNode, new Text(line[1]));

            String[] inLinks = line[1].split(":");
            for (int i = 0; i < inLinks.length; i++)
                context.write(new IntWritable(i + 1), new Text(inLinks[i]));
        }
    }

    // Reducer: 수신된 PR 기여값을 합산하고, outlink로 재분배
    public static class MyReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
        protected void reduce(IntWritable key, Iterable<Text> nodeDistances, Context context)
                throws IOException, InterruptedException {
            double sum = 0.0;
            String[] adjList = new String[5];
            for (Text nodeDistance : nodeDistances) {
                if (nodeDistance.toString().contains(":"))
                    adjList = nodeDistance.toString().split(":");
                else
                    sum += Double.parseDouble(nodeDistance.toString());
            }

            int count = 0;
            for (String outlinks : adjList) {
                if (!outlinks.equals("0.0"))
                    count++;
            }

            String[] newAdjList = new String[5];
            for (int i = 0; i < adjList.length; i++) {
                if (adjList[i].equals("0.0"))
                    newAdjList[i] = "0.0";
                else {
                    double newRank = sum / count;
                    newAdjList[i] = newRank + "";
                }
            }

            String finalAdjList = "";
            for (String aList : newAdjList)
                finalAdjList += aList + ":";

            // 수렴 확인: 기존과 차이가 크면 반복 계속함
            for (int i = 0; i < adjList.length; i++) {
                if ((Double.parseDouble(adjList[i]) - Double.parseDouble(newAdjList[i])) > 0.0001) {
                    counters++;
                    break;
                }
            }
            context.write(key, new Text(finalAdjList));
        }
    }

    // 실제 MapReduce 작업 정의
    public static void myMapReduceTask(Job job, String inputPath, String outputPath) throws Exception {
        Configuration conf = new Configuration();
        job.setJarByClass(PageRank.class);

        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        job.waitForCompletion(true);
    }

    // 메인 함수: 전처리 → 반복 MR → 결과 출력
    public static void main(String[] args) throws Exception {
        String inputPath = args[0];
        String preprocessPath = "/input/preprocess.txt";
        String finalPath = args[1];
        String outputPath = "/outputs/output";

        preprocess(inputPath, preprocessPath);
        inputPath = preprocessPath;

        do {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);

            FileSystem fs = FileSystem.get(conf);
            Path tmppath = new Path(outputPath + roundNumber);
            if (fs.exists(tmppath))
                fs.delete(tmppath, true);

            counters = 0;
            myMapReduceTask(job, inputPath, outputPath + roundNumber);
            inputPath = outputPath + roundNumber + "/part-r-00000";
            roundNumber++;

        } while (counters > 0);

        lastCalculate(outputPath + (roundNumber - 1) + "/part-r-00000", finalPath);
    }
}

실행 명령어 정리

1단계. Hadoop 클러스터 실행

start-dfs.sh
start-yarn.sh

2단계. 입력 파일 준비 및 업로드

hdfs dfs -mkdir /input/
hdfs dfs -put ~/pagerankinput.txt /input/

3단계. 자바 코드 컴파일 및 JAR 생성

hadoop com.sun.tools.javac.Main PageRank.java
jar cf PageRank.jar PageRank*.class

4단계. PageRank 실행

hadoop jar PageRank.jar PageRank /input/pagerankinput.txt /finaloutput/finaloutput.txt

5단계. 출력 결과 확인

hdfs dfs -cat /finaloutput/*

6단계. Hadoop 종료

stop-dfs.sh
stop-yarn.sh
 

 

반응형

댓글