👍공격자 IP를 차단하는 전체 소스입니다.
main.py
import subprocess
import time
import json
import os
import urllib.request
import urllib.parse
from collections import defaultdict
from datetime import datetime, timedelta
CONFIG_FILE = "config.json"
def load_config():
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return {
"check_interval_seconds": 60,
"block_threshold": 5,
"time_window_minutes": 10,
"rule_name_prefix": "IvyFirewall_Block_",
"whitelist_ips": ["127.0.0.1", "::1"]
}
class IvyFirewall:
def __init__(self, config):
self.config = config
self.check_interval = config.get("check_interval_seconds", 60)
self.block_threshold = config.get("block_threshold", 5)
self.time_window = timedelta(minutes=config.get("time_window_minutes", 10))
self.rule_prefix = config.get("rule_name_prefix", "IvyFirewall_Block_")
self.whitelist = set(config.get("whitelist_ips", ["127.0.0.1", "::1"]))
self.telegram_bot_token = config.get("telegram_bot_token", "")
self.telegram_chat_id = config.get("telegram_chat_id", "")
self.failed_attempts = defaultdict(list)
self.blocked_ips = set()
self.rule_name = self.rule_prefix + "List"
self.rule_exists = False
self._load_existing_rules()
def _load_existing_rules(self):
print(f"기존 방화벽 차단 규칙({self.rule_name})을 불러오는 중...")
cmd = f'netsh advfirewall firewall show rule name="{self.rule_name}"'
try:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, encoding='cp949', errors='ignore')
if result.returncode == 0:
self.rule_exists = True
for line in result.stdout.split('\n'):
line_lower = line.lower()
if "remoteip" in line_lower or "원격 ip" in line_lower or "remote ip" in line_lower:
parts = line.split(':')
if len(parts) > 1:
ips_str = parts[1].strip()
if ips_str.lower() != 'any' and ips_str.lower() != '모두':
for ip in ips_str.split(','):
ip = ip.strip()
if ip:
self.blocked_ips.add(ip)
print(f"총 {len(self.blocked_ips)}개의 이전에 차단된 IP를 불러왔습니다.")
else:
print("기존 차단 규칙이 존재하지 않습니다. 새로 생성할 예정입니다.")
self.rule_exists = False
except Exception as e:
print(f"기존 규칙 로드 실패: {e}")
self.rule_exists = False
def get_failed_logons(self):
"""이벤트 뷰어에서 최근 실패한 로그인(RDP 등) 시도를 가져옵니다."""
minutes_to_check = max(1, self.check_interval // 60) + 1 # 여유있게 1분 추가
time_str = (datetime.now() - timedelta(minutes=minutes_to_check)).strftime('%Y-%m-%dT%H:%M:%S')
ps_command = f"""
$ErrorActionPreference = 'SilentlyContinue'
$events = Get-WinEvent -FilterHashtable @{{LogName='Security'; Id=4625; StartTime='{time_str}'}}
if ($events) {{
foreach ($event in $events) {{
$xml = [xml]$event.ToXml()
$ip = $xml.Event.EventData.Data | Where-Object {{ $_.Name -eq 'IpAddress' }} | Select-Object -ExpandProperty '#text'
if ($ip -and $ip -ne '-' -and $ip -notmatch ':') {{
Write-Output $ip
}}
}}
}}
"""
try:
# powershell 호출
result = subprocess.run(["powershell", "-Command", ps_command], capture_output=True, text=True)
ips = result.stdout.strip().split('\n')
return [ip.strip() for ip in ips if ip.strip()]
except subprocess.CalledProcessError as e:
print(f"이벤트 로그 조회 오류: {e}")
return []
def send_telegram_message(self, message):
if not self.telegram_bot_token or not self.telegram_chat_id:
return
try:
data = urllib.parse.urlencode({
"chat_id": self.telegram_chat_id,
"text": message,
"parse_mode": "HTML"
}).encode("utf-8")
req = urllib.request.Request(url, data=data)
with urllib.request.urlopen(req, timeout=5) as response:
pass
except Exception as e:
print(f"텔레그램 알림 전송 실패: {e}")
def block_ip(self, ip):
if ip in self.whitelist:
return
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] IP 차단 진행: {ip}")
self.blocked_ips.add(ip)
ips_str = ",".join(sorted(list(self.blocked_ips)))
try:
if self.rule_exists:
# 기존 규칙 업데이트
block_cmd = f'netsh advfirewall firewall set rule name="{self.rule_name}" new remoteip="{ips_str}"'
subprocess.run(block_cmd, shell=True, check=True, capture_output=True)
else:
# 새 규칙 생성
block_cmd = f'netsh advfirewall firewall add rule name="{self.rule_name}" dir=in action=block remoteip="{ips_str}"'
subprocess.run(block_cmd, shell=True, check=True, capture_output=True)
self.rule_exists = True
print(f"성공적으로 차단되었습니다: {ip} (총 차단 IP 개수: {len(self.blocked_ips)}개)")
# 텔레그램 알림 발송
msg = f"🚨 <b>IvyFirewall IP 차단 알림</b>\n\n" \
f"지속적인 로그인 실패가 감지되어 방화벽에 차단 규칙을 업데이트했습니다.\n\n" \
f"<b>차단된 IP:</b> <code>{ip}</code>\n" \
f"<b>전체 차단된 IP 목록 ({len(self.blocked_ips)}개):</b>\n<code>{ips_str}</code>"
self.send_telegram_message(msg)
except subprocess.CalledProcessError as e:
if self.rule_exists:
try:
print("기존 규칙 업데이트 실패. 규칙이 없거나 오류가 발생하여 규칙을 새로 생성합니다.")
block_cmd = f'netsh advfirewall firewall add rule name="{self.rule_name}" dir=in action=block remoteip="{ips_str}"'
subprocess.run(block_cmd, shell=True, check=True, capture_output=True)
self.rule_exists = True
print(f"성공적으로 차단되었습니다(재생성): {ip} (총 차단 IP 개수: {len(self.blocked_ips)}개)")
msg = f"🚨 <b>IvyFirewall IP 차단 알림 (규칙 재생성)</b>\n\n" \
f"지속적인 로그인 실패가 감지되어 방화벽에 차단 규칙을 생성했습니다.\n\n" \
f"<b>차단된 IP:</b> <code>{ip}</code>\n" \
f"<b>전체 차단된 IP 목록 ({len(self.blocked_ips)}개):</b>\n<code>{ips_str}</code>"
self.send_telegram_message(msg)
return
except subprocess.CalledProcessError as ex:
print(f"차단 실패 ({ip}): {ex}")
else:
print(f"차단 실패 ({ip}): {e}")
if ip in self.blocked_ips:
self.blocked_ips.remove(ip)
def clean_old_attempts(self):
"""시간이 지난 실패 기록 삭제"""
now = datetime.now()
for ip in list(self.failed_attempts.keys()):
self.failed_attempts[ip] = [ts for ts in self.failed_attempts[ip] if now - ts <= self.time_window]
if not self.failed_attempts[ip]:
del self.failed_attempts[ip]
def run(self):
print("="*50)
print("IvyFirewall 모니터링이 시작되었습니다.")
print(f"설정: {self.time_window.seconds // 60}분 내에 {self.block_threshold}회 실패 시 IP 차단")
print("종료하려면 Ctrl+C를 누르세요.")
print("="*50)
while True:
try:
#print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 이벤트 로그 확인 중...")
ips = self.get_failed_logons()
now = datetime.now()
for ip in ips:
if ip in self.blocked_ips or ip in self.whitelist:
continue
self.failed_attempts[ip].append(now)
self.clean_old_attempts()
# 임계치 확인 및 차단
for ip, attempts in list(self.failed_attempts.items()):
if len(attempts) >= self.block_threshold:
self.block_ip(ip)
del self.failed_attempts[ip] # 차단 후 기록 삭제
time.sleep(self.check_interval)
except KeyboardInterrupt:
print("\n프로그램을 종료합니다.")
break
except Exception as e:
print(f"예기치 않은 오류: {e}")
time.sleep(self.check_interval)
if __name__ == "__main__":
import ctypes
# 관리자 권한 확인
def is_admin():
try:
return ctypes.windll.shell32.IsUserAnAdmin()
except:
return False
if not is_admin():
print("경고: 관리자 권한으로 실행되지 않았습니다. 방화벽 규칙을 추가하려면 관리자 권한이 필요합니다.")
print("명령 프롬프트나 PowerShell을 '관리자 권한으로 실행'한 후 다시 시도해주세요.")
input("엔터를 누르면 종료합니다...")
exit(1)
config = load_config()
firewall = IvyFirewall(config)
firewall.run()
config.json
{
"check_interval_seconds": 60,
"block_threshold": 5,
"time_window_minutes": 10,
"rule_name_prefix": "IvyFirewall_Block_",
"whitelist_ips": [
"127.0.0.1",
"::1"
],
"telegram_bot_token": "텔레그램 토큰",
"telegram_chat_id": "텔레그램ID"
}
main.py 소스 코드는 윈도우 환경에서 보안을 강화하기 위해 RDP(원격 데스크톱) 등의 로그인 실패 공격을 탐지하고, 자동으로 방화벽에 차단 IP를 등록하는 침입 차단 프로그램입니다.
이 프로그램의 핵심 아키텍처와 상세 동작 방식을 세부 요소별로 설명해 드리겠습니다.
1. 프로그램의 동작 흐름 개요
- 관리자 권한 확인: 실행 시 윈도우 방화벽을 제어해야 하므로 관리자 권한 여부를 체크합니다.
- 설정 로드: config.json 파일을 읽어 동작 주기, 임계값, 허용 IP(화이트리스트), 텔레그램 알림 정보 등을 파싱합니다.
- 기존 규칙 조회: 이미 등록된 IvyFirewall_Block_List 규칙에서 이전에 차단되었던 IP 목록을 파싱해 내부 메모리에 유지합니다.
- 무한 루프 감시:
- 설정된 주기마다 윈도우 **Security 이벤트 로그(ID: 4625 - 로그인 실패)**를 PowerShell을 이용해 수집합니다.
- 특정 IP의 실패 횟수가 시간 범위 내에서 임계치를 초과하면 해당 IP를 차단 대상으로 판단합니다.
- 단일 방화벽 규칙(IvyFirewall_Block_List)의 원격 IP 속성 영역에 해당 IP를 추가하여 업데이트하고, 관리자에게 텔레그램 알림을 발송합니다.
2. 세부 함수 및 메서드 설명
⚙️ 설정 관리 및 초기화
- load_config():
- 프로그램 설정이 포함된 config.json을 읽어옵니다.
- 파일이 없는 경우 기본값(동작 주기 60초, 10분 내 5회 실패 시 차단, 로컬 IP 화이트리스트 등)으로 설정 딕셔너리를 반환합니다.
- IvyFirewall.__init__(self, config):
- 각종 상태 값(시간 윈도우 계산용 timedelta, 화이트리스트 IP 집합 등)을 세팅합니다.
- 단일 방화벽 인바운드 규칙명(self.rule_name = "IvyFirewall_Block_List")을 선언하고 기존 차단 정보를 로드하는 _load_existing_rules() 메서드를 호출합니다.
🔍 기존 차단 정보 동기화 (_load_existing_rules)
- 단일 차단 규칙(IvyFirewall_Block_List)이 방화벽에 존재하는지 검사합니다.
- netsh advfirewall firewall show rule 명령의 출력 텍스트 중 원격 IP 주소 필드(RemoteIP, 원격 IP, Remote IP)를 파싱하여, 기존에 이미 차단되었던 IP들을 self.blocked_ips 집합(Set)에 완전히 동기화합니다. 이로 인해 프로그램 재시작 시에도 중복 차단 작업을 하지 않습니다.
🕵️ RDP 로그인 실패 로그 감시 (get_failed_logons)
- PowerShell의 Get-WinEvent cmdlet을 활용하여 이벤트 뷰어의 Security 로그 중 **이벤트 ID 4625(로그인 실패)**를 타겟팅합니다.
- XML 형식의 이벤트 데이터에서 실제 로그인을 시도한 공격자의 원격 IP 주소(IpAddress)를 추출하여 리스트로 반환합니다.
🚫 핵심 차단 로직 (block_ip)
- 허용 IP(화이트리스트) 대역은 실수로 본인이 차단되는 것을 막기 위해 사전에 걸러냅니다.
- 새로운 차단 IP가 확보되면 기존에 차단된 IP 목록과 병합하여 쉼표(,)로 구분된 단일 문자열을 만듭니다.
- 방화벽 규칙의 상태에 따라 분기 처리합니다.
- 규칙이 이미 존재할 때 (set rule): netsh advfirewall firewall set rule name="IvyFirewall_Block_List" new remoteip="IP1,IP2,IP3..." 명령어로 원격 IP 속성을 일괄 업데이트합니다.
- 규칙이 존재하지 않을 때 (add rule): netsh advfirewall firewall add rule name="IvyFirewall_Block_List" dir=in action=block remoteip="IP1,IP2,IP3..." 명령어로 단일 차단 인바운드 규칙을 새롭게 생성합니다.
- 예외 복구(Fallback): 사용자가 방화벽 규칙을 임의로 지우는 등의 비정상적인 실패 상황이 발생하면, 업데이트 실패 시 자동으로 재생성(add rule)을 시도하여 시스템 안전성을 극대화합니다.
📨 텔레그램 실시간 알림 (send_telegram_message)
- 공격 IP가 신규로 차단될 때 텔레그램 봇 API를 통해 관리자에게 실시간 알림 메시지를 전송합니다.
- 메시지에는 금방 차단된 IP 정보뿐만 아니라 전체 차단된 IP 목록과 총 개수를 함께 시각화하여 한눈에 모니터링할 수 있도록 돕습니다.
🧹 메모리 최적화 및 메인 루프 (clean_old_attempts & run)
- clean_old_attempts: 지정한 시간 범위(예: 10분)가 지난 무의미한 로그인 실패 기록들을 메모리 딕셔너리에서 주기적으로 청소하여 메모리 누수를 원천 방어합니다.
- run: 감시 주기마다 위 프로세스를 순환하며 키보드 인터럽트(Ctrl + C) 입력 시 프로그램을 안전하게 종료시킵니다.
'Python' 카테고리의 다른 글
| Python 유명한 라이브러리 소개 및 웹 크롤링 (0) | 2026.05.20 |
|---|---|
| 윈도우 서버(또는 PC)에 대한 무차별 대입 공격(Brute-force)을 방어 Python 소스 (0) | 2026.05.13 |
| Uvicorn ASGI(Asynchronous Server Gateway Interface) 서버 사용방법 (0) | 2026.05.13 |
| Python에서 GUI제작용 tkinter 라이브러리 사용예제 나열 (0) | 2026.05.11 |
| Pillow(PIL Fork) 이미지 처리 라이브러리 사용법 (0) | 2026.05.11 |