Skip to content

Commit 5ca70a9

Browse files
authored
fix: Remove action timeouts everywhere (#368)
Changes * Remove action timeouts everywhere * Update action handling collectors so that they work properly with this setup Why? As per internal discussion Trials Performed * Tests in the QA document * Tried restarting and cancelling actions during various steps of the action flow
1 parent 919e200 commit 5ca70a9

21 files changed

+527
-993
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ tokio-stream = "0.1.15"
2222
tokio-util = { version = "0.7", features = ["codec", "time"] }
2323

2424
[profile.dev]
25-
opt-level = 1
25+
opt-level = 0
2626
debug = true
27-
strip = true
27+
strip = false
2828
panic = "unwind"
2929

3030
[profile.release]

tools/utils/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,5 @@ serde_json = { workspace = true }
2727
tokio = { workspace = true }
2828
tokio-stream = { workspace = true }
2929
tokio-util = { workspace = true }
30+
structopt = "0.3.26"
3031
uplink = { path = "../../uplink" }

tools/utils/src/wait_and_send.rs

+95-37
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
use futures_util::SinkExt;
22
use serde::{Deserialize, Serialize};
3-
use std::thread::sleep;
43
use std::time::{Duration, SystemTime, UNIX_EPOCH};
4+
use structopt::StructOpt;
55
use tokio::net::TcpStream;
6+
use tokio::select;
67
use tokio_stream::StreamExt;
78
use tokio_util::codec::{Framed, LinesCodec};
89
use uplink::Action;
@@ -18,46 +19,103 @@ struct Response {
1819
errors: Vec<String>,
1920
}
2021

22+
#[derive(StructOpt, Debug)]
23+
pub struct CommandLine {
24+
#[structopt(short = "s", default_value = "Completed")]
25+
pub final_status: String,
26+
#[structopt(short = "w", default_value = "3")]
27+
pub wait_time: u64,
28+
#[structopt(short = "p")]
29+
pub port: String,
30+
}
31+
32+
async fn respond<'a>(
33+
framed: &'a mut Framed<TcpStream, LinesCodec>,
34+
idx: &mut u32,
35+
action_id: &str,
36+
state: &str,
37+
progress: u8,
38+
) {
39+
*idx += 1;
40+
let response = Response {
41+
stream: "action_status".to_string(),
42+
sequence: *idx,
43+
timestamp: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64,
44+
id: action_id.to_string(),
45+
state: state.to_string(),
46+
progress,
47+
errors: vec![],
48+
};
49+
let resp = serde_json::to_string(&response).unwrap();
50+
println!("Sending: {resp}");
51+
framed.send(resp).await.unwrap();
52+
}
53+
54+
struct ActionState {
55+
id: String,
56+
response_counter: u32,
57+
}
58+
2159
#[tokio::main]
2260
async fn main() {
23-
let final_state = std::env::args().nth(1).unwrap_or_else(|| {
24-
println!("Using default value \"Completed\"");
25-
"Completed".to_string()
26-
});
61+
let args: CommandLine = StructOpt::from_args();
2762
let port = std::env::args().nth(2).unwrap_or_else(|| "127.0.0.1:5050".to_string());
28-
let stream = TcpStream::connect(port).await.unwrap();
63+
let mut stream = TcpStream::connect(port.as_str()).await.unwrap();
2964
let mut framed = Framed::new(stream, LinesCodec::new());
30-
async fn respond<'a>(
31-
framed: &'a mut Framed<TcpStream, LinesCodec>,
32-
idx: &mut u32,
33-
action_id: &str,
34-
state: &str,
35-
progress: u8,
36-
) {
37-
*idx += 1;
38-
let response = Response {
39-
stream: "action_status".to_string(),
40-
sequence: *idx,
41-
timestamp: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64,
42-
id: action_id.to_string(),
43-
state: state.to_string(),
44-
progress,
45-
errors: vec![],
46-
};
47-
let resp = serde_json::to_string(&response).unwrap();
48-
println!("Sending: {resp}");
49-
framed.send(resp).await.unwrap();
50-
}
51-
let mut idx = 0;
65+
66+
let mut curr_action: Option<ActionState> = None;
67+
5268
loop {
53-
let action_s = framed.next().await.unwrap().unwrap();
54-
println!("Received: {action_s}");
55-
let action = serde_json::from_str::<Action>(action_s.as_str()).unwrap();
56-
sleep(Duration::from_secs(3));
57-
respond(&mut framed, &mut idx, action.action_id.as_str(), "Working", 33).await;
58-
sleep(Duration::from_secs(3));
59-
respond(&mut framed, &mut idx, action.action_id.as_str(), "Working", 66).await;
60-
sleep(Duration::from_secs(3));
61-
respond(&mut framed, &mut idx, action.action_id.as_str(), final_state.as_str(), 100).await;
69+
select! {
70+
action_t = framed.next() => {
71+
let action_s = if let Some(Ok(action_s)) = action_t {
72+
action_s
73+
} else {
74+
tokio::time::sleep(Duration::from_secs(1)).await;
75+
if let Ok(s) = TcpStream::connect(port.as_str()).await {
76+
stream = s;
77+
framed = Framed::new(stream, LinesCodec::new());
78+
}
79+
continue;
80+
};
81+
println!("Received: {action_s}");
82+
let action = match serde_json::from_str::<Action>(action_s.as_str()) {
83+
Err(e) => {
84+
println!("invalid payload: {e}");
85+
continue;
86+
}
87+
Ok(s) => s,
88+
};
89+
if curr_action.is_some() {
90+
let curr_action_ref = curr_action.as_mut().unwrap();
91+
if action.name == "cancel_action" {
92+
respond(&mut framed, &mut curr_action_ref.response_counter, action.action_id.as_str(), "Completed", 100).await;
93+
respond(&mut framed, &mut curr_action_ref.response_counter, curr_action_ref.id.as_str(), "Failed", 100).await;
94+
curr_action = None;
95+
} else {
96+
respond(&mut framed, &mut curr_action_ref.response_counter, action.action_id.as_str(), "Failed", 100).await;
97+
}
98+
} else {
99+
curr_action = Some(ActionState {
100+
id: action.action_id.clone(),
101+
response_counter: 0,
102+
});
103+
let curr_action = curr_action.as_mut().unwrap();
104+
respond(&mut framed, &mut curr_action.response_counter, action.action_id.as_str(), "ReceivedByClient", 0).await;
105+
}
106+
}
107+
_ = tokio::time::sleep(Duration::from_secs(args.wait_time)), if curr_action.is_some() => {
108+
let curr_action_ref = curr_action.as_mut().unwrap();
109+
if curr_action_ref.response_counter == 2 {
110+
respond(&mut framed, &mut curr_action_ref.response_counter, curr_action_ref.id.as_str(), args.final_status.as_str(), 100).await;
111+
} else {
112+
let progress= (33 * curr_action_ref.response_counter) as u8;
113+
respond(&mut framed, &mut curr_action_ref.response_counter, curr_action_ref.id.as_str(), "Working", progress).await;
114+
}
115+
if curr_action_ref.response_counter == 3 {
116+
curr_action = None;
117+
}
118+
}
119+
}
62120
}
63121
}

0 commit comments

Comments
 (0)