Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@ build/
.run
target/
Thumbs.db
run-app.sh
run-app.sh

### Eclipse ###
*.metadata
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# [Oracle Database Transactional Event Queues](https://www.oracle.com/database/advanced-queuing/) support for Spring Cloud Stream

This version of the binder supports Spring Boot 3+/Spring framework 6+.
This version of the binder supports Spring Boot 4+/Spring framework 7+.

For more information on queuing within Oracle Database, see the official documentation for [Transactional Event Queues](https://www.oracle.com/database/advanced-queuing/).

Expand Down
32 changes: 19 additions & 13 deletions database/spring-cloud-stream-binder-oracle-txeventq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<oracle-database.version>23.26.0.0.0</oracle-database.version>
<oracle-database.version>23.26.1.0.0</oracle-database.version>
<oracle.database.security.version>21.20.0.0</oracle.database.security.version>
<org.springframework.cloud.version>4.3.0</org.springframework.cloud.version>
<spring.boot.version>3.5.9</spring.boot.version>
<spring.framework.version>6.2.15</spring.framework.version>
<org.springframework.cloud.version>5.0.1</org.springframework.cloud.version>
<spring.boot.version>4.0.2</spring.boot.version>
<spring.framework.version>7.0.3</spring.framework.version>
<testcontainers.version>1.21.4</testcontainers.version>
<build-helper-maven-plugin.version>3.6.0</build-helper-maven-plugin.version>
<maven-javadoc-plugin.version>3.7.0</maven-javadoc-plugin.version>
Expand All @@ -73,13 +73,17 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-jms</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc11</artifactId>
Expand All @@ -98,7 +102,7 @@
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jms</artifactId>
<version>6.4.1</version>
<version>7.0.2</version>
</dependency>
<dependency>
<groupId>jakarta.jms</groupId>
Expand All @@ -108,23 +112,23 @@
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.16.1</version>
<version>1.21.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.14.0</version>
<version>3.20.0</version>
</dependency>
<dependency>
<groupId>com.oracle.database.messaging</groupId>
<artifactId>aqapi-jakarta</artifactId>
<version>23.3.1.0</version>
<version>23.8.0.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.17.1</version>
</dependency>
<groupId>tools.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>3.0.4</version>
</dependency>
<dependency>
<groupId>com.oracle.database.security</groupId>
<artifactId>oraclepki</artifactId>
Expand Down Expand Up @@ -210,6 +214,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.5.4</version>
<configuration>
<includes>
<include>**/*IT.java</include>
Expand Down Expand Up @@ -292,6 +297,7 @@
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>3.1.4</version>
<executions>
<execution>
<id>deploy</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<txeventq.streambinder.version>0.17.0</txeventq.streambinder.version>
<spring.boot.version>3.5.9</spring.boot.version>
<spring.boot.version>4.0.2</spring.boot.version>
<testcontainers.version>1.21.4</testcontainers.version>
</properties>

Expand All @@ -67,6 +67,27 @@
<scope>test</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>7.0.3</version> <!-- Matches your spring-core version -->
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.oracle.database.spring</groupId>
<artifactId>oracle-spring-boot-starter-ucp</artifactId>
<version>26.0.1</version>
</dependency>
<dependency>
<groupId>com.oracle.database.spring</groupId>
<artifactId>oracle-spring-boot-starter-aqjms</artifactId>
<version>26.0.0</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
Expand All @@ -86,4 +107,17 @@
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.2.5</version> <!-- Recommended version for Java 21 -->
<configuration>
<argLine>-XX:+EnableDynamicAgentLoading</argLine>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2024, Oracle and/or its affiliates.
// Copyright (c) 2024, 2026, Oracle and/or its affiliates.
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
package com.oracle.database.spring.cloud.stream.binder.sample;

Expand All @@ -10,4 +10,5 @@ public class TxEventQSampleApp {
public static void main(String[] args) {
SpringApplication.run(TxEventQSampleApp.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) 2024, 2026, Oracle and/or its affiliates.
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.

package com.oracle.database.spring.cloud.stream.binder.sample;

import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -15,14 +18,21 @@ public WordSupplier(String phrase) {

@Override
public String get() {
int i = idx.getAndAccumulate(words.length, (x, y) -> {
if (x < words.length - 1) {
return x + 1;
int currentIdx = idx.getAndIncrement();

// Check if we still have words to send
if (currentIdx < words.length) {
// If this was the very last word, mark us as done
if (currentIdx == words.length - 1) {
done.set(true);
}
done.set(true);
return 0;
});
return words[i];
return words[currentIdx];
}

done.set(true);

// Returning null tells Spring Cloud Stream to skip this poll
return null;
}

public boolean done() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
// Copyright (c) 2024, Oracle and/or its affiliates.
// Copyright (c) 2024, 2026, Oracle and/or its affiliates.
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
package com.oracle.database.spring.cloud.stream.binder.sample;

import java.time.Duration;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.cloud.stream.binding.BindingsLifecycleController;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.oracle.OracleContainer;
import org.testcontainers.utility.MountableFile;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.TimeUnit;
import static org.awaitility.Awaitility.await;

@SpringBootTest
@Testcontainers
Expand All @@ -31,6 +37,22 @@ public static void setUp() throws Exception {
oracleContainer.copyFileToContainer(MountableFile.forClasspathResource("init.sql"), "/tmp/init.sql");
oracleContainer.execInContainer("sqlplus", "sys / as sysdba", "@/tmp/init.sql");
}

@TestConfiguration
static class Config {
@Bean
public javax.sql.DataSource dataSource() throws java.sql.SQLException {
oracle.ucp.jdbc.PoolDataSource pds = oracle.ucp.jdbc.PoolDataSourceFactory.getPoolDataSource();
pds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
pds.setURL(oracleContainer.getJdbcUrl());
pds.setUser(oracleContainer.getUsername());
pds.setPassword(oracleContainer.getPassword());
pds.setInitialPoolSize(1);
pds.setMinPoolSize(1);
pds.setMaxPoolSize(3);
return pds;
}
}

@DynamicPropertySource
static void properties(DynamicPropertyRegistry registry) {
Expand All @@ -44,16 +66,29 @@ static void properties(DynamicPropertyRegistry registry) {

@Autowired
BindingsLifecycleController lifecycleController;

/**
* This ensures the JMS Poller stops BEFORE the UCP DataSource closes.
*/
@AfterEach
void stopBindings() {
if (lifecycleController != null) {
lifecycleController.queryStates().forEach(state -> {
String bindingName = (String) state.get("bindingName");
lifecycleController.stop(bindingName);
});
}
}

@Test
void processStream() throws InterruptedException {
// Process all words from the word supplier message stream.
do {
Thread.sleep(100);
} while (!wordSupplier.done());

// Shutdown all messaging beans.
lifecycleController.queryStates().forEach((state) ->
lifecycleController.stop((String) state.get("bindingName")));
await()
.atMost(30, TimeUnit.SECONDS)
.pollInterval(Duration.ofSeconds(2))
.until(() -> {
// Check if the supplier is done OR if we've reached
// a target number of "Consumed" logs.
return wordSupplier.done();
});
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
** TxEventQ Support for Spring Cloud Stream
** Copyright (c) 2023, 2024 Oracle and/or its affiliates.
** Copyright (c) 2023, 2026 Oracle and/or its affiliates.
**
** This file has been modified by Oracle Corporation.
**
Expand Down Expand Up @@ -50,8 +50,8 @@
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.integration.core.RecoveryCallback;
import org.springframework.core.retry.RetryTemplate;


public class JMSMessageChannelBinder
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
** TxEventQ Support for Spring Cloud Stream
** Copyright (c) 2023, 2024 Oracle and/or its affiliates.
** Copyright (c) 2023, 2026 Oracle and/or its affiliates.
**
** This file has been modified by Oracle Corporation.
*/
Expand Down Expand Up @@ -38,7 +38,7 @@
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.jms.JndiConnectionFactoryAutoConfiguration;
import org.springframework.boot.jms.autoconfigure.JndiConnectionFactoryAutoConfiguration;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
Expand Down
Loading