create-example
skillCreate new publish/subscribe examples for ROS2 message types. Use when the user wants to add a new example, create pub/sub for a new message type, or add trajectory/sensor/geometry message examples.
apm::install
apm install @robotis-git/create-exampleapm::skill.md
---
name: create-example
description: Create new publish/subscribe examples for ROS2 message types. Use when the user wants to add a new example, create pub/sub for a new message type, or add trajectory/sensor/geometry message examples.
---
# Create Example
This skill guides you through creating new publish/subscribe examples for the zenoh-ros2-sdk.
## When to Use
- User wants to add a new example for a message type
- User wants to create publisher/subscriber for a new ROS2 message
- Adding support for trajectory, sensor, geometry, or other message types
## Project Structure
Examples are in `examples/` with numbered filenames:
- Odd numbers: publishers (e.g., `11_publish_*.py`)
- Even numbers: subscribers (e.g., `12_subscribe_*.py`)
## 1. Determine Next Example Numbers
Check existing examples in `examples/` directory. Use the next available odd number for publisher, next even for subscriber.
## 2. Look Up Message Definition
Find the ROS2 message definition. Key resources:
- https://docs.ros2.org/foxy/api/{package}/msg/{MessageName}.html
- Common packages: `std_msgs`, `geometry_msgs`, `sensor_msgs`, `trajectory_msgs`, `nav_msgs`
## 3. Publisher Template
Create `examples/{NN}_publish_{msg_name}.py`:
```python
#!/usr/bin/env python3
"""
{NN} - Publish {MessageName} Messages
Demonstrates how to publish {package}/msg/{MessageName} messages to a ROS2 topic.
{Brief description of what this message type is used for.}
"""
import time
import numpy as np
from zenoh_ros2_sdk import ROS2Publisher, get_message_class
def main():
print("{NN} - Publish {MessageName} Messages")
print("Publishing to /{topic_name} topic...\n")
# Get message classes for easy object creation
Header = get_message_class("std_msgs/msg/Header")
Time = get_message_class("builtin_interfaces/msg/Time")
{MessageName} = get_message_class("{package}/msg/{MessageName}")
# Add nested message classes as needed
if not all([Header, Time, {MessageName}]):
print("Error: Failed to get message classes")
return
# Create publisher
pub = ROS2Publisher(
topic="/{topic_name}",
msg_type="{package}/msg/{MessageName}"
)
try:
print("Publishing {MessageName} messages...")
print("Press Ctrl+C to stop\n")
counter = 0
while True:
# Get current time
now = time.time()
sec = int(now)
nanosec = int((now - sec) * 1e9)
# Create header with timestamp
header = Header(
stamp=Time(sec=sec, nanosec=nanosec),
frame_id="base_link"
)
# Create and populate message fields
# Note: Arrays need to be numpy arrays for proper serialization
# Publish message - pass fields as kwargs matching message structure
pub.publish(
header=header,
# ... other fields
)
print(f"Published {MessageName} {counter}")
counter += 1
time.sleep(0.1) # 10 Hz
except KeyboardInterrupt:
print("\nInterrupted by user")
finally:
pub.close()
print("Publisher closed")
if __name__ == "__main__":
main()
```
## 4. Subscriber Template
Create `examples/{NN+1}_subscribe_{msg_name}.py`:
```python
#!/usr/bin/env python3
"""
{NN+1} - Subscribe to {MessageName} Messages
Demonstrates how to subscribe to a ROS2 topic and receive {package}/msg/{MessageName} messages.
{Brief description of what this message type is used for.}
"""
import time
from zenoh_ros2_sdk import ROS2Subscriber
def main():
print("{NN+1} - Subscribe to {MessageName} Messages")
print("Subscribing to /{topic_name} topic...\n")
message_count = [0] # Use list to allow modification in nested function
def on_message(msg):
"""Callback function called when a {MessageName} message is received."""
message_count[0] += 1
# Extract information from the message
timestamp = msg.header.stamp
frame_id = msg.header.frame_id
# ... extract other fields
# Display received data
print(f"\n--- {MessageName} Message #{message_count[0]} ---")
print(f"Timestamp: {timestamp.sec}.{timestamp.nanosec:09d}")
print(f"Frame ID: {frame_id}")
# ... print other fields
# Create subscriber
sub = ROS2Subscriber(
topic="/{topic_name}",
msg_type="{package}/msg/{MessageName}",
callback=on_message
)
try:
print("Waiting for {MessageName} messages... Press Ctrl+C to stop")
while True:
time.sleep(0.1)
except KeyboardInterrupt:
print("\nInterrupted by user")
finally:
sub.close()
print("Subscriber closed")
if __name__ == "__main__":
main()
```
## 5. Update README
Add entries to `examples/README.md`:
```markdown
### [`{NN}_publish_{msg_name}.py`]({NN}_publish_{msg_name}.py)
Demonstrates how to publish `{package}/msg/{MessageName}` messages. {Description}.
**Usage:**
\```bash
python3 examples/{NN}_publish_{msg_name}.py
\```
### [`{NN+1}_subscribe_{msg_name}.py`]({NN+1}_subscribe_{msg_name}.py)
Demonstrates how to subscribe to `{package}/msg/{MessageName}` messages. {Description}.
**Usage:**
\```bash
python3 examples/{NN+1}_subscribe_{msg_name}.py
\```
```
## 6. Key Patterns
### Arrays
Use numpy arrays for proper CDR serialization:
```python
positions = np.array([0.0, 0.1, 0.2], dtype=np.float64)
```
### Nested Messages
Get nested message classes and construct them:
```python
Point = get_message_class("trajectory_msgs/msg/JointTrajectoryPoint")
Duration = get_message_class("builtin_interfaces/msg/Duration")
point = Point(
positions=np.array([0.0, 0.1], dtype=np.float64),
time_from_start=Duration(sec=1, nanosec=0)
)
```
### Common Topic Names
- `/joint_states` - JointState
- `/cmd_vel` - Twist
- `/joint_trajectory` - JointTrajectory
- `/odom` - Odometry
- `/tf` - TransformStamped