-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathcontext.go
More file actions
297 lines (241 loc) · 7.46 KB
/
context.go
File metadata and controls
297 lines (241 loc) · 7.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
package raiden
import (
"context"
"encoding/json"
"errors"
"net/http"
"reflect"
"time"
"github.com/sev-2/raiden/pkg/client/net"
"github.com/valyala/fasthttp"
"go.opentelemetry.io/otel/trace"
)
type (
// The `Context` interface defines a set of methods that can be implemented by a struct to provide a
// context for handling HTTP requests in the Raiden framework.
Context interface {
Ctx() context.Context
SetCtx(ctx context.Context)
Config() *Config
SendRpc(Rpc) error
ExecuteRpc(Rpc) (any, error)
SendJson(data any) error
SendError(message string) error
SendErrorWithCode(statusCode int, err error) error
RequestContext() *fasthttp.RequestCtx
Span() trace.Span
SetSpan(span trace.Span)
Tracer() trace.Tracer
NewJobCtx() (JobContext, error)
Write(data []byte)
WriteError(err error)
Set(key string, value any)
Get(key string) any
GetParam(key string) any
GetQuery(key string) string
Publish(ctx context.Context, provider PubSubProviderType, topic string, message []byte) error
HttpRequest(method string, url string, body []byte, headers map[string]string, timeout time.Duration) (*http.Response, error)
HttpRequestAndBind(method string, url string, body []byte, headers map[string]string, timeout time.Duration, response any) error
ResolveLibrary(key any) error
RegisterLibraries(key map[string]any)
}
// The `Ctx` struct is a struct that implements the `Context` interface in the Raiden framework. It
// embeds the `context.Context` and `*fasthttp.RequestCtx` types, which provide the context and request
// information for handling HTTP requests. Additionally, it has fields for storing the configuration
// (`config`), span (`span`), and tracer (`tracer`) for tracing and monitoring purposes.
Ctx struct {
context.Context
*fasthttp.RequestCtx
config *Config
span trace.Span
tracer trace.Tracer
jobChan chan JobParams
data map[string]any
pubSub PubSub
libraryRegistry map[string]any
}
)
func NewCtx(config *Config, tracer trace.Tracer, jobChan chan JobParams) Ctx {
return Ctx{
config: config,
tracer: tracer,
jobChan: jobChan,
}
}
func (c *Ctx) Config() *Config {
return c.config
}
func (c *Ctx) SendRpc(rpc Rpc) error {
rs, err := c.ExecuteRpc(rpc)
if err != nil {
return err
}
return c.SendJson(rs)
}
func (c *Ctx) ExecuteRpc(rpc Rpc) (any, error) {
return ExecuteRpc(c, rpc)
}
func (c *Ctx) RequestContext() *fasthttp.RequestCtx {
return c.RequestCtx
}
func (c *Ctx) Span() trace.Span {
return c.span
}
func (c *Ctx) SetSpan(span trace.Span) {
c.span = span
}
func (c *Ctx) Tracer() trace.Tracer {
return c.tracer
}
func (c *Ctx) SetJobChan(jobChan chan JobParams) {
c.jobChan = jobChan
}
func (c *Ctx) RegisterLibraries(key map[string]any) {
c.libraryRegistry = key
}
func (c *Ctx) NewJobCtx() (JobContext, error) {
if c.jobChan != nil {
jobCtx := newJobCtx(c.config, c.pubSub, c.jobChan, make(JobData))
spanCtx := trace.SpanContextFromContext(c.Context)
jobCtx.SetContext(trace.ContextWithSpanContext(context.Background(), spanCtx))
return jobCtx, nil
}
return nil, errors.New(("event channel not available, enable scheduler to use this feature"))
}
func (c *Ctx) Ctx() context.Context {
return c.Context
}
func (c *Ctx) SetCtx(ctx context.Context) {
c.Context = ctx
}
func (c *Ctx) Get(key string) any {
if c.data == nil {
c.data = make(map[string]any)
}
return c.data[key]
}
func (c *Ctx) GetParam(key string) any {
return c.RequestContext().UserValue(key)
}
func (c *Ctx) GetQuery(key string) string {
return string(c.RequestContext().QueryArgs().Peek(key))
}
func (c *Ctx) Set(key string, value any) {
if c.data == nil {
c.data = make(map[string]any)
}
c.data[key] = value
}
func (c *Ctx) Publish(ctx context.Context, provider PubSubProviderType, topic string, message []byte) error {
if c.pubSub == nil {
return errors.New("unable to publish because pubsub not initialize")
}
return c.pubSub.Publish(ctx, provider, topic, message)
}
// The `SendJson` function is a method of the `Ctx` struct in the Raiden framework. It is responsible
// for sending a JSON response to the client.
func (c *Ctx) SendJson(data any) error {
c.Response.Header.SetContentType("application/json")
byteData, err := json.Marshal(data)
if err != nil {
return err
}
c.Write(byteData)
return nil
}
func (c *Ctx) SendError(message string) error {
return &ErrorResponse{
Message: message,
StatusCode: fasthttp.StatusInternalServerError,
}
}
func (c *Ctx) SendErrorWithCode(statusCode int, err error) error {
return &ErrorResponse{
Message: err.Error(),
StatusCode: statusCode,
Code: fasthttp.StatusMessage(statusCode),
}
}
func (c *Ctx) HttpRequestAndBind(method string, url string, body []byte, headers map[string]string, timeout time.Duration, response any) error {
if reflect.TypeOf(response).Kind() != reflect.Ptr {
return errors.New("response payload must be pointer")
}
byteData, err := net.SendRequest(method, url, body, timeout, func(req *http.Request) error {
currentHeaders := req.Header.Clone()
if len(headers) > 0 {
for k, v := range headers {
currentHeaders.Set(k, v)
}
}
req.Header = currentHeaders
return nil
}, nil)
if err != nil {
return err
}
return json.Unmarshal(byteData, response)
}
func (c *Ctx) HttpRequest(method string, url string, body []byte, headers map[string]string, timeout time.Duration) (response *http.Response, err error) {
_, errResponse := net.SendRequest(method, url, body, timeout, func(req *http.Request) error {
currentHeaders := req.Header.Clone()
if len(headers) > 0 {
for k, v := range headers {
currentHeaders.Set(k, v)
}
}
req.Header = currentHeaders
return nil
}, func(resp *http.Response) error {
response = resp
return nil
})
if errResponse != nil {
err = errResponse
}
return
}
// The `WriteError` function is a method of the `Ctx` struct in the Raiden framework. It is responsible
// for writing an error response to the HTTP response body.
func (c *Ctx) WriteError(err error) {
c.Response.Header.SetContentType("application/json")
if errResponse, ok := err.(*ErrorResponse); ok {
responseByte, errMarshall := json.Marshal(errResponse)
if errMarshall == nil {
c.Response.SetStatusCode(errResponse.StatusCode)
c.Response.AppendBody(responseByte)
return
}
err = errMarshall
}
c.Response.SetStatusCode(fasthttp.StatusInternalServerError)
c.Response.AppendBodyString(err.Error())
}
// The `Write` function is a method of the `Ctx` struct in the Raiden framework. It is responsible for
// writing the response body to the HTTP response.
func (c *Ctx) Write(data []byte) {
c.Response.SetStatusCode(fasthttp.StatusOK)
c.Response.AppendBody(data)
}
func (c *Ctx) ResolveLibrary(key any) error {
keyVal := reflect.ValueOf(key)
if keyVal.Kind() != reflect.Ptr {
return errors.New("key must be a pointer")
}
typeOfKey := reflect.TypeOf(key).Elem()
val, exists := c.libraryRegistry[typeOfKey.Name()]
if !exists {
return errors.New("lib not initialized")
}
valReflect := reflect.ValueOf(val)
// If the stored value is a pointer but key expects a non-pointer, dereference it
if valReflect.Kind() == reflect.Ptr && valReflect.Elem().Type() == typeOfKey {
keyVal.Elem().Set(valReflect.Elem()) // Set the dereferenced value
return nil
}
// If the types match directly, assign the value
if valReflect.Type().AssignableTo(typeOfKey) {
keyVal.Elem().Set(valReflect)
return nil
}
return errors.New("lib not initialized")
}