Skip to content

Commit 9c9e2a5

Browse files
committed
feat: introduce stream-indexing flag
1 parent 1c3a5dd commit 9c9e2a5

File tree

3 files changed

+28
-7
lines changed

3 files changed

+28
-7
lines changed

components/ordhook-cli/src/cli/mod.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,9 @@ struct StartCommand {
345345
/// Check blocks integrity
346346
#[clap(long = "check-blocks-integrity")]
347347
pub block_integrity_check: bool,
348+
/// Stream indexing to observers
349+
#[clap(long = "stream-indexing")]
350+
pub stream_indexing_to_observers: bool,
348351
}
349352

350353
#[derive(Subcommand, PartialEq, Clone, Debug)]
@@ -758,7 +761,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
758761

759762
let mut service = Service::new(config, ctx.clone());
760763
return service
761-
.run(predicates, None, cmd.block_integrity_check)
764+
.run(predicates, None, cmd.block_integrity_check, cmd.stream_indexing_to_observers)
762765
.await;
763766
}
764767
},

components/ordhook-core/src/service/mod.rs

+23-5
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::db::{
2323
find_last_block_inserted, find_missing_blocks, run_compaction,
2424
update_sequence_metadata_with_block,
2525
};
26+
use crate::ord::chain;
2627
use crate::scan::bitcoin::process_block_with_predicates;
2728
use crate::service::http_api::start_predicate_api_server;
2829
use crate::service::observers::{
@@ -33,7 +34,7 @@ use crate::service::observers::{
3334
use crate::service::runloops::start_bitcoin_scan_runloop;
3435
use chainhook_sdk::chainhooks::bitcoin::BitcoinChainhookOccurrencePayload;
3536
use chainhook_sdk::chainhooks::types::{
36-
BitcoinChainhookSpecification, ChainhookFullSpecification, ChainhookSpecification,
37+
BitcoinChainhookSpecification, ChainhookConfig, ChainhookFullSpecification, ChainhookSpecification
3738
};
3839
use chainhook_sdk::observer::{
3940
start_event_observer, BitcoinBlockDataCached, DataHandlerEvent, EventObserverConfig,
@@ -63,17 +64,33 @@ impl Service {
6364

6465
pub async fn run(
6566
&mut self,
66-
predicates: Vec<BitcoinChainhookSpecification>,
67+
observer_specs: Vec<BitcoinChainhookSpecification>,
6768
predicate_activity_relayer: Option<
6869
crossbeam_channel::Sender<BitcoinChainhookOccurrencePayload>,
6970
>,
7071
check_blocks_integrity: bool,
72+
stream_indexing_to_observers: bool,
7173
) -> Result<(), String> {
7274
let mut event_observer_config = self.config.get_event_observer_config();
7375

76+
let block_post_processor = if stream_indexing_to_observers && !observer_specs.is_empty() {
77+
let mut chainhook_config: ChainhookConfig = ChainhookConfig::new();
78+
let specs = observer_specs.clone();
79+
for mut observer_spec in specs.into_iter() {
80+
observer_spec.enabled = true;
81+
let spec = ChainhookSpecification::Bitcoin(observer_spec);
82+
chainhook_config.register_specification(spec)?;
83+
}
84+
event_observer_config.chainhook_config = Some(chainhook_config);
85+
let block_tx = start_observer_forwarding(&event_observer_config, &self.ctx);
86+
Some(block_tx)
87+
} else {
88+
None
89+
};
90+
7491
// Catch-up with chain tip
7592
let chain_tip_height = self
76-
.catch_up_with_chain_tip(false, check_blocks_integrity)
93+
.catch_up_with_chain_tip(false, check_blocks_integrity, block_post_processor)
7794
.await?;
7895
info!(
7996
self.ctx.expect_logger(),
@@ -98,7 +115,7 @@ impl Service {
98115
// 2) catch-up outdated observers by dispatching replays
99116
let (chainhook_config, outdated_observers) =
100117
create_and_consolidate_chainhook_config_with_predicates(
101-
predicates,
118+
observer_specs,
102119
chain_tip_height,
103120
predicate_activity_relayer.is_some(),
104121
&self.config,
@@ -446,6 +463,7 @@ impl Service {
446463
&mut self,
447464
rebuild_from_scratch: bool,
448465
compact_and_check_rocksdb_integrity: bool,
466+
block_post_processor: Option<crossbeam_channel::Sender<BitcoinBlockData>>,
449467
) -> Result<u64, String> {
450468
{
451469
if compact_and_check_rocksdb_integrity {
@@ -515,7 +533,7 @@ impl Service {
515533
)?;
516534
}
517535
}
518-
self.update_state(None).await
536+
self.update_state(block_post_processor).await
519537
}
520538

521539
pub async fn update_state(

components/ordhook-sdk-js/src/ordinals_indexer.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ impl OrdinalsIndexingRunloop {
159159
match cmd {
160160
IndexerCommand::StreamBlocks => {
161161
// We start the service as soon as the start() method is being called.
162-
let future = service.catch_up_with_chain_tip(false, true);
162+
let future = service.catch_up_with_chain_tip(false, true, None);
163163
let _ = hiro_system_kit::nestable_block_on(future).expect("unable to start indexer");
164164
let future = service.start_event_observer(observer_sidecar);
165165
let (command_tx, event_rx) =

0 commit comments

Comments
 (0)