APM

>Agent Skill

@robotis-git/create-example

skillproductivity

Create new publish/subscribe examples for ROS2 message types. Use when the user wants to add a new example, create pub/sub for a new message type, or add trajectory/sensor/geometry message examples.

pythonapi-design
apm::install
$apm install @robotis-git/create-example
apm::skill.md
---
name: create-example
description: Create new publish/subscribe examples for ROS2 message types. Use when the user wants to add a new example, create pub/sub for a new message type, or add trajectory/sensor/geometry message examples.
---

# Create Example

This skill guides you through creating new publish/subscribe examples for the zenoh-ros2-sdk.

## When to Use

- User wants to add a new example for a message type
- User wants to create publisher/subscriber for a new ROS2 message
- Adding support for trajectory, sensor, geometry, or other message types

## Project Structure

Examples are in `examples/` with numbered filenames:
- Odd numbers: publishers (e.g., `11_publish_*.py`)
- Even numbers: subscribers (e.g., `12_subscribe_*.py`)

## 1. Determine Next Example Numbers

Check existing examples in `examples/` directory. Use the next available odd number for publisher, next even for subscriber.

## 2. Look Up Message Definition

Find the ROS2 message definition. Key resources:
- https://docs.ros2.org/foxy/api/{package}/msg/{MessageName}.html
- Common packages: `std_msgs`, `geometry_msgs`, `sensor_msgs`, `trajectory_msgs`, `nav_msgs`

## 3. Publisher Template

Create `examples/{NN}_publish_{msg_name}.py`:

```python
#!/usr/bin/env python3
"""
{NN} - Publish {MessageName} Messages

Demonstrates how to publish {package}/msg/{MessageName} messages to a ROS2 topic.
{Brief description of what this message type is used for.}
"""
import time
import numpy as np

from zenoh_ros2_sdk import ROS2Publisher, get_message_class


def main():
    print("{NN} - Publish {MessageName} Messages")
    print("Publishing to /{topic_name} topic...\n")

    # Get message classes for easy object creation
    Header = get_message_class("std_msgs/msg/Header")
    Time = get_message_class("builtin_interfaces/msg/Time")
    {MessageName} = get_message_class("{package}/msg/{MessageName}")
    # Add nested message classes as needed

    if not all([Header, Time, {MessageName}]):
        print("Error: Failed to get message classes")
        return

    # Create publisher
    pub = ROS2Publisher(
        topic="/{topic_name}",
        msg_type="{package}/msg/{MessageName}"
    )

    try:
        print("Publishing {MessageName} messages...")
        print("Press Ctrl+C to stop\n")

        counter = 0
        while True:
            # Get current time
            now = time.time()
            sec = int(now)
            nanosec = int((now - sec) * 1e9)

            # Create header with timestamp
            header = Header(
                stamp=Time(sec=sec, nanosec=nanosec),
                frame_id="base_link"
            )

            # Create and populate message fields
            # Note: Arrays need to be numpy arrays for proper serialization

            # Publish message - pass fields as kwargs matching message structure
            pub.publish(
                header=header,
                # ... other fields
            )

            print(f"Published {MessageName} {counter}")
            counter += 1
            time.sleep(0.1)  # 10 Hz

    except KeyboardInterrupt:
        print("\nInterrupted by user")
    finally:
        pub.close()
        print("Publisher closed")


if __name__ == "__main__":
    main()
```

## 4. Subscriber Template

Create `examples/{NN+1}_subscribe_{msg_name}.py`:

```python
#!/usr/bin/env python3
"""
{NN+1} - Subscribe to {MessageName} Messages

Demonstrates how to subscribe to a ROS2 topic and receive {package}/msg/{MessageName} messages.
{Brief description of what this message type is used for.}
"""
import time

from zenoh_ros2_sdk import ROS2Subscriber


def main():
    print("{NN+1} - Subscribe to {MessageName} Messages")
    print("Subscribing to /{topic_name} topic...\n")

    message_count = [0]  # Use list to allow modification in nested function

    def on_message(msg):
        """Callback function called when a {MessageName} message is received."""
        message_count[0] += 1

        # Extract information from the message
        timestamp = msg.header.stamp
        frame_id = msg.header.frame_id
        # ... extract other fields

        # Display received data
        print(f"\n--- {MessageName} Message #{message_count[0]} ---")
        print(f"Timestamp: {timestamp.sec}.{timestamp.nanosec:09d}")
        print(f"Frame ID: {frame_id}")
        # ... print other fields

    # Create subscriber
    sub = ROS2Subscriber(
        topic="/{topic_name}",
        msg_type="{package}/msg/{MessageName}",
        callback=on_message
    )

    try:
        print("Waiting for {MessageName} messages... Press Ctrl+C to stop")
        while True:
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\nInterrupted by user")
    finally:
        sub.close()
        print("Subscriber closed")


if __name__ == "__main__":
    main()
```

## 5. Update README

Add entries to `examples/README.md`:

```markdown
### [`{NN}_publish_{msg_name}.py`]({NN}_publish_{msg_name}.py)
Demonstrates how to publish `{package}/msg/{MessageName}` messages. {Description}.

**Usage:**
\```bash
python3 examples/{NN}_publish_{msg_name}.py
\```

### [`{NN+1}_subscribe_{msg_name}.py`]({NN+1}_subscribe_{msg_name}.py)
Demonstrates how to subscribe to `{package}/msg/{MessageName}` messages. {Description}.

**Usage:**
\```bash
python3 examples/{NN+1}_subscribe_{msg_name}.py
\```
```

## 6. Key Patterns

### Arrays
Use numpy arrays for proper CDR serialization:
```python
positions = np.array([0.0, 0.1, 0.2], dtype=np.float64)
```

### Nested Messages
Get nested message classes and construct them:
```python
Point = get_message_class("trajectory_msgs/msg/JointTrajectoryPoint")
Duration = get_message_class("builtin_interfaces/msg/Duration")

point = Point(
    positions=np.array([0.0, 0.1], dtype=np.float64),
    time_from_start=Duration(sec=1, nanosec=0)
)
```

### Common Topic Names
- `/joint_states` - JointState
- `/cmd_vel` - Twist
- `/joint_trajectory` - JointTrajectory
- `/odom` - Odometry
- `/tf` - TransformStamped