@@ -735,120 +735,121 @@ void check_lag_and_throttle(time_t time_now) {
735735 }
736736 }
737737
738- // Replication lag above threshold, find an entity to throttle
739- mysql_mutex_lock (&LOCK_global_write_statistics);
740- if (global_write_statistics_map.size () == 0 ) {
741- // no stats collected so far
742- mysql_mutex_unlock (&LOCK_global_write_statistics);
743- sql_print_information (" [Write Throttling]No write statistics." );
744- return ;
745- }
746-
747- // write_stats_frequency may be updated dynamically. Caching it for the
748- // logic below
749- ulong write_stats_frequency_cached = write_stats_frequency;
750- if (write_stats_frequency_cached == 0 ) {
751- return ;
752- }
753-
754- // Find latest write_statistics time bucket that is complete.
755- // Example - For write_stats_frequency=6s, At t=8s, this method should
756- // return write_stats bucket for stats between t=0 to t=6 i.e. bucket_key=0
757- // and not bucket_key=6 which is incomplete and only has 2s worth of
758- // write_stats data;
759- int time_bucket_key = time_now - (time_now % write_stats_frequency_cached) -
760- write_stats_frequency_cached;
761- auto latest_write_stats_iter = global_write_statistics_map.begin ();
762-
763- // For testing purpose, force use the latest write stats bucket for culprit
764- // analysis
765- bool dbug_skip_last_complete_bucket_check = false ;
766- DBUG_EXECUTE_IF (" dbug.skip_last_complete_bucket_check" ,
767- { dbug_skip_last_complete_bucket_check = true ; });
768-
769- if (!dbug_skip_last_complete_bucket_check) {
770- if (latest_write_stats_iter->first != time_bucket_key) {
771- // move to the second from front time bucket
772- ++latest_write_stats_iter;
773- }
774- if (latest_write_stats_iter == global_write_statistics_map.end () ||
775- latest_write_stats_iter->first != time_bucket_key) {
776- // no complete write statistics bucket for analysis
777- // reset currently monitored entity, if any, as there's been a
778- // significant gap in time since we last did culprit analysis. It is
779- // outdated.
780- currently_monitored_entity.resetHits ();
781- if (currently_monitored_entity.dimension > WTR_DIM_UNKNOWN ) {
782- sql_print_information (
783- " [Write Throttling]Resetting monitored entity hit count. "
784- " dim:%s, name:%s, count:%u" ,
785- WRITE_STATS_TYPE_STRING [currently_monitored_entity.dimension ]
786- .c_str (),
787- currently_monitored_entity.name .c_str (),
788- currently_monitored_entity.hits );
789- } else {
790- sql_print_information (
791- " [Write Throttling]Resetting monitored entity hit count. "
792- " Entity has been previously reset." );
793- }
794- mysql_mutex_unlock (&LOCK_global_write_statistics);
795- return ;
796- }
797- }
798- TIME_BUCKET_STATS &latest_write_stats = latest_write_stats_iter->second ;
799-
800- std::vector<enum_wtr_dimension> &dimensions =
801- write_throttle_permissible_dimensions_in_order;
802-
803738 bool is_fallback_entity_set = false ;
804739 std::pair<std::string, enum_wtr_dimension> fallback_entity;
805740 bool is_entity_to_throttle_set = false ;
806741 std::pair<std::string, enum_wtr_dimension> entity_to_throttle;
807742
808- for (auto dim_iter = dimensions.begin (); dim_iter != dimensions.end ();
809- ++dim_iter) {
810- enum_wtr_dimension dim = *dim_iter;
811- auto &dim_stats = latest_write_stats[dim];
812- std::pair<std::string, std::string> top_entities =
813- get_top_two_entities (dim_stats);
814-
815- // Set the fallback entity as the top entity in the highest cardinality
816- // dimension This entity is throttled if there is no conclusive entity
817- // causing the lag.
818- if (!is_fallback_entity_set && !dim_stats.empty ()) {
819- fallback_entity = std::make_pair (top_entities.first , dim);
820- is_fallback_entity_set = true ;
743+ // Scope for LOCK_global_write_statistics.
744+ {
745+ // Replication lag above threshold, find an entity to throttle
746+ MUTEX_LOCK (guard, &LOCK_global_write_statistics);
747+ if (global_write_statistics_map.size () == 0 ) {
748+ // no stats collected so far
749+ sql_print_information (" [Write Throttling]No write statistics." );
750+ return ;
821751 }
822752
823- // For testing purpose, skip to throttle fallback entity
824- bool dbug_simulate_fallback_sql_throttling = false ;
825- DBUG_EXECUTE_IF (" dbug.simulate_fallback_sql_throttling" ,
826- { dbug_simulate_fallback_sql_throttling = true ; });
827-
828- if (dim_stats.empty () || dbug_simulate_fallback_sql_throttling) {
829- // move on to the next dimension
830- continue ;
831- } else if (dim_stats.size () == 1 ) {
832- // throttle the first entity
833- entity_to_throttle = std::make_pair (top_entities.first , dim);
834- is_entity_to_throttle_set = true ;
835- break ;
836- } else {
837- // compare the top two entities in this dimension
838- auto first_bytes_written =
839- dim_stats[top_entities.first ].binlog_bytes_written ;
840- auto second_bytes_written =
841- dim_stats[top_entities.second ].binlog_bytes_written ;
842- if (first_bytes_written >
843- second_bytes_written * write_throttle_min_ratio) {
844- // first entity can be throttled
753+ // write_stats_frequency may be updated dynamically. Caching it for the
754+ // logic below
755+ ulong write_stats_frequency_cached = write_stats_frequency;
756+ if (write_stats_frequency_cached == 0 ) {
757+ return ;
758+ }
759+
760+ // Find latest write_statistics time bucket that is complete.
761+ // Example - For write_stats_frequency=6s, At t=8s, this method should
762+ // return write_stats bucket for stats between t=0 to t=6 i.e.
763+ // bucket_key=0 and not bucket_key=6 which is incomplete and only has 2s
764+ // worth of write_stats data;
765+ int time_bucket_key = time_now -
766+ (time_now % write_stats_frequency_cached) -
767+ write_stats_frequency_cached;
768+ auto latest_write_stats_iter = global_write_statistics_map.begin ();
769+
770+ // For testing purpose, force use the latest write stats bucket for
771+ // culprit analysis
772+ bool dbug_skip_last_complete_bucket_check = false ;
773+ DBUG_EXECUTE_IF (" dbug.skip_last_complete_bucket_check" ,
774+ { dbug_skip_last_complete_bucket_check = true ; });
775+
776+ if (!dbug_skip_last_complete_bucket_check) {
777+ if (latest_write_stats_iter->first != time_bucket_key) {
778+ // move to the second from front time bucket
779+ ++latest_write_stats_iter;
780+ }
781+ if (latest_write_stats_iter == global_write_statistics_map.end () ||
782+ latest_write_stats_iter->first != time_bucket_key) {
783+ // no complete write statistics bucket for analysis
784+ // reset currently monitored entity, if any, as there's been a
785+ // significant gap in time since we last did culprit analysis. It is
786+ // outdated.
787+ currently_monitored_entity.resetHits ();
788+ if (currently_monitored_entity.dimension > WTR_DIM_UNKNOWN ) {
789+ sql_print_information (
790+ " [Write Throttling]Resetting monitored entity hit count. "
791+ " dim:%s, name:%s, count:%u" ,
792+ WRITE_STATS_TYPE_STRING [currently_monitored_entity.dimension ]
793+ .c_str (),
794+ currently_monitored_entity.name .c_str (),
795+ currently_monitored_entity.hits );
796+ } else {
797+ sql_print_information (
798+ " [Write Throttling]Resetting monitored entity hit count. "
799+ " Entity has been previously reset." );
800+ }
801+ return ;
802+ }
803+ }
804+ TIME_BUCKET_STATS &latest_write_stats = latest_write_stats_iter->second ;
805+
806+ std::vector<enum_wtr_dimension> &dimensions =
807+ write_throttle_permissible_dimensions_in_order;
808+
809+ for (auto dim_iter = dimensions.begin (); dim_iter != dimensions.end ();
810+ ++dim_iter) {
811+ enum_wtr_dimension dim = *dim_iter;
812+ auto &dim_stats = latest_write_stats[dim];
813+ std::pair<std::string, std::string> top_entities =
814+ get_top_two_entities (dim_stats);
815+
816+ // Set the fallback entity as the top entity in the highest cardinality
817+ // dimension This entity is throttled if there is no conclusive entity
818+ // causing the lag.
819+ if (!is_fallback_entity_set && !dim_stats.empty ()) {
820+ fallback_entity = std::make_pair (top_entities.first , dim);
821+ is_fallback_entity_set = true ;
822+ }
823+
824+ // For testing purpose, skip to throttle fallback entity
825+ bool dbug_simulate_fallback_sql_throttling = false ;
826+ DBUG_EXECUTE_IF (" dbug.simulate_fallback_sql_throttling" ,
827+ { dbug_simulate_fallback_sql_throttling = true ; });
828+
829+ if (dim_stats.empty () || dbug_simulate_fallback_sql_throttling) {
830+ // move on to the next dimension
831+ continue ;
832+ } else if (dim_stats.size () == 1 ) {
833+ // throttle the first entity
845834 entity_to_throttle = std::make_pair (top_entities.first , dim);
846835 is_entity_to_throttle_set = true ;
847836 break ;
837+ } else {
838+ // compare the top two entities in this dimension
839+ auto first_bytes_written =
840+ dim_stats[top_entities.first ].binlog_bytes_written ;
841+ auto second_bytes_written =
842+ dim_stats[top_entities.second ].binlog_bytes_written ;
843+ if (first_bytes_written >
844+ second_bytes_written * write_throttle_min_ratio) {
845+ // first entity can be throttled
846+ entity_to_throttle = std::make_pair (top_entities.first , dim);
847+ is_entity_to_throttle_set = true ;
848+ break ;
849+ }
848850 }
849851 }
850852 }
851- mysql_mutex_unlock (&LOCK_global_write_statistics);
852853
853854 if (is_entity_to_throttle_set) {
854855 // throttle the culprit entity if set
0 commit comments