main.py 소스
import subprocess
import time
import json
import os
import urllib.request
import urllib.parse
from collections import defaultdict
from datetime import datetime, timedelta
CONFIG_FILE = "config.json"
def load_config():
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return {
"check_interval_seconds": 60,
"block_threshold": 5,
"time_window_minutes": 10,
"rule_name_prefix": "IvyFirewall_Block_",
"whitelist_ips": ["127.0.0.1", "::1"]
}
class IvyFirewall:
def __init__(self, config):
self.config = config
self.check_interval = config.get("check_interval_seconds", 60)
self.block_threshold = config.get("block_threshold", 5)
self.time_window = timedelta(minutes=config.get("time_window_minutes", 10))
self.rule_prefix = config.get("rule_name_prefix", "IvyFirewall_Block_")
self.whitelist = set(config.get("whitelist_ips", ["127.0.0.1", "::1"]))
self.telegram_bot_token = config.get("telegram_bot_token", "")
self.telegram_chat_id = config.get("telegram_chat_id", "")
self.failed_attempts = defaultdict(list)
self.blocked_ips = set()
self._load_existing_rules()
def _load_existing_rules(self):
print("기존 방화벽 차단 규칙을 불러오는 중...")
cmd = 'netsh advfirewall firewall show rule name=all'
try:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, encoding='cp949', errors='ignore')
for line in result.stdout.split('\n'):
if self.rule_prefix in line:
# Rule Name: IvyFirewall_Block_192.168.1.1
parts = line.split(self.rule_prefix)
if len(parts) > 1:
ip = parts[1].strip()
self.blocked_ips.add(ip)
print(f"총 {len(self.blocked_ips)}개의 이전에 차단된 IP를 불러왔습니다.")
except Exception as e:
print(f"기존 규칙 로드 실패: {e}")
def get_failed_logons(self):
"""이벤트 뷰어에서 최근 실패한 로그인(RDP 등) 시도를 가져옵니다."""
minutes_to_check = max(1, self.check_interval // 60) + 1 # 여유있게 1분 추가
time_str = (datetime.now() - timedelta(minutes=minutes_to_check)).strftime('%Y-%m-%dT%H:%M:%S')
ps_command = f"""
$ErrorActionPreference = 'SilentlyContinue'
$events = Get-WinEvent -FilterHashtable @{{LogName='Security'; Id=4625; StartTime='{time_str}'}}
if ($events) {{
foreach ($event in $events) {{
$xml = [xml]$event.ToXml()
$ip = $xml.Event.EventData.Data | Where-Object {{ $_.Name -eq 'IpAddress' }} | Select-Object -ExpandProperty '#text'
if ($ip -and $ip -ne '-' -and $ip -notmatch ':') {{
Write-Output $ip
}}
}}
}}
"""
try:
# powershell 호출
result = subprocess.run(["powershell", "-Command", ps_command], capture_output=True, text=True)
ips = result.stdout.strip().split('\n')
return [ip.strip() for ip in ips if ip.strip()]
except subprocess.CalledProcessError as e:
print(f"이벤트 로그 조회 오류: {e}")
return []
def send_telegram_message(self, message):
if not self.telegram_bot_token or not self.telegram_chat_id:
return
try:
data = urllib.parse.urlencode({
"chat_id": self.telegram_chat_id,
"text": message,
"parse_mode": "HTML"
}).encode("utf-8")
req = urllib.request.Request(url, data=data)
with urllib.request.urlopen(req, timeout=5) as response:
pass
except Exception as e:
print(f"텔레그램 알림 전송 실패: {e}")
def block_ip(self, ip):
if ip in self.whitelist:
return
rule_name = f"{self.rule_prefix}{ip}"
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] IP 차단 진행: {ip}")
# 방화벽 인바운드 차단 규칙 추가
block_cmd = f'netsh advfirewall firewall add rule name="{rule_name}" dir=in action=block remoteip="{ip}"'
try:
subprocess.run(block_cmd, shell=True, check=True, capture_output=True)
print(f"성공적으로 차단되었습니다: {ip}")
self.blocked_ips.add(ip)
# 텔레그램 알림 발송
msg = f"🚨 <b>IvyFirewall IP 차단 알림</b>\n\n" \
f"지속적인 로그인 실패가 감지되어 방화벽에 차단 규칙을 등록했습니다.\n\n" \
f"<b>차단된 IP:</b> <code>{ip}</code>"
self.send_telegram_message(msg)
except subprocess.CalledProcessError as e:
print(f"차단 실패 ({ip}): {e}")
def clean_old_attempts(self):
"""시간이 지난 실패 기록 삭제"""
now = datetime.now()
for ip in list(self.failed_attempts.keys()):
self.failed_attempts[ip] = [ts for ts in self.failed_attempts[ip] if now - ts <= self.time_window]
if not self.failed_attempts[ip]:
del self.failed_attempts[ip]
def run(self):
print("="*50)
print("IvyFirewall 모니터링이 시작되었습니다.")
print(f"설정: {self.time_window.seconds // 60}분 내에 {self.block_threshold}회 실패 시 IP 차단")
print("종료하려면 Ctrl+C를 누르세요.")
print("="*50)
while True:
try:
#print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 이벤트 로그 확인 중...")
ips = self.get_failed_logons()
now = datetime.now()
for ip in ips:
if ip in self.blocked_ips or ip in self.whitelist:
continue
self.failed_attempts[ip].append(now)
self.clean_old_attempts()
# 임계치 확인 및 차단
for ip, attempts in list(self.failed_attempts.items()):
if len(attempts) >= self.block_threshold:
self.block_ip(ip)
del self.failed_attempts[ip] # 차단 후 기록 삭제
time.sleep(self.check_interval)
except KeyboardInterrupt:
print("\n프로그램을 종료합니다.")
break
except Exception as e:
print(f"예기치 않은 오류: {e}")
time.sleep(self.check_interval)
if __name__ == "__main__":
import ctypes
# 관리자 권한 확인
def is_admin():
try:
return ctypes.windll.shell32.IsUserAnAdmin()
except:
return False
if not is_admin():
print("경고: 관리자 권한으로 실행되지 않았습니다. 방화벽 규칙을 추가하려면 관리자 권한이 필요합니다.")
print("명령 프롬프트나 PowerShell을 '관리자 권한으로 실행'한 후 다시 시도해주세요.")
input("엔터를 누르면 종료합니다...")
exit(1)
config = load_config()
firewall = IvyFirewall(config)
firewall.run()
config.json 환경설정
{
"check_interval_seconds": 60,
"block_threshold": 5,
"time_window_minutes": 10,
"rule_name_prefix": "IvyFirewall_Block_",
"whitelist_ips": [
"127.0.0.1",
"::1"
],
"telegram_bot_token": "텔레그램 토큰 입력",
"telegram_chat_id": "텔레그램 챗 ID 입력"
}
이 프로그램은 리눅스의 fail2ban과 유사하게 윈도우 서버(또는 PC)에 대한 무차별 대입 공격(Brute-force)을 방어하기 위해 만들어진 자동화된 방화벽 IP 차단 도구입니다. 주로 RDP(원격 데스크톱) 등의 로그인 실패 이벤트를 감지하여 일정 횟수 이상 실패한 공격자의 IP를 윈도우 방화벽에 차단 등록하고 텔레그램으로 알림을 보냅니다.
전체적인 로직과 각 주요 함수별 상세 분석은 다음과 같습니다.
1. 설정 및 초기화
- load_config() 함수
- 동일한 폴더에 있는 config.json 파일에서 설정을 읽어옵니다. 파일이 없을 경우 기본값을 제공합니다.
- 주요 설정값:
- check_interval_seconds: 로그 확인 주기 (기본 60초)
- block_threshold: 차단 임계치 (기본 5회 실패 시 차단)
- time_window_minutes: 실패 횟수를 누적하는 시간 창 (기본 10분 내에 임계치 도달 시 차단)
- rule_name_prefix: 방화벽 규칙 이름의 접두사 (기본 IvyFirewall_Block_)
- whitelist_ips: 차단하지 않을 안전한 IP 목록
- telegram_bot_token, telegram_chat_id: 알림 발송용 텔레그램 봇 정보
- IvyFirewall.__init__()
- 클래스를 초기화하며 설정을 변수에 저장합니다.
- self.failed_attempts: 각 IP별로 로그인 실패 시간(timestamp)을 리스트로 기록하는 딕셔너리입니다.
- self.blocked_ips: 현재 차단된 IP들을 기억하는 Set 자료형으로 중복 차단을 방지합니다.
- 초기화 시 _load_existing_rules()를 호출하여 기존에 차단해둔 규칙을 불러옵니다.
2. 핵심 동작 메서드
- _load_existing_rules()
- 프로그램이 재시작되었을 때, 이전에 차단했던 내역이 날아가지 않도록 윈도우 방화벽을 조회합니다.
- netsh advfirewall firewall show rule name=all 명령을 실행하여 IvyFirewall_Block_ 접두사를 가진 규칙들을 찾고 해당 IP들을 self.blocked_ips에 등록합니다.
- get_failed_logons() (가장 중요한 모니터링 기능)
- 파워셸(PowerShell) 을 호출하여 윈도우 **이벤트 뷰어(Security 로그)**를 뒤집니다.
- Event ID 4625는 윈도우에서 **'로그인 실패'**를 의미하는 고유 코드입니다. (보통 잘못된 비밀번호 입력이나 RDP 해킹 시도 시 기록됨)
- 최근 확인 주기 시간 동안 발생한 4625 이벤트를 필터링하고 XML 형태로 파싱 한 후, 공격자의 IpAddress만 추출하여 리스트로 반환합니다.
- clean_old_attempts()
- 시간이 오래된(설정된 time_window_minutes가 지난) 로그인 실패 기록은 삭제해 줍니다.
- 예를 들어, 10분에 5번 실패가 기준일 때, 1시간 전에 4번 실패했던 기록 때문에 억울하게 차단되는 일이 없도록 메모리를 청소하는 역할입니다.
- block_ip(ip)
- 조건이 충족되어 IP를 차단해야 할 때 호출됩니다.
- 먼저 whitelist에 있는 IP인지 검사하여, 자기 자신이나 관리자의 IP가 차단되는 사고를 막습니다.
- netsh advfirewall firewall add rule ... action=block 명령어를 시스템 쉘로 직접 실행하여 윈도우 방화벽 인바운드 규칙에 해당 IP를 영구 차단합니다.
- 차단 성공 후 send_telegram_message()를 호출하여 관리자에게 알림을 보냅니다.
- send_telegram_message(message)
- 파이썬의 내장 urllib 모듈을 사용하여 Telegram API로 HTTP POST 요청을 보냅니다. 외부 라이브러리(requests 등) 의존성을 없애기 위해 기본 라이브러리를 잘 활용했습니다.
3. 메인 루프 (프로그램 실행 흐름)
- run()
- 무한 루프(while True:)를 돌면서 프로그램이 계속 실행됩니다.
- 흐름 순서:
- get_failed_logons()를 호출해 실패한 IP들을 가져옵니다.
- 이미 차단되었거나 화이트리스트인 IP를 제외하고 self.failed_attempts에 실패 시간을 추가합니다.
- clean_old_attempts()로 유효기간이 지난 기록을 지웁니다.
- 각 IP별 누적 실패 횟수를 확인하여 block_threshold(기본 5회)를 넘겼다면 block_ip()를 호출해 차단합니다.
- time.sleep()으로 설정된 대기시간(기본 60초)만큼 쉬었다가 다시 1번으로 돌아갑니다.
4. 프로그램 진입점 (if __name__ == "__main__":)
- 관리자 권한 체크 (is_admin())
- 이 프로그램은 윈도우 방화벽 규칙을 조작해야 하므로 반드시 최고 관리자 권한이 필요합니다.
- ctypes.windll.shell32.IsUserAnAdmin() 윈도우 API를 통해 권한을 검사하고, 관리자 권한이 없으면 안내 메시지와 함께 실행을 즉시 중단합니다.
💡 총평 및 요약
아주 깔끔하고 직관적으로 작성된 윈도우용 보안 스크립트입니다. 외부 라이브러리 의존성을 최소화하고 윈도우의 기본 기능(Get-WinEvent, netsh)을 파이썬의 subprocess로 잘 연결하여 구현하였습니다. 메모리 누수를 막기 위한 과거 기록 청소(clean_old_attempts)나 기존 룰 로딩 기능(_load_existing_rules) 등 안정적으로 백그라운드에서 동작하기 위한 필수 예외 처리가 잘 갖춰져 있습니다.
'Python' 카테고리의 다른 글
| 공격자 IP 차단 Python 소스 (0) | 2026.05.28 |
|---|---|
| Python 유명한 라이브러리 소개 및 웹 크롤링 (0) | 2026.05.20 |
| Uvicorn ASGI(Asynchronous Server Gateway Interface) 서버 사용방법 (0) | 2026.05.13 |
| Python에서 GUI제작용 tkinter 라이브러리 사용예제 나열 (0) | 2026.05.11 |
| Pillow(PIL Fork) 이미지 처리 라이브러리 사용법 (0) | 2026.05.11 |