Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ bin/
*.exe
*.json
*.log
*.token
*.token
diagnostics/
diagnostics-smoke/
365 changes: 273 additions & 92 deletions README.md

Large diffs are not rendered by default.

24 changes: 23 additions & 1 deletion client_config.json.example
Original file line number Diff line number Diff line change
@@ -1,12 +1,34 @@
{
"listen_addr": "127.0.0.1:1080",
"storage_type": "google",
"performance_profile": "balanced",
"refresh_rate_ms": 200,
"flush_rate_ms": 300,
"idle_poll_max_ms": 2000,
"idle_poll_step_ms": 500,
"session_idle_timeout_sec": 25,
"cleanup_file_max_age_sec": 60,
"startup_stale_max_age_sec": 20,
"storage_retry_max": 3,
"storage_retry_base_ms": 300,
"storage_op_timeout_sec": 45,
"max_payload_bytes": 786432,
"max_active_sessions": 0,
"target_metrics_top_n": 10,
"blocked_targets": [],
"low_priority_targets": [],
"session_wait_timeout_sec": 15,
"backpressure_bytes": 4194304,
"immediate_flush": false,
"cold_start_burst_ms": 10000,
"cold_start_poll_ms": 100,
"metrics_log_sec": 30,
"health_listen_addr": "127.0.0.1:18081",
"transport": {
"TargetIP": "216.239.38.120:443",
"SNI": "google.com",
"HostHeader": "www.googleapis.com",
"InsecureSkipVerify": false
}
},
"google_lanes": []
}
199 changes: 182 additions & 17 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ import (
"net"
"os"
"os/signal"
"path"
"strings"
"syscall"
"time"

"github.com/NullLatency/flow-driver/internal/app"
"github.com/NullLatency/flow-driver/internal/config"
"github.com/NullLatency/flow-driver/internal/httpclient"
"github.com/NullLatency/flow-driver/internal/storage"
"github.com/NullLatency/flow-driver/internal/health"
"github.com/NullLatency/flow-driver/internal/transport"
"github.com/things-go/go-socks5"
"github.com/things-go/go-socks5/statute"
Expand Down Expand Up @@ -48,23 +51,18 @@ func main() {
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
appCfg.ApplyProfile()

var backend storage.Backend
if appCfg.StorageType == "google" {
customHttpClient := httpclient.NewCustomClient(appCfg.Transport)
backend = storage.NewGoogleBackend(customHttpClient, gcPath, appCfg.GoogleFolderID)
} else {
backend, err = storage.NewLocalBackend(appCfg.LocalDir)
if err != nil {
log.Fatalf("Failed to init local storage: %v", err)
}
backend, err := app.BuildBackend(appCfg, gcPath)
if err != nil {
log.Fatalf("Failed to init storage: %v", err)
}
if err := backend.Login(ctx); err != nil {
log.Fatalf("Backend login failed: %v", err)
}

// AUTOMATION: If folder ID is missing, find or create it
if appCfg.StorageType == "google" && appCfg.GoogleFolderID == "" {
if appCfg.StorageType == "google" && len(appCfg.GoogleLanes) == 0 && appCfg.GoogleFolderID == "" {
log.Println("Zero-Config: Searching for existing Google Drive folder 'Flow-Data'...")
folderID, err := backend.FindFolder(ctx, "Flow-Data")
if err != nil {
Expand Down Expand Up @@ -100,36 +98,71 @@ func main() {
if appCfg.FlushRateMs > 0 {
engine.SetFlushRate(appCfg.FlushRateMs)
}
engine.SetIdlePollMax(appCfg.IdlePollMaxMs)
engine.SetIdlePollStep(appCfg.IdlePollStepMs)
engine.SetSessionIdleTimeout(appCfg.SessionIdleTimeoutSec)
engine.SetCleanupFileMaxAge(appCfg.CleanupFileMaxAgeSec)
engine.SetStartupStaleMaxAge(appCfg.StartupStaleMaxAgeSec)
engine.SetMaxPayloadBytes(appCfg.MaxPayloadBytes)
engine.SetBackpressureBytes(appCfg.BackpressureBytes)
engine.SetStorageOpTimeout(appCfg.StorageOpTimeoutSec)
engine.SetImmediateFlush(appCfg.ImmediateFlush)
engine.SetColdStartBurst(appCfg.ColdStartBurstMs, appCfg.ColdStartPollMs)
engine.SetMetricsLogInterval(appCfg.MetricsLogSec)
engine.SetTargetMetricsTopN(appCfg.TargetMetricsTopN)
engine.Start(ctx)
health.Start(ctx, appCfg.HealthListenAddr, engine)

listenAddr := appCfg.ListenAddr
if listenAddr == "" {
listenAddr = "127.0.0.1:1080"
}
policy := newTargetPolicy(appCfg.BlockedTargets, appCfg.LowPriorityTargets)

// Create the library SOCKS5 server wrapping our custom Google Drive Engine tunnel
server := socks5.NewServer(
socks5.WithDial(func(dc context.Context, network, addr string) (net.Conn, error) {
host, port, hasHostPort := splitTarget(addr)
if policy.blocked(host, port, addr) {
engine.RecordBlockedTarget(addr)
log.Printf("Blocked target by policy: %s", addr)
return nil, fmt.Errorf("target blocked by policy: %s", addr)
}
lowPriority := policy.lowPriority(host, port, addr)

if err := waitForSessionCapacity(dc, engine, appCfg.MaxActiveSessions, appCfg.SessionWaitTimeoutSec); err != nil {
return nil, err
}

sessionID := generateSessionID()

// Intelligently parse the address string to warn users if their browser is natively leaking DNS
host, port, err := net.SplitHostPort(addr)
if err == nil {
priorityLabel := ""
if lowPriority {
priorityLabel = " LOW-PRIORITY"
}
if hasHostPort {
if net.ParseIP(host) != nil {
log.Printf("New covert session %s targeting RAW IP %s:%s (Warning: Local DNS Leak?)", sessionID, host, port)
log.Printf("New covert session %s%s targeting RAW IP %s:%s (Warning: Local DNS Leak?)", sessionID, priorityLabel, host, port)
} else {
log.Printf("New covert session %s targeting SECURE DOMAIN %s:%s", sessionID, host, port)
log.Printf("New covert session %s%s targeting SECURE DOMAIN %s:%s", sessionID, priorityLabel, host, port)
}
} else {
log.Printf("New covert session %s targeting %s", sessionID, addr)
log.Printf("New covert session %s%s targeting %s", sessionID, priorityLabel, addr)
}

session := transport.NewSession(sessionID)
session.TargetAddr = addr
session.TargetHost = host
session.LowPriority = lowPriority
engine.AddSession(session)

// Instantly ping a blank payload so the remote end opens the actual TCP destination
session.EnqueueTx(nil)
if !lowPriority {
engine.TriggerWarmPoll()
engine.ForceFlush()
}

return transport.NewVirtualConn(session, engine), nil
}),
Expand Down Expand Up @@ -158,3 +191,135 @@ func main() {
log.Println("Shutting down client...")
cancel()
}

type targetPolicy struct {
blockedTargets []targetPattern
lowPriorityTargets []targetPattern
}

type targetPattern struct {
raw string
host string
port string
}

func newTargetPolicy(blocked, lowPriority []string) targetPolicy {
return targetPolicy{
blockedTargets: parseTargetPatterns(blocked),
lowPriorityTargets: parseTargetPatterns(lowPriority),
}
}

func parseTargetPatterns(values []string) []targetPattern {
patterns := make([]targetPattern, 0, len(values))
for _, value := range values {
raw := strings.TrimSpace(strings.ToLower(value))
if raw == "" {
continue
}
host, port, ok := splitPattern(raw)
if !ok {
host = raw
}
patterns = append(patterns, targetPattern{raw: raw, host: host, port: port})
}
return patterns
}

func splitPattern(value string) (host, port string, ok bool) {
if strings.HasPrefix(value, "[") {
host, port, err := net.SplitHostPort(value)
if err == nil {
return strings.Trim(host, "[]"), port, true
}
}
lastColon := strings.LastIndex(value, ":")
if lastColon <= 0 {
return "", "", false
}
host = value[:lastColon]
port = value[lastColon+1:]
if host == "" || port == "" {
return "", "", false
}
return strings.Trim(host, "[]"), port, true
}

func (p targetPolicy) blocked(host, port, raw string) bool {
return targetPatternsMatch(p.blockedTargets, host, port, raw)
}

func (p targetPolicy) lowPriority(host, port, raw string) bool {
return targetPatternsMatch(p.lowPriorityTargets, host, port, raw)
}

func targetPatternsMatch(patterns []targetPattern, host, port, raw string) bool {
host = strings.ToLower(strings.Trim(host, "[]"))
port = strings.ToLower(port)
raw = strings.ToLower(raw)
for _, pattern := range patterns {
if pattern.match(host, port, raw) {
return true
}
}
return false
}

func (p targetPattern) match(host, port, raw string) bool {
if p.port != "" && p.port != "*" && p.port != port {
return false
}

patternHost := p.host
if patternHost == "" {
patternHost = p.raw
}
if matched, err := path.Match(patternHost, host); err == nil && matched {
return true
}
if p.port == "" {
if matched, err := path.Match(p.raw, raw); err == nil && matched {
return true
}
}
return patternHost == host || (p.port == "" && p.raw == raw)
}

func splitTarget(addr string) (host, port string, ok bool) {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return strings.ToLower(strings.Trim(addr, "[]")), "", false
}
return strings.ToLower(strings.Trim(host, "[]")), strings.ToLower(port), true
}

func waitForSessionCapacity(ctx context.Context, engine *transport.Engine, maxActive, timeoutSec int) error {
if maxActive <= 0 {
return nil
}
if engine.ActiveSessionCount() < maxActive {
return nil
}

waitTimeout := 10 * time.Second
if timeoutSec > 0 {
waitTimeout = time.Duration(timeoutSec) * time.Second
}
timer := time.NewTimer(waitTimeout)
defer timer.Stop()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return fmt.Errorf("too many active sessions: %d/%d", engine.ActiveSessionCount(), maxActive)
case <-ticker.C:
if engine.ActiveSessionCount() < maxActive {
return nil
}
}
}
}
54 changes: 54 additions & 0 deletions cmd/client/target_policy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package main

import "testing"

func TestTargetPolicyMatchesHostAndPortPatterns(t *testing.T) {
policy := newTargetPolicy(
[]string{"*.doubleclick.net", "*:5228"},
[]string{"mtalk.google.com:*", "optimizationguide-pa.googleapis.com"},
)

tests := []struct {
name string
addr string
blocked bool
lowPriority bool
}{
{
name: "blocked host glob",
addr: "googleads.g.doubleclick.net:443",
blocked: true,
},
{
name: "blocked port glob",
addr: "push.example.com:5228",
blocked: true,
},
{
name: "low priority host with any port",
addr: "mtalk.google.com:443",
lowPriority: true,
},
{
name: "low priority host without explicit port pattern",
addr: "optimizationguide-pa.googleapis.com:443",
lowPriority: true,
},
{
name: "unmatched target",
addr: "www.youtube.com:443",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
host, port, _ := splitTarget(tt.addr)
if got := policy.blocked(host, port, tt.addr); got != tt.blocked {
t.Fatalf("blocked=%v, want %v", got, tt.blocked)
}
if got := policy.lowPriority(host, port, tt.addr); got != tt.lowPriority {
t.Fatalf("lowPriority=%v, want %v", got, tt.lowPriority)
}
})
}
}
Loading