Skip to content

USBEndpoint_8ut_8cpp

github-actions edited this page Mar 15, 2026 · 3 revisions

title: src/USB/USBEndpoint.ut.cpp


src/USB/USBEndpoint.ut.cpp

Namespaces

Name
aasdk
aasdk::usb
aasdk::usb::ut

Classes

Name
class aasdk::usb::ut::USBEndpointUnitTest

Source code

/*
*  This file is part of aasdk library project.
*  Copyright (C) 2018 f1x.studio (Michal Szwaj)
*
*  aasdk is free software: you can redistribute it and/or modify
*  it under the terms of the GNU General Public License as published by
*  the Free Software Foundation; either version 3 of the License, or
*  (at your option) any later version.

*  aasdk is distributed in the hope that it will be useful,
*  but WITHOUT ANY WARRANTY; without even the implied warranty of
*  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
*  GNU General Public License for more details.
*
*  You should have received a copy of the GNU General Public License
*  along with aasdk. If not, see <http://www.gnu.org/licenses/>.
*/

#include <gtest/gtest.h>
#include <aasdk/USB/UT/USBWrapper.mock.hpp>
#include <aasdk/USB/UT/USBEndpoint.mock.hpp>
#include <aasdk/USB/UT/USBEndpointPromiseHandler.mock.hpp>
#include <aasdk/Error/Error.hpp>
#include <aasdk/USB/USBEndpoint.hpp>


namespace aasdk
{
namespace usb
{
namespace ut
{

using ::testing::_;
using ::testing::DoAll;
using ::testing::Return;
using ::testing::SaveArg;

class USBEndpointUnitTest : public testing::Test
{
protected:
    USBEndpointUnitTest()
      : deviceHandle_(reinterpret_cast<libusb_device_handle*>(&dummyDeviceHandle_), [](auto*) {})
      , promise_(IUSBEndpoint::Promise::defer(ioService_))
    {
        promise_->then(std::bind(&USBEndpointPromiseHandlerMock::onResolve, &promiseHandlerMock_, std::placeholders::_1),
                      std::bind(&USBEndpointPromiseHandlerMock::onReject, &promiseHandlerMock_, std::placeholders::_1));
    }

    USBWrapperMock usbWrapperMock_;
    boost::asio::io_service ioService_;
    USBWrapperMock::DummyDeviceHandle dummyDeviceHandle_;
    DeviceHandle deviceHandle_;
    USBEndpointPromiseHandlerMock promiseHandlerMock_;
    IUSBEndpoint::Promise::Pointer promise_;
};

TEST_F(USBEndpointUnitTest, USBEndpoint_ControlTransferForNonControlEndpoint)
{
    common::Data data(10, 0);
    USBEndpoint::Pointer usbEndpoint(std::make_shared<USBEndpoint>(usbWrapperMock_, ioService_, deviceHandle_, 0x01));

    EXPECT_CALL(promiseHandlerMock_, onReject(error::Error(error::ErrorCode::USB_INVALID_TRANSFER_METHOD)));
    EXPECT_CALL(promiseHandlerMock_, onResolve(_)).Times(0);
    usbEndpoint->controlTransfer(common::DataBuffer(data), 0, std::move(promise_));

    ioService_.run();
}

TEST_F(USBEndpointUnitTest, USBEndpoint_BulkTransferForControlEndpoint)
{
    common::Data data(10, 0);
    USBEndpoint::Pointer usbEndpoint(std::make_shared<USBEndpoint>(usbWrapperMock_, ioService_, deviceHandle_));

    EXPECT_CALL(promiseHandlerMock_, onReject(error::Error(error::ErrorCode::USB_INVALID_TRANSFER_METHOD)));
    EXPECT_CALL(promiseHandlerMock_, onResolve(_)).Times(0);
    usbEndpoint->bulkTransfer(common::DataBuffer(data), 0, std::move(promise_));

    ioService_.run();
}

TEST_F(USBEndpointUnitTest, USBEndpoint_InterruptTransferForControlEndpoint)
{
    common::Data data(10, 0);
    USBEndpoint::Pointer usbEndpoint(std::make_shared<USBEndpoint>(usbWrapperMock_, ioService_, deviceHandle_));

    EXPECT_CALL(promiseHandlerMock_, onReject(error::Error(error::ErrorCode::USB_INVALID_TRANSFER_METHOD)));
    EXPECT_CALL(promiseHandlerMock_, onResolve(_)).Times(0);
    usbEndpoint->interruptTransfer(common::DataBuffer(data), 0, std::move(promise_));

    ioService_.run();
}

TEST_F(USBEndpointUnitTest, USBEndpoint_ControlTransferAllocationFailed)
{
    common::Data data(10, 0);
    USBEndpoint::Pointer usbEndpoint(std::make_shared<USBEndpoint>(usbWrapperMock_, ioService_, deviceHandle_));

    EXPECT_CALL(usbWrapperMock_, allocTransfer(0)).WillOnce(Return(nullptr));
    EXPECT_CALL(promiseHandlerMock_, onReject(error::Error(error::ErrorCode::USB_TRANSFER_ALLOCATION)));
    EXPECT_CALL(promiseHandlerMock_, onResolve(_)).Times(0);
    usbEndpoint->controlTransfer(common::DataBuffer(data), 0, std::move(promise_));

    ioService_.run();
}

TEST_F(USBEndpointUnitTest, USBEndpoint_BulkTransferAllocationFailed)
{
    common::Data data(10, 0);
    USBEndpoint::Pointer usbEndpoint(std::make_shared<USBEndpoint>(usbWrapperMock_, ioService_, deviceHandle_, 0x01));

    EXPECT_CALL(usbWrapperMock_, allocTransfer(0)).WillOnce(Return(nullptr));
    EXPECT_CALL(promiseHandlerMock_, onReject(error::Error(error::ErrorCode::USB_TRANSFER_ALLOCATION)));
    EXPECT_CALL(promiseHandlerMock_, onResolve(_)).Times(0);
    usbEndpoint->bulkTransfer(common::DataBuffer(data), 0, std::move(promise_));

    ioService_.run();
}

TEST_F(USBEndpointUnitTest, USBEndpoint_InterruptTransferAllocationFailed)
{
    common::Data data(10, 0);
    USBEndpoint::Pointer usbEndpoint(std::make_shared<USBEndpoint>(usbWrapperMock_, ioService_, deviceHandle_, 0x01));

    EXPECT_CALL(usbWrapperMock_, allocTransfer(0)).WillOnce(Return(nullptr));
    EXPECT_CALL(promiseHandlerMock_, onReject(error::Error(error::ErrorCode::USB_TRANSFER_ALLOCATION)));
    EXPECT_CALL(promiseHandlerMock_, onResolve(_)).Times(0);
    usbEndpoint->interruptTransfer(common::DataBuffer(data), 0, std::move(promise_));

    ioService_.run();
}

TEST_F(USBEndpointUnitTest, USBEndpoint_BulkTransfer)
{
    const uint8_t endpointAddress = 0x55;
    USBEndpoint::Pointer usbEndpoint(std::make_shared<USBEndpoint>(usbWrapperMock_, ioService_, deviceHandle_, endpointAddress));

    libusb_transfer transfer;
    EXPECT_CALL(usbWrapperMock_, allocTransfer(0)).WillOnce(Return(&transfer));

    libusb_transfer_cb_fn transferCallback;
    common::Data data(1000, 0);
    common::DataBuffer buffer(data);
    EXPECT_CALL(usbWrapperMock_, fillBulkTransfer(&transfer, _, endpointAddress, buffer.data, buffer.size, _, _, _))
            .WillOnce(DoAll(SaveArg<5>(&transferCallback), SaveArg<6>(&transfer.user_data)));
    EXPECT_CALL(usbWrapperMock_, submitTransfer(&transfer));

    usbEndpoint->bulkTransfer(common::DataBuffer(data), 0, std::move(promise_));
    ioService_.run();
    ioService_.reset();

    transfer.actual_length = buffer.size;
    transfer.status = LIBUSB_TRANSFER_COMPLETED;
    transferCallback(&transfer);

    EXPECT_CALL(usbWrapperMock_, freeTransfer(&transfer));
    EXPECT_CALL(promiseHandlerMock_, onReject(_)).Times(0);
    EXPECT_CALL(promiseHandlerMock_, onResolve(buffer.size));
    ioService_.run();
}

TEST_F(USBEndpointUnitTest, USBEndpoint_MultipleBulkTransfers)
{
    const uint8_t endpointAddress = 0x55;
    USBEndpoint::Pointer usbEndpoint(std::make_shared<USBEndpoint>(usbWrapperMock_, ioService_, deviceHandle_, endpointAddress));

    libusb_transfer transfer;
    EXPECT_CALL(usbWrapperMock_, allocTransfer(0)).WillRepeatedly(Return(&transfer));

    const size_t attemptsCount = 1000;

    libusb_transfer_cb_fn transferCallback;

    EXPECT_CALL(usbWrapperMock_, submitTransfer(&transfer)).Times(attemptsCount);
    EXPECT_CALL(usbWrapperMock_, freeTransfer(&transfer)).Times(attemptsCount);
    EXPECT_CALL(promiseHandlerMock_, onReject(_)).Times(0);

    for(size_t i = 0; i < attemptsCount; ++i)
    {
        common::Data data(10000 + attemptsCount, 0);
        common::DataBuffer buffer(data);
        EXPECT_CALL(usbWrapperMock_, fillBulkTransfer(&transfer, _, endpointAddress, buffer.data, buffer.size, _, _, _))
                .WillOnce(DoAll(SaveArg<5>(&transferCallback), SaveArg<6>(&transfer.user_data)));
        EXPECT_CALL(promiseHandlerMock_, onResolve(buffer.size)).Times(1);

        transfer.actual_length = 0;
        transfer.status = LIBUSB_TRANSFER_ERROR;

        auto promise = IUSBEndpoint::Promise::defer(ioService_);
        promise->then(std::bind(&USBEndpointPromiseHandlerMock::onResolve, &promiseHandlerMock_, std::placeholders::_1),
                      std::bind(&USBEndpointPromiseHandlerMock::onReject, &promiseHandlerMock_, std::placeholders::_1));

        usbEndpoint->bulkTransfer(common::DataBuffer(data), 0, std::move(promise));
        ioService_.run();
        ioService_.reset();

        transfer.actual_length = buffer.size;
        transfer.status = LIBUSB_TRANSFER_COMPLETED;
        transferCallback(&transfer);
        ioService_.run();
        ioService_.reset();
    }
}

TEST_F(USBEndpointUnitTest, USBEndpoint_ControlTransfer)
{
    USBEndpoint::Pointer usbEndpoint(std::make_shared<USBEndpoint>(usbWrapperMock_, ioService_, deviceHandle_));

    libusb_transfer transfer;
    EXPECT_CALL(usbWrapperMock_, allocTransfer(0)).WillOnce(Return(&transfer));

    libusb_transfer_cb_fn transferCallback;
    common::Data data(100, 0);
    common::DataBuffer buffer(data);
    EXPECT_CALL(usbWrapperMock_, fillControlTransfer(&transfer, _, buffer.data, _, _, _))
            .WillOnce(DoAll(SaveArg<3>(&transferCallback), SaveArg<4>(&transfer.user_data)));
    EXPECT_CALL(usbWrapperMock_, submitTransfer(&transfer));

    usbEndpoint->controlTransfer(common::DataBuffer(data), 0, std::move(promise_));
    ioService_.run();
    ioService_.reset();

    transfer.actual_length = buffer.size;
    transfer.status = LIBUSB_TRANSFER_COMPLETED;
    transferCallback(&transfer);

    EXPECT_CALL(usbWrapperMock_, freeTransfer(&transfer));
    EXPECT_CALL(promiseHandlerMock_, onReject(_)).Times(0);
    EXPECT_CALL(promiseHandlerMock_, onResolve(buffer.size));
    ioService_.run();
}

TEST_F(USBEndpointUnitTest, USBEndpoint_InterruptTransfer)
{
    const uint8_t endpointAddress = 0x35;
    USBEndpoint::Pointer usbEndpoint(std::make_shared<USBEndpoint>(usbWrapperMock_, ioService_, deviceHandle_, endpointAddress));

    libusb_transfer transfer;
    EXPECT_CALL(usbWrapperMock_, allocTransfer(0)).WillOnce(Return(&transfer));

    libusb_transfer_cb_fn transferCallback;
    common::Data data(150, 0);
    common::DataBuffer buffer(data);
    EXPECT_CALL(usbWrapperMock_, fillInterruptTransfer(&transfer, _, endpointAddress, buffer.data, buffer.size, _, _, _))
            .WillOnce(DoAll(SaveArg<5>(&transferCallback), SaveArg<6>(&transfer.user_data)));
    EXPECT_CALL(usbWrapperMock_, submitTransfer(&transfer));

    usbEndpoint->interruptTransfer(common::DataBuffer(data), 0, std::move(promise_));
    ioService_.run();
    ioService_.reset();

    transfer.actual_length = buffer.size;
    transfer.status = LIBUSB_TRANSFER_COMPLETED;
    transferCallback(&transfer);

    EXPECT_CALL(usbWrapperMock_, freeTransfer(&transfer));
    EXPECT_CALL(promiseHandlerMock_, onReject(_)).Times(0);
    EXPECT_CALL(promiseHandlerMock_, onResolve(buffer.size));
    ioService_.run();
}

TEST_F(USBEndpointUnitTest, USBEndpoint_BulkTransferFailed)
{
    const uint8_t endpointAddress = 0x55;
    USBEndpoint::Pointer usbEndpoint(std::make_shared<USBEndpoint>(usbWrapperMock_, ioService_, deviceHandle_, endpointAddress));

    libusb_transfer transfer;
    EXPECT_CALL(usbWrapperMock_, allocTransfer(0)).WillOnce(Return(&transfer));

    libusb_transfer_cb_fn transferCallback;
    common::Data data(10, 0);
    common::DataBuffer buffer(data);
    EXPECT_CALL(usbWrapperMock_, fillBulkTransfer(&transfer, _, endpointAddress, buffer.data, buffer.size, _, _, _))
            .WillOnce(DoAll(SaveArg<5>(&transferCallback), SaveArg<6>(&transfer.user_data)));
    EXPECT_CALL(usbWrapperMock_, submitTransfer(&transfer));

    usbEndpoint->bulkTransfer(common::DataBuffer(data), 0, std::move(promise_));
    ioService_.run();
    ioService_.reset();

    transfer.actual_length = buffer.size;
    transfer.status = LIBUSB_TRANSFER_CANCELLED;
    transferCallback(&transfer);

    EXPECT_CALL(usbWrapperMock_, freeTransfer(&transfer));
    EXPECT_CALL(promiseHandlerMock_, onReject(error::Error(error::ErrorCode::OPERATION_ABORTED))).Times(1);
    EXPECT_CALL(promiseHandlerMock_, onResolve(_)).Times(0);
    ioService_.run();
}

}
}
}

Updated on 2026-03-15 at 09:02:41 +0000

Clone this wiki locally