Skip to content

Commit e11dc90

Browse files
marvin test
1 parent 102a2b0 commit e11dc90

1 file changed

Lines changed: 183 additions & 0 deletions

File tree

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""Smoke tests for firewall rules on VPC public IPs."""
19+
20+
from nose.plugins.attrib import attr
21+
22+
from marvin.cloudstackTestCase import cloudstackTestCase
23+
from marvin.lib.base import Account, FireWallRule, Network, NetworkOffering, PublicIPAddress, VPC, VpcOffering
24+
from marvin.lib.common import get_domain, get_zone, list_publicIP
25+
from marvin.lib.utils import cleanup_resources, wait_until
26+
27+
28+
class TestVpcFirewallRules(cloudstackTestCase):
29+
30+
@classmethod
31+
def setUpClass(cls):
32+
cls.testClient = super(TestVpcFirewallRules, cls).getClsTestClient()
33+
cls.apiclient = cls.testClient.getApiClient()
34+
cls.services = cls.testClient.getParsedTestDataConfig()
35+
cls.zone = get_zone(cls.apiclient, cls.testClient.getZoneForTests())
36+
cls.domain = get_domain(cls.apiclient)
37+
cls._cleanup = []
38+
39+
cls.account = Account.create(
40+
cls.apiclient,
41+
cls.services["account"],
42+
domainid=cls.domain.id
43+
)
44+
cls._cleanup.append(cls.account)
45+
46+
cls.services["vpc_offering"]["supportedservices"] = "Vpn,Dhcp,Dns,SourceNat,Lb,UserData,StaticNat,NetworkACL,PortForwarding,Firewall"
47+
cls.services["vpc_offering"]["serviceProviderList"] = {
48+
"Vpn": "VpcVirtualRouter",
49+
"Dhcp": "VpcVirtualRouter",
50+
"Dns": "VpcVirtualRouter",
51+
"SourceNat": "VpcVirtualRouter",
52+
"Lb": "VpcVirtualRouter",
53+
"UserData": "VpcVirtualRouter",
54+
"StaticNat": "VpcVirtualRouter",
55+
"NetworkACL": "VpcVirtualRouter",
56+
"PortForwarding": "VpcVirtualRouter",
57+
"Firewall": "VpcVirtualRouter"
58+
}
59+
60+
cls.vpc_offering = VpcOffering.create(
61+
cls.apiclient,
62+
cls.services["vpc_offering"]
63+
)
64+
cls.vpc_offering.update(cls.apiclient, state="Enabled")
65+
cls._cleanup.append(cls.vpc_offering)
66+
67+
network_offering = NetworkOffering.list(
68+
cls.apiclient,
69+
name="DefaultIsolatedNetworkOfferingForVpcNetworks"
70+
)
71+
cls.assertTrue(network_offering is not None and len(network_offering) > 0,
72+
"No VPC tier network offering found")
73+
cls.network_offering = network_offering[0]
74+
75+
cls.services["vpc"]["cidr"] = "10.20.30.0/24"
76+
cls.vpc = VPC.create(
77+
cls.apiclient,
78+
cls.services["vpc"],
79+
vpcofferingid=cls.vpc_offering.id,
80+
zoneid=cls.zone.id,
81+
account=cls.account.name,
82+
domainid=cls.account.domainid
83+
)
84+
cls._cleanup.append(cls.vpc)
85+
86+
cls.tier = Network.create(
87+
cls.apiclient,
88+
services={"name": "vpc-fw-tier", "displaytext": "vpc-fw-tier"},
89+
accountid=cls.account.name,
90+
domainid=cls.account.domainid,
91+
networkofferingid=cls.network_offering.id,
92+
zoneid=cls.zone.id,
93+
vpcid=cls.vpc.id,
94+
gateway="10.20.30.1",
95+
netmask="255.255.255.0"
96+
)
97+
cls._cleanup.append(cls.tier)
98+
99+
@classmethod
100+
def tearDownClass(cls):
101+
try:
102+
cleanup_resources(cls.apiclient, cls._cleanup)
103+
except Exception as e:
104+
raise Exception("Warning: Exception during cleanup: %s" % e)
105+
106+
def setUp(self):
107+
self.apiclient = self.testClient.getApiClient()
108+
self.cleanup = []
109+
110+
def tearDown(self):
111+
cleanup_resources(self.apiclient, self.cleanup)
112+
113+
def _wait_for_firewall_rule(self, rule_id):
114+
rules = FireWallRule.list(self.apiclient, id=rule_id, listall=True)
115+
if rules and len(rules) == 1:
116+
return True, rules[0]
117+
return False, None
118+
119+
@attr(tags=["advanced", "advancedns", "smoke"], required_hardware="false")
120+
def test_01_create_firewall_rule_on_vpc_public_ip(self):
121+
"""Verify firewall rule can be created and listed on a dedicated VPC public IP."""
122+
public_ip = PublicIPAddress.create(
123+
self.apiclient,
124+
zoneid=self.zone.id,
125+
accountid=self.account.name,
126+
domainid=self.account.domainid,
127+
vpcid=self.vpc.id
128+
)
129+
self.cleanup.append(public_ip)
130+
131+
firewall_rule = FireWallRule.create(
132+
self.apiclient,
133+
ipaddressid=public_ip.ipaddress.id,
134+
protocol="tcp",
135+
cidrlist=["0.0.0.0/0"],
136+
startport=19090,
137+
endport=19090,
138+
vpcid=self.vpc.id
139+
)
140+
self.cleanup.insert(0, firewall_rule)
141+
142+
result, listed_rule = wait_until(2, 10, self._wait_for_firewall_rule, firewall_rule.id)
143+
self.assertTrue(result, "Firewall rule was not listed for the VPC public IP")
144+
self.assertEqual(listed_rule.id, firewall_rule.id)
145+
self.assertEqual(listed_rule.ipaddressid, public_ip.ipaddress.id)
146+
self.assertEqual(listed_rule.vpcid, self.vpc.id)
147+
self.assertEqual(listed_rule.protocol.lower(), "tcp")
148+
self.assertEqual(int(listed_rule.startport), 19090)
149+
self.assertEqual(int(listed_rule.endport), 19090)
150+
151+
@attr(tags=["advanced", "advancedns", "smoke"], required_hardware="false")
152+
def test_02_create_firewall_rule_on_vpc_source_nat_ip(self):
153+
"""Verify firewall rule can be created and listed on the VPC source NAT IP."""
154+
source_nat_ips = list_publicIP(
155+
self.apiclient,
156+
vpcid=self.vpc.id,
157+
listall=True,
158+
issourcenat=True
159+
)
160+
self.assertTrue(source_nat_ips is not None and len(source_nat_ips) > 0,
161+
"No source NAT IP found for the VPC")
162+
source_nat_ip = source_nat_ips[0]
163+
164+
firewall_rule = FireWallRule.create(
165+
self.apiclient,
166+
ipaddressid=source_nat_ip.id,
167+
protocol="tcp",
168+
cidrlist=["0.0.0.0/0"],
169+
startport=19443,
170+
endport=19443,
171+
vpcid=self.vpc.id
172+
)
173+
self.cleanup.append(firewall_rule)
174+
175+
result, listed_rule = wait_until(2, 10, self._wait_for_firewall_rule, firewall_rule.id)
176+
self.assertTrue(result, "Firewall rule was not listed for the VPC source NAT IP")
177+
self.assertEqual(listed_rule.id, firewall_rule.id)
178+
self.assertEqual(listed_rule.ipaddressid, source_nat_ip.id)
179+
self.assertEqual(listed_rule.vpcid, self.vpc.id)
180+
self.assertEqual(listed_rule.protocol.lower(), "tcp")
181+
self.assertEqual(int(listed_rule.startport), 19443)
182+
self.assertEqual(int(listed_rule.endport), 19443)
183+

0 commit comments

Comments
 (0)