Conversation
| return 1 | ||
| } | ||
|
|
||
| func (c *Client) CallStream(opname string, req, resp Message, handleStream func(client *StreamClient) error) error { |
server.go
Outdated
| } | ||
|
|
||
| func (s *Server) handleRequest(wi *serverWorkItem, pendingResponses chan<- *serverWorkItem, stopCh <-chan struct{}) { | ||
| func (s *Server) handleRequest(wi *serverWorkItem, pendingResponses chan<- *serverWorkItem, stream *StreamServer, stopCh <-chan struct{}) { |
client.go
Outdated
| // | ||
| // This saves memory and CPU resources. | ||
|
|
||
| s := NewStreamClient(sid, xxhash.Sum64String(opname), c.opts.Codec, in, func(id uint32, m *request, resp chan error) { |
| continue | ||
| } | ||
|
|
||
| var stream *StreamServer |
server.go
Outdated
| in := make(chan *request, 1) | ||
| streamID := binary.BigEndian.Uint32(wi.streamID[:]) | ||
| s.inStreamMsg[streamID] = in | ||
| stream = NewServerStream(streamID, s.opts.Codec, in, func(id uint32, m *response, resp chan error) { |
| var request = strconv.Itoa(t.N / 2) | ||
| var response = "" | ||
|
|
||
| err := c.CallStream("stream", &request, &response, func(client *StreamClient) error { |
stream_test.go
Outdated
| t.ResetTimer() | ||
| t.ReportAllocs() | ||
|
|
||
| err := client.SendMsg(request) |
There was a problem hiding this comment.
invalid operation: client (variable of type *invalid type) has no field or method SendMsg
| var r string | ||
|
|
||
| for i := 0; i < t.N/2; i++ { | ||
| err = client.RecvMsg(&r) |
There was a problem hiding this comment.
invalid operation: client (variable of type *invalid type) has no field or method RecvMsg
| reply := opinfo.ReplyType() | ||
|
|
||
| if err = handler(nil, args, reply); err != nil { | ||
| if err = handler(&Context{stream}, args, reply); err != nil { |
There was a problem hiding this comment.
github.com/thesyncim/exposed.Context composite literal uses unkeyed fields
| ReleaseResponse(rawResp) | ||
| releaseClientWorkItem(wi) | ||
| c.decPendingRequests() | ||
| c.getError(err) |
There was a problem hiding this comment.
Error return value of c.getError is not checked
| ReleaseResponse(rawResp) | ||
| releaseClientWorkItem(wi) | ||
| c.decPendingRequests() | ||
| c.getError(err) |
There was a problem hiding this comment.
Error return value of c.getError is not checked
| ReleaseResponse(rawResp) | ||
| releaseClientWorkItem(wi) | ||
| c.decPendingRequests() | ||
| c.getError(err) |
There was a problem hiding this comment.
Error return value of c.getError is not checked
| codec encoding.Codec | ||
|
|
||
| inMessages chan *request | ||
| errOutCh chan error |
|
|
||
| type StreamClient struct { | ||
| ID uint32 | ||
| isServer bool |
stream.go
Outdated
| codec encoding.Codec | ||
|
|
||
| inMessages <-chan *response | ||
| errOutCh chan error |
| pendingResponsesLock sync.Mutex | ||
|
|
||
| incomingStreamMsg sync.Map | ||
| incomingStreamMsgLock sync.Mutex |
server.go
Outdated
| } | ||
|
|
||
| var ( | ||
| typeUnarycall = []byte{byte(0)} |
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| if i == 99 { |
make stream thread safe improve and reorganize code
| close(stopCh) | ||
| inStreamMsg.Range(func(key, value interface{}) bool { | ||
| panic("das") | ||
| close(value.(chan *request)) |
| conn.Close() | ||
| inStreamMsg.Range(func(key, value interface{}) bool { | ||
| panic("das") | ||
| close(value.(chan *request)) |
| streamMessage = packetControl(1) | ||
| streamStart = packetControl(2) | ||
| ///todo implement | ||
| streamClose = packetControl(3) |
| streamStart = packetControl(2) | ||
| ///todo implement | ||
| streamClose = packetControl(3) | ||
| streamCloseRead = packetControl(4) |
| ///todo implement | ||
| streamClose = packetControl(3) | ||
| streamCloseRead = packetControl(4) | ||
| streamCloseWrite = packetControl(5) |
| inMessages <-chan *response | ||
| serverOutMessages chan<- WorkItem | ||
|
|
||
| errOutCh chan error |
| codec encoding.Codec | ||
|
|
||
| inMessages <-chan *response | ||
| serverOutMessages chan<- WorkItem |
| reqID [4]byte | ||
| type serverUnaryWorkItem struct { | ||
| ctx *exposedCtx | ||
| startStream bool |
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| if i == 99 { |
| }) | ||
| sid := c.nextStreamID() | ||
| inStream, _ := c.incomingStreamMsg.LoadOrStore(sid, make(chan *response, 100)) | ||
| var in chan *response |
There was a problem hiding this comment.
should merge variable declaration with assignment on next line
Codecov Report
@@ Coverage Diff @@
## master #9 +/- ##
===========================================
- Coverage 66.84% 53.08% -13.77%
===========================================
Files 9 10 +1
Lines 926 1217 +291
===========================================
+ Hits 619 646 +27
- Misses 239 487 +248
- Partials 68 84 +16
Continue to review full report at Codecov.
|
No description provided.