diff --git a/src/consume.rs b/src/consume.rs index d512f9a..3cf9993 100644 --- a/src/consume.rs +++ b/src/consume.rs @@ -9,53 +9,61 @@ use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList}; use std::time::Duration; use uuid::Uuid; +pub struct ConsumeConfig<'a> { + pub topic: &'a BrokerAndTopic, + pub partition: Option, + pub filter: &'a Option, + pub num_messages: Option, + pub offset: Option, + pub last: Option, + pub timestamp: Option, + pub key: bool, + pub terse: bool, + pub kafka_config: Option>, +} + #[allow(clippy::too_many_arguments)] -pub fn consume( - topic: &BrokerAndTopic, - partition: Option, - filter: &Option, - num_messages: Option, - offset: Option, - last: Option, - timestamp: Option, - kafka_config: Option>, -) { +pub fn consume(config: &ConsumeConfig) { debug!( "Listening to topic: {} partition {:?} on broker {}:{}, filtering {}", - topic.topic, - partition, - topic.host, - topic.port, - filter.as_deref().unwrap_or("none") + config.topic.topic, + config.partition, + config.topic.host, + config.topic.port, + config.filter.as_deref().unwrap_or("none") ); - let mut config = ClientConfig::new(); - config.set("group.id", Uuid::new_v4().to_string()); - config.set("bootstrap.servers", topic.broker()); + let mut client_config = ClientConfig::new(); + client_config.set("group.id", Uuid::new_v4().to_string()); + client_config.set("bootstrap.servers", config.topic.broker()); - if let Some(kafka_options) = kafka_config { + if let Some(kafka_options) = &config.kafka_config { for option in kafka_options { println!( "Setting Kafka config option {}={}", option.key, option.value ); - config.set(&option.key, &option.value); + client_config.set(&option.key, &option.value); } } - let consumer: BaseConsumer = config.create().expect("Base creation failed"); + let consumer: BaseConsumer = client_config.create().expect("Base creation failed"); let start: Option; let (low_watermark, high_watermark) = consumer - .fetch_watermarks(&topic.topic, partition.unwrap_or(0), Duration::from_secs(1)) - .unwrap_or_else(|_| panic!("Failed to get watermarks for topic {}", topic.topic)); + .fetch_watermarks( + &config.topic.topic, + config.partition.unwrap_or(0), + Duration::from_secs(1), + ) + .unwrap_or_else(|_| panic!("Failed to get watermarks for topic {}", config.topic.topic)); let num_messages_on_topic = high_watermark - low_watermark; - if let Some(_timestamp) = timestamp { + if let Some(_timestamp) = config.timestamp { let mut tpl = TopicPartitionList::new(); tpl.add_partition_offset( - &topic.topic, - partition.unwrap_or(0), + &config.topic.topic, + config.partition.unwrap_or(0), Offset::Offset(_timestamp), ) .expect("Can't add partition to consumer with timestamp"); @@ -68,13 +76,13 @@ pub fn consume( .expect("No topics found for timestamp") .offset(), ); - } else if let Some(_offset) = offset { + } else if let Some(_offset) = config.offset { assert!( _offset >= low_watermark && _offset <= high_watermark, "offset ({_offset:?}) must be between high ({high_watermark}) and low({low_watermark}) watermarks" ); start = Some(Offset::Offset(_offset)); - } else if let Some(last_num_messages) = last { + } else if let Some(last_num_messages) = config.last { assert!( last_num_messages <= num_messages_on_topic, "Cannot consume {last_num_messages:?} messages from a topic which only has {num_messages_on_topic} messages" @@ -87,26 +95,26 @@ pub fn consume( if let Some(_start) = start { info!("Starting at {_start:?}"); let mut tpl = TopicPartitionList::new(); - tpl.add_partition_offset(&topic.topic, partition.unwrap_or(0), _start) + tpl.add_partition_offset(&config.topic.topic, config.partition.unwrap_or(0), _start) .unwrap_or_else(|_| panic!("Failed to set partition offset to {:?}", _start)); consumer.assign(&tpl).expect("Failed to assign to topic"); } consumer - .subscribe(&[&topic.topic]) - .unwrap_or_else(|_| panic!("Failed to subscribe to topic {}", topic.topic)); + .subscribe(&[&config.topic.topic]) + .unwrap_or_else(|_| panic!("Failed to subscribe to topic {}", config.topic.topic)); let mut counter = 0; for message in consumer.iter() { match message { Ok(message) => { - if partition.is_some() && Some(message.partition()) != partition { + if config.partition.is_some() && Some(message.partition()) != config.partition { continue; } match message.payload() { Some(p) => { - if let Some(f) = filter { + if let Some(f) = config.filter { if let Some(schema_id) = get_schema_id(p) && schema_id != f.to_bytes() { @@ -114,9 +122,22 @@ pub fn consume( } debug!("Message has no schema id, ignoring filter") } - + print!("[partition={}", message.partition()); + if config.key { + print!(" key={:?}", message.key().unwrap_or(b"")); + } + print!("] "); match deserialize_message(p) { - Ok(d) => println!("{d:?}"), + Ok(d) => { + if config.terse { + let schema = get_schema_id(p) + .and_then(|s| str::from_utf8(s).ok()) + .unwrap_or("invalid"); + println!("{schema} ({} bytes)", p.len()) + } else { + println!("{d:?}") + } + } Err(e) => error!("Failed to deserialize message: {e:?}"), } } @@ -132,7 +153,7 @@ pub fn consume( } } counter += 1; - if Some(counter) == num_messages || Some(counter) == last { + if Some(counter) == config.num_messages || Some(counter) == config.last { println!("Reached {} messages, exiting", counter); break; } diff --git a/src/main.rs b/src/main.rs index 9c54818..fa13210 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ mod sniff; use crate::cli_utils::BrokerAndOptionalTopic; use crate::cli_utils::KafkaOption; +use crate::consume::ConsumeConfig; use crate::count::count; use crate::howl::{EventMessageConfig, HowlConfig, howl}; use crate::sniff::sniff; @@ -45,7 +46,13 @@ enum Commands { /// Print last x messages on topic #[arg(short, long, conflicts_with_all = ["offset","timestamp","messages","filter"])] last: Option, - // Additonal command line arguments + /// Show message key + #[arg(long, action=clap::ArgAction::SetTrue)] + key: bool, + /// Print using terse format (just schema ID and length) + #[arg(long, action=clap::ArgAction::SetTrue)] + terse: bool, + /// Additonal Kafka options #[arg(short = 'X', long)] kafka_config: Option>, }, @@ -126,17 +133,21 @@ async fn main() { offset, last, timestamp, + key, + terse, kafka_config, - } => consume::consume( - &topic, + } => consume::consume(&ConsumeConfig { + topic: &topic, partition, - &filter, - messages, + filter: &filter, + num_messages: messages, offset, last, timestamp, + key, + terse, kafka_config, - ), + }), Commands::Sniff { broker, kafka_config,