Asynchronous programming is essential for building efficient, scalable connectors. This guide covers Tokio patterns, concurrency strategies, and best practices for Strike48 connector development.
Why Async?
Strike48 connectors are inherently I/O-bound:
- Waiting for network responses
- Executing external tools
- Reading/writing files
- Database queries
Async programming allows handling thousands of concurrent operations without blocking threads.
Tokio Basics
The Tokio Runtime
Tokio is the async runtime used by Strike48 SDK:
#[tokio::main]
async fn main() -> Result<()> {
// Tokio runtime is automatically initialized
run_connector().await
}
Async Functions
// Async function definition
async fn scan_target(target: &str) -> Result<ScanResult> {
// Async operations with .await
let result = perform_scan(target).await?;
Ok(result)
}
// Calling async functions
#[tokio::main]
async fn main() {
let result = scan_target("192.168.1.1").await;
}
Futures and .await
use tokio::time::{sleep, Duration};
async fn delayed_scan(target: &str) -> Result<ScanResult> {
// Wait for 1 second (non-blocking)
sleep(Duration::from_secs(1)).await;
// Perform the scan
scan(target).await
}
Concurrency Patterns
Running Tasks in Parallel
Use tokio::spawn for concurrent execution:
use tokio::task;
async fn scan_multiple_targets(targets: Vec<String>) -> Result<Vec<ScanResult>> {
let mut handles = Vec::new();
// Spawn tasks for parallel execution
for target in targets {
let handle = task::spawn(async move {
scan_target(&target).await
});
handles.push(handle);
}
// Wait for all tasks to complete
let mut results = Vec::new();
for handle in handles {
match handle.await {
Ok(Ok(result)) => results.push(result),
Ok(Err(e)) => tracing::warn!("Scan failed: {}", e),
Err(e) => tracing::error!("Task panicked: {}", e),
}
}
Ok(results)
}
Using join! for Fixed Concurrency
When you have a fixed number of async operations:
use tokio::join;
async fn perform_recon(target: &str) -> Result<ReconResult> {
// Run multiple scans concurrently
let (port_scan, service_scan, vuln_scan) = join!(
scan_ports(target),
detect_services(target),
check_vulnerabilities(target),
);
Ok(ReconResult {
ports: port_scan?,
services: service_scan?,
vulnerabilities: vuln_scan?,
})
}
Using try_join! for Early Exit
Stop all operations if any fails:
use tokio::try_join;
async fn validate_and_scan(target: &str) -> Result<ScanResult> {
// All must succeed, or early exit
let (dns_result, ping_result, scan_result) = try_join!(
resolve_dns(target),
ping_target(target),
scan_ports(target),
)?;
Ok(ScanResult {
dns: dns_result,
ping: ping_result,
ports: scan_result,
})
}
Using select! for Racing Operations
Execute multiple operations, use the first to complete:
use tokio::select;
use tokio::time::{sleep, Duration};
async fn scan_with_timeout(target: &str, timeout: u64) -> Result<ScanResult> {
select! {
result = scan_ports(target) => {
result
}
_ = sleep(Duration::from_secs(timeout)) => {
Err(ConnectorError::Timeout { duration: timeout })
}
}
}
Concurrency Limiting
Using Semaphore
Limit concurrent operations to prevent resource exhaustion:
use tokio::sync::Semaphore;
use std::sync::Arc;
pub struct Scanner {
semaphore: Arc<Semaphore>,
}
impl Scanner {
pub fn new(max_concurrent: usize) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(max_concurrent)),
}
}
pub async fn scan_targets(&self, targets: Vec<String>) -> Vec<ScanResult> {
let mut handles = Vec::new();
for target in targets {
let semaphore = self.semaphore.clone();
let handle = tokio::spawn(async move {
// Acquire permit before scanning
let _permit = semaphore.acquire().await.unwrap();
// Only max_concurrent scans run at once
scan_target(&target).await
});
handles.push(handle);
}
// Collect results
let mut results = Vec::new();
for handle in handles {
if let Ok(Ok(result)) = handle.await {
results.push(result);
}
}
results
}
}
// Usage
let scanner = Scanner::new(10); // Max 10 concurrent scans
let results = scanner.scan_targets(targets).await;
Using FuturesUnordered
Process results as they complete:
use futures::stream::{FuturesUnordered, StreamExt};
async fn scan_targets_streaming(targets: Vec<String>) -> Result<Vec<ScanResult>> {
let mut futures = FuturesUnordered::new();
// Create futures for all targets
for target in targets {
futures.push(scan_target(target));
}
// Process results as they complete
let mut results = Vec::new();
while let Some(result) = futures.next().await {
match result {
Ok(scan_result) => {
tracing::info!("Scan completed: {}", scan_result.target);
results.push(scan_result);
}
Err(e) => tracing::warn!("Scan failed: {}", e),
}
}
Ok(results)
}
Channels for Communication
Using mpsc for Producer-Consumer
use tokio::sync::mpsc;
async fn scan_with_progress(
targets: Vec<String>,
) -> (mpsc::Receiver<ScanProgress>, tokio::task::JoinHandle<Result<Vec<ScanResult>>>) {
let (tx, rx) = mpsc::channel(100);
let handle = tokio::spawn(async move {
let mut results = Vec::new();
for (i, target) in targets.iter().enumerate() {
// Send progress update
let _ = tx.send(ScanProgress {
current: i + 1,
total: targets.len(),
target: target.clone(),
}).await;
// Perform scan
match scan_target(target).await {
Ok(result) => results.push(result),
Err(e) => tracing::warn!("Scan failed for {}: {}", target, e),
}
}
Ok(results)
});
(rx, handle)
}
// Usage
let (mut progress_rx, scan_handle) = scan_with_progress(targets).await;
// Receive progress updates
tokio::spawn(async move {
while let Some(progress) = progress_rx.recv().await {
println!("Progress: {}/{} - {}", progress.current, progress.total, progress.target);
}
});
// Wait for completion
let results = scan_handle.await??;
Using broadcast for Fan-Out
use tokio::sync::broadcast;
pub struct ScanCoordinator {
event_tx: broadcast::Sender<ScanEvent>,
}
impl ScanCoordinator {
pub fn new() -> Self {
let (tx, _) = broadcast::channel(100);
Self { event_tx: tx }
}
pub fn subscribe(&self) -> broadcast::Receiver<ScanEvent> {
self.event_tx.subscribe()
}
pub async fn scan_target(&self, target: String) -> Result<ScanResult> {
// Notify start
let _ = self.event_tx.send(ScanEvent::Started(target.clone()));
// Perform scan
let result = match perform_scan(&target).await {
Ok(r) => {
let _ = self.event_tx.send(ScanEvent::Completed(target.clone()));
r
}
Err(e) => {
let _ = self.event_tx.send(ScanEvent::Failed(target.clone(), e.to_string()));
return Err(e);
}
};
Ok(result)
}
}
// Multiple subscribers can listen
let coordinator = ScanCoordinator::new();
let mut subscriber1 = coordinator.subscribe();
let mut subscriber2 = coordinator.subscribe();
tokio::spawn(async move {
while let Ok(event) = subscriber1.recv().await {
println!("Subscriber 1: {:?}", event);
}
});
tokio::spawn(async move {
while let Ok(event) = subscriber2.recv().await {
println!("Subscriber 2: {:?}", event);
}
});
Blocking Operations
Never Block the Async Runtime
❌ Bad: Blocking the runtime
async fn bad_example() {
// This blocks the entire async runtime!
std::thread::sleep(Duration::from_secs(1));
}
✅ Good: Use async sleep
async fn good_example() {
// Non-blocking sleep
tokio::time::sleep(Duration::from_secs(1)).await;
}
spawn_blocking for CPU-Intensive Work
use tokio::task;
async fn parse_large_file(path: &str) -> Result<Vec<Record>> {
let path = path.to_string();
// Run CPU-intensive work in a thread pool
let records = task::spawn_blocking(move || {
// This runs on a dedicated thread pool
std::fs::read_to_string(&path)
.map(|content| parse_records(&content))
})
.await??;
Ok(records)
}
Wrapping Sync Code
async fn execute_nmap(target: &str) -> Result<String> {
let target = target.to_string();
// Run sync process in blocking thread pool
let output = tokio::task::spawn_blocking(move || {
std::process::Command::new("nmap")
.arg(&target)
.output()
})
.await??;
Ok(String::from_utf8_lossy(&output.stdout).to_string())
}
Timeouts and Cancellation
Adding Timeouts
use tokio::time::{timeout, Duration};
async fn scan_with_timeout(target: &str) -> Result<ScanResult> {
match timeout(Duration::from_secs(300), scan_target(target)).await {
Ok(result) => result,
Err(_) => Err(ConnectorError::Timeout { duration: 300 }),
}
}
Graceful Cancellation
use tokio::sync::watch;
use tokio::select;
pub struct CancellableScanner {
cancel_tx: watch::Sender<bool>,
}
impl CancellableScanner {
pub fn new() -> Self {
let (tx, _) = watch::channel(false);
Self { cancel_tx: tx }
}
pub async fn scan_targets(
&self,
targets: Vec<String>,
) -> Result<Vec<ScanResult>> {
let mut cancel_rx = self.cancel_tx.subscribe();
let mut results = Vec::new();
for target in targets {
select! {
result = scan_target(&target) => {
results.push(result?);
}
_ = cancel_rx.changed() => {
tracing::info!("Scan cancelled");
break;
}
}
}
Ok(results)
}
pub fn cancel(&self) {
let _ = self.cancel_tx.send(true);
}
}
State Management
Sharing State with Arc
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Clone)]
pub struct ScannerState {
results: Arc<RwLock<Vec<ScanResult>>>,
stats: Arc<RwLock<ScanStats>>,
}
impl ScannerState {
pub fn new() -> Self {
Self {
results: Arc::new(RwLock::new(Vec::new())),
stats: Arc::new(RwLock::new(ScanStats::default())),
}
}
pub async fn add_result(&self, result: ScanResult) {
let mut results = self.results.write().await;
results.push(result);
let mut stats = self.stats.write().await;
stats.total_scans += 1;
}
pub async fn get_stats(&self) -> ScanStats {
self.stats.read().await.clone()
}
}
// Usage: Clone and share across tasks
let state = ScannerState::new();
for target in targets {
let state = state.clone();
tokio::spawn(async move {
let result = scan_target(&target).await.unwrap();
state.add_result(result).await;
});
}
Using Mutex vs RwLock
use tokio::sync::{Mutex, RwLock};
// Use Mutex for short, exclusive access
let counter = Arc::new(Mutex::new(0));
{
let mut count = counter.lock().await;
*count += 1;
} // Lock released
// Use RwLock for many readers, few writers
let cache = Arc::new(RwLock::new(HashMap::new()));
// Many concurrent readers
let data = cache.read().await;
let value = data.get(&key);
// Exclusive writer
let mut data = cache.write().await;
data.insert(key, value);
Stream Processing
Processing Items as They Arrive
use futures::stream::{self, StreamExt};
async fn process_scan_stream(targets: Vec<String>) -> Result<()> {
// Create stream of targets
let stream = stream::iter(targets)
.map(|target| async move {
scan_target(&target).await
})
.buffer_unordered(10); // Process up to 10 concurrently
// Process results as they complete
tokio::pin!(stream);
while let Some(result) = stream.next().await {
match result {
Ok(scan_result) => {
tracing::info!("Processed: {}", scan_result.target);
// Handle result
}
Err(e) => tracing::warn!("Scan failed: {}", e),
}
}
Ok(())
}
Transforming Streams
use futures::stream::{self, StreamExt, TryStreamExt};
async fn scan_and_filter(targets: Vec<String>) -> Result<Vec<ScanResult>> {
stream::iter(targets)
.map(Ok)
.try_for_each_concurrent(5, |target| async move {
scan_target(&target).await
})
.await?;
// Or with transformations
let results: Vec<_> = stream::iter(targets)
.then(|target| scan_target(target))
.filter_map(|result| async move { result.ok() })
.filter(|result| async move { !result.open_ports.is_empty() })
.collect()
.await;
Ok(results)
}
Background Tasks
Long-Running Background Task
use tokio::sync::mpsc;
pub struct BackgroundScanner {
tx: mpsc::Sender<String>,
}
impl BackgroundScanner {
pub fn start() -> Self {
let (tx, mut rx) = mpsc::channel::<String>(100);
// Spawn background task
tokio::spawn(async move {
while let Some(target) = rx.recv().await {
match scan_target(&target).await {
Ok(result) => {
tracing::info!("Background scan completed: {}", target);
// Store result somewhere
}
Err(e) => {
tracing::error!("Background scan failed: {}", e);
}
}
}
});
Self { tx }
}
pub async fn queue_scan(&self, target: String) -> Result<()> {
self.tx.send(target).await
.map_err(|_| ConnectorError::ChannelClosed)?;
Ok(())
}
}
Periodic Tasks
use tokio::time::{interval, Duration};
async fn periodic_cleanup() {
let mut interval = interval(Duration::from_secs(3600));
loop {
interval.tick().await;
// Run cleanup every hour
if let Err(e) = cleanup_old_results().await {
tracing::error!("Cleanup failed: {}", e);
}
}
}
// Start background task
tokio::spawn(periodic_cleanup());
Best Practices
✅ Do
- Use async for I/O operations - Network, filesystem, database
- Limit concurrency - Use semaphores to prevent resource exhaustion
- Use spawn_blocking for CPU work - Don't block the runtime
- Handle cancellation - Support graceful shutdown
- Add timeouts - Prevent hanging operations
- Use channels for communication - Don't share mutable state directly
- Process streams incrementally - Don't buffer everything in memory
- Clone Arc, not data - Share ownership efficiently
❌ Don't
- Don't use std::thread::sleep - Use tokio::time::sleep
- Don't hold locks across .await - Causes deadlocks
- Don't spawn unlimited tasks - Use bounded concurrency
- Don't panic in async code - Use Result for errors
- Don't block without spawn_blocking - Blocks entire runtime
- Don't ignore task join handles - Tasks can panic silently
Common Pitfalls
Holding Locks Across Await
❌ Bad: Deadlock risk
async fn bad() {
let mut data = mutex.lock().await;
expensive_async_operation().await; // Lock held during await!
data.insert("key", "value");
}
✅ Good: Release lock before await
async fn good() {
let value = {
let data = mutex.lock().await;
data.get("key").cloned()
}; // Lock released
let result = expensive_async_operation().await;
{
let mut data = mutex.lock().await;
data.insert("key", result);
} // Lock released
}
Spawning Too Many Tasks
❌ Bad: Unbounded concurrency
async fn bad(targets: Vec<String>) {
let mut handles = Vec::new();
for target in targets {
// Could spawn thousands of tasks!
handles.push(tokio::spawn(scan(target)));
}
// ...
}
✅ Good: Bounded concurrency
async fn good(targets: Vec<String>) {
let semaphore = Arc::new(Semaphore::new(10));
let mut handles = Vec::new();
for target in targets {
let sem = semaphore.clone();
handles.push(tokio::spawn(async move {
let _permit = sem.acquire().await;
scan(target).await
}));
}
// ...
}
Next Steps
- Building Your First Connector - Apply async patterns
- Error Handling - Handle async errors
- Testing Connectors - Test async code