scapy로 tcp 통신을 구현해보자
목차
개요
SYN, SYN+ACK, ACK 및 FIN+ACK 를 테스트해야 하는 상황에 놓였다. scapy로 구현해보자.
방법
설명하기에 앞서, TCP 통신을 직접 조작할 경우 OS에서 보낸 적이 없는 SYN이 발생한 것에 대해 이상하게 여겨
SYN 발생 시 RST를 날려 테스트를 제대로 진행하지 못하는 상황이 발생한다. [출처]
따라서 linux의 경우 아래 명령어를 통해 방화벽으로 허용해주어야 한다. (iptables 를 아예 꺼버리면 문제가 생기니 주의) [출처]
1
2
3
4
5
6
7
# command
# iptables -A OUTPUT -p tcp --tcp-flags RST RST -s {our IP} -d {dest IP} --dport {port we're sending from} -j DROP
# example1 - 443 포트만 허용
iptables -A OUTPUT -p tcp --tcp-flags RST RST --dport 443 -j DROP
# example2 - 모든포트 허용
iptables -A OUTPUT -p tcp --tcp-flags RST RST -j DROP
윈도우에서는 테스트를 진행해보지 않아 정확한 방법을 모르겠다. 윈도우 10 사용 시 WSL을 이용하면 정상적으로 실행할 수 있으니 참고하자.
실행 환경
- OS : Ubuntu 22.04.1 LTS (WSL 2)
- python : 3.7.15
scapy 설치
scapy를 설치하기 위해서는 python 3.5 이상, 3.9 이하 버전이 필요하다.
scapy의 경우 pip를 통해 다운받아도 되고 https://pypi.org/project/scapy/
프로젝트를 클론하여 설치할 수도 있다.
1
git clone https://github.com/secdev/scapy.git
다운을 다 받고 나면 scapy 디렉토리가 생긴다.
scapy 임포트
scapy 디렉토리로 이동하지 않고 파일을 만든다.
1
touch tcp_test.py
scapy 라이브러리를 인식시켜주기 위해 아래와 같이 작성한다.
scapy 모듈의 경우 클론을 받은 디렉토리 기준으로 scapy/scapy 디렉토리에 위치하기에
아래와 같이 작성하면 정상적으로 scapy 모듈을 인식할 것이다.
1
2
3
4
5
6
7
8
9
10
from os import getcwd
from os import path as osPath
from sys import path as sysPath
SCAPY_PATH = getcwd() + "/scapy/scapy"
if osPath.exists(SCAPY_PATH):
REAL_SCAPY_PATH = osPath.dirname(osPath.abspath(SCAPY_PATH))
sysPath.append(REAL_SCAPY_PATH)
print("scapy library find [{}]".format(SCAPY_PATH))
이후 scapy 모듈을 import 해준다.
random 모듈의 경우 src port를 랜덤하게 생성해주기 위해 사용하였다.
1
2
3
4
5
from scapy.all import *
from scapy.layers.inet import IP, ICMP, TCP
from scapy.sendrecv import *
import random
소스
아래는 scapy를 이용해 tcp 3way handshake, 4way handshake 통신을 구현한 클래스 소스이다.
TCP 통신을 직접 조작해야 하기에 sudo 권한이 필요하다.
sudo python3 tcp_test.py
sudo로 진행하지 않으면 하기와 같은 에러를 맛볼 수 있다.
1
2
_socket.socket.__init__(self, family, type, proto, fileno)
PermissionError: [Errno 1] Operation not permitted
정상적으로 통신을 진행했을 때의 wireshark 캡쳐를 함께 첨부한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class TCP_test():
def __init__(self, srcip, dstip):
self.srcip = srcip
self.dstip = dstip
self.ack = 0
self.sport = random.randint(1024,65535)
self.ip = IP(src=self.srcip, dst=self.dstip)
self.lastPkt = None
# tcp connect start
def threewayhandshake(self):
# tcp_options = [("MSS", 1460), ("SAckOK", ""), ("NOP", 1), ("WScale", 7)]
# SYN = TCP(sport=self.sport, dport=443, flags="S", seq=42, window=65535, options=tcp_options)
SYN = TCP(sport=self.sport, dport=443, flags="S", seq=42, window=65535)
SYNACK = sr1(self.ip/SYN)
self.lastPkt = SYNACK
ACK = TCP(sport=self.sport, dport=443, flags="A", seq=SYNACK.ack, ack=SYNACK.seq + 1)
ls(ACK)
send(self.ip/ACK)
# tcp connect finish
def fourwayhandshake(self):
FIN = TCP(sport=self.sport, dport=443, flags="FA", seq=self.lastPkt.ack, ack=self.lastPkt.seq + 1)
FINACK = sr1(self.ip/FIN)
self.lastPkt = FINACK
ls(FINACK)
LASTACK = TCP(sport=self.sport, dport=443, flags="A", seq=FINACK.ack, ack=FINACK.seq + 1)
send(self.ip/LASTACK)
정리
소스 전문은 다음과 같다.
전문
복사해서 main 함수의 srcip
, dstip
변수를 수정하고
맨 윗줄의 파이썬 경로를 수정한 뒤 파일에 실행 권한을 주고 sudo
로 실행시키면 된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/python3
# from time import sleep
from os import getcwd
from os import path as osPath
from sys import path as sysPath
# scapy import를 위한 경로 설정
SCAPY_PATH = getcwd() + "/scapy/scapy"
if osPath.exists(SCAPY_PATH):
REAL_SCAPY_PATH = osPath.dirname(osPath.abspath(SCAPY_PATH))
sysPath.append(REAL_SCAPY_PATH)
print("scapy library find [{}]".format(SCAPY_PATH))
from scapy.all import *
from scapy.layers.inet import IP, ICMP, TCP
from scapy.sendrecv import *
# source port 랜덤 생성을 위한 랜덤 모듈 임포트
import random
conf.L3socket=L3RawSocket
class TCP_test():
def __init__(self, srcip, dstip):
self.srcip = srcip
self.dstip = dstip
self.ack = 0
self.sport = random.randint(1024,65535)
self.ip = IP(src=self.srcip, dst=self.dstip)
self.lastPkt = None
def threewayhandshake(self):
# tcp 옵션을 같이 보내고 싶을 때는 아래 두 주석을 해제하고 사용하면 된다.
# tcp_options = [("MSS", 1460), ("SAckOK", ""), ("NOP", 1), ("WScale", 7)]
# SYN = TCP(sport=self.sport, dport=443, flags="S", seq=42, window=65535, options=tcp_options)
SYN = TCP(sport=self.sport, dport=443, flags="S", seq=42, window=65535)
SYNACK = sr1(self.ip/SYN)
self.lastPkt = SYNACK
ACK = TCP(sport=self.sport, dport=443, flags="A", seq=SYNACK.ack, ack=SYNACK.seq + 1)
ls(ACK)
send(self.ip/ACK)
def fourwayhandshake(self):
FIN = TCP(sport=self.sport, dport=443, flags="FA", seq=self.lastPkt.ack, ack=self.lastPkt.seq + 1)
FINACK = sr1(self.ip/FIN)
self.lastPkt = FINACK
ls(FINACK)
LASTACK = TCP(sport=self.sport, dport=443, flags="A", seq=FINACK.ack, ack=FINACK.seq + 1)
send(self.ip/LASTACK)
def main():
srcip = "1.1.1.1"
dstip = "2.2.2.2"
tcpPkt = TCP_test(srcip, dstip)
tcpPkt.threewayhandshake()
tcpPkt.fourwayhandshake()
if __name__ == "__main__":
main()