리눅스

[ISP 확인] IP에 해당하는 ISP(인터넷 서비스 제공자) 확인

박히응 2025. 12. 5. 17:48

실행 목적

사용자의 ISP 파악

실제 경로 내 파일

실 사용 파일 : 대상 log, GeoLite2-ASN.mmdb, asn_lookup.py, venv

선행 조건

  1. 구성된 ubuntu 환경
  2. GeoLite2-ASN.mmdb 다운로드 필요
  3. ubuntu 환경에서 python 라이브러리 설치 불가 시 가상환경(venv) 구성
    1. apt install python3.12-venv #설치
    2. python3 -m venv venv #가상환경 구성
    3. source venv/bin/activate #가상환경 사용
    4. deactivate #가상환경 종료

실행 순서

  1. asn_lookup.py 내용 중 GeoLite2-ASN.mmdb 위치 지정

 

   2. asn_lookup.py 내용 중 대상 로그 지정

 

   3. python3 asn_lookup.py

작성 내용

asn_lookup.py

import maxminddb
import pandas as pd
import os
import time

# --- 설정 변수 ---
# GeoLite2-ASN 데이터베이스 파일 경로
DB_PATH = '/root/isp_check/GeoLite2-ASN.mmdb'

# 입력 파일 리스트
INPUT_FILES = [
    'KT_20251027.log', 'KT_20251026.log', 'KT_20251025.log', 'KT_20251024.log',
    'KT_20251023.log', 'KT_20251022.log', 'KT_20251021.log', 'KT_20251020.log',
    'SK_20251027.log', 'SK_20251026.log', 'SK_20251025.log', 'SK_20251024.log',
    'SK_20251023.log', 'SK_20251022.log', 'SK_20251021.log', 'SK_20251020.log'
]

# --- ISP 분류 함수 (키워드 보강됨) ---
def classify_isp(org_name):
    """
    조직 이름을 받아 KT, SK, LG, Other로 분류합니다.
    인수합병된 전신 기업명(Hanaro, Dacom 등)과 자회사(Skylife, HelloVision 등)를 포함합니다.
    """
    if not isinstance(org_name, str):
        return 'Other'

    org_upper = org_name.upper()

    # 1. KT 그룹 키워드
    # KORNET, QOOK, KTF, SKYLIFE(스카이라이프), HCN(현대HCN 인수)
    kt_keywords = [
        'KT', 'KOREA TELECOM', 'KORNET', 'OLLEH', 'HCN',
        'SKYLIFE', 'KTF', 'QOOK', 'K T', 'KT CORPORATION', 'KT-IDC'
    ]

    # 2. SK 그룹 키워드
    # SKB, SKT, HANARO(하나로텔레콤), DREAMLINE(드림라인), SK TELINK
    sk_keywords = [
        'SK', 'SK BROADBAND', 'SK TELECOM', 'SKT', 'SKB',
        'HANARO', 'DREAMLINE', 'SK TELINK', 'SK STOA'
    ]

    # 3. LG 그룹 키워드
    # DACOM(데이콤), POWERCOMM(파워콤), HELLOVISION(헬로비전), CJ HELLO
    lg_keywords = [
        'LG', 'LGU+', 'DACOM', 'POWERCOMM', 'UPLUS', 'LG TELECOM', 'LGT',
        'HELLOVISION', 'CJ HELLO', 'CJHELLO'
    ]

    # 분류 로직
    if any(k in org_upper for k in kt_keywords):
        return 'KT'
    elif any(k in org_upper for k in sk_keywords):
        return 'SK'
    elif any(k in org_upper for k in lg_keywords):
        return 'LG'
    else:
        return 'Other'

# --- 메인 로직 ---

try:
    reader = maxminddb.open_database(DB_PATH)
except FileNotFoundError:
    print(f"오류: ASN 데이터베이스 파일 ({DB_PATH})을 찾을 수 없습니다.")
    exit()
except AttributeError:
    reader = maxminddb.open(DB_PATH)

total_ips_processed = 0
print(f"총 {len(INPUT_FILES)}개의 파일 처리를 시작합니다.")

for input_file in INPUT_FILES:
    start_time = time.time()
    output_file = f"{input_file}_isp_result.csv"

    print(f"\n--- 파일 처리 시작: {input_file} ---")

    results = []

    try:
        # 파일 읽기 (공백 구분자)
        df = pd.read_csv(input_file, sep='\s+', engine='python', header=None, usecols=[1])

        # IP 조회
        for index, row in df.iterrows():
            ip_str = str(row[1]).strip()

            if not ip_str:
                continue

            try:
                record = reader.get(ip_str)
                asn = record.get('autonomous_system_number', '') if record else ''
                org = record.get('autonomous_system_organization', 'UNKNOWN') if record else 'UNKNOWN'

                results.append({
                    'Original_IP': ip_str,
                    'ASN': asn,
                    'Organization': org
                })
                total_ips_processed += 1

            except (ValueError, Exception):
                results.append({
                    'Original_IP': ip_str,
                    'ASN': 'ERROR',
                    'Organization': 'UNKNOWN'
                })

        # DataFrame 생성 및 결과 저장
        results_df = pd.DataFrame(results)
        results_df.to_csv(output_file, index=False, encoding='utf-8-sig')

        # --- 요약 로직 (Grouping) ---
        results_df['ISP_Group'] = results_df['Organization'].apply(classify_isp)
        group_counts = results_df['ISP_Group'].value_counts()
        total_count = len(results_df)

        # 파일 끝에 요약 추가
        with open(output_file, 'a', encoding='utf-8-sig') as f:
            f.write('\n\n')
            f.write('#ISP Summary\n')

            # LG, KT, SK, Other 순서대로 출력 (데이터 없어도 0.00%로 출력됨)
            target_isps = ['LG', 'KT', 'SK', 'Other']

            for isp in target_isps:
                count = group_counts.get(isp, 0)
                if total_count > 0:
                    percent = (count / total_count * 100)
                else:
                    percent = 0.0

                # 요청하신 포맷 적용
                f.write(f"{isp} : {percent:.2f} %\n")

        end_time = time.time()
        print(f"처리 완료. IP 수: {len(df)}, 소요 시간: {end_time - start_time:.2f}초")
        print(f"결과 저장됨: {output_file}")

    except FileNotFoundError:
        print(f"경고: 입력 파일 {input_file}을 찾을 수 없습니다. 건너뜁니다.")
    except Exception as e:
        print(f"경고: 파일 {input_file} 처리 중 오류 발생: {e}")

reader.close()
print(f"\n===== 모든 파일 처리 완료! 총 처리된 IP 수: {total_ips_processed} =====")

실행 결과

'리눅스' 카테고리의 다른 글

sed 스크립트  (0) 2025.04.10
IP 추출 스크립트  (0) 2025.04.10
멀티_커맨드  (0) 2025.04.08
[인증 기간 확인] SSL 인증 체크 스크립트  (0) 2025.04.03