This commit is contained in:
anlicheng 2025-07-02 15:55:07 +08:00
parent 9ec12be0f7
commit a8a441adc1
5 changed files with 35 additions and 249 deletions

1
Cargo.lock generated
View File

@ -282,6 +282,7 @@ name = "modbus_agent"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"bytes",
"futures", "futures",
"tokio", "tokio",
"tokio-modbus", "tokio-modbus",

View File

@ -10,3 +10,4 @@ tokio-serial = "5.4.1"
futures = "0.3.31" futures = "0.3.31"
byteorder = "1.5.0" byteorder = "1.5.0"
tokio-util = "0.7.15" tokio-util = "0.7.15"
bytes = "1.10.1"

View File

@ -1,15 +1,6 @@
use std::io::{Cursor, Read}; use std::io::{Cursor, Read};
use byteorder::{BigEndian, ReadBytesExt}; use byteorder::{BigEndian, ReadBytesExt};
#[derive(Debug)]
pub(crate) struct SerialConfig {
pub port: Vec<u8>, // Port/binary变长二进制数据
pub baudrate: u32, // Baudrate:32
pub stopbits: u8, // Stopbits:8
pub parity: u8, // Parity:8
pub timeout: u32, // Timeout:32
}
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Request { pub(crate) struct Request {
pub packet_id: u32, pub packet_id: u32,
@ -18,27 +9,6 @@ pub(crate) struct Request {
pub cnt: u16, pub cnt: u16,
} }
pub(crate) fn parse_serial_config(data: &[u8]) -> std::io::Result<SerialConfig> {
let mut cursor = Cursor::new(data);
let port_len = cursor.read_u8()?;
let mut port = vec![0u8; port_len as usize];
cursor.read_exact(&mut port)?;
let baudrate = cursor.read_u32::<BigEndian>()?;
let stopbits = cursor.read_u8()?;
let parity = cursor.read_u8()?;
let timeout = cursor.read_u32::<BigEndian>()?;
Ok(SerialConfig {
port,
baudrate,
stopbits,
parity,
timeout,
})
}
pub(crate) fn parse_request(data: &[u8]) -> std::io::Result<Request> { pub(crate) fn parse_request(data: &[u8]) -> std::io::Result<Request> {
let mut cursor = Cursor::new(data); let mut cursor = Cursor::new(data);

View File

@ -1,113 +1,52 @@
mod codec; mod codec;
mod modbus_client;
use tokio::net::{UnixListener, UnixStream}; use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
//use tokio_util::codec::{Framed, LinesCodec}; use bytes::{BytesMut, BufMut};
use futures::SinkExt;
use tokio::io::AsyncReadExt;
//use tokio::io::AsyncReadExt;
use std::io::{self, Read, Write};
use std::process; use std::process;
use crate::modbus_client::ModbusClient;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() -> io::Result<()> {
let stdin = io::stdin(); // 标准输入输出句柄
let mut stdin = stdin.lock(); let mut stdin = io::stdin();
let mut stdout = io::stdout(); let mut stdout = io::stdout();
let d = vec![
0x05, // port_len: 5
b'C', b'O', b'M', b'1', 0x00, // Port: "COM1" + null终止符假设
0x00, 0x00, 0x25, 0x80, // baudrate: 9600 (大端)
0x01, // stopbits: 1
0x00, // parity: 0
0x00, 0x00, 0x03, 0xE8, // timeout: 1000 (大端)
];
let c = codec::parse_serial_config(&d).unwrap();
println!("{:?}", c);
let s = String::from_utf8(c.port.clone()).unwrap();
println!("port: {}", s);
println!("port: {:?}", c.port);
let mut client: Option<ModbusClient> = None;
loop { loop {
// 1. 读取4字节长度前缀 (big-endian) // 先读 2 字节长度
let mut len_bytes = [0u8; 2]; let mut len_buf = [0u8; 2];
if stdin.read_exact(&mut len_bytes).is_err() { stdin.read_exact(&mut len_buf).await?;
// 读取失败或EOF退出
process::exit(-1);
}
let length = u16::from_be_bytes(len_bytes) as usize; let packet_len = u16::from_be_bytes(len_buf) as usize;
// 2. 读取实际数据
let mut data = vec![0u8; length];
if stdin.read_exact(&mut data).is_err() {
process::exit(-1);
}
match data[0] { let mut packet = vec![0u8; packet_len];
// 根据长度再读数据
stdin.read_exact(&mut packet).await?;
// 模拟业务逻辑: 回显收到的数据 + 前缀
match packet[0] {
0x01 => { 0x01 => {
let config = codec::parse_serial_config(&data[1..]).unwrap(); let request = codec::parse_request(&packet[1..]).unwrap();
println!("{:?}", config); let response = [0x01];
client = Option::from(ModbusClient::new(config)); reply(&mut stdout, request.packet_id, &response).await?;
}, }
0x02 => {
let request = codec::parse_request(&data[1..]).unwrap();
println!("request {:?}", request);
if let Some(client) = client.as_mut() {
match client.request(request.slave_id, request.address, request.cnt).await {
Ok(vec) => {
}
Err(e) => {
}
}
}
},
_ => { _ => {
process::exit(-1);
} }
} }
} }
// let data_str = String::from_utf8(data).unwrap_or_else(|_| {
// eprintln!("Invalid UTF-8 data");
// process::exit(-1);
// });
//
// // 3. 处理数据并生成响应
// let response = format!("Processed: {}", data_str);
// let response_bytes = response.as_bytes();
//
// // 4. 写入响应(长度前缀 + 数据)
// stdout.write_all(&(response_bytes.len() as u32).to_be_bytes())?;
// stdout.write_all(response_bytes)?;
// stdout.flush()?; // 确保立即发送
} }
async fn handle_client(mut stream: UnixStream) -> Result<(), Box<dyn std::error::Error>> { async fn reply(stdout: &mut io::Stdout, packet_id: u32, response: &[u8]) -> io::Result<()> {
loop { let mut write_buf = BytesMut::with_capacity(2 + 4 + response.len());
let len = stream.read_u16().await.unwrap();
let mut buf = vec![0u8; len as usize];
match stream.read_exact(&mut buf).await { let len = 4 + response.len();
Ok(bytes) => { write_buf.put_u16(len as u16);
let str = String::from_utf8(buf).unwrap(); write_buf.put_u32(packet_id);
println!("read bytes: {}", str) write_buf.put_slice(&response);
},
Err(err) => { // 写回 stdout
println!("read data get error: {}", err) stdout.write_all(&write_buf).await?;
} stdout.flush().await?;
}
} Ok(())
} }

View File

@ -1,125 +0,0 @@
use crate::codec::SerialConfig;
use tokio_serial::{SerialPortBuilderExt, SerialStream};
use tokio_modbus::{
client::rtu,
client::Context,
slave::Slave,
prelude::*,
};
pub (crate) struct ModbusClient {
stream: SerialStream
}
impl ModbusClient {
pub fn new(serial_config: SerialConfig) -> Self {
let port_name = String::from_utf8(serial_config.port).unwrap();
let parity = match serial_config.parity {
0x00 => tokio_serial::Parity::None,
0x01 => tokio_serial::Parity::Odd,
0x02 => tokio_serial::Parity::Even,
_ => tokio_serial::Parity::None
};
let stopbits = match serial_config.stopbits {
0x01 => tokio_serial::StopBits::One,
0x02 => tokio_serial::StopBits::Two,
_ => tokio_serial::StopBits::One
};
let builder = tokio_serial::new(port_name, serial_config.baudrate)
.data_bits(tokio_serial::DataBits::Eight)
.stop_bits(stopbits)
.parity(parity)
.timeout(std::time::Duration::from_millis(serial_config.timeout as u64));
// 2. 建立串口连接
let stream = builder.open_native_async().unwrap();
Self {
stream
}
}
pub async fn request(&mut self, slave_id: u8, address: u16, cnt: u16) -> Result<Vec<u16>, Box<dyn std::error::Error>> {
let slave = Slave(slave_id);
let mut ctx = rtu::attach_slave(&mut self.stream, slave);
// 根据 address 范围执行不同操作
match address {
// 保持寄存器Holding Registers: 40000-49999
40000..=49999 => {
let modbus_addr = address - 40000; // 转换为 Modbus 协议地址0-9999
let response = ctx.read_holding_registers(modbus_addr, cnt).await?;
println!("Holding Register {}: {:?}", modbus_addr, response);
match response {
Ok(bytes) => {
Ok(bytes)
}
Err(e) => {
Err(e.into())
}
}
}
// 输入寄存器Input Registers: 30000-39999
30000..=39999 => {
let modbus_addr = address - 30000;
let response = ctx.read_input_registers(modbus_addr, cnt).await?;
println!("Input Register {}: {:?}", modbus_addr, response);
match response {
Ok(bytes) => {
Ok(bytes)
}
Err(e) => {
Err(e.into())
}
}
}
// 离散输入Discrete Inputs: 10000-19999
10000..=19999 => {
let modbus_addr = address - 10000;
let response = ctx.read_discrete_inputs(modbus_addr, 1).await?;
println!("Discrete Input {}: {:?}", modbus_addr, response);
match response {
Ok(bytes) => {
Ok(Self::bool_to_u16(bytes[0]))
}
Err(e) => {
Err(e.into())
}
}
}
// 线圈Coils: 0-9999
0..=9999 => {
let response = ctx.read_coils(address, 1).await?;
println!("Coil {}: {:?}", address, response);
match response {
Ok(bytes) => {
Ok(Self::bool_to_u16(bytes[0]))
}
Err(e) => {
Err(e.into())
}
}
}
_ => {
return Err("Invalid Modbus address".into());
}
}
}
pub fn bool_to_u16(val: bool) -> Vec<u16> {
if val {
vec![1]
} else {
vec![0]
}
}
}