Getting Started

This guide demonstrates how to create a SOME/IP service using someipy that broadcasts temperature measurements every second. The complete example is available in the example applications.

Prerequisites

  • Linux (Ubuntu 22.04 or equivalent)

  • Python 3.8+

  • Network interface with multicast support

Installation

Install someipy from PyPI:

pip3 install someipy

Service Datatype Definition

Define the temperature data structure using Python dataclasses. No IDL files required.

Create temperature_msg.py:

from dataclasses import dataclass
from someipy.serialization import (
    SomeIpPayload,
    SomeIpFixedSizeArray,
    Uint8,
    Uint64,
    Float32,
)

@dataclass
class Version(SomeIpPayload):
    major: Uint8
    minor: Uint8

    def __init__(self):
        self.major = Uint8()
        self.minor = Uint8()

@dataclass
class TemperatureMsg(SomeIpPayload):
    version: Version
    timestamp: Uint64
    measurements: SomeIpFixedSizeArray

    def __init__(self):
        self.version = Version()
        self.timestamp = Uint64()
        self.measurements = SomeIpFixedSizeArray(Float32, 4)

Asyncio App Implementation

someipy is an asyncio based Python library, since multiple concurrent tasks are running in the SOME/IP implementation for service discovery, waiting for new clients or waiting for new data.

First we will set up our application’s structure. Typically in an asyncio application, a main coroutine-function is added which is executed using asyncio.run. Our application’s logic will be added inside the main coroutine-function async def main.

Create send_events_udp.py with the following structure:

import asyncio
import ipaddress
import logging

async def main():

    # .. our application will go here

    except asyncio.CancelledError:
        print("Application cancelled...")
    finally:
        print("Cleanup...")
    print("End main task...")

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

someipy Logging

At the beginning of the application the someipy logging level is configured. Logging levels can be chosen from the Python3 logging module levels.

import logging

async def main():
    # .. our application will go here
    set_someipy_log_level(logging.DEBUG)

Starting Service Discovery

Before defining and instantiating our SOME/IP service, a ServiceDiscoveryProtocol class has to be instantiated and started. The ServiceDiscoveryProtocol object will take care of receiving and sending all service discovery messages on the service discovery multicast group which is typically 224.224.224.245 and on port 30490. Also the IP address of the network interface used has to be provided. In this example localhost is used and 127.0.0.1 is passed. The construction can be done using the factory function construct_service_discovery from the module someipy.service_discovery.

Make sure to close the service discovery at the end of your application to ensure ports are freed correctly using the close() method.

from someipy.service_discovery import construct_service_discovery
from temperature_msg import TemperatureMsg

async def main():
    # Configure logging
    set_someipy_log_level(logging.INFO)

    # Service discovery configuration
    SD_MULTICAST_GROUP = "224.224.224.245"
    SD_PORT = 30490
    INTERFACE_IP = "127.0.0.1"
    service_discovery = await construct_service_discovery(
        SD_MULTICAST_GROUP, SD_PORT, INTERFACE_IP
    )

    # ...
    finally:
        print("Service Discovery close..")
        service_discovery.close()

Defining the SOME/IP Service

For offering a SOME/IP service, you first define a Service containing EventGroups or Methods using the ServiceBuilder. Afterwards the Service can be instantiated as a Server- or Client-Instance.

In this example, a temperature_service with service ID 0x1234 containing a single event group with ID 0x0321 which in turn contains a single event with ID 0x0123. The service has a major version 1 and minor version 0:

from someipy import ServiceBuilder, EventGroup

async def main():
    # ...
    service_discovery = await construct_service_discovery(
        SD_MULTICAST_GROUP, SD_PORT, INTERFACE_IP
    )

    SAMPLE_SERVICE_ID = 0x1234
    SAMPLE_EVENTGROUP_ID = 0x0321
    SAMPLE_EVENT_ID = 0x0123

    temperature_eventgroup = EventGroup(
        id=SAMPLE_EVENTGROUP_ID, event_ids=[SAMPLE_EVENT_ID]
    )
    temperature_service = (
        ServiceBuilder()
        .with_service_id(SAMPLE_SERVICE_ID)
        .with_major_version(1)
        .with_eventgroup(temperature_eventgroup)
        .build()
    )
    # ...

Instantiating the SOME/IP Service

Once the Service is defined it can be instantiated multiple times. For offering a Service in someipy the ServerServiceInstance class is used. For using a service as client the ClientServiceInstance class is used.

Since the construction of ServerServiceInstance is not trivial, the construct_server_service_instance factory function is provided. The following information has to be passed to the function:

  • The Service object (defined above)

  • A service instance ID (0x5678 in this example)

  • An endpoint tuple consisting of IP and port on which the service is offered (127.0.0.1 and port 3000 in this example)

  • The TTL (time to live) of the service discovery offer messages (5 seconds in this example)

  • The ServiceDiscoveryProtocol object (defined above)

  • The period of the service discovery offer messages in milliseconds (2000 ms in this example)

  • The protocol of the service instance: Either TransportLayerProtocol.UDP or TransportLayerProtocol.TCP

After instantiating the Service using the construct_server_service_instance function, the returned ServerServiceInstance has to be attached to the ServiceDiscoveryProtocol object. This is needed so that the ServerServiceInstance is informed about subscriptions by clients.

Finally the SOME/IP service can be offered using the start_offer method. When exiting your application make sure to use stop_offer method on the service instance.

from someipy import TransportLayerProtocol, construct_server_service_instance

async def main():
    # ...

    SAMPLE_INSTANCE_ID = 0x5678
    service_instance_temperature = await construct_server_service_instance(
        temperature_service,
        instance_id=SAMPLE_INSTANCE_ID,
        endpoint=(
            ipaddress.IPv4Address(INTERFACE_IP),
            3000,
        ),  # source IP and port of the service
        ttl=5,
        sd_sender=service_discovery,
        cyclic_offer_delay_ms=2000,
        protocol=TransportLayerProtocol.UDP
    )

    # The service instance has to be attached to the ServiceDiscoveryProtocol object, so that
    # the service instance is notified about subscriptions from other ECUs
    service_discovery.attach(service_instance_temperature)

    # Starts sending periodic SD offer messages
    service_instance_temperature.start_offer()

    # ...
    # Before exiting the app: service_instance_temperature.stop_offer()

Sending Events

Until now you have defined

  • a datatype TemperatureMsg

  • started the service discovery

  • defined a SOME/IP service called temperature_service containing a single event

  • and instantiated and offered the service using a ServerServiceInstance object.

Now it is time to send events to subscribed clients. First some data has to be prepared: Import and instantiate the TemperatureMsg and fill it with some data:

from someipy.serialization import Uint8, Uint64, Float32
from temperature_msg import TemperatureMsg

async def main():
    # ...
    tmp_msg = TemperatureMsg()

    tmp_msg.version.major = Uint8(1)
    tmp_msg.version.minor = Uint8(0)
    tmp_msg.measurements.data[0] = Float32(20.0)
    tmp_msg.measurements.data[1] = Float32(21.0)
    tmp_msg.measurements.data[2] = Float32(22.0)
    tmp_msg.measurements.data[3] = Float32(23.0)
    # ...

Afterwards we will start an endless loop sending data every second using the send_event method on the service instance. The send_event method takes a bytes-object which can be retrieved serializing the TemperatureMsg.

async def main():
    # ...

    try:
        # Cyclically send events in an endless loop...
        while True:
            await asyncio.sleep(1)
            tmp_msg.timestamp = Uint64(tmp_msg.timestamp.value + 1)
            payload = tmp_msg.serialize()
            service_instance_temperature.send_event(
                SAMPLE_EVENTGROUP_ID, SAMPLE_EVENT_ID, payload
            )

    except asyncio.CancelledError:
        print("Stop offering service...")
        await service_instance_temperature.stop_offer()
    finally:
        print("Service Discovery close...")
        service_discovery.close()

        # ...

Network Configuration

If you are using Linux, make sure to join the multicast group for your network interface used for the service discovery before starting the application. In our example we use 224.224.224.245 and the loopback interface. Make sure to adjust the command for your project. Otherwise, it will not be possible for clients to subscribe to your SOME/IP service.

sudo ip addr add 224.224.224.245 dev lo autojoin
python3 send_events_udp.py

Running Two Applications On The Same Machine

For local testing with multiple applications, assign different IP addresses:

sudo ip addr add 127.0.0.2/24 dev lo
python3 send_events_udp.py --interface_ip 127.0.0.1
python3 receive_events_udp.py --interface_ip 127.0.0.2