Compare commits

...

11 Commits

Author SHA1 Message Date
d3ebb245df Update .gitreview for unmaintained/2023.1
Change-Id: I014edd55d698b1f05190acf34806a484b360da43
2024-11-14 10:35:40 +00:00
0ac4dc3ae2 Update TOX_CONSTRAINTS_FILE for stable/2023.1
Update the URL to the upper-constraints file to point to the redirect
rule on releases.openstack.org so that anyone working on this branch
will switch to the correct upper-constraints list automatically when
the requirements repository branches.

Until the requirements repository has as stable/2023.1 branch, tests will
continue to use the upper-constraints list on master.

Change-Id: Idca37464ce6fe79bf822be0f4c55baf95ebe4ef2
2023-03-02 13:26:40 +00:00
099eedf120 Update .gitreview for stable/2023.1
Change-Id: Id28f62454164f6a9bb7a0741520742f61b82ba65
2023-03-02 13:26:39 +00:00
Rodolfo Alonso Hernandez
16a2cd127d Remove CLI warning message
This warning was added in [1]. This patch is partially reverting it
but we keep the release note. This warning message could be a bit
intrusive for those users still using it. In any case, the
recommendation to move to OSC is something already known and
publicly requested.

[1]https://review.opendev.org/c/openstack/python-neutronclient/+/862371

Change-Id: I0c2fba3e45a9de1eba09efcf8919661a855c7e89
2023-02-14 08:30:20 +01:00
elajkat
68cbf56f9c Move network trunk commands from python-neutronclient
The depends-on patch adds trunk commands to OSC, as we can
long consider trunk operations as core Networking operations.

Change-Id: Ie557a5d541cf117d20f3f2b548620a74dbadb383
Depends-On: https://review.opendev.org/c/openstack/python-openstackclient/+/869447
Related-Bug: #1999774
2023-01-16 14:18:27 +01:00
elajkat
33f1c89a84 Tox4: add allowlist_externals where necessary
With tox4 allowlist_externals is more strictly checked, so
fix it where necessary and fix pylint version.

Depends-On: https://review.opendev.org/c/zuul/zuul-jobs/+/866943
Related-Bug: #1999558

Change-Id: Id115a436b95b3ede5a1f3102b4bb9e3ade75c970
2023-01-16 08:34:36 +00:00
Elvira García
776e360e35 Fix help sentence in network log create --enable
As the documentation states [1], the default in network log objects is
to be enabled, not disabled.

[1] https://docs.openstack.org/neutron/latest/admin/config-logging.html

Change-Id: I13e9d1132fc38104e6e85d9c8442bc7506adc2fd
2022-11-25 15:32:46 +01:00
elajkat
f67af3d9be Add warning and reno for SDK
On the 2023.1 (Antelope) PTG we discussed the status of the python
binding (SDK) code in python-neutronclient and decided to not allow new
features to this repo (see [1]), and make users to use openstacksdk.
Let's add a warning log message and a releasenote to make it visible.

[1]: https://etherpad.opendev.org/p/neutron-antelope-ptg#L163

Change-Id: I03317179bd0d30a69b91eef6e451b8e40eb28191
2022-10-21 16:40:38 +02:00
ec84aff516 Switch to 2023.1 Python3 unit tests and generic template name
This is an automatically generated patch to ensure unit testing
is in place for all the of the tested runtimes for antelope. Also,
updating the template name to generic one.

See also the PTI in governance [1].

[1]: https://governance.openstack.org/tc/reference/project-testing-interface.html

Change-Id: I026505ff0d277fd4f15329ed26a5cecf1d573f68
2022-09-14 09:26:48 +00:00
f060429cfc Update master for stable/zed
Add file to the reno documentation build to show release notes for
stable/zed.

Use pbr instruction to increment the minor version number
automatically so that master versions are higher than the versions on
stable/zed.

Sem-Ver: feature
Change-Id: Ie6a0efba406cd26606168ce5b6dbc8ee7ae759ed
2022-09-09 11:48:19 +00:00
Pedro Henrique
7467c710f6 Add support to floating ip port forwarding
To extend Horizon to allow users to create
port forwarding rules for their floating ips,
we need to extend this client to allow it,
as Horizon uses this client.

This patch is the one of a series of patches
to implement floating ip port forwarding with
port ranges.

The specification is defined in:
https://github.com/openstack/neutron-specs/blob/master/specs/wallaby/port-forwarding-port-ranges.rst

Implements: blueprint floatingips-portforwarding-ranges
Related-Bug: #1885921
Change-Id: I3f616dba5e2ebe301cf6ce4bed8c2e6e4da2da9b
2022-07-15 10:06:03 -03:00
17 changed files with 57 additions and 1282 deletions

View File

@@ -2,3 +2,4 @@
host=review.opendev.org
port=29418
project=openstack/python-neutronclient.git
defaultbranch=unmaintained/2023.1

View File

@@ -1,7 +1,7 @@
- project:
templates:
- openstack-cover-jobs
- openstack-python3-zed-jobs
- openstack-python3-jobs
- publish-openstack-docs-pti
- check-requirements
- lib-forward-testing-python3

View File

@@ -4,3 +4,4 @@
openstackdocstheme>=2.2.0 # Apache-2.0
reno>=3.1.0 # Apache-2.0
sphinx>=2.0.0,!=2.1.0 # BSD
cliff>=3.4.0 # Apache-2.0

View File

@@ -1,16 +0,0 @@
=============
network trunk
=============
A **network trunk** is a container to group logical ports from different
networks and provide a single trunked vNIC for servers. It consists of
one parent port which is a regular VIF and multiple subports which allow
the server to connect to more networks.
Network v2
.. autoprogram-cliff:: openstack.neutronclient.v2
:command: network subport list
.. autoprogram-cliff:: openstack.neutronclient.v2
:command: network trunk *

View File

@@ -58,11 +58,11 @@ def _get_common_parser(parser):
enable_group.add_argument(
'--enable',
action='store_true',
help=_('Enable this log (default is disabled)'))
help=_('Enable this log'))
enable_group.add_argument(
'--disable',
action='store_true',
help=_('Disable this log'))
help=_('Disable this log (default is enabled)'))
return parser

View File

@@ -1,393 +0,0 @@
# Copyright 2016 ZTE Corporation.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Network trunk and subports action implementations"""
import logging
from osc_lib.cli import format_columns
from osc_lib.cli import parseractions
from osc_lib.command import command
from osc_lib import exceptions
from osc_lib import utils as osc_utils
from neutronclient._i18n import _
from neutronclient.osc import utils as nc_osc_utils
from neutronclient.osc.v2 import utils as v2_utils
LOG = logging.getLogger(__name__)
TRUNK = 'trunk'
TRUNKS = 'trunks'
SUB_PORTS = 'sub_ports'
class CreateNetworkTrunk(command.ShowOne):
"""Create a network trunk for a given project"""
def get_parser(self, prog_name):
parser = super(CreateNetworkTrunk, self).get_parser(prog_name)
parser.add_argument(
'name',
metavar='<name>',
help=_("Name of the trunk to create")
)
parser.add_argument(
'--description',
metavar='<description>',
help=_("A description of the trunk")
)
parser.add_argument(
'--parent-port',
metavar='<parent-port>',
required=True,
help=_("Parent port belonging to this trunk (name or ID)")
)
parser.add_argument(
'--subport',
metavar='<port=,segmentation-type=,segmentation-id=>',
action=parseractions.MultiKeyValueAction, dest='add_subports',
optional_keys=['segmentation-id', 'segmentation-type'],
required_keys=['port'],
help=_("Subport to add. Subport is of form "
"\'port=<name or ID>,segmentation-type=,segmentation-ID=\' "
"(--subport) option can be repeated")
)
admin_group = parser.add_mutually_exclusive_group()
admin_group.add_argument(
'--enable',
action='store_true',
default=True,
help=_("Enable trunk (default)")
)
admin_group.add_argument(
'--disable',
action='store_true',
help=_("Disable trunk")
)
nc_osc_utils.add_project_owner_option_to_parser(parser)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
attrs = _get_attrs_for_trunk(self.app.client_manager,
parsed_args)
body = {TRUNK: attrs}
obj = client.create_trunk(body)
columns = _get_columns(obj[TRUNK])
data = osc_utils.get_dict_properties(obj[TRUNK], columns,
formatters=_formatters)
return columns, data
class DeleteNetworkTrunk(command.Command):
"""Delete a given network trunk"""
def get_parser(self, prog_name):
parser = super(DeleteNetworkTrunk, self).get_parser(prog_name)
parser.add_argument(
'trunk',
metavar="<trunk>",
nargs="+",
help=_("Trunk(s) to delete (name or ID)")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
result = 0
for trunk in parsed_args.trunk:
try:
trunk_id = _get_id(client, trunk, TRUNK)
client.delete_trunk(trunk_id)
except Exception as e:
result += 1
LOG.error(_("Failed to delete trunk with name "
"or ID '%(trunk)s': %(e)s"),
{'trunk': trunk, 'e': e})
if result > 0:
total = len(parsed_args.trunk)
msg = (_("%(result)s of %(total)s trunks failed "
"to delete.") % {'result': result, 'total': total})
raise exceptions.CommandError(msg)
class ListNetworkTrunk(command.Lister):
"""List all network trunks"""
def get_parser(self, prog_name):
parser = super(ListNetworkTrunk, self).get_parser(prog_name)
parser.add_argument(
'--long',
action='store_true',
default=False,
help=_("List additional fields in output")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
data = client.list_trunks()
headers = (
'ID',
'Name',
'Parent Port',
'Description'
)
columns = (
'id',
'name',
'port_id',
'description'
)
if parsed_args.long:
headers += (
'Status',
'State',
'Created At',
'Updated At',
)
columns += (
'status',
'admin_state_up',
'created_at',
'updated_at'
)
return (headers,
(osc_utils.get_dict_properties(
s, columns,
formatters=_formatters,
) for s in data[TRUNKS]))
class SetNetworkTrunk(command.Command):
"""Set network trunk properties"""
def get_parser(self, prog_name):
parser = super(SetNetworkTrunk, self).get_parser(prog_name)
parser.add_argument(
'trunk',
metavar="<trunk>",
help=_("Trunk to modify (name or ID)")
)
parser.add_argument(
'--name',
metavar="<name>",
help=_("Set trunk name")
)
parser.add_argument(
'--description',
metavar='<description>',
help=_("A description of the trunk")
)
parser.add_argument(
'--subport',
metavar='<port=,segmentation-type=,segmentation-id=>',
action=parseractions.MultiKeyValueAction, dest='set_subports',
optional_keys=['segmentation-id', 'segmentation-type'],
required_keys=['port'],
help=_("Subport to add. Subport is of form "
"\'port=<name or ID>,segmentation-type=,segmentation-ID=\'"
"(--subport) option can be repeated")
)
admin_group = parser.add_mutually_exclusive_group()
admin_group.add_argument(
'--enable',
action='store_true',
help=_("Enable trunk")
)
admin_group.add_argument(
'--disable',
action='store_true',
help=_("Disable trunk")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
trunk_id = _get_id(client, parsed_args.trunk, TRUNK)
attrs = _get_attrs_for_trunk(self.app.client_manager, parsed_args)
body = {TRUNK: attrs}
try:
client.update_trunk(trunk_id, body)
except Exception as e:
msg = (_("Failed to set trunk '%(t)s': %(e)s")
% {'t': parsed_args.trunk, 'e': e})
raise exceptions.CommandError(msg)
if parsed_args.set_subports:
subport_attrs = _get_attrs_for_subports(self.app.client_manager,
parsed_args)
try:
client.trunk_add_subports(trunk_id, subport_attrs)
except Exception as e:
msg = (_("Failed to add subports to trunk '%(t)s': %(e)s")
% {'t': parsed_args.trunk, 'e': e})
raise exceptions.CommandError(msg)
class ShowNetworkTrunk(command.ShowOne):
"""Show information of a given network trunk"""
def get_parser(self, prog_name):
parser = super(ShowNetworkTrunk, self).get_parser(prog_name)
parser.add_argument(
'trunk',
metavar="<trunk>",
help=_("Trunk to display (name or ID)")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
trunk_id = _get_id(client, parsed_args.trunk, TRUNK)
obj = client.show_trunk(trunk_id)
columns = _get_columns(obj[TRUNK])
data = osc_utils.get_dict_properties(obj[TRUNK], columns,
formatters=_formatters)
return columns, data
class ListNetworkSubport(command.Lister):
"""List all subports for a given network trunk"""
def get_parser(self, prog_name):
parser = super(ListNetworkSubport, self).get_parser(prog_name)
parser.add_argument(
'--trunk',
required=True,
metavar="<trunk>",
help=_("List subports belonging to this trunk (name or ID)")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
trunk_id = _get_id(client, parsed_args.trunk, TRUNK)
data = client.trunk_get_subports(trunk_id)
headers = ('Port', 'Segmentation Type', 'Segmentation ID')
columns = ('port_id', 'segmentation_type', 'segmentation_id')
return (headers,
(osc_utils.get_dict_properties(
s, columns,
) for s in data[SUB_PORTS]))
class UnsetNetworkTrunk(command.Command):
"""Unset subports from a given network trunk"""
def get_parser(self, prog_name):
parser = super(UnsetNetworkTrunk, self).get_parser(prog_name)
parser.add_argument(
'trunk',
metavar="<trunk>",
help=_("Unset subports from this trunk (name or ID)")
)
parser.add_argument(
'--subport',
metavar="<subport>",
required=True,
action='append', dest='unset_subports',
help=_("Subport to delete (name or ID of the port) "
"(--subport) option can be repeated")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
attrs = _get_attrs_for_subports(self.app.client_manager, parsed_args)
trunk_id = _get_id(client, parsed_args.trunk, TRUNK)
client.trunk_remove_subports(trunk_id, attrs)
_formatters = {
'admin_state_up': v2_utils.AdminStateColumn,
'sub_ports': format_columns.ListDictColumn,
}
def _get_columns(item):
return tuple(sorted(list(item.keys())))
def _get_attrs_for_trunk(client_manager, parsed_args):
attrs = {}
if parsed_args.name is not None:
attrs['name'] = str(parsed_args.name)
if parsed_args.description is not None:
attrs['description'] = str(parsed_args.description)
if parsed_args.enable:
attrs['admin_state_up'] = True
if parsed_args.disable:
attrs['admin_state_up'] = False
if 'parent_port' in parsed_args and parsed_args.parent_port is not None:
port_id = _get_id(client_manager.neutronclient,
parsed_args.parent_port, 'port')
attrs['port_id'] = port_id
if 'add_subports' in parsed_args and parsed_args.add_subports is not None:
attrs[SUB_PORTS] = _format_subports(client_manager,
parsed_args.add_subports)
# "trunk set" command doesn't support setting project.
if 'project' in parsed_args and parsed_args.project is not None:
identity_client = client_manager.identity
project_id = nc_osc_utils.find_project(
identity_client,
parsed_args.project,
parsed_args.project_domain,
).id
attrs['tenant_id'] = project_id
return attrs
def _format_subports(client_manager, subports):
attrs = []
for subport in subports:
subport_attrs = {}
if subport.get('port'):
port_id = _get_id(client_manager.neutronclient,
subport['port'], 'port')
subport_attrs['port_id'] = port_id
if subport.get('segmentation-id'):
try:
subport_attrs['segmentation_id'] = int(
subport['segmentation-id'])
except ValueError:
msg = (_("Segmentation-id '%s' is not an integer") %
subport['segmentation-id'])
raise exceptions.CommandError(msg)
if subport.get('segmentation-type'):
subport_attrs['segmentation_type'] = subport['segmentation-type']
attrs.append(subport_attrs)
return attrs
def _get_attrs_for_subports(client_manager, parsed_args):
attrs = {}
if 'set_subports' in parsed_args and parsed_args.set_subports is not None:
attrs[SUB_PORTS] = _format_subports(client_manager,
parsed_args.set_subports)
if ('unset_subports' in parsed_args and
parsed_args.unset_subports is not None):
subports_list = []
for subport in parsed_args.unset_subports:
port_id = _get_id(client_manager.neutronclient,
subport, 'port')
subports_list.append({'port_id': port_id})
attrs[SUB_PORTS] = subports_list
return attrs
def _get_id(client, id_or_name, resource):
return client.find_resource(resource, str(id_or_name))['id']

View File

@@ -1,87 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from unittest import mock
from oslo_utils import uuidutils
class FakeTrunk(object):
"""Fake one or more trunks."""
@staticmethod
def create_one_trunk(attrs=None):
"""Create a fake trunk.
:param Dictionary attrs:
A dictionary with all attributes
:return:
A Dictionary with id, name, description, admin_state_up, port_id,
sub_ports, status and project_id
"""
attrs = attrs or {}
# Set default attributes.
trunk_attrs = {
'id': 'trunk-id-' + uuidutils.generate_uuid(dashed=False),
'name': 'trunk-name-' + uuidutils.generate_uuid(dashed=False),
'description': '',
'port_id': 'port-' + uuidutils.generate_uuid(dashed=False),
'admin_state_up': True,
'project_id': 'project-id-' +
uuidutils.generate_uuid(dashed=False),
'status': 'ACTIVE',
'sub_ports': [{'port_id': 'subport-' +
uuidutils.generate_uuid(dashed=False),
'segmentation_type': 'vlan',
'segmentation_id': 100}],
}
# Overwrite default attributes.
trunk_attrs.update(attrs)
return copy.deepcopy(trunk_attrs)
@staticmethod
def create_trunks(attrs=None, count=2):
"""Create multiple fake trunks.
:param Dictionary attrs:
A dictionary with all attributes
:param int count:
The number of routers to fake
:return:
A list of dictionaries faking the trunks
"""
trunks = []
for i in range(0, count):
trunks.append(FakeTrunk.create_one_trunk(attrs))
return trunks
@staticmethod
def get_trunks(trunks=None, count=2):
"""Get an iterable Mock object with a list of faked trunks.
If trunks list is provided, then initialize the Mock object with the
list. Otherwise create one.
:param List trunks:
A list of FakeResource objects faking trunks
:param int count:
The number of trunks to fake
:return:
An iterable Mock object with side_effect set to a list of faked
trunks
"""
if trunks is None:
trunks = FakeTrunk.create_trunks(count)
return mock.Mock(side_effect=trunks)

View File

@@ -1,769 +0,0 @@
# Copyright 2016 ZTE Corporation.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import copy
from unittest import mock
from unittest.mock import call
from osc_lib.cli import format_columns
from osc_lib import exceptions
from osc_lib.tests import utils as tests_utils
import testtools
from neutronclient.osc.v2.trunk import network_trunk as trunk
from neutronclient.osc.v2 import utils as v2_utils
from neutronclient.tests.unit.osc.v2 import fakes as test_fakes
from neutronclient.tests.unit.osc.v2.trunk import fakes
def _get_id(client, id_or_name, resource):
return id_or_name
class TestCreateNetworkTrunk(test_fakes.TestNeutronClientOSCV2):
# The new trunk created
_trunk = fakes.FakeTrunk.create_one_trunk()
columns = (
'admin_state_up',
'description',
'id',
'name',
'port_id',
'project_id',
'status',
'sub_ports',
)
def get_data(self):
return (
v2_utils.AdminStateColumn(self._trunk['admin_state_up']),
self._trunk['description'],
self._trunk['id'],
self._trunk['name'],
self._trunk['port_id'],
self._trunk['project_id'],
self._trunk['status'],
format_columns.ListDictColumn(self._trunk['sub_ports']),
)
def setUp(self):
super(TestCreateNetworkTrunk, self).setUp()
mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
new=_get_id).start()
self.neutronclient.create_trunk = mock.Mock(
return_value={trunk.TRUNK: self._trunk})
self.data = self.get_data()
# Get the command object to test
self.cmd = trunk.CreateNetworkTrunk(self.app, self.namespace)
def test_create_no_options(self):
arglist = []
verifylist = []
self.assertRaises(tests_utils.ParserException, self.check_parser,
self.cmd, arglist, verifylist)
def test_create_default_options(self):
arglist = [
"--parent-port", self._trunk['port_id'],
self._trunk['name'],
]
verifylist = [
('parent_port', self._trunk['port_id']),
('name', self._trunk['name']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = (self.cmd.take_action(parsed_args))
self.neutronclient.create_trunk.assert_called_once_with({
trunk.TRUNK: {'name': self._trunk['name'],
'admin_state_up': self._trunk['admin_state_up'],
'port_id': self._trunk['port_id']}
})
self.assertEqual(self.columns, columns)
self.assertItemEqual(self.data, data)
def test_create_full_options(self):
self._trunk['description'] = 'foo description'
self.data = self.get_data()
subport = self._trunk['sub_ports'][0]
arglist = [
"--disable",
"--description", self._trunk['description'],
"--parent-port", self._trunk['port_id'],
"--subport", 'port=%(port)s,segmentation-type=%(seg_type)s,'
'segmentation-id=%(seg_id)s' % {
'seg_id': subport['segmentation_id'],
'seg_type': subport['segmentation_type'],
'port': subport['port_id']},
self._trunk['name'],
]
verifylist = [
('name', self._trunk['name']),
('description', self._trunk['description']),
('parent_port', self._trunk['port_id']),
('add_subports', [{
'port': subport['port_id'],
'segmentation-id': str(subport['segmentation_id']),
'segmentation-type': subport['segmentation_type']}]),
('disable', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = (self.cmd.take_action(parsed_args))
self.neutronclient.create_trunk.assert_called_once_with({
trunk.TRUNK: {'name': self._trunk['name'],
'description': self._trunk['description'],
'admin_state_up': False,
'sub_ports': [subport],
'port_id': self._trunk['port_id']}
})
self.assertEqual(self.columns, columns)
self.assertItemEqual(self.data, data)
def test_create_trunk_with_subport_invalid_segmentation_id_fail(self):
subport = self._trunk['sub_ports'][0]
arglist = [
"--parent-port", self._trunk['port_id'],
"--subport", "port=%(port)s,segmentation-type=%(seg_type)s,"
"segmentation-id=boom" % {
'seg_type': subport['segmentation_type'],
'port': subport['port_id']},
self._trunk['name'],
]
verifylist = [
('name', self._trunk['name']),
('parent_port', self._trunk['port_id']),
('add_subports', [{
'port': subport['port_id'],
'segmentation-id': 'boom',
'segmentation-type': subport['segmentation_type']}]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
with testtools.ExpectedException(exceptions.CommandError) as e:
self.cmd.take_action(parsed_args)
self.assertEqual("Segmentation-id 'boom' is not an integer",
str(e))
def test_create_network_trunk_subports_without_optional_keys(self):
subport = copy.copy(self._trunk['sub_ports'][0])
# Pop out the segmentation-id and segmentation-type
subport.pop('segmentation_type')
subport.pop('segmentation_id')
arglist = [
'--parent-port', self._trunk['port_id'],
'--subport', 'port=%(port)s' % {'port': subport['port_id']},
self._trunk['name'],
]
verifylist = [
('name', self._trunk['name']),
('parent_port', self._trunk['port_id']),
('add_subports', [{
'port': subport['port_id']}]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = (self.cmd.take_action(parsed_args))
self.neutronclient.create_trunk.assert_called_once_with({
trunk.TRUNK: {'name': self._trunk['name'],
'admin_state_up': True,
'sub_ports': [subport],
'port_id': self._trunk['port_id']}
})
self.assertEqual(self.columns, columns)
self.assertItemEqual(self.data, data)
def test_create_network_trunk_subports_without_required_key_fail(self):
subport = self._trunk['sub_ports'][0]
arglist = [
'--parent-port', self._trunk['port_id'],
'--subport', 'segmentation-type=%(seg_type)s,'
'segmentation-id=%(seg_id)s' % {
'seg_id': subport['segmentation_id'],
'seg_type': subport['segmentation_type']},
self._trunk['name'],
]
verifylist = [
('name', self._trunk['name']),
('parent_port', self._trunk['port_id']),
('add_subports', [{
'segmentation-id': str(subport['segmentation_id']),
'segmentation-type': subport['segmentation_type']}]),
]
with testtools.ExpectedException(argparse.ArgumentTypeError):
self.check_parser(self.cmd, arglist, verifylist)
class TestDeleteNetworkTrunk(test_fakes.TestNeutronClientOSCV2):
# The trunk to be deleted.
_trunks = fakes.FakeTrunk.create_trunks(count=2)
def setUp(self):
super(TestDeleteNetworkTrunk, self).setUp()
mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
new=_get_id).start()
self.neutronclient.delete_trunk = mock.Mock(return_value=None)
# Get the command object to test
self.cmd = trunk.DeleteNetworkTrunk(self.app, self.namespace)
def test_delete_trunk(self):
arglist = [
self._trunks[0]['name'],
]
verifylist = [
('trunk', [self._trunks[0]['name']]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.neutronclient.delete_trunk.assert_called_once_with(
self._trunks[0]['name'])
self.assertIsNone(result)
def test_delete_trunk_multiple(self):
arglist = []
verifylist = []
for t in self._trunks:
arglist.append(t['name'])
verifylist = [
('trunk', arglist),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
calls = []
for t in self._trunks:
calls.append(call(t['name']))
self.neutronclient.delete_trunk.assert_has_calls(calls)
self.assertIsNone(result)
def test_delete_trunk_multiple_with_exception(self):
arglist = [
self._trunks[0]['name'],
'unexist_trunk',
]
verifylist = [
('trunk',
[self._trunks[0]['name'], 'unexist_trunk']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
get_mock_result = [self._trunks[0], exceptions.CommandError]
trunk._get_id = (
mock.Mock(side_effect=get_mock_result)
)
with testtools.ExpectedException(exceptions.CommandError) as e:
self.cmd.take_action(parsed_args)
self.assertEqual('1 of 2 trunks failed to delete.', str(e))
self.neutronclient.delete_trunk.assert_called_once_with(
self._trunks[0]
)
class TestShowNetworkTrunk(test_fakes.TestNeutronClientOSCV2):
# The trunk to set.
_trunk = fakes.FakeTrunk.create_one_trunk()
columns = (
'admin_state_up',
'description',
'id',
'name',
'port_id',
'project_id',
'status',
'sub_ports',
)
data = (
v2_utils.AdminStateColumn(_trunk['admin_state_up']),
_trunk['description'],
_trunk['id'],
_trunk['name'],
_trunk['port_id'],
_trunk['project_id'],
_trunk['status'],
format_columns.ListDictColumn(_trunk['sub_ports']),
)
def setUp(self):
super(TestShowNetworkTrunk, self).setUp()
mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
new=_get_id).start()
self.neutronclient.show_trunk = mock.Mock(
return_value={trunk.TRUNK: self._trunk})
# Get the command object to test
self.cmd = trunk.ShowNetworkTrunk(self.app, self.namespace)
def test_show_no_options(self):
arglist = []
verifylist = []
self.assertRaises(tests_utils.ParserException, self.check_parser,
self.cmd, arglist, verifylist)
def test_show_all_options(self):
arglist = [
self._trunk['id'],
]
verifylist = [
('trunk', self._trunk['id']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.neutronclient.show_trunk.assert_called_once_with(
self._trunk['id'])
self.assertEqual(self.columns, columns)
self.assertItemEqual(self.data, data)
class TestListNetworkTrunk(test_fakes.TestNeutronClientOSCV2):
# Create trunks to be listed.
_trunks = fakes.FakeTrunk.create_trunks(
{'created_at': '2001-01-01 00:00:00',
'updated_at': '2001-01-01 00:00:00'}, count=3)
columns = (
'ID',
'Name',
'Parent Port',
'Description'
)
columns_long = columns + (
'Status',
'State',
'Created At',
'Updated At'
)
data = []
for t in _trunks:
data.append((
t['id'],
t['name'],
t['port_id'],
t['description']
))
data_long = []
for t in _trunks:
data_long.append((
t['id'],
t['name'],
t['port_id'],
t['description'],
t['status'],
v2_utils.AdminStateColumn(t['admin_state_up']),
'2001-01-01 00:00:00',
'2001-01-01 00:00:00',
))
def setUp(self):
super(TestListNetworkTrunk, self).setUp()
mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
new=_get_id).start()
self.neutronclient.list_trunks = mock.Mock(
return_value={trunk.TRUNKS: self._trunks})
# Get the command object to test
self.cmd = trunk.ListNetworkTrunk(self.app, self.namespace)
def test_trunk_list_no_option(self):
arglist = []
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.neutronclient.list_trunks.assert_called_once_with()
self.assertEqual(self.columns, columns)
self.assertListItemEqual(self.data, list(data))
def test_trunk_list_long(self):
arglist = [
'--long',
]
verifylist = [
('long', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.neutronclient.list_trunks.assert_called_once_with()
self.assertEqual(self.columns_long, columns)
self.assertListItemEqual(self.data_long, list(data))
class TestSetNetworkTrunk(test_fakes.TestNeutronClientOSCV2):
# Create trunks to be listed.
_trunk = fakes.FakeTrunk.create_one_trunk()
columns = (
'admin_state_up',
'id',
'name',
'description',
'port_id',
'project_id',
'status',
'sub_ports',
)
data = (
v2_utils.AdminStateColumn(_trunk['admin_state_up']),
_trunk['id'],
_trunk['name'],
_trunk['description'],
_trunk['port_id'],
_trunk['project_id'],
_trunk['status'],
format_columns.ListDictColumn(_trunk['sub_ports']),
)
def setUp(self):
super(TestSetNetworkTrunk, self).setUp()
mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
new=_get_id).start()
self.neutronclient.update_trunk = mock.Mock(
return_value={trunk.TRUNK: self._trunk})
self.neutronclient.trunk_add_subports = mock.Mock(
return_value=self._trunk)
# Get the command object to test
self.cmd = trunk.SetNetworkTrunk(self.app, self.namespace)
def _test_set_network_trunk_attr(self, attr, value):
arglist = [
'--%s' % attr, value,
self._trunk[attr],
]
verifylist = [
(attr, value),
('trunk', self._trunk[attr]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
attr: value,
}
self.neutronclient.update_trunk.assert_called_once_with(
self._trunk[attr], {trunk.TRUNK: attrs})
self.assertIsNone(result)
def test_set_network_trunk_name(self):
self._test_set_network_trunk_attr('name', 'trunky')
def test_test_set_network_trunk_description(self):
self._test_set_network_trunk_attr('description', 'description')
def test_set_network_trunk_admin_state_up_disable(self):
arglist = [
'--disable',
self._trunk['name'],
]
verifylist = [
('disable', True),
('trunk', self._trunk['name']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'admin_state_up': False,
}
self.neutronclient.update_trunk.assert_called_once_with(
self._trunk['name'], {trunk.TRUNK: attrs})
self.assertIsNone(result)
def test_set_network_trunk_admin_state_up_enable(self):
arglist = [
'--enable',
self._trunk['name'],
]
verifylist = [
('enable', True),
('trunk', self._trunk['name']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'admin_state_up': True,
}
self.neutronclient.update_trunk.assert_called_once_with(
self._trunk['name'], {trunk.TRUNK: attrs})
self.assertIsNone(result)
def test_set_network_trunk_nothing(self):
arglist = [self._trunk['name'], ]
verifylist = [('trunk', self._trunk['name']), ]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {}
self.neutronclient.update_trunk.assert_called_once_with(
self._trunk['name'], {trunk.TRUNK: attrs})
self.assertIsNone(result)
def test_set_network_trunk_subports(self):
subport = self._trunk['sub_ports'][0]
arglist = [
'--subport', 'port=%(port)s,segmentation-type=%(seg_type)s,'
'segmentation-id=%(seg_id)s' % {
'seg_id': subport['segmentation_id'],
'seg_type': subport['segmentation_type'],
'port': subport['port_id']},
self._trunk['name'],
]
verifylist = [
('trunk', self._trunk['name']),
('set_subports', [{
'port': subport['port_id'],
'segmentation-id': str(subport['segmentation_id']),
'segmentation-type': subport['segmentation_type']}]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.neutronclient.trunk_add_subports.assert_called_once_with(
self._trunk['name'], {'sub_ports': [subport]}
)
self.assertIsNone(result)
def test_set_network_trunk_subports_without_optional_keys(self):
subport = copy.copy(self._trunk['sub_ports'][0])
# Pop out the segmentation-id and segmentation-type
subport.pop('segmentation_type')
subport.pop('segmentation_id')
arglist = [
'--subport', 'port=%(port)s' % {'port': subport['port_id']},
self._trunk['name'],
]
verifylist = [
('trunk', self._trunk['name']),
('set_subports', [{
'port': subport['port_id']}]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.neutronclient.trunk_add_subports.assert_called_once_with(
self._trunk['name'], {'sub_ports': [subport]}
)
self.assertIsNone(result)
def test_set_network_trunk_subports_without_required_key_fail(self):
subport = self._trunk['sub_ports'][0]
arglist = [
'--subport', 'segmentation-type=%(seg_type)s,'
'segmentation-id=%(seg_id)s' % {
'seg_id': subport['segmentation_id'],
'seg_type': subport['segmentation_type']},
self._trunk['name'],
]
verifylist = [
('trunk', self._trunk['name']),
('set_subports', [{
'segmentation-id': str(subport['segmentation_id']),
'segmentation-type': subport['segmentation_type']}]),
]
with testtools.ExpectedException(argparse.ArgumentTypeError):
self.check_parser(self.cmd, arglist, verifylist)
self.neutronclient.trunk_add_subports.assert_not_called()
def test_set_trunk_attrs_with_exception(self):
arglist = [
'--name', 'reallylongname',
self._trunk['name'],
]
verifylist = [
('trunk', self._trunk['name']),
('name', 'reallylongname'),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.neutronclient.update_trunk = (
mock.Mock(side_effect=exceptions.CommandError)
)
with testtools.ExpectedException(exceptions.CommandError) as e:
self.cmd.take_action(parsed_args)
self.assertEqual(
"Failed to set trunk '%s': " % self._trunk['name'],
str(e))
attrs = {'name': 'reallylongname'}
self.neutronclient.update_trunk.assert_called_once_with(
self._trunk['name'], {trunk.TRUNK: attrs})
self.neutronclient.trunk_add_subports.assert_not_called()
def test_set_trunk_add_subport_with_exception(self):
arglist = [
'--subport', 'port=invalid_subport',
self._trunk['name'],
]
verifylist = [
('trunk', self._trunk['name']),
('set_subports', [{'port': 'invalid_subport'}]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.neutronclient.trunk_add_subports = (
mock.Mock(side_effect=exceptions.CommandError)
)
with testtools.ExpectedException(exceptions.CommandError) as e:
self.cmd.take_action(parsed_args)
self.assertEqual(
"Failed to add subports to trunk '%s': " % self._trunk['name'],
str(e))
self.neutronclient.update_trunk.assert_called_once_with(
self._trunk['name'], {trunk.TRUNK: {}})
self.neutronclient.trunk_add_subports.assert_called_once_with(
self._trunk['name'],
{'sub_ports': [{'port_id': 'invalid_subport'}]}
)
class TestListNetworkSubport(test_fakes.TestNeutronClientOSCV2):
_trunk = fakes.FakeTrunk.create_one_trunk()
_subports = _trunk['sub_ports']
columns = (
'Port',
'Segmentation Type',
'Segmentation ID',
)
data = []
for s in _subports:
data.append((
s['port_id'],
s['segmentation_type'],
s['segmentation_id'],
))
def setUp(self):
super(TestListNetworkSubport, self).setUp()
mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
new=_get_id).start()
self.neutronclient.trunk_get_subports = mock.Mock(
return_value={trunk.SUB_PORTS: self._subports})
# Get the command object to test
self.cmd = trunk.ListNetworkSubport(self.app, self.namespace)
def test_subport_list(self):
arglist = [
'--trunk', self._trunk['name'],
]
verifylist = [
('trunk', self._trunk['name']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.neutronclient.trunk_get_subports.assert_called_once_with(
self._trunk['name'])
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, list(data))
class TestUnsetNetworkTrunk(test_fakes.TestNeutronClientOSCV2):
_trunk = fakes.FakeTrunk.create_one_trunk()
columns = (
'admin_state_up',
'id',
'name',
'port_id',
'project_id',
'status',
'sub_ports',
)
data = (
v2_utils.AdminStateColumn(_trunk['admin_state_up']),
_trunk['id'],
_trunk['name'],
_trunk['port_id'],
_trunk['project_id'],
_trunk['status'],
format_columns.ListDictColumn(_trunk['sub_ports']),
)
def setUp(self):
super(TestUnsetNetworkTrunk, self).setUp()
mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
new=_get_id).start()
self.neutronclient.trunk_remove_subports = mock.Mock(
return_value=None)
# Get the command object to test
self.cmd = trunk.UnsetNetworkTrunk(self.app, self.namespace)
def test_unset_network_trunk_subport(self):
subport = self._trunk['sub_ports'][0]
arglist = [
"--subport", subport['port_id'],
self._trunk['name'],
]
verifylist = [
('trunk', self._trunk['name']),
('unset_subports', [subport['port_id']]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.neutronclient.trunk_remove_subports.assert_called_once_with(
self._trunk['name'],
{trunk.SUB_PORTS: [{'port_id': subport['port_id']}]}
)
self.assertIsNone(result)
def test_unset_subport_no_arguments_fail(self):
arglist = [
self._trunk['name'],
]
verifylist = [
('trunk', self._trunk['name']),
]
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)

View File

@@ -515,6 +515,8 @@ class Client(ClientBase):
router_path = "/routers/%s"
floatingips_path = "/floatingips"
floatingip_path = "/floatingips/%s"
port_forwardings_path = "/floatingips/%s/port_forwardings"
port_forwarding_path = "/floatingips/%s/port_forwardings/%s"
security_groups_path = "/security-groups"
security_group_path = "/security-groups/%s"
security_group_rules_path = "/security-group-rules"
@@ -1019,6 +1021,32 @@ class Client(ClientBase):
"""Deletes the specified floatingip."""
return self.delete(self.floatingip_path % (floatingip))
def show_port_forwarding(self, floatingip, portforwarding):
"""Fetches information of a certain portforwarding"""
return self.get(self.port_forwarding_path % (floatingip,
portforwarding))
def list_port_forwardings(self, floatingip, retrieve_all=True, **_params):
"""Fetches a list of all portforwardings for a floatingip."""
return self.list('port_forwardings',
self.port_forwardings_path % floatingip, retrieve_all,
**_params)
def create_port_forwarding(self, floatingip, body=None):
"""Creates a new portforwarding."""
return self.post(self.port_forwardings_path % floatingip, body=body)
def delete_port_forwarding(self, floatingip, portforwarding):
"""Deletes the specified portforwarding."""
return self.delete(self.port_forwarding_path % (floatingip,
portforwarding))
def update_port_forwarding(self, floatingip, portforwarding, body=None):
"""Updates a portforwarding."""
return self.put(self.port_forwarding_path % (floatingip,
portforwarding),
body=body)
def create_security_group(self, body=None):
"""Creates a new security group."""
return self.post(self.security_groups_path, body=body)

View File

@@ -0,0 +1,4 @@
---
features:
- |
Add support to floating ip port forwarding.

View File

@@ -0,0 +1,7 @@
---
prelude: >
Openstack community decided to use one SDK for its services, that is
in ``openstacksdk`` repository. To avoid duplication, sooner or later the
python binding code in ``python-neutronclient`` will be deprecated, and
``Neutron`` team decided on the ``2023.1 (Antelope)`` PTG to not allow
new features\' bindings implemented here.

View File

@@ -6,6 +6,7 @@
:maxdepth: 1
unreleased
zed
yoga
xena
wallaby

View File

@@ -0,0 +1,6 @@
========================
Zed Series Release Notes
========================
.. release-notes::
:branch: stable/zed

View File

@@ -33,13 +33,6 @@ openstack.cli.extension =
neutronclient = neutronclient.osc.plugin
openstack.neutronclient.v2 =
network_subport_list = neutronclient.osc.v2.trunk.network_trunk:ListNetworkSubport
network_trunk_create = neutronclient.osc.v2.trunk.network_trunk:CreateNetworkTrunk
network_trunk_delete = neutronclient.osc.v2.trunk.network_trunk:DeleteNetworkTrunk
network_trunk_list = neutronclient.osc.v2.trunk.network_trunk:ListNetworkTrunk
network_trunk_set = neutronclient.osc.v2.trunk.network_trunk:SetNetworkTrunk
network_trunk_show = neutronclient.osc.v2.trunk.network_trunk:ShowNetworkTrunk
network_trunk_unset = neutronclient.osc.v2.trunk.network_trunk:UnsetNetworkTrunk
sfc_flow_classifier_create = neutronclient.osc.v2.sfc.sfc_flow_classifier:CreateSfcFlowClassifier
sfc_flow_classifier_delete = neutronclient.osc.v2.sfc.sfc_flow_classifier:DeleteSfcFlowClassifier
sfc_flow_classifier_list = neutronclient.osc.v2.sfc.sfc_flow_classifier:ListSfcFlowClassifier

13
tox.ini
View File

@@ -1,7 +1,7 @@
[tox]
envlist = py39,pep8
minversion = 3.18.0
skipsdist = True
skipsdist = False
ignore_basepython_conflict = True
[testenv]
@@ -12,18 +12,17 @@ setenv = VIRTUAL_ENV={envdir}
LC_ALL=C
PYTHONWARNINGS=default::DeprecationWarning
usedevelop = True
install_command = pip install {opts} {packages}
deps = -c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/master}
deps = -c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/2023.1}
-r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
# Delete bytecodes from normal directories before running tests.
# Note that bytecodes in dot directories will not be deleted
# to keep bytecodes of python modules installed into virtualenvs.
commands = sh -c "find . -type d -name '.?*' -prune -o \
commands = bash -c "find . -type d -name '.?*' -prune -o \
\( -type d -name '__pycache__' -o -type f -name '*.py[co]' \) \
-print0 | xargs -0 rm -rf"
stestr run {posargs}
allowlist_externals = sh
allowlist_externals = bash
[testenv:pep8]
commands =
@@ -52,7 +51,7 @@ commands =
[testenv:docs]
deps =
-c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/master}
-c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/2023.1}
-r{toxinidir}/doc/requirements.txt
commands = sphinx-build -W -b html doc/source doc/build/html
@@ -67,7 +66,7 @@ commands =
[testenv:releasenotes]
deps =
-c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/master}
-c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/2023.1}
-r{toxinidir}/doc/requirements.txt
commands = sphinx-build -a -E -d releasenotes/build/doctrees -b html releasenotes/source releasenotes/build/html