diff --git a/proxy-client.js b/proxy-client.js index e27336b..9c70245 100644 --- a/proxy-client.js +++ b/proxy-client.js @@ -93,13 +93,13 @@ function buildUdpPayload(host, port, data) { } function writeMessage(socket, type, connectionId, data = Buffer.alloc(0)) { - if (!socket || socket.destroyed) return; - const header = Buffer.allocUnsafe(9); - header.writeUInt8(type, 0); - header.writeUInt32BE(connectionId >>> 0, 1); - header.writeUInt32BE(data.length >>> 0, 5); - socket.write(header); - if (data.length > 0) socket.write(data); + if (!socket || socket.destroyed) return true; + const buf = Buffer.allocUnsafe(9 + data.length); + buf.writeUInt8(type, 0); + buf.writeUInt32BE(connectionId >>> 0, 1); + buf.writeUInt32BE(data.length >>> 0, 5); + if (data.length > 0) data.copy(buf, 9); + return socket.write(buf); } function createMessageReader() { @@ -156,6 +156,81 @@ let nextConnId = 1; // } const udpAssocs = new Map(); +// Framing and fairness controls for local->tunnel direction +const FRAME_MAX = 16 * 1024; // 16KiB frames to improve interleaving +const BYTES_PER_TICK = 64 * 1024; // Limit per processing burst to yield event loop +// Per-connection TX queues: connectionId -> { queue: Buffer[], sending: boolean } +const txQueues = new Map(); + +function processTxQueue(connectionId, state) { + if (!tunnelSocket || tunnelSocket.destroyed) { + state.queue = []; + state.sending = false; + return; + } + if (state.sending) return; + state.sending = true; + + let bytesThisTick = 0; + + const run = () => { + if (!tunnelSocket || tunnelSocket.destroyed) { + state.queue = []; + state.sending = false; + return; + } + while (state.queue.length) { + const buf = state.queue[0]; + let offset = buf._offset || 0; + + while (offset < buf.length) { + const end = Math.min(offset + FRAME_MAX, buf.length); + const slice = buf.subarray(offset, end); + const ok = writeMessage(tunnelSocket, MSG_TYPES.DATA, connectionId, slice); + if (!ok) { + buf._offset = end; + tunnelSocket.once('drain', () => { + state.sending = false; + run(); + }); + return; + } + offset = end; + bytesThisTick += slice.length; + if (bytesThisTick >= BYTES_PER_TICK) { + buf._offset = offset; + bytesThisTick = 0; + setImmediate(() => { + state.sending = false; + run(); + }); + return; + } + } + + delete buf._offset; + state.queue.shift(); + } + + state.sending = false; + }; + + run(); +} + +function enqueueToTunnel(connectionId, data) { + let state = txQueues.get(connectionId); + if (!state) { + state = { queue: [], sending: false }; + txQueues.set(connectionId, state); + } + state.queue.push(data); + if (!state.sending) { + state.sending = false; + processTxQueue(connectionId, state); + } +} + function genConnId() { let id = nextConnId >>> 0; do { @@ -300,6 +375,7 @@ function destroyLocalTCP(connectionId) { if (!st) return; connections.delete(connectionId); clearTimeout(st.openTimer); + txQueues.delete(connectionId); try { st.localSocket.destroy(); } catch (_) {} } @@ -329,21 +405,23 @@ function startOpenTcp(connectionId, host, port, st) { } function armTcpStreaming(st) { - // After CONNECT success, forward subsequent data + // After CONNECT success, forward subsequent data with framing/backpressure const forward = (chunk) => { if (!tunnelSocket || tunnelSocket.destroyed) return; - writeMessage(tunnelSocket, MSG_TYPES.DATA, st.id, chunk); + enqueueToTunnel(st.id, chunk); }; st.localSocket.on('data', forward); st.localSocket.once('close', () => { writeMessage(tunnelSocket, MSG_TYPES.CLOSE, st.id); connections.delete(st.id); clearTimeout(st.openTimer); + txQueues.delete(st.id); }); st.localSocket.once('error', () => { writeMessage(tunnelSocket, MSG_TYPES.CLOSE, st.id); connections.delete(st.id); clearTimeout(st.openTimer); + txQueues.delete(st.id); }); } diff --git a/proxy-server.js b/proxy-server.js index e83c1ec..9b2308d 100644 --- a/proxy-server.js +++ b/proxy-server.js @@ -66,13 +66,13 @@ const MSG_TYPES = { }; function writeMessage(socket, type, connectionId, data = Buffer.alloc(0)) { - if (!socket || socket.destroyed) return; - const header = Buffer.allocUnsafe(9); - header.writeUInt8(type, 0); - header.writeUInt32BE(connectionId >>> 0, 1); - header.writeUInt32BE(data.length >>> 0, 5); - socket.write(header); - if (data.length > 0) socket.write(data); + if (!socket || socket.destroyed) return true; + const buf = Buffer.allocUnsafe(9 + data.length); + buf.writeUInt8(type, 0); + buf.writeUInt32BE(connectionId >>> 0, 1); + buf.writeUInt32BE(data.length >>> 0, 5); + if (data.length > 0) data.copy(buf, 9); + return socket.write(buf); } function createMessageReader() { @@ -162,6 +162,87 @@ const upstreamConns = new Map(); // connectionId -> net.Socket // UDP association mapping: connectionId -> { udp4?: dgram.Socket, udp6?: dgram.Socket } const udpAssoc = new Map(); +// Framing and fairness controls +const FRAME_MAX = 16 * 1024; // 16KiB frames to improve interleaving +const BYTES_PER_TICK = 64 * 1024; // Limit per processing burst to yield event loop +// Per-connection TX queues for upstream->tunnel direction +const txQueues = new Map(); // connectionId -> { queue: Buffer[], sending: boolean } + +function processTxQueue(connectionId, state) { + if (!tunnelSocket || tunnelSocket.destroyed) { + // Drop queued data if tunnel is gone + state.queue = []; + state.sending = false; + return; + } + if (state.sending) return; + state.sending = true; + + let bytesThisTick = 0; + + const run = () => { + if (!tunnelSocket || tunnelSocket.destroyed) { + state.queue = []; + state.sending = false; + return; + } + while (state.queue.length) { + const buf = state.queue[0]; + let offset = buf._offset || 0; + + while (offset < buf.length) { + const end = Math.min(offset + FRAME_MAX, buf.length); + const slice = buf.subarray(offset, end); + const ok = writeMessage(tunnelSocket, MSG_TYPES.DATA, connectionId, slice); + if (!ok) { + // Backpressure: remember progress and resume on drain + buf._offset = end; + tunnelSocket.once('drain', () => { + // Reset flag to allow re-entry + state.sending = false; + run(); + }); + return; + } + offset = end; + bytesThisTick += slice.length; + if (bytesThisTick >= BYTES_PER_TICK) { + // Yield to allow other I/O + buf._offset = offset; + bytesThisTick = 0; + setImmediate(() => { + state.sending = false; + run(); + }); + return; + } + } + + // Finished this buffer + delete buf._offset; + state.queue.shift(); + } + + state.sending = false; + }; + + run(); +} + +function enqueueToTunnel(connectionId, data) { + let state = txQueues.get(connectionId); + if (!state) { + state = { queue: [], sending: false }; + txQueues.set(connectionId, state); + } + state.queue.push(data); + // Attempt to process immediately + if (!state.sending) { + state.sending = false; // ensure process can start + processTxQueue(connectionId, state); + } +} + function closeAllUpstreams() { for (const [, s] of upstreamConns) { try { s.destroy(); } catch (_) {} @@ -248,7 +329,8 @@ const server = tls.createServer( }); upstream.on('data', (chunk) => { - writeMessage(tunnelSocket, MSG_TYPES.DATA, connectionId, chunk); + // Queue data with framing and backpressure-aware sending + enqueueToTunnel(connectionId, chunk); }); const cleanup = () => { @@ -256,6 +338,8 @@ const server = tls.createServer( if (upstreamConns.get(connectionId) === upstream) { upstreamConns.delete(connectionId); } + // Drop any pending TX data for this connection + txQueues.delete(connectionId); writeMessage(tunnelSocket, MSG_TYPES.CLOSE, connectionId); };