Skip to content

Commit 25d764e

Browse files
committed
add tcp proxy tutorial
1 parent 308a472 commit 25d764e

File tree

3 files changed

+149
-0
lines changed

3 files changed

+149
-0
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ This is a repository of all the tutorials of [The Python Code](https://www.thepy
225225
- [How to Use the Argparse Module in Python](https://www.thepythoncode.com/article/how-to-use-argparse-in-python). ([code](python-standard-library/argparse))
226226
- [How to Make a Grep Clone in Python](https://thepythoncode.com/article/how-to-make-grep-clone-in-python). ([code](python-standard-library/grep-clone))
227227
- [How to Validate Credit Card Numbers in Python](https://thepythoncode.com/article/credit-card-validation-in-python). ([code](python-standard-library/credit-card-validation))
228+
- [How to Build a TCP Proxy with Python](https://thepythoncode.com/article/building-a-tcp-proxy-with-python). ([code](python-standard-library/tcp-proxy))
228229

229230
- ### [Using APIs](https://www.thepythoncode.com/topic/using-apis-in-python)
230231
- [How to Automate your VPS or Dedicated Server Management in Python](https://www.thepythoncode.com/article/automate-veesp-server-management-in-python). ([code](general/automating-server-management))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# [How to Build a TCP Proxy with Python](https://thepythoncode.com/article/building-a-tcp-proxy-with-python)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import sys
2+
import socket
3+
import threading
4+
import time
5+
from typing import Optional, Tuple, Dict
6+
7+
class TcpProxy:
8+
def __init__(self):
9+
self._local_addr: str = ""
10+
self._local_port: int = 0
11+
self._remote_addr: str = ""
12+
self._remote_port: int = 0
13+
self._preload: bool = False
14+
self._backlog: int = 5
15+
self._chunk_size: int = 16
16+
self._timeout: int = 5
17+
self._buffer_size: int = 4096
18+
self._termination_flags: Dict[bytes, bool] = {
19+
b'220 ': True,
20+
b'331 ': True,
21+
b'230 ': True,
22+
b'530 ': True
23+
}
24+
25+
def _process_data(self, stream: bytes) -> None:
26+
#Transform data stream for analysis
27+
for offset in range(0, len(stream), self._chunk_size):
28+
block = stream[offset:offset + self._chunk_size]
29+
30+
# Format block representation
31+
bytes_view = ' '.join(f'{byte:02X}' for byte in block)
32+
text_view = ''.join(chr(byte) if 32 <= byte <= 126 else '.' for byte in block)
33+
34+
# Display formatted line
35+
print(f"{offset:04X} {bytes_view:<{self._chunk_size * 3}} {text_view}")
36+
37+
def _extract_stream(self, conn: socket.socket) -> bytes:
38+
#Extract data stream from connection
39+
accumulator = b''
40+
conn.settimeout(self._timeout)
41+
42+
try:
43+
while True:
44+
fragment = conn.recv(self._buffer_size)
45+
if not fragment:
46+
break
47+
48+
accumulator += fragment
49+
50+
# Check for protocol markers
51+
if accumulator.endswith(b'\r\n'):
52+
for flag in self._termination_flags:
53+
if flag in accumulator:
54+
return accumulator
55+
56+
except socket.timeout:
57+
pass
58+
59+
return accumulator
60+
61+
def _monitor_stream(self, direction: str, stream: bytes) -> bytes:
62+
# Monitor and decode stream content
63+
try:
64+
content = stream.decode('utf-8').strip()
65+
marker = ">>>" if direction == "in" else "<<<"
66+
print(f"{marker} {content}")
67+
except UnicodeDecodeError:
68+
print(f"{direction}: [binary content]")
69+
70+
return stream
71+
72+
def _bridge_connections(self, entry_point: socket.socket) -> None:
73+
#Establish and maintain connection bridge
74+
# Initialize exit point
75+
exit_point = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
76+
try:
77+
exit_point.connect((self._remote_addr, self._remote_port))
78+
# Handle initial remote response
79+
if self._preload:
80+
remote_data = self._extract_stream(exit_point)
81+
if remote_data:
82+
self._process_data(remote_data)
83+
processed = self._monitor_stream("out", remote_data)
84+
entry_point.send(processed)
85+
# Main interaction loop
86+
while True:
87+
# Process incoming traffic
88+
entry_data = self._extract_stream(entry_point)
89+
if entry_data:
90+
print(f"\n[>] Captured {len(entry_data)} bytes incoming")
91+
self._process_data(entry_data)
92+
processed = self._monitor_stream("in", entry_data)
93+
exit_point.send(processed)
94+
# Process outgoing traffic
95+
exit_data = self._extract_stream(exit_point)
96+
if exit_data:
97+
print(f"\n[<] Captured {len(exit_data)} bytes outgoing")
98+
self._process_data(exit_data)
99+
processed = self._monitor_stream("out", exit_data)
100+
entry_point.send(processed)
101+
# Prevent CPU saturation
102+
if not (entry_data or exit_data):
103+
time.sleep(0.1)
104+
except Exception as e:
105+
print(f"[!] Bridge error: {str(e)}")
106+
finally:
107+
print("[*] Closing bridge")
108+
entry_point.close()
109+
exit_point.close()
110+
111+
def orchestrate(self) -> None:
112+
# Orchestrate the proxy operation
113+
# Validate input
114+
if len(sys.argv[1:]) != 5:
115+
print("Usage: script.py [local_addr] [local_port] [remote_addr] [remote_port] [preload]")
116+
print("Example: script.py 127.0.0.1 8080 target.com 80 True")
117+
sys.exit(1)
118+
# Configure proxy parameters
119+
self._local_addr = sys.argv[1]
120+
self._local_port = int(sys.argv[2])
121+
self._remote_addr = sys.argv[3]
122+
self._remote_port = int(sys.argv[4])
123+
self._preload = "true" in sys.argv[5].lower()
124+
# Initialize listener
125+
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
126+
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
127+
try:
128+
listener.bind((self._local_addr, self._local_port))
129+
except socket.error as e:
130+
print(f"[!] Binding failed: {e}")
131+
sys.exit(1)
132+
listener.listen(self._backlog)
133+
print(f"[*] Service active on {self._local_addr}:{self._local_port}")
134+
# Main service loop
135+
while True:
136+
client, address = listener.accept()
137+
print(f"[+] Connection from {address[0]}:{address[1]}")
138+
bridge = threading.Thread(
139+
target=self._bridge_connections,
140+
args=(client,)
141+
)
142+
bridge.daemon = True
143+
bridge.start()
144+
145+
if __name__ == "__main__":
146+
bridge = TcpProxy()
147+
bridge.orchestrate()

0 commit comments

Comments
 (0)