@@ -33,6 +33,7 @@ use chainhook_sdk::observer::{
33
33
} ;
34
34
use chainhook_sdk:: types:: { BitcoinBlockData , BlockIdentifier } ;
35
35
use chainhook_sdk:: utils:: Context ;
36
+ use crossbeam_channel:: select;
36
37
use crossbeam_channel:: unbounded;
37
38
use dashmap:: DashMap ;
38
39
use fxhash:: FxHasher ;
@@ -132,9 +133,9 @@ impl Service {
132
133
133
134
// Sidecar channels setup
134
135
let ( observer_command_tx, observer_command_rx) = channel ( ) ;
135
- let ( block_mutator_in_tx, block_mutator_in_rx) = channel ( ) ;
136
- let ( block_mutator_out_tx, block_mutator_out_rx) = channel ( ) ;
137
- let ( chain_event_notifier_tx, chain_event_notifier_rx) = channel ( ) ;
136
+ let ( block_mutator_in_tx, block_mutator_in_rx) = crossbeam_channel :: unbounded ( ) ;
137
+ let ( block_mutator_out_tx, block_mutator_out_rx) = crossbeam_channel :: unbounded ( ) ;
138
+ let ( chain_event_notifier_tx, chain_event_notifier_rx) = crossbeam_channel :: unbounded ( ) ;
138
139
let observer_sidecar = ObserverSidecar {
139
140
bitcoin_blocks_mutator : Some ( ( block_mutator_in_tx, block_mutator_out_rx) ) ,
140
141
bitcoin_chain_event_notifier : Some ( chain_event_notifier_tx) ,
@@ -198,46 +199,27 @@ impl Service {
198
199
let config = self . config . clone ( ) ;
199
200
let cache_l2 = traversals_cache. clone ( ) ;
200
201
201
- let _ = hiro_system_kit:: thread_named ( "Sidecar block mutator" ) . spawn ( move || loop {
202
- let ( mut blocks_to_mutate, blocks_ids_to_rollback) = match block_mutator_in_rx. recv ( ) {
203
- Ok ( block) => block,
204
- Err ( e) => {
205
- error ! (
206
- ctx. expect_logger( ) ,
207
- "Error: broken channel {}" ,
208
- e. to_string( )
209
- ) ;
210
- break ;
211
- }
212
- } ;
213
- chainhook_sidecar_mutate_blocks (
214
- & mut blocks_to_mutate,
215
- & blocks_ids_to_rollback,
216
- & cache_l2,
217
- & config,
218
- & ctx,
219
- ) ;
220
- let _ = block_mutator_out_tx. send ( blocks_to_mutate) ;
221
- } ) ;
222
-
223
- let ctx = self . ctx . clone ( ) ;
224
- let config = self . config . clone ( ) ;
225
- let _ =
226
- hiro_system_kit:: thread_named ( "Chain event notification handler" ) . spawn ( move || loop {
227
- let command = match chain_event_notifier_rx. recv ( ) {
228
- Ok ( cmd) => cmd,
229
- Err ( e) => {
230
- error ! (
231
- ctx. expect_logger( ) ,
232
- "Error: broken channel {}" ,
233
- e. to_string( )
202
+ let _ = hiro_system_kit:: thread_named ( "Observer Sidecar Runloop" ) . spawn ( move || loop {
203
+ select ! {
204
+ recv( block_mutator_in_rx) -> msg => {
205
+ if let Ok ( ( mut blocks_to_mutate, blocks_ids_to_rollback) ) = msg {
206
+ chainhook_sidecar_mutate_blocks(
207
+ & mut blocks_to_mutate,
208
+ & blocks_ids_to_rollback,
209
+ & cache_l2,
210
+ & config,
211
+ & ctx,
234
212
) ;
235
- break ;
213
+ let _ = block_mutator_out_tx . send ( blocks_to_mutate ) ;
236
214
}
237
- } ;
238
-
239
- chainhook_sidecar_mutate_ordhook_db ( command, & config, & ctx)
240
- } ) ;
215
+ }
216
+ recv( chain_event_notifier_rx) -> msg => {
217
+ if let Ok ( command) = msg {
218
+ chainhook_sidecar_mutate_ordhook_db( command, & config, & ctx)
219
+ }
220
+ }
221
+ }
222
+ } ) ;
241
223
242
224
loop {
243
225
let event = match observer_event_rx. recv ( ) {
0 commit comments