Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 13 additions & 49 deletions lib/src/commonMain/kotlin/org/holance/ktbus/KtBus.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,16 @@ import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeoutOrNull
import java.lang.reflect.Method
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
import kotlin.reflect.KClass
import kotlin.reflect.KFunction
import kotlin.reflect.KParameter
import kotlin.reflect.full.callSuspend
import kotlin.reflect.full.createInstance
import kotlin.reflect.full.findAnnotation
import kotlin.reflect.full.memberFunctions
import kotlin.reflect.jvm.javaMethod
import kotlin.reflect.jvm.isAccessible
import kotlin.reflect.jvm.jvmErasure
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
Expand Down Expand Up @@ -180,20 +177,6 @@ class KtBus(val config: KtBusConfig = KtBusConfig()) {
}
}

private suspend fun invokeSuspendFunction(
method: Method,
obj: Any,
event: Any
) {
suspendCoroutine<Unit> {
try {
method.invoke(obj, event, it)
} catch (e: Exception) {
it.resumeWithException(e)
}
}
}

/** Processes a method annotated with @Subscribe */
private fun processSubscriber(
target: Any,
Expand Down Expand Up @@ -233,18 +216,11 @@ class KtBus(val config: KtBusConfig = KtBusConfig()) {
return@collect
}
try {
function.isAccessible = true
if (function.isSuspend) {
if (function.javaMethod != null) {
invokeSuspendFunction(function.javaMethod!!, target, event.data)
} else {
function.callSuspend(target, event.data)
}
function.callSuspend(target, event.data)
} else {
if (function.javaMethod != null) {
function.javaMethod?.invoke(target, event.data)
} else {
function.call(target, event.data)
}
function.call(target, event.data)
}
} catch (e: Throwable) {
logger?.e("Exception in event handler [$source]: $e")
Expand Down Expand Up @@ -330,25 +306,13 @@ class KtBus(val config: KtBusConfig = KtBusConfig()) {

var responseData: Any? = null
var responseError: Throwable? = null
function.isAccessible = true
try {
// Call the actual handler function (suspend or regular)
responseData = if (function.isSuspend) {

if (function.javaMethod != null) {
invokeSuspendFunction(
function.javaMethod!!,
target,
requestWrapper.payload
)
} else {
function.callSuspend(target, requestWrapper.payload)
}
function.callSuspend(target, requestWrapper.payload)
} else {
if (function.javaMethod != null) {
function.javaMethod?.invoke(target, requestWrapper.payload)
} else {
function.call(target, requestWrapper.payload)
}
function.call(target, requestWrapper.payload)
}
// Basic check: Handler should return non-null for success
if (responseData == null) {
Expand Down Expand Up @@ -395,7 +359,7 @@ class KtBus(val config: KtBusConfig = KtBusConfig()) {
*
* - **Sticky Event Management:** Post will remove sticky event automatically.
*
* @sample org.holance.ktbus.samples.PostSamples
* @sample org.holance.samples.ktbus.PostSamples
*/
fun post(event: Any, channel: String = DefaultChannel) {
runBlocking {
Expand Down Expand Up @@ -488,7 +452,7 @@ class KtBus(val config: KtBusConfig = KtBusConfig()) {
* @throws IllegalArgumentException if the event is an internal wrapper type (RequestWrapper or ResponseWrapper).
* @throws IllegalArgumentException if the event is a generic type.
*
* @sample org.holance.ktbus.samples.PostSamples
* @sample org.holance.samples.ktbus.PostSamples
*/
suspend fun postAsync(
event: Any,
Expand Down Expand Up @@ -519,7 +483,7 @@ class KtBus(val config: KtBusConfig = KtBusConfig()) {
* @throws RequestException If the request handler encounters an error or returns null data.
* @throws IllegalArgumentException if the event is a unit or any type.
* @throws IllegalArgumentException if the event is a generic type.
* @sample org.holance.ktbus.samples.RequestSample.sendRequest
* @sample org.holance.samples.ktbus.RequestSample.sendRequest
*/
inline fun <reified T : Any, reified R : Any> request(
request: T,
Expand Down Expand Up @@ -552,7 +516,7 @@ class KtBus(val config: KtBusConfig = KtBusConfig()) {
* @throws IllegalArgumentException if the request or response type is a unit or any type.
* @throws IllegalArgumentException if the request or response type is a generic type.
*
* @sample org.holance.ktbus.samples.RequestSample.sendRequestAsync
* @sample org.holance.samples.ktbus.RequestSample.sendRequestAsync
*
*/
suspend inline fun <reified T : Any, reified R : Any> requestAsync(
Expand Down Expand Up @@ -633,7 +597,7 @@ class KtBus(val config: KtBusConfig = KtBusConfig()) {
* @see Subscribe
* @see RequestHandler
* @see unsubscribe
* @sample org.holance.ktbus.samples.SubscribeSample
* @sample org.holance.samples.ktbus.SubscribeSample
*/
fun subscribe(target: Any) {
if (subscriptions.containsKey(target)) {
Expand Down Expand Up @@ -667,7 +631,7 @@ class KtBus(val config: KtBusConfig = KtBusConfig()) {
* Also removes its handlers from the central registry.
*
* @param target The object instance to unsubscribe.
* @sample org.holance.ktbus.samples.SubscribeSample
* @sample org.holance.samples.ktbus.SubscribeSample
*/
fun unsubscribe(target: Any) {
// 1. Cancel collector jobs
Expand Down
22 changes: 22 additions & 0 deletions lib/src/commonMain/kotlin/org/holance/samples/ktbus/PostSamples.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.holance.samples.ktbus

import org.holance.ktbus.KtBus

class PostSamples {
val bus = KtBus.getDefault()
fun postEvent() {
bus.post(Event1("Hello, world!"))
}

fun postEventWithChannel() {
bus.post(Event1("Hello, world!"), "myChannel")
}

suspend fun postEventAsync() {
bus.postAsync(Event1("Hello, world!"))
}

suspend fun postEventWithChannelAsync() {
bus.postAsync(Event1("Hello, world!"), "myChannel")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.holance.samples.ktbus

import org.holance.ktbus.KtBus
import org.holance.ktbus.RequestHandler

class RequestSample {
data class MyRequest(val message: String)
data class MyResponse(val result: String)

class HandleRequest {
val bus = KtBus.getDefault()
fun setup() {
bus.subscribe(this)
}

fun tearDown() {
bus.unsubscribe(this)
}

@RequestHandler
fun handleRequest(request: MyRequest): MyResponse {
// Process the request and return a response
return MyResponse("Processed: ${request.message}")
}
}

fun sendRequest() {
val bus = KtBus.getDefault()
val request = MyRequest("Hello, KtBus!")
val response = bus.request<MyRequest, MyResponse>(request)
println("Response: ${response.result}")
}

suspend fun sendRequestAsync() {
val bus = KtBus.getDefault()
val request = MyRequest("Hello, KtBus!")
val response = bus.requestAsync<MyRequest, MyResponse>(request)
println("Response: ${response.result}")
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package org.holance.samples.ktbus

data class Event1(val message: String)
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.holance.samples.ktbus

import org.holance.ktbus.*

class SubscribeSample {
val bus = KtBus.getDefault()

fun setup() {
bus.subscribe(this)
}

fun tearDown() {
bus.unsubscribe(this)
}

@Subscribe
fun onEvent(event: Event1) {
println("Received event: ${event.message}")
}

@Subscribe(channel = "myChannel")
fun onEventFromMyChannel(event: Event1) {
println("Received event from myChannel: ${event.message}")
}

@Subscribe(scope = DispatcherTypes.IO)
fun onEventOnIO(event: Event1) {
println("Received event and process on IO thread: ${event.message}")
}
}

6 changes: 4 additions & 2 deletions lib/src/commonTest/kotlin/org/holance/ktbus/RequestTests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
data class Add1Request(val value: Int)
data class Multiply2Request(val value: Int)
Expand Down Expand Up @@ -59,7 +60,7 @@ class KtBusRequestTests {
}
runBlocking {
val result = withTimeoutOrNull<Boolean>(20.seconds) {
while (result1.size < iteration || result2.size < iteration) {
while (result2.size < iteration) {
delay(10)
}
return@withTimeoutOrNull true
Expand Down Expand Up @@ -161,7 +162,8 @@ class KtBusRequestTests {
}

@RequestHandler(scope = DispatcherTypes.IO)
fun onEvent2(event: Multiply2Request): Resp {
suspend fun onEvent2(event: Multiply2Request): Resp {
delay(10.milliseconds)
return Resp(event.value * 2)
}

Expand Down