Post

scapy로 tcp 통신을 구현해보자

목차


개요

SYN, SYN+ACK, ACK 및 FIN+ACK 를 테스트해야 하는 상황에 놓였다. scapy로 구현해보자.

방법

설명하기에 앞서, TCP 통신을 직접 조작할 경우 OS에서 보낸 적이 없는 SYN이 발생한 것에 대해 이상하게 여겨
SYN 발생 시 RST를 날려 테스트를 제대로 진행하지 못하는 상황이 발생한다. [출처]

따라서 linux의 경우 아래 명령어를 통해 방화벽으로 허용해주어야 한다. (iptables 를 아예 꺼버리면 문제가 생기니 주의) [출처]

1
2
3
4
5
6
7
# command
# iptables -A OUTPUT -p tcp --tcp-flags RST RST -s {our IP} -d {dest IP} --dport {port we're sending from} -j DROP

# example1 - 443 포트만 허용
iptables -A OUTPUT -p tcp --tcp-flags RST RST --dport 443 -j DROP
# example2 - 모든포트 허용
iptables -A OUTPUT -p tcp --tcp-flags RST RST -j DROP

윈도우에서는 테스트를 진행해보지 않아 정확한 방법을 모르겠다. 윈도우 10 사용 시 WSL을 이용하면 정상적으로 실행할 수 있으니 참고하자.

실행 환경

  • OS : Ubuntu 22.04.1 LTS (WSL 2)
  • python : 3.7.15

scapy 설치

scapy를 설치하기 위해서는 python 3.5 이상, 3.9 이하 버전이 필요하다.

scapy의 경우 pip를 통해 다운받아도 되고 https://pypi.org/project/scapy/
프로젝트를 클론하여 설치할 수도 있다.

1
git clone https://github.com/secdev/scapy.git

다운을 다 받고 나면 scapy 디렉토리가 생긴다.

scapy 임포트

scapy 디렉토리로 이동하지 않고 파일을 만든다.

1
touch tcp_test.py

scapy 라이브러리를 인식시켜주기 위해 아래와 같이 작성한다.
scapy 모듈의 경우 클론을 받은 디렉토리 기준으로 scapy/scapy 디렉토리에 위치하기에
아래와 같이 작성하면 정상적으로 scapy 모듈을 인식할 것이다.

1
2
3
4
5
6
7
8
9
10
from os import getcwd
from os import path as osPath
from sys import path as sysPath

SCAPY_PATH = getcwd() + "/scapy/scapy"

if osPath.exists(SCAPY_PATH):
    REAL_SCAPY_PATH = osPath.dirname(osPath.abspath(SCAPY_PATH))
    sysPath.append(REAL_SCAPY_PATH)
    print("scapy library find [{}]".format(SCAPY_PATH))

이후 scapy 모듈을 import 해준다.
random 모듈의 경우 src port를 랜덤하게 생성해주기 위해 사용하였다.

1
2
3
4
5
from scapy.all import *
from scapy.layers.inet import IP, ICMP, TCP
from scapy.sendrecv import *

import random

소스

아래는 scapy를 이용해 tcp 3way handshake, 4way handshake 통신을 구현한 클래스 소스이다.

TCP 통신을 직접 조작해야 하기에 sudo 권한이 필요하다.
sudo python3 tcp_test.py

sudo로 진행하지 않으면 하기와 같은 에러를 맛볼 수 있다.

1
2
_socket.socket.__init__(self, family, type, proto, fileno)
PermissionError: [Errno 1] Operation not permitted

정상적으로 통신을 진행했을 때의 wireshark 캡쳐를 함께 첨부한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class TCP_test():
    def __init__(self, srcip, dstip):
        self.srcip = srcip
        self.dstip = dstip
        self.ack = 0
        self.sport = random.randint(1024,65535)
        self.ip = IP(src=self.srcip, dst=self.dstip)
        self.lastPkt = None

    # tcp connect start
    def threewayhandshake(self):
        # tcp_options = [("MSS", 1460), ("SAckOK", ""), ("NOP", 1), ("WScale", 7)]
        # SYN = TCP(sport=self.sport, dport=443, flags="S", seq=42, window=65535, options=tcp_options)
        SYN = TCP(sport=self.sport, dport=443, flags="S", seq=42, window=65535)
        SYNACK = sr1(self.ip/SYN)
        self.lastPkt = SYNACK
        ACK = TCP(sport=self.sport, dport=443, flags="A", seq=SYNACK.ack, ack=SYNACK.seq + 1)
        ls(ACK)
        send(self.ip/ACK)

    # tcp connect finish
    def fourwayhandshake(self):
        FIN = TCP(sport=self.sport, dport=443, flags="FA", seq=self.lastPkt.ack, ack=self.lastPkt.seq + 1)
        FINACK = sr1(self.ip/FIN)
        self.lastPkt = FINACK
        ls(FINACK)
        LASTACK = TCP(sport=self.sport, dport=443, flags="A", seq=FINACK.ack, ack=FINACK.seq + 1)
        send(self.ip/LASTACK)

정리

소스 전문은 다음과 같다.

전문

복사해서 main 함수의 srcip, dstip 변수를 수정하고
맨 윗줄의 파이썬 경로를 수정한 뒤 파일에 실행 권한을 주고 sudo 로 실행시키면 된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/python3

# from time import sleep
from os import getcwd
from os import path as osPath
from sys import path as sysPath

# scapy import를 위한 경로 설정
SCAPY_PATH = getcwd() + "/scapy/scapy"

if osPath.exists(SCAPY_PATH):
    REAL_SCAPY_PATH = osPath.dirname(osPath.abspath(SCAPY_PATH))
    sysPath.append(REAL_SCAPY_PATH)
    print("scapy library find [{}]".format(SCAPY_PATH))



from scapy.all import *
from scapy.layers.inet import IP, ICMP, TCP
from scapy.sendrecv import *

# source port 랜덤 생성을 위한 랜덤 모듈 임포트
import random


conf.L3socket=L3RawSocket


class TCP_test():
    def __init__(self, srcip, dstip):
        self.srcip = srcip
        self.dstip = dstip
        self.ack = 0
        self.sport = random.randint(1024,65535)
        self.ip = IP(src=self.srcip, dst=self.dstip)
        self.lastPkt = None

    def threewayhandshake(self):
        # tcp 옵션을 같이 보내고 싶을 때는 아래 두 주석을 해제하고 사용하면 된다.
        # tcp_options = [("MSS", 1460), ("SAckOK", ""), ("NOP", 1), ("WScale", 7)]
        # SYN = TCP(sport=self.sport, dport=443, flags="S", seq=42, window=65535, options=tcp_options)
        SYN = TCP(sport=self.sport, dport=443, flags="S", seq=42, window=65535)
        SYNACK = sr1(self.ip/SYN)
        self.lastPkt = SYNACK
        ACK = TCP(sport=self.sport, dport=443, flags="A", seq=SYNACK.ack, ack=SYNACK.seq + 1)
        ls(ACK)
        send(self.ip/ACK)

    def fourwayhandshake(self):
        FIN = TCP(sport=self.sport, dport=443, flags="FA", seq=self.lastPkt.ack, ack=self.lastPkt.seq + 1)
        FINACK = sr1(self.ip/FIN)
        self.lastPkt = FINACK
        ls(FINACK)
        LASTACK = TCP(sport=self.sport, dport=443, flags="A", seq=FINACK.ack, ack=FINACK.seq + 1)
        send(self.ip/LASTACK)


def main():
    srcip = "1.1.1.1"
    dstip = "2.2.2.2"

    tcpPkt = TCP_test(srcip, dstip)
    tcpPkt.threewayhandshake()
    tcpPkt.fourwayhandshake()


if __name__ == "__main__":
    main()
This post is licensed under CC BY 4.0 by the author.