@@ -7,7 +7,8 @@ use std::{
7
7
8
8
use chainhook_sdk:: {
9
9
indexer:: bitcoin:: {
10
- build_http_client, download_block_with_retry, retrieve_block_hash_with_retry, try_download_block_bytes_with_retry, parse_fetched_block, download_block, parse_downloaded_block,
10
+ build_http_client, download_block, download_block_with_retry, parse_downloaded_block,
11
+ parse_fetched_block, retrieve_block_hash_with_retry, try_download_block_bytes_with_retry,
11
12
} ,
12
13
types:: {
13
14
BitcoinBlockData , BlockIdentifier , OrdinalInscriptionRevealData ,
@@ -21,18 +22,21 @@ use rand::{thread_rng, Rng};
21
22
22
23
use rocksdb:: DB ;
23
24
use rusqlite:: { Connection , OpenFlags , ToSql , Transaction } ;
24
- use tokio:: task:: JoinSet ;
25
25
use std:: io:: Cursor ;
26
26
use std:: io:: { Read , Write } ;
27
27
use threadpool:: ThreadPool ;
28
+ use tokio:: task:: JoinSet ;
28
29
29
30
use chainhook_sdk:: {
30
31
indexer:: bitcoin:: BitcoinBlockFullBreakdown , observer:: BitcoinConfig , utils:: Context ,
31
32
} ;
32
33
33
- use crate :: { hord:: { self , HordConfig } , config:: Config } ;
34
34
use crate :: hord:: { new_traversals_lazy_cache, update_hord_db_and_augment_bitcoin_block} ;
35
35
use crate :: ord:: { height:: Height , sat:: Sat } ;
36
+ use crate :: {
37
+ config:: Config ,
38
+ hord:: { self , HordConfig } ,
39
+ } ;
36
40
37
41
fn get_default_hord_db_file_path ( base_dir : & PathBuf ) -> PathBuf {
38
42
let mut destination_path = base_dir. clone ( ) ;
@@ -973,6 +977,10 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
973
977
} ;
974
978
let _ = block_data_tx. send ( res) ;
975
979
} ) ;
980
+ // TODO: remove this join?
981
+ if block_height >= ordinal_computing_height {
982
+ let _ = retrieve_block_data_pool. join ( ) ;
983
+ }
976
984
}
977
985
let res = retrieve_block_data_pool. join ( ) ;
978
986
res
@@ -994,6 +1002,9 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
994
1002
block_data,
995
1003
) ) ) ;
996
1004
} ) ;
1005
+ if block_height >= ordinal_computing_height {
1006
+ let _ = compress_block_data_pool. join ( ) ;
1007
+ }
997
1008
}
998
1009
let res = compress_block_data_pool. join ( ) ;
999
1010
res
@@ -1018,10 +1029,15 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
1018
1029
1019
1030
// Should we start look for inscriptions data in blocks?
1020
1031
if raw_block. height as u64 >= ordinal_computing_height {
1021
- if cursor == 0 {
1032
+ if ( cursor as u64 ) < ordinal_computing_height {
1022
1033
cursor = raw_block. height ;
1023
1034
}
1024
- ctx. try_log ( |logger| slog:: info!( logger, "Queueing compacted block #{block_height}" , ) ) ;
1035
+ ctx. try_log ( |logger| {
1036
+ slog:: info!(
1037
+ logger,
1038
+ "Queueing compacted block #{block_height} (#{cursor})" ,
1039
+ )
1040
+ } ) ;
1025
1041
// Is the action of processing a block allows us
1026
1042
// to process more blocks present in the inbox?
1027
1043
inbox. insert ( raw_block. height , raw_block) ;
@@ -1871,7 +1887,6 @@ pub async fn rebuild_rocks_db(
1871
1887
let number_of_blocks_to_process = end_block - start_block + 1 ;
1872
1888
let ( block_req_lim, block_process_lim) = ( 128 , 128 ) ;
1873
1889
1874
-
1875
1890
let ( block_data_tx, block_data_rx) = crossbeam_channel:: bounded ( block_req_lim) ;
1876
1891
let compress_block_data_pool = ThreadPool :: new ( hord_config. ingestion_thread_max ) ;
1877
1892
let ( block_compressed_tx, block_compressed_rx) = crossbeam_channel:: bounded ( block_process_lim) ;
@@ -1902,7 +1917,7 @@ pub async fn rebuild_rocks_db(
1902
1917
1903
1918
let _ = hiro_system_kit:: thread_named ( "Block data compression" )
1904
1919
. spawn ( move || {
1905
- while let Ok ( Some ( block_bytes) ) = block_data_rx. recv ( ) {
1920
+ while let Ok ( Some ( block_bytes) ) = block_data_rx. recv ( ) {
1906
1921
let block_compressed_tx_moved = block_compressed_tx. clone ( ) ;
1907
1922
compress_block_data_pool. execute ( move || {
1908
1923
let block_data = parse_downloaded_block ( block_bytes) . unwrap ( ) ;
@@ -1919,43 +1934,46 @@ pub async fn rebuild_rocks_db(
1919
1934
let res = compress_block_data_pool. join ( ) ;
1920
1935
res
1921
1936
} )
1922
- . expect ( "unable to spawn thread" ) ;
1937
+ . expect ( "unable to spawn thread" ) ;
1923
1938
1924
- let blocks_db_rw =
1925
- open_readwrite_hord_db_conn_rocks_db ( & config. expected_cache_path ( ) , & ctx) ?;
1939
+ let blocks_db_rw = open_readwrite_hord_db_conn_rocks_db ( & config. expected_cache_path ( ) , & ctx) ?;
1926
1940
let cloned_ctx = ctx. clone ( ) ;
1927
1941
let ingestion_thread = hiro_system_kit:: thread_named ( "Block data ingestion" )
1928
1942
. spawn ( move || {
1929
1943
let mut blocks_stored = 0 ;
1930
1944
let mut num_writes = 0 ;
1931
-
1932
- while let Ok ( Some ( ( block_height, compacted_block, _raw_block) ) ) = block_compressed_rx. recv ( ) {
1945
+
1946
+ while let Ok ( Some ( ( block_height, compacted_block, _raw_block) ) ) =
1947
+ block_compressed_rx. recv ( )
1948
+ {
1933
1949
insert_entry_in_blocks ( block_height, & compacted_block, & blocks_db_rw, & cloned_ctx) ;
1934
1950
blocks_stored += 1 ;
1935
1951
num_writes += 1 ;
1936
-
1952
+
1937
1953
// In the context of ordinals, we're constrained to process blocks sequentially
1938
1954
// Blocks are processed by a threadpool and could be coming out of order.
1939
1955
// Inbox block for later if the current block is not the one we should be
1940
1956
// processing.
1941
-
1957
+
1942
1958
// Should we start look for inscriptions data in blocks?
1943
- cloned_ctx. try_log ( |logger| slog:: info!( logger, "Storing compacted block #{block_height}" , ) ) ;
1944
-
1959
+ cloned_ctx. try_log ( |logger| {
1960
+ slog:: info!( logger, "Storing compacted block #{block_height}" , )
1961
+ } ) ;
1962
+
1945
1963
if blocks_stored == number_of_blocks_to_process {
1946
1964
cloned_ctx. try_log ( |logger| {
1947
1965
slog:: info!(
1948
1966
logger,
1949
1967
"Local block storage successfully seeded with #{blocks_stored} blocks"
1950
1968
)
1951
1969
} ) ;
1952
-
1970
+
1953
1971
// match guard.report().build() {
1954
1972
// Ok(report) => {
1955
1973
// ctx.try_log(|logger| {
1956
1974
// slog::info!(logger, "Generating report");
1957
1975
// });
1958
-
1976
+
1959
1977
// let file = std::fs::File::create("hord-perf.svg").unwrap();
1960
1978
// report.flamegraph(file).unwrap();
1961
1979
// }
@@ -1966,7 +1984,7 @@ pub async fn rebuild_rocks_db(
1966
1984
// }
1967
1985
// }
1968
1986
}
1969
-
1987
+
1970
1988
if num_writes % 128 == 0 {
1971
1989
cloned_ctx. try_log ( |logger| {
1972
1990
slog:: info!( logger, "Flushing DB to disk ({num_writes} inserts)" ) ;
@@ -1979,20 +1997,20 @@ pub async fn rebuild_rocks_db(
1979
1997
num_writes = 0 ;
1980
1998
}
1981
1999
}
1982
-
2000
+
1983
2001
if let Err ( e) = blocks_db_rw. flush ( ) {
1984
2002
cloned_ctx. try_log ( |logger| {
1985
2003
slog:: error!( logger, "{}" , e. to_string( ) ) ;
1986
2004
} ) ;
1987
2005
}
1988
2006
( )
1989
- } ) . expect ( "unable to spawn thread" ) ;
2007
+ } )
2008
+ . expect ( "unable to spawn thread" ) ;
1990
2009
1991
2010
while let Some ( res) = set. join_next ( ) . await {
1992
2011
let block = res. unwrap ( ) . unwrap ( ) ;
1993
2012
1994
- let _ = block_data_tx
1995
- . send ( Some ( block) ) ;
2013
+ let _ = block_data_tx. send ( Some ( block) ) ;
1996
2014
1997
2015
if let Some ( block_height) = block_heights. pop_front ( ) {
1998
2016
let config = moved_config. clone ( ) ;
0 commit comments