본문 바로가기
Python

공격자 IP 차단 Python 소스

by ERLite 2026. 5. 28.

👍공격자 IP를 차단하는 전체 소스입니다.

main.py

import subprocess
import time
import json
import os
import urllib.request
import urllib.parse
from collections import defaultdict
from datetime import datetime, timedelta

CONFIG_FILE = "config.json"

def load_config():
    if os.path.exists(CONFIG_FILE):
        with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
            return json.load(f)
    return {
        "check_interval_seconds": 60,
        "block_threshold": 5,
        "time_window_minutes": 10,
        "rule_name_prefix": "IvyFirewall_Block_",
        "whitelist_ips": ["127.0.0.1", "::1"]
    }

class IvyFirewall:
    def __init__(self, config):
        self.config = config
        self.check_interval = config.get("check_interval_seconds", 60)
        self.block_threshold = config.get("block_threshold", 5)
        self.time_window = timedelta(minutes=config.get("time_window_minutes", 10))
        self.rule_prefix = config.get("rule_name_prefix", "IvyFirewall_Block_")
        self.whitelist = set(config.get("whitelist_ips", ["127.0.0.1", "::1"]))
        self.telegram_bot_token = config.get("telegram_bot_token", "")
        self.telegram_chat_id = config.get("telegram_chat_id", "")
       
        self.failed_attempts = defaultdict(list)
        self.blocked_ips = set()
        self.rule_name = self.rule_prefix + "List"
        self.rule_exists = False
       
        self._load_existing_rules()

    def _load_existing_rules(self):
        print(f"기존 방화벽 차단 규칙({self.rule_name})을 불러오는 중...")
        cmd = f'netsh advfirewall firewall show rule name="{self.rule_name}"'
        try:
            result = subprocess.run(cmd, shell=True, capture_output=True, text=True, encoding='cp949', errors='ignore')
            if result.returncode == 0:
                self.rule_exists = True
                for line in result.stdout.split('\n'):
                    line_lower = line.lower()
                    if "remoteip" in line_lower or "원격 ip" in line_lower or "remote ip" in line_lower:
                        parts = line.split(':')
                        if len(parts) > 1:
                            ips_str = parts[1].strip()
                            if ips_str.lower() != 'any' and ips_str.lower() != '모두':
                                for ip in ips_str.split(','):
                                    ip = ip.strip()
                                    if ip:
                                        self.blocked_ips.add(ip)
                print(f"총 {len(self.blocked_ips)}개의 이전에 차단된 IP를 불러왔습니다.")
            else:
                print("기존 차단 규칙이 존재하지 않습니다. 새로 생성할 예정입니다.")
                self.rule_exists = False
        except Exception as e:
            print(f"기존 규칙 로드 실패: {e}")
            self.rule_exists = False

    def get_failed_logons(self):
        """이벤트 뷰어에서 최근 실패한 로그인(RDP 등) 시도를 가져옵니다."""
        minutes_to_check = max(1, self.check_interval // 60) + 1 # 여유있게 1분 추가
        time_str = (datetime.now() - timedelta(minutes=minutes_to_check)).strftime('%Y-%m-%dT%H:%M:%S')
       
        ps_command = f"""
        $ErrorActionPreference = 'SilentlyContinue'
        $events = Get-WinEvent -FilterHashtable @{{LogName='Security'; Id=4625; StartTime='{time_str}'}}
        if ($events) {{
            foreach ($event in $events) {{
                $xml = [xml]$event.ToXml()
                $ip = $xml.Event.EventData.Data | Where-Object {{ $_.Name -eq 'IpAddress' }} | Select-Object -ExpandProperty '#text'
                if ($ip -and $ip -ne '-' -and $ip -notmatch ':') {{
                    Write-Output $ip
                }}
            }}
        }}
        """
       
        try:
            # powershell 호출
            result = subprocess.run(["powershell", "-Command", ps_command], capture_output=True, text=True)
            ips = result.stdout.strip().split('\n')
            return [ip.strip() for ip in ips if ip.strip()]
        except subprocess.CalledProcessError as e:
            print(f"이벤트 로그 조회 오류: {e}")
            return []

    def send_telegram_message(self, message):
        if not self.telegram_bot_token or not self.telegram_chat_id:
            return
           
        try:
            url = f"https://api.telegram.org/bot{self.telegram_bot_token}/sendMessage"
            data = urllib.parse.urlencode({
                "chat_id": self.telegram_chat_id,
                "text": message,
                "parse_mode": "HTML"
            }).encode("utf-8")
           
            req = urllib.request.Request(url, data=data)
            with urllib.request.urlopen(req, timeout=5) as response:
                pass
        except Exception as e:
            print(f"텔레그램 알림 전송 실패: {e}")

    def block_ip(self, ip):
        if ip in self.whitelist:
            return
           
        print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] IP 차단 진행: {ip}")
        self.blocked_ips.add(ip)
        ips_str = ",".join(sorted(list(self.blocked_ips)))
       
        try:
            if self.rule_exists:
                # 기존 규칙 업데이트
                block_cmd = f'netsh advfirewall firewall set rule name="{self.rule_name}" new remoteip="{ips_str}"'
                subprocess.run(block_cmd, shell=True, check=True, capture_output=True)
            else:
                # 새 규칙 생성
                block_cmd = f'netsh advfirewall firewall add rule name="{self.rule_name}" dir=in action=block remoteip="{ips_str}"'
                subprocess.run(block_cmd, shell=True, check=True, capture_output=True)
                self.rule_exists = True
               
            print(f"성공적으로 차단되었습니다: {ip} (총 차단 IP 개수: {len(self.blocked_ips)}개)")
           
            # 텔레그램 알림 발송
            msg = f"🚨 <b>IvyFirewall IP 차단 알림</b>\n\n" \
                  f"지속적인 로그인 실패가 감지되어 방화벽에 차단 규칙을 업데이트했습니다.\n\n" \
                  f"<b>차단된 IP:</b> <code>{ip}</code>\n" \
                  f"<b>전체 차단된 IP 목록 ({len(self.blocked_ips)}개):</b>\n<code>{ips_str}</code>"
            self.send_telegram_message(msg)
           
        except subprocess.CalledProcessError as e:
            if self.rule_exists:
                try:
                    print("기존 규칙 업데이트 실패. 규칙이 없거나 오류가 발생하여 규칙을 새로 생성합니다.")
                    block_cmd = f'netsh advfirewall firewall add rule name="{self.rule_name}" dir=in action=block remoteip="{ips_str}"'
                    subprocess.run(block_cmd, shell=True, check=True, capture_output=True)
                    self.rule_exists = True
                    print(f"성공적으로 차단되었습니다(재생성): {ip} (총 차단 IP 개수: {len(self.blocked_ips)}개)")
                   
                    msg = f"🚨 <b>IvyFirewall IP 차단 알림 (규칙 재생성)</b>\n\n" \
                          f"지속적인 로그인 실패가 감지되어 방화벽에 차단 규칙을 생성했습니다.\n\n" \
                          f"<b>차단된 IP:</b> <code>{ip}</code>\n" \
                          f"<b>전체 차단된 IP 목록 ({len(self.blocked_ips)}개):</b>\n<code>{ips_str}</code>"
                    self.send_telegram_message(msg)
                    return
                except subprocess.CalledProcessError as ex:
                    print(f"차단 실패 ({ip}): {ex}")
            else:
                print(f"차단 실패 ({ip}): {e}")
               
            if ip in self.blocked_ips:
                self.blocked_ips.remove(ip)

    def clean_old_attempts(self):
        """시간이 지난 실패 기록 삭제"""
        now = datetime.now()
        for ip in list(self.failed_attempts.keys()):
            self.failed_attempts[ip] = [ts for ts in self.failed_attempts[ip] if now - ts <= self.time_window]
            if not self.failed_attempts[ip]:
                del self.failed_attempts[ip]

    def run(self):
        print("="*50)
        print("IvyFirewall 모니터링이 시작되었습니다.")
        print(f"설정: {self.time_window.seconds // 60}분 내에 {self.block_threshold}회 실패 시 IP 차단")
        print("종료하려면 Ctrl+C를 누르세요.")
        print("="*50)
       
        while True:
            try:
                #print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 이벤트 로그 확인 중...")
                ips = self.get_failed_logons()
                now = datetime.now()
               
                for ip in ips:
                    if ip in self.blocked_ips or ip in self.whitelist:
                        continue
                       
                    self.failed_attempts[ip].append(now)
               
                self.clean_old_attempts()
               
                # 임계치 확인 및 차단
                for ip, attempts in list(self.failed_attempts.items()):
                    if len(attempts) >= self.block_threshold:
                        self.block_ip(ip)
                        del self.failed_attempts[ip] # 차단 후 기록 삭제
               
                time.sleep(self.check_interval)
               
            except KeyboardInterrupt:
                print("\n프로그램을 종료합니다.")
                break
            except Exception as e:
                print(f"예기치 않은 오류: {e}")
                time.sleep(self.check_interval)

if __name__ == "__main__":
    import ctypes
   
    # 관리자 권한 확인
    def is_admin():
        try:
            return ctypes.windll.shell32.IsUserAnAdmin()
        except:
            return False

    if not is_admin():
        print("경고: 관리자 권한으로 실행되지 않았습니다. 방화벽 규칙을 추가하려면 관리자 권한이 필요합니다.")
        print("명령 프롬프트나 PowerShell을 '관리자 권한으로 실행'한 후 다시 시도해주세요.")
        input("엔터를 누르면 종료합니다...")
        exit(1)
       
    config = load_config()
    firewall = IvyFirewall(config)
    firewall.run()

config.json

{
  "check_interval_seconds": 60,
  "block_threshold": 5,
  "time_window_minutes": 10,
  "rule_name_prefix": "IvyFirewall_Block_",
  "whitelist_ips": [
    "127.0.0.1",
    "::1"
  ],
  "telegram_bot_token": "텔레그램 토큰",
  "telegram_chat_id": "텔레그램ID"
}

main.py 소스 코드는 윈도우 환경에서 보안을 강화하기 위해 RDP(원격 데스크톱) 등의 로그인 실패 공격을 탐지하고, 자동으로 방화벽에 차단 IP를 등록하는 침입 차단 프로그램입니다.

이 프로그램의 핵심 아키텍처와 상세 동작 방식을 세부 요소별로 설명해 드리겠습니다.


1. 프로그램의 동작 흐름 개요

  1. 관리자 권한 확인: 실행 시 윈도우 방화벽을 제어해야 하므로 관리자 권한 여부를 체크합니다.
  2. 설정 로드: config.json 파일을 읽어 동작 주기, 임계값, 허용 IP(화이트리스트), 텔레그램 알림 정보 등을 파싱합니다.
  3. 기존 규칙 조회: 이미 등록된 IvyFirewall_Block_List 규칙에서 이전에 차단되었던 IP 목록을 파싱해 내부 메모리에 유지합니다.
  4. 무한 루프 감시:
    • 설정된 주기마다 윈도우 **Security 이벤트 로그(ID: 4625 - 로그인 실패)**를 PowerShell을 이용해 수집합니다.
    • 특정 IP의 실패 횟수가 시간 범위 내에서 임계치를 초과하면 해당 IP를 차단 대상으로 판단합니다.
    • 단일 방화벽 규칙(IvyFirewall_Block_List)의 원격 IP 속성 영역에 해당 IP를 추가하여 업데이트하고, 관리자에게 텔레그램 알림을 발송합니다.

2. 세부 함수 및 메서드 설명

⚙️ 설정 관리 및 초기화

  • load_config():
    • 프로그램 설정이 포함된 config.json을 읽어옵니다.
    • 파일이 없는 경우 기본값(동작 주기 60초, 10분 내 5회 실패 시 차단, 로컬 IP 화이트리스트 등)으로 설정 딕셔너리를 반환합니다.
  • IvyFirewall.__init__(self, config):
    • 각종 상태 값(시간 윈도우 계산용 timedelta, 화이트리스트 IP 집합 등)을 세팅합니다.
    • 단일 방화벽 인바운드 규칙명(self.rule_name = "IvyFirewall_Block_List")을 선언하고 기존 차단 정보를 로드하는 _load_existing_rules() 메서드를 호출합니다.

🔍 기존 차단 정보 동기화 (_load_existing_rules)

  • 단일 차단 규칙(IvyFirewall_Block_List)이 방화벽에 존재하는지 검사합니다.
  • netsh advfirewall firewall show rule 명령의 출력 텍스트 중 원격 IP 주소 필드(RemoteIP, 원격 IP, Remote IP)를 파싱하여, 기존에 이미 차단되었던 IP들을 self.blocked_ips 집합(Set)에 완전히 동기화합니다. 이로 인해 프로그램 재시작 시에도 중복 차단 작업을 하지 않습니다.

🕵️ RDP 로그인 실패 로그 감시 (get_failed_logons)

  • PowerShell의 Get-WinEvent cmdlet을 활용하여 이벤트 뷰어의 Security 로그 중 **이벤트 ID 4625(로그인 실패)**를 타겟팅합니다.
  • XML 형식의 이벤트 데이터에서 실제 로그인을 시도한 공격자의 원격 IP 주소(IpAddress)를 추출하여 리스트로 반환합니다.

🚫 핵심 차단 로직 (block_ip)

  • 허용 IP(화이트리스트) 대역은 실수로 본인이 차단되는 것을 막기 위해 사전에 걸러냅니다.
  • 새로운 차단 IP가 확보되면 기존에 차단된 IP 목록과 병합하여 쉼표(,)로 구분된 단일 문자열을 만듭니다.
  • 방화벽 규칙의 상태에 따라 분기 처리합니다.
    • 규칙이 이미 존재할 때 (set rule): netsh advfirewall firewall set rule name="IvyFirewall_Block_List" new remoteip="IP1,IP2,IP3..." 명령어로 원격 IP 속성을 일괄 업데이트합니다.
    • 규칙이 존재하지 않을 때 (add rule): netsh advfirewall firewall add rule name="IvyFirewall_Block_List" dir=in action=block remoteip="IP1,IP2,IP3..." 명령어로 단일 차단 인바운드 규칙을 새롭게 생성합니다.
    • 예외 복구(Fallback): 사용자가 방화벽 규칙을 임의로 지우는 등의 비정상적인 실패 상황이 발생하면, 업데이트 실패 시 자동으로 재생성(add rule)을 시도하여 시스템 안전성을 극대화합니다.

📨 텔레그램 실시간 알림 (send_telegram_message)

  • 공격 IP가 신규로 차단될 때 텔레그램 봇 API를 통해 관리자에게 실시간 알림 메시지를 전송합니다.
  • 메시지에는 금방 차단된 IP 정보뿐만 아니라 전체 차단된 IP 목록과 총 개수를 함께 시각화하여 한눈에 모니터링할 수 있도록 돕습니다.

🧹 메모리 최적화 및 메인 루프 (clean_old_attempts & run)

  • clean_old_attempts: 지정한 시간 범위(예: 10분)가 지난 무의미한 로그인 실패 기록들을 메모리 딕셔너리에서 주기적으로 청소하여 메모리 누수를 원천 방어합니다.
  • run: 감시 주기마다 위 프로세스를 순환하며 키보드 인터럽트(Ctrl + C) 입력 시 프로그램을 안전하게 종료시킵니다.