proxy: prevent head-of-line blocking with fair 16KiB framing and backpressure-aware per-connection TX queues on client and server; single-buffer writes; cleanup queues on close

This commit is contained in:
2025-09-09 21:54:53 +08:00
parent b98cd1bb22
commit e8b2d42a5f
2 changed files with 179 additions and 17 deletions

View File

@@ -93,13 +93,13 @@ function buildUdpPayload(host, port, data) {
} }
function writeMessage(socket, type, connectionId, data = Buffer.alloc(0)) { function writeMessage(socket, type, connectionId, data = Buffer.alloc(0)) {
if (!socket || socket.destroyed) return; if (!socket || socket.destroyed) return true;
const header = Buffer.allocUnsafe(9); const buf = Buffer.allocUnsafe(9 + data.length);
header.writeUInt8(type, 0); buf.writeUInt8(type, 0);
header.writeUInt32BE(connectionId >>> 0, 1); buf.writeUInt32BE(connectionId >>> 0, 1);
header.writeUInt32BE(data.length >>> 0, 5); buf.writeUInt32BE(data.length >>> 0, 5);
socket.write(header); if (data.length > 0) data.copy(buf, 9);
if (data.length > 0) socket.write(data); return socket.write(buf);
} }
function createMessageReader() { function createMessageReader() {
@@ -156,6 +156,81 @@ let nextConnId = 1;
// } // }
const udpAssocs = new Map(); const udpAssocs = new Map();
// Framing and fairness controls for local->tunnel direction
const FRAME_MAX = 16 * 1024; // 16KiB frames to improve interleaving
const BYTES_PER_TICK = 64 * 1024; // Limit per processing burst to yield event loop
// Per-connection TX queues: connectionId -> { queue: Buffer[], sending: boolean }
const txQueues = new Map();
function processTxQueue(connectionId, state) {
if (!tunnelSocket || tunnelSocket.destroyed) {
state.queue = [];
state.sending = false;
return;
}
if (state.sending) return;
state.sending = true;
let bytesThisTick = 0;
const run = () => {
if (!tunnelSocket || tunnelSocket.destroyed) {
state.queue = [];
state.sending = false;
return;
}
while (state.queue.length) {
const buf = state.queue[0];
let offset = buf._offset || 0;
while (offset < buf.length) {
const end = Math.min(offset + FRAME_MAX, buf.length);
const slice = buf.subarray(offset, end);
const ok = writeMessage(tunnelSocket, MSG_TYPES.DATA, connectionId, slice);
if (!ok) {
buf._offset = end;
tunnelSocket.once('drain', () => {
state.sending = false;
run();
});
return;
}
offset = end;
bytesThisTick += slice.length;
if (bytesThisTick >= BYTES_PER_TICK) {
buf._offset = offset;
bytesThisTick = 0;
setImmediate(() => {
state.sending = false;
run();
});
return;
}
}
delete buf._offset;
state.queue.shift();
}
state.sending = false;
};
run();
}
function enqueueToTunnel(connectionId, data) {
let state = txQueues.get(connectionId);
if (!state) {
state = { queue: [], sending: false };
txQueues.set(connectionId, state);
}
state.queue.push(data);
if (!state.sending) {
state.sending = false;
processTxQueue(connectionId, state);
}
}
function genConnId() { function genConnId() {
let id = nextConnId >>> 0; let id = nextConnId >>> 0;
do { do {
@@ -300,6 +375,7 @@ function destroyLocalTCP(connectionId) {
if (!st) return; if (!st) return;
connections.delete(connectionId); connections.delete(connectionId);
clearTimeout(st.openTimer); clearTimeout(st.openTimer);
txQueues.delete(connectionId);
try { st.localSocket.destroy(); } catch (_) {} try { st.localSocket.destroy(); } catch (_) {}
} }
@@ -329,21 +405,23 @@ function startOpenTcp(connectionId, host, port, st) {
} }
function armTcpStreaming(st) { function armTcpStreaming(st) {
// After CONNECT success, forward subsequent data // After CONNECT success, forward subsequent data with framing/backpressure
const forward = (chunk) => { const forward = (chunk) => {
if (!tunnelSocket || tunnelSocket.destroyed) return; if (!tunnelSocket || tunnelSocket.destroyed) return;
writeMessage(tunnelSocket, MSG_TYPES.DATA, st.id, chunk); enqueueToTunnel(st.id, chunk);
}; };
st.localSocket.on('data', forward); st.localSocket.on('data', forward);
st.localSocket.once('close', () => { st.localSocket.once('close', () => {
writeMessage(tunnelSocket, MSG_TYPES.CLOSE, st.id); writeMessage(tunnelSocket, MSG_TYPES.CLOSE, st.id);
connections.delete(st.id); connections.delete(st.id);
clearTimeout(st.openTimer); clearTimeout(st.openTimer);
txQueues.delete(st.id);
}); });
st.localSocket.once('error', () => { st.localSocket.once('error', () => {
writeMessage(tunnelSocket, MSG_TYPES.CLOSE, st.id); writeMessage(tunnelSocket, MSG_TYPES.CLOSE, st.id);
connections.delete(st.id); connections.delete(st.id);
clearTimeout(st.openTimer); clearTimeout(st.openTimer);
txQueues.delete(st.id);
}); });
} }

View File

@@ -66,13 +66,13 @@ const MSG_TYPES = {
}; };
function writeMessage(socket, type, connectionId, data = Buffer.alloc(0)) { function writeMessage(socket, type, connectionId, data = Buffer.alloc(0)) {
if (!socket || socket.destroyed) return; if (!socket || socket.destroyed) return true;
const header = Buffer.allocUnsafe(9); const buf = Buffer.allocUnsafe(9 + data.length);
header.writeUInt8(type, 0); buf.writeUInt8(type, 0);
header.writeUInt32BE(connectionId >>> 0, 1); buf.writeUInt32BE(connectionId >>> 0, 1);
header.writeUInt32BE(data.length >>> 0, 5); buf.writeUInt32BE(data.length >>> 0, 5);
socket.write(header); if (data.length > 0) data.copy(buf, 9);
if (data.length > 0) socket.write(data); return socket.write(buf);
} }
function createMessageReader() { function createMessageReader() {
@@ -162,6 +162,87 @@ const upstreamConns = new Map(); // connectionId -> net.Socket
// UDP association mapping: connectionId -> { udp4?: dgram.Socket, udp6?: dgram.Socket } // UDP association mapping: connectionId -> { udp4?: dgram.Socket, udp6?: dgram.Socket }
const udpAssoc = new Map(); const udpAssoc = new Map();
// Framing and fairness controls
const FRAME_MAX = 16 * 1024; // 16KiB frames to improve interleaving
const BYTES_PER_TICK = 64 * 1024; // Limit per processing burst to yield event loop
// Per-connection TX queues for upstream->tunnel direction
const txQueues = new Map(); // connectionId -> { queue: Buffer[], sending: boolean }
function processTxQueue(connectionId, state) {
if (!tunnelSocket || tunnelSocket.destroyed) {
// Drop queued data if tunnel is gone
state.queue = [];
state.sending = false;
return;
}
if (state.sending) return;
state.sending = true;
let bytesThisTick = 0;
const run = () => {
if (!tunnelSocket || tunnelSocket.destroyed) {
state.queue = [];
state.sending = false;
return;
}
while (state.queue.length) {
const buf = state.queue[0];
let offset = buf._offset || 0;
while (offset < buf.length) {
const end = Math.min(offset + FRAME_MAX, buf.length);
const slice = buf.subarray(offset, end);
const ok = writeMessage(tunnelSocket, MSG_TYPES.DATA, connectionId, slice);
if (!ok) {
// Backpressure: remember progress and resume on drain
buf._offset = end;
tunnelSocket.once('drain', () => {
// Reset flag to allow re-entry
state.sending = false;
run();
});
return;
}
offset = end;
bytesThisTick += slice.length;
if (bytesThisTick >= BYTES_PER_TICK) {
// Yield to allow other I/O
buf._offset = offset;
bytesThisTick = 0;
setImmediate(() => {
state.sending = false;
run();
});
return;
}
}
// Finished this buffer
delete buf._offset;
state.queue.shift();
}
state.sending = false;
};
run();
}
function enqueueToTunnel(connectionId, data) {
let state = txQueues.get(connectionId);
if (!state) {
state = { queue: [], sending: false };
txQueues.set(connectionId, state);
}
state.queue.push(data);
// Attempt to process immediately
if (!state.sending) {
state.sending = false; // ensure process can start
processTxQueue(connectionId, state);
}
}
function closeAllUpstreams() { function closeAllUpstreams() {
for (const [, s] of upstreamConns) { for (const [, s] of upstreamConns) {
try { s.destroy(); } catch (_) {} try { s.destroy(); } catch (_) {}
@@ -248,7 +329,8 @@ const server = tls.createServer(
}); });
upstream.on('data', (chunk) => { upstream.on('data', (chunk) => {
writeMessage(tunnelSocket, MSG_TYPES.DATA, connectionId, chunk); // Queue data with framing and backpressure-aware sending
enqueueToTunnel(connectionId, chunk);
}); });
const cleanup = () => { const cleanup = () => {
@@ -256,6 +338,8 @@ const server = tls.createServer(
if (upstreamConns.get(connectionId) === upstream) { if (upstreamConns.get(connectionId) === upstream) {
upstreamConns.delete(connectionId); upstreamConns.delete(connectionId);
} }
// Drop any pending TX data for this connection
txQueues.delete(connectionId);
writeMessage(tunnelSocket, MSG_TYPES.CLOSE, connectionId); writeMessage(tunnelSocket, MSG_TYPES.CLOSE, connectionId);
}; };