Skip to content

Commit c194114

Browse files
feat: add data chunck server side
1 parent a939f8a commit c194114

5 files changed

Lines changed: 210 additions & 45 deletions

File tree

common/helper.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package common
22

33
import (
4+
"bytes"
45
"crypto/sha256"
56
"encoding/base64"
7+
"errors"
68
"io"
79
"net/http"
810
"strings"
@@ -13,17 +15,31 @@ import (
1315
)
1416

1517
func ReadProtoBody(body io.ReadCloser, message proto.Message) error {
16-
data, err := io.ReadAll(body)
17-
if err != nil {
18-
return err
19-
}
2018
defer body.Close()
2119

22-
// Decode into a map
23-
if err = proto.Unmarshal(data, message); err != nil {
24-
return err
20+
// Stream read into a buffer to support chunked uploads without requiring Content-Length.
21+
var buf bytes.Buffer
22+
tmp := make([]byte, 64*1024) // 64KB chunks to avoid large allocations
23+
for {
24+
n, err := body.Read(tmp)
25+
if n > 0 {
26+
if _, werr := buf.Write(tmp[:n]); werr != nil {
27+
return werr
28+
}
29+
}
30+
if errors.Is(err, io.EOF) {
31+
break
32+
}
33+
if err != nil {
34+
return err
35+
}
2536
}
26-
return nil
37+
38+
if buf.Len() == 0 {
39+
return io.ErrUnexpectedEOF
40+
}
41+
42+
return proto.Unmarshal(buf.Bytes(), message)
2743
}
2844

2945
func SendProtoResponse(w http.ResponseWriter, data proto.Message) {

common/helper_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package common
2+
3+
import (
4+
"bytes"
5+
"io"
6+
"strings"
7+
"testing"
8+
9+
"google.golang.org/protobuf/proto"
10+
)
11+
12+
func TestReadProtoBodyChunked(t *testing.T) {
13+
largeConfig := strings.Repeat("config", 30000) // ~180KB to force multiple reads
14+
expected := &Backend{
15+
Type: BackendType_XRAY,
16+
Config: largeConfig,
17+
Users: []*User{
18+
{Email: "a@example.com"},
19+
},
20+
KeepAlive: 5,
21+
}
22+
23+
data, err := proto.Marshal(expected)
24+
if err != nil {
25+
t.Fatalf("failed to marshal backend: %v", err)
26+
}
27+
28+
reader := io.NopCloser(bytes.NewReader(data))
29+
30+
var decoded Backend
31+
if err = ReadProtoBody(reader, &decoded); err != nil {
32+
t.Fatalf("ReadProtoBody returned error: %v", err)
33+
}
34+
35+
if decoded.GetConfig() != expected.GetConfig() {
36+
t.Fatalf("config mismatch: got %d chars, want %d chars", len(decoded.GetConfig()), len(expected.GetConfig()))
37+
}
38+
39+
if decoded.GetKeepAlive() != expected.GetKeepAlive() {
40+
t.Fatalf("keepalive mismatch: got %d, want %d", decoded.GetKeepAlive(), expected.GetKeepAlive())
41+
}
42+
43+
if got := decoded.GetUsers(); len(got) != len(expected.GetUsers()) || got[0].GetEmail() != expected.GetUsers()[0].GetEmail() {
44+
t.Fatalf("users mismatch: got %+v, want %+v", got, expected.GetUsers())
45+
}
46+
}

controller/rest/user.go

Lines changed: 68 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package rest
22

33
import (
4+
"bytes"
5+
"errors"
6+
"fmt"
47
"io"
58
"log"
69
"net/http"
@@ -10,66 +13,94 @@ import (
1013
"github.com/pasarguard/node/common"
1114
)
1215

13-
func (s *Service) SyncUser(w http.ResponseWriter, r *http.Request) {
14-
body, err := io.ReadAll(r.Body)
15-
if err != nil {
16-
http.Error(w, "Failed to read request body", http.StatusBadRequest)
17-
return
16+
const (
17+
requestChunkSize = 64 * 1024 // 64KB streaming chunks
18+
)
19+
20+
func readRequestBody(body io.ReadCloser) ([]byte, error) {
21+
defer body.Close()
22+
23+
var buf bytes.Buffer
24+
tmp := make([]byte, requestChunkSize)
25+
for {
26+
n, err := body.Read(tmp)
27+
if n > 0 {
28+
if _, werr := buf.Write(tmp[:n]); werr != nil {
29+
return nil, werr
30+
}
31+
}
32+
if errors.Is(err, io.EOF) {
33+
break
34+
}
35+
if err != nil {
36+
return nil, err
37+
}
1838
}
19-
defer r.Body.Close()
2039

21-
user := &common.User{}
22-
if err = proto.Unmarshal(body, user); err != nil {
23-
http.Error(w, "Failed to decode user", http.StatusBadRequest)
24-
return
40+
if buf.Len() == 0 {
41+
return nil, io.ErrUnexpectedEOF
2542
}
2643

27-
if user == nil {
28-
http.Error(w, "no user received", http.StatusBadRequest)
29-
return
44+
return buf.Bytes(), nil
45+
}
46+
47+
func decodeUsersPayload(data []byte) ([]*common.User, error) {
48+
if len(data) == 0 {
49+
return nil, io.ErrUnexpectedEOF
3050
}
3151

32-
log.Printf("Got user: %v", user.GetEmail())
52+
// First try a Users envelope for batch updates.
53+
users := &common.Users{}
54+
if err := proto.Unmarshal(data, users); err == nil && len(users.GetUsers()) > 0 {
55+
return users.GetUsers(), nil
56+
}
3357

34-
if err = s.Backend().SyncUser(r.Context(), user); err != nil {
35-
log.Printf("Error syncing user: %v", err)
36-
http.Error(w, err.Error(), http.StatusInternalServerError)
37-
return
58+
// Fallback to single user payload.
59+
user := &common.User{}
60+
if err := proto.Unmarshal(data, user); err == nil && user.GetEmail() != "" {
61+
return []*common.User{user}, nil
3862
}
3963

40-
response, _ := proto.Marshal(&common.Empty{})
64+
return nil, fmt.Errorf("failed to decode user payload")
65+
}
4166

42-
w.Header().Set("Content-Type", "application/x-protobuf")
43-
if _, err = w.Write(response); err != nil {
44-
http.Error(w, "Failed to write response", http.StatusInternalServerError)
67+
func (s *Service) SyncUser(w http.ResponseWriter, r *http.Request) {
68+
body, err := readRequestBody(r.Body)
69+
if err != nil {
70+
http.Error(w, fmt.Sprintf("Failed to read request body: %v", err), http.StatusBadRequest)
4571
return
4672
}
47-
}
4873

49-
func (s *Service) SyncUsers(w http.ResponseWriter, r *http.Request) {
50-
body, err := io.ReadAll(r.Body)
74+
users, err := decodeUsersPayload(body)
5175
if err != nil {
52-
http.Error(w, "Failed to read request body", http.StatusBadRequest)
76+
http.Error(w, fmt.Sprintf("Failed to decode user payload: %v", err), http.StatusBadRequest)
5377
return
5478
}
55-
defer r.Body.Close()
5679

80+
for _, user := range users {
81+
log.Printf("Got user: %v", user.GetEmail())
82+
83+
if err = s.Backend().SyncUser(r.Context(), user); err != nil {
84+
log.Printf("Error syncing user: %v", err)
85+
http.Error(w, err.Error(), http.StatusInternalServerError)
86+
return
87+
}
88+
}
89+
90+
common.SendProtoResponse(w, &common.Empty{})
91+
}
92+
93+
func (s *Service) SyncUsers(w http.ResponseWriter, r *http.Request) {
5794
users := &common.Users{}
58-
if err = proto.Unmarshal(body, users); err != nil {
59-
http.Error(w, "Failed to decode user", http.StatusBadRequest)
95+
if err := common.ReadProtoBody(r.Body, users); err != nil {
96+
http.Error(w, fmt.Sprintf("Failed to decode user payload: %v", err), http.StatusBadRequest)
6097
return
6198
}
6299

63-
if err = s.Backend().SyncUsers(r.Context(), users.GetUsers()); err != nil {
100+
if err := s.Backend().SyncUsers(r.Context(), users.GetUsers()); err != nil {
64101
http.Error(w, err.Error(), http.StatusInternalServerError)
65102
return
66103
}
67104

68-
response, _ := proto.Marshal(&common.Empty{})
69-
70-
w.Header().Set("Content-Type", "application/x-protobuf")
71-
if _, err = w.Write(response); err != nil {
72-
http.Error(w, "Failed to write response", http.StatusInternalServerError)
73-
return
74-
}
105+
common.SendProtoResponse(w, &common.Empty{})
75106
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package rest
2+
3+
import (
4+
"bytes"
5+
"io"
6+
"testing"
7+
8+
"google.golang.org/protobuf/proto"
9+
10+
"github.com/pasarguard/node/common"
11+
)
12+
13+
func TestReadRequestBodyChunked(t *testing.T) {
14+
payload := bytes.Repeat([]byte("abc123"), 50000) // ~300KB
15+
reader := io.NopCloser(bytes.NewReader(payload))
16+
17+
data, err := readRequestBody(reader)
18+
if err != nil {
19+
t.Fatalf("readRequestBody failed: %v", err)
20+
}
21+
22+
if len(data) != len(payload) {
23+
t.Fatalf("payload length mismatch: got %d, want %d", len(data), len(payload))
24+
}
25+
26+
if !bytes.Equal(data[:32], payload[:32]) || !bytes.Equal(data[len(data)-32:], payload[len(payload)-32:]) {
27+
t.Fatalf("payload contents mismatch")
28+
}
29+
}
30+
31+
func TestDecodeUsersPayloadSingleUser(t *testing.T) {
32+
user := &common.User{Email: "single@example.com"}
33+
data, err := proto.Marshal(user)
34+
if err != nil {
35+
t.Fatalf("failed to marshal user: %v", err)
36+
}
37+
38+
users, err := decodeUsersPayload(data)
39+
if err != nil {
40+
t.Fatalf("decodeUsersPayload returned error: %v", err)
41+
}
42+
43+
if len(users) != 1 || users[0].GetEmail() != user.GetEmail() {
44+
t.Fatalf("decoded users mismatch: %+v", users)
45+
}
46+
}
47+
48+
func TestDecodeUsersPayloadUsersEnvelope(t *testing.T) {
49+
usersMsg := &common.Users{
50+
Users: []*common.User{
51+
{Email: "a@example.com"},
52+
{Email: "b@example.com"},
53+
},
54+
}
55+
data, err := proto.Marshal(usersMsg)
56+
if err != nil {
57+
t.Fatalf("failed to marshal users: %v", err)
58+
}
59+
60+
users, err := decodeUsersPayload(data)
61+
if err != nil {
62+
t.Fatalf("decodeUsersPayload returned error: %v", err)
63+
}
64+
65+
if len(users) != len(usersMsg.GetUsers()) {
66+
t.Fatalf("expected %d users, got %d", len(usersMsg.GetUsers()), len(users))
67+
}
68+
}

controller/rpc/service.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,15 @@ func StartGRPCListener(tlsConfig *tls.Config, addr string, cfg *config.Config) (
2929

3030
creds := credentials.NewTLS(tlsConfig)
3131

32+
const maxGRPCMessageSize = 64 * 1024 * 1024 // 64MB to support large Backend and Users payloads
33+
3234
// Create the gRPC server with conditional middleware
3335
grpcServer := grpc.NewServer(
3436
grpc.Creds(creds),
3537
grpc.UnaryInterceptor(ConditionalMiddleware(s)),
3638
grpc.StreamInterceptor(ConditionalStreamMiddleware(s)),
39+
grpc.MaxRecvMsgSize(maxGRPCMessageSize),
40+
grpc.MaxSendMsgSize(maxGRPCMessageSize),
3741
)
3842

3943
// Register the service

0 commit comments

Comments
 (0)