dbcsr_transformations.F Source File


Source Code

# 1 "/__w/dbcsr/dbcsr/src/ops/dbcsr_transformations.F" 1
!--------------------------------------------------------------------------------------------------!
! Copyright (C) by the DBCSR developers group - All rights reserved                                !
! This file is part of the DBCSR library.                                                          !
!                                                                                                  !
! For information on the license, see the LICENSE file.                                            !
! For further information please visit https://dbcsr.cp2k.org                                      !
! SPDX-License-Identifier: GPL-2.0+                                                                !
!--------------------------------------------------------------------------------------------------!

MODULE dbcsr_transformations
   !! DBCSR transformations
   USE dbcsr_array_types, ONLY: array_data, &
                                array_hold, &
                                array_i1d_obj, &
                                array_release
   USE dbcsr_block_access, ONLY: dbcsr_get_block_p, &
                                 dbcsr_put_block
   USE dbcsr_block_operations, ONLY: dbcsr_block_conjg, &
                                     dbcsr_block_partial_copy, &
                                     dbcsr_block_real_neg, &
                                     dbcsr_block_transpose, &
                                     dbcsr_data_clear, &
                                     dbcsr_data_copy, &
                                     dbcsr_data_set
   USE dbcsr_data_methods, ONLY: &
      dbcsr_data_clear_pointer, dbcsr_data_ensure_size, dbcsr_data_get_memory_type, &
      dbcsr_data_get_size, dbcsr_data_get_size_referenced, dbcsr_data_get_type, dbcsr_data_hold, &
      dbcsr_data_init, dbcsr_data_new, dbcsr_data_release, dbcsr_data_set_pointer, &
      dbcsr_data_set_size_referenced, dbcsr_get_data, dbcsr_type_1d_to_2d
   USE dbcsr_data_operations, ONLY: dbcsr_copy_sort_data, &
                                    dbcsr_switch_data_area
   USE dbcsr_dist_methods, ONLY: &
      dbcsr_distribution_col_dist, dbcsr_distribution_hold, dbcsr_distribution_local_cols, &
      dbcsr_distribution_local_rows, dbcsr_distribution_mp, dbcsr_distribution_ncols, &
      dbcsr_distribution_nlocal_cols, dbcsr_distribution_nlocal_rows, dbcsr_distribution_nrows, &
      dbcsr_distribution_release, dbcsr_distribution_row_dist
   USE dbcsr_dist_operations, ONLY: dbcsr_get_local_cols, &
                                    dbcsr_get_local_rows, &
                                    dbcsr_get_stored_coordinates, &
                                    dbcsr_reblocking_targets, &
                                    dbcsr_transpose_dims, &
                                    dbcsr_transpose_distribution
   USE dbcsr_dist_util, ONLY: convert_sizes_to_offsets, &
                              dbcsr_checksum, &
                              dbcsr_pack_meta, &
                              dbcsr_unpack_meta, &
                              dbcsr_verify_matrix, &
                              get_internal_offsets, &
                              global_offsets_to_local, &
                              nfull_elements
   USE dbcsr_index_operations, ONLY: dbcsr_addto_index_array, &
                                     dbcsr_clearfrom_index_array, &
                                     dbcsr_repoint_index, &
                                     make_dense_index, &
                                     make_undense_index, &
                                     transpose_index_local
   USE dbcsr_iterator_operations, ONLY: dbcsr_iterator_blocks_left, &
                                        dbcsr_iterator_next_block, &
                                        dbcsr_iterator_start, &
                                        dbcsr_iterator_stop
   USE dbcsr_kinds, ONLY: dp, &
                          int_8, &
                          sp
   USE dbcsr_methods, ONLY: &
      dbcsr_col_block_sizes, dbcsr_distribution, dbcsr_get_data_type, dbcsr_get_matrix_type, &
      dbcsr_has_symmetry, dbcsr_nblkcols_total, dbcsr_nblkrows_total, dbcsr_nfullcols_total, &
      dbcsr_nfullrows_total, dbcsr_release, dbcsr_row_block_sizes, dbcsr_valid_index
   USE dbcsr_mp_methods, ONLY: &
      dbcsr_mp_grid_remove, dbcsr_mp_grid_setup, dbcsr_mp_group, dbcsr_mp_has_subgroups, &
      dbcsr_mp_my_col_group, dbcsr_mp_my_row_group, dbcsr_mp_mynode, dbcsr_mp_mypcol, &
      dbcsr_mp_myprow, dbcsr_mp_npcols, dbcsr_mp_nprows, dbcsr_mp_numnodes, dbcsr_mp_pgrid
   USE dbcsr_mp_operations, ONLY: dbcsr_allgatherv, &
                                  hybrid_alltoall_any, &
                                  hybrid_alltoall_c1, &
                                  hybrid_alltoall_d1, &
                                  hybrid_alltoall_i1, &
                                  hybrid_alltoall_s1, &
                                  hybrid_alltoall_z1
   USE dbcsr_mpiwrap, ONLY: mp_allgather, &
                            mp_alltoall, mp_comm_type
   USE dbcsr_operations, ONLY: dbcsr_set
   USE dbcsr_ptr_util, ONLY: pointer_view
   USE dbcsr_types, ONLY: &
      dbcsr_data_obj, dbcsr_distribution_obj, dbcsr_iterator, dbcsr_meta_size, dbcsr_mp_obj, &
      dbcsr_repl_col, dbcsr_repl_full, dbcsr_repl_none, dbcsr_repl_row, dbcsr_slot_blk_p, &
      dbcsr_slot_col_i, dbcsr_slot_home_pcol, dbcsr_slot_home_prow, dbcsr_slot_nblks, &
      dbcsr_slot_nze, dbcsr_slot_row_p, dbcsr_type, dbcsr_type_complex_4, dbcsr_type_complex_8, &
      dbcsr_type_no_symmetry, dbcsr_type_real_4, dbcsr_type_real_8
   USE dbcsr_work_operations, ONLY: dbcsr_create, &
                                    dbcsr_finalize, &
                                    dbcsr_work_create
#include "base/dbcsr_base_uses.f90"

   IMPLICIT NONE
   PRIVATE

   CHARACTER(len=*), PARAMETER, PRIVATE :: moduleN = 'dbcsr_transformations'

   LOGICAL, PARAMETER, PRIVATE          :: careful_mod = .FALSE.

   PUBLIC :: dbcsr_desymmetrize_deep, &
             dbcsr_new_transposed, &
             dbcsr_transposed, &
             dbcsr_complete_redistribute, &
             dbcsr_redistribute, dbcsr_make_untransposed_blocks
   PUBLIC :: dbcsr_replicate_all, dbcsr_distribute, dbcsr_datablock_redistribute
   PUBLIC :: dbcsr_make_dense, dbcsr_make_undense, dbcsr_make_dense_low

CONTAINS

   SUBROUTINE dbcsr_new_transposed(transposed, normal, shallow_data_copy, &
                                   transpose_data, transpose_distribution, transpose_index, &
                                   use_distribution, redistribute)
      !! Transposes a DBCSR matrix.
      !!
      !! Distribution options
      !! By default the distribution is transposed. If transpose_distribution
      !! is false, then an undetermined distribution is created that is
      !! compatible with the same process grid.

      TYPE(dbcsr_type), INTENT(INOUT)                    :: transposed
         !! transposed DBCSR matrix
      TYPE(dbcsr_type), INTENT(IN)                       :: normal
         !! input DBCSR matrix
      LOGICAL, INTENT(IN), OPTIONAL                      :: shallow_data_copy, transpose_data, &
                                                            transpose_distribution, transpose_index
         !! only shallow data_copy; default is no; if set, the transpose_data option is ignored
         !! transpose data blocks, default is True
         !! transpose the distribution from the input matrix, default is True
         !! transpose the index (default=yes) or turn it into BCSC
      TYPE(dbcsr_distribution_obj), INTENT(IN), OPTIONAL :: use_distribution
         !! use this distribution
      LOGICAL, INTENT(IN), OPTIONAL                      :: redistribute
         !! redistributes the matrix; default is .TRUE. unless shallow or transpose_distribution are set.

      CHARACTER(len=*), PARAMETER :: routineN = 'dbcsr_new_transposed'
      LOGICAL, PARAMETER                                 :: dbg = .FALSE.

      INTEGER                                            :: handle, stat
      INTEGER, ALLOCATABLE, DIMENSION(:)                 :: blk_p
      LOGICAL                                            :: redist, shallow, tr_blocks, tr_dist, &
                                                            tr_index
      TYPE(dbcsr_distribution_obj)                       :: new_dist
      TYPE(dbcsr_type)                                   :: t2

!   ---------------------------------------------------------------------------

      CALL timeset(routineN, handle)

      IF (.NOT. dbcsr_valid_index(normal)) &
         DBCSR_ABORT("Matrix does not exist.")
      ! Internalize options
      shallow = .FALSE.
      IF (PRESENT(shallow_data_copy)) shallow = shallow_data_copy
      tr_blocks = .TRUE.
      IF (PRESENT(transpose_data)) tr_blocks = transpose_data
      tr_dist = .TRUE.
      IF (PRESENT(transpose_distribution)) tr_dist = transpose_distribution
      tr_index = .TRUE.
      IF (PRESENT(transpose_index)) tr_index = transpose_index
      ! Prepare the distribution for the transposed matrix
      IF (PRESENT(use_distribution)) THEN
         IF (dbcsr_distribution_nrows(use_distribution) /= dbcsr_distribution_ncols(normal%dist)) &
            DBCSR_ABORT("Given distribution must be compatible with the current distribution")
         IF (dbcsr_distribution_ncols(use_distribution) /= dbcsr_distribution_nrows(normal%dist)) &
            DBCSR_ABORT("Given distribution must be compatible with the current distribution")
         new_dist = use_distribution
         CALL dbcsr_distribution_hold(new_dist)
      ELSE
         IF (tr_dist) THEN
            CALL dbcsr_transpose_distribution(new_dist, normal%dist)
         ELSE
            CALL dbcsr_transpose_dims(new_dist, normal%dist)
         END IF
      END IF
      ! Create the transposed matrix
      CALL dbcsr_create(transposed, name="transposed "//TRIM(normal%name), &
                        template=normal, &
                        dist=new_dist, &
                        row_blk_size_obj=normal%col_blk_size, &
                        col_blk_size_obj=normal%row_blk_size, &
                        matrix_type=dbcsr_get_matrix_type(normal), &
                        max_rbs=normal%max_cbs, &
                        max_cbs=normal%max_rbs, &
                        row_blk_offset=normal%col_blk_offset, &
                        col_blk_offset=normal%row_blk_offset)
      CALL dbcsr_distribution_release(new_dist)
      ! Reserve the space for the new indices.
      IF (tr_index) THEN
         CALL dbcsr_addto_index_array(transposed, dbcsr_slot_row_p, &
                                      reservation=transposed%nblkrows_total + 1, extra=transposed%nblks*2)
      ELSE
         CALL dbcsr_addto_index_array(transposed, dbcsr_slot_row_p, &
                                      reservation=normal%nblkrows_total + 1, extra=transposed%nblks*2)
      END IF
      CALL dbcsr_addto_index_array(transposed, dbcsr_slot_col_i, &
                                   reservation=normal%nblks)
      CALL dbcsr_addto_index_array(transposed, dbcsr_slot_blk_p, &
                                   reservation=normal%nblks)
      CALL dbcsr_repoint_index(transposed)
      IF (.NOT. shallow) THEN
         CALL dbcsr_data_ensure_size(transposed%data_area, &
                                     dbcsr_data_get_size_referenced(normal%data_area), &
                                     nocopy=.TRUE.)
      END IF
      !
      transposed%nblks = normal%nblks
      transposed%nze = normal%nze
      transposed%index(dbcsr_slot_nblks) = normal%nblks
      transposed%index(dbcsr_slot_nze) = normal%nze
      ! Transpose the local index.
      ALLOCATE (blk_p(normal%nblks), stat=stat)
      IF (stat /= 0) DBCSR_ABORT("blk_p")
      IF (tr_index) THEN
         CALL transpose_index_local(transposed%row_p, transposed%col_i, &
                                    normal%row_p, normal%col_i, blk_p, normal%blk_p)
         IF (dbg) THEN
            WRITE (*, *) 'orig. row_p', normal%row_p
            WRITE (*, *) 'orig. col_i', normal%col_i
            WRITE (*, *) 'orig. blk_p', normal%blk_p
            WRITE (*, *) 'new . row_p', transposed%row_p
            WRITE (*, *) 'new . col_i', transposed%col_i
            WRITE (*, *) 'new . blk_p', blk_p!transposed%blk_p
         END IF
      ELSE
         transposed%row_p(:) = normal%row_p(:)
         transposed%col_i(:) = normal%col_i(:)
         blk_p(:) = normal%blk_p(:)
         !transposed%transpose = .TRUE.
      END IF
      ! Copy the data
      IF (shallow) THEN
         CALL dbcsr_switch_data_area(transposed, normal%data_area)
         transposed%blk_p(1:transposed%nblks) = &
            -blk_p(1:transposed%nblks)
      ELSE
         CALL dbcsr_copy_sort_data(transposed%blk_p, blk_p, transposed%row_p, &
                                   transposed%col_i, array_data(transposed%row_blk_size), &
                                   array_data(transposed%col_blk_size), &
                                   transposed%data_area, normal%data_area, &
                                   mark_transposed=.not. tr_blocks, &
                                   transpose_blocks=tr_blocks)
      END IF
      transposed%valid = .TRUE.
      !CALL dbcsr_copy_sort_data (transposed%blk_p, blk_p, transposed%row_p,&
      !     transposed%col_i, array_data (transposed%row_blk_size),&
      !     array_data (transposed%col_blk_size),&
      !     transposed%data_area, normal%data_area,&
      !     transpose_blocks=.TRUE.)
      !
1315  FORMAT(I5, 1X, I5, 1X, I5, 1X, I5, 1X, I5, 1X, I5, 1X, I5, 1X, I5, 1X, I5, 1X, I5)
      IF (dbg) THEN
         WRITE (*, *) 'new FINAL index'
         WRITE (*, 1315) transposed%row_p
         WRITE (*, 1315) transposed%col_i
         WRITE (*, 1315) transposed%blk_p
      END IF
      !
      IF (tr_index) DEALLOCATE (blk_p)
      !
      IF (PRESENT(redistribute)) THEN
         redist = redistribute
      ELSE
         redist = .NOT. tr_dist .AND. .NOT. shallow
      END IF
      IF (redist) THEN
         !write (*,*)routineN//" redistributing"
         CALL dbcsr_create(t2, template=transposed)
         CALL dbcsr_redistribute(transposed, t2)
         CALL dbcsr_release(transposed)
         transposed = t2
      END IF
      CALL timestop(handle)
   END SUBROUTINE dbcsr_new_transposed

   SUBROUTINE dbcsr_transposed(transposed, normal, shallow_data_copy, &
      !! Transposes a DBCSR matrix, keeping the same distribution
                               transpose_data, transpose_distribution, use_distribution)

      TYPE(dbcsr_type), INTENT(INOUT)                    :: transposed
      TYPE(dbcsr_type), INTENT(IN)                       :: normal
      LOGICAL, INTENT(IN), OPTIONAL                      :: shallow_data_copy, transpose_data, &
                                                            transpose_distribution
      TYPE(dbcsr_distribution_obj), INTENT(IN), &
         OPTIONAL                                        :: use_distribution

      LOGICAL                                            :: myshallow_data_copy, &
                                                            mytranspose_distribution
      TYPE(dbcsr_distribution_obj)                           :: myuse_distribution

!   set some defaults to make usage a bit less painful (fschiff)

      myshallow_data_copy = .FALSE.
      myuse_distribution = normal%dist
      mytranspose_distribution = .FALSE.
      IF (PRESENT(shallow_data_copy)) myshallow_data_copy = shallow_data_copy
      IF (PRESENT(use_distribution)) myuse_distribution = use_distribution
      IF (PRESENT(transpose_distribution)) mytranspose_distribution = transpose_distribution

      CALL dbcsr_new_transposed(transposed, normal, myshallow_data_copy, &
                                transpose_data, mytranspose_distribution, &
                                use_distribution=myuse_distribution)
   END SUBROUTINE dbcsr_transposed

   SUBROUTINE dbcsr_desymmetrize_deep(sm, desm, untransposed_data)
      !! Duplicates data in symmetric matrix to make it normal (w.r.t. data
      !! structure. Can handle symmetric/antisymmetric/hermitian/skewhermitian
      !! matrices

      TYPE(dbcsr_type), INTENT(IN)                       :: sm
         !! input symmetric matrix
      TYPE(dbcsr_type), INTENT(INOUT)                    :: desm
         !! desymmetrized matrix
      LOGICAL, INTENT(IN), OPTIONAL                      :: untransposed_data

      CHARACTER(len=*), PARAMETER :: routineN = 'dbcsr_desymmetrize_deep'
      INTEGER, PARAMETER                                 :: idata = 2, imeta = 1, metalen = 3

      INTEGER :: blk, blk_l, blk_p, blk_ps, blks, col, col_size, dst_p, handle, &
                 nsymmetries, numproc, nze, pcol, prow, row, row_size, src_p, stored_col, stored_row, &
                 symmetry_i
      INTEGER, ALLOCATABLE, DIMENSION(:)                 :: rd_disp, recv_meta, rm_disp, sd_disp, &
                                                            sdp, send_meta, sm_disp, smp
      INTEGER, ALLOCATABLE, DIMENSION(:, :)              :: recv_count, send_count, &
                                                            total_recv_count, total_send_count
      INTEGER, DIMENSION(:), POINTER                     :: col_blk_size, col_dist, row_blk_size, &
                                                            row_dist
      INTEGER, DIMENSION(:, :), POINTER                  :: blacs2mpi
      LOGICAL                                            :: make_untr, tr
      TYPE(dbcsr_data_obj)                               :: recv_data, send_data
      TYPE(dbcsr_distribution_obj)                       :: target_dist
      TYPE(dbcsr_iterator)                               :: iter
      TYPE(dbcsr_mp_obj)                                 :: mp_obj
      TYPE(mp_comm_type)                                 :: mp_group

!   ---------------------------------------------------------------------------

      CALL timeset(routineN, handle)
      IF (.NOT. dbcsr_valid_index(sm)) &
         DBCSR_ABORT("Input matrix is invalid.")
      IF (dbcsr_valid_index(desm)) THEN
         CALL dbcsr_release(desm)
      END IF
      CALL dbcsr_create(desm, sm, matrix_type="N")

      IF (PRESENT(untransposed_data)) THEN
         make_untr = untransposed_data
      ELSE
         make_untr = .FALSE.
      END IF
      nsymmetries = 1
      IF (sm%symmetry) THEN
         nsymmetries = 2
      END IF
      row_blk_size => array_data(sm%row_blk_size)
      col_blk_size => array_data(sm%col_blk_size)
      target_dist = sm%dist
      row_dist => dbcsr_distribution_row_dist(target_dist)
      col_dist => dbcsr_distribution_col_dist(target_dist)
      mp_obj = dbcsr_distribution_mp(target_dist)
      blacs2mpi => dbcsr_mp_pgrid(mp_obj)
      numproc = dbcsr_mp_numnodes(mp_obj)
      mp_group = dbcsr_mp_group(mp_obj)
      !
      ALLOCATE (send_count(2, 0:numproc - 1))
      ALLOCATE (recv_count(2, 0:numproc - 1))
      ALLOCATE (total_send_count(2, 0:numproc - 1))
      ALLOCATE (total_recv_count(2, 0:numproc - 1))
      ALLOCATE (sdp(0:numproc - 1))
      ALLOCATE (sd_disp(0:numproc - 1))
      ALLOCATE (smp(0:numproc - 1))
      ALLOCATE (sm_disp(0:numproc - 1))
      ALLOCATE (rd_disp(0:numproc - 1))
      ALLOCATE (rm_disp(0:numproc - 1))
      !
      desm%negate_real = sm%negate_real
      desm%negate_imaginary = sm%negate_imaginary
      ! Count initial sizes for sending.
      send_count(:, :) = 0
      CALL dbcsr_iterator_start(iter, sm)
      DO WHILE (dbcsr_iterator_blocks_left(iter))
         CALL dbcsr_iterator_next_block(iter, row, col, blk, &
                                        row_size=row_size, col_size=col_size)
         DO symmetry_i = 1, nsymmetries
            IF (symmetry_i .EQ. 1) THEN
               stored_row = row; stored_col = col
            ELSE
               IF (row .EQ. col) CYCLE
               stored_row = col; stored_col = row
            END IF
            ! Where do we send this block?
            prow = row_dist(stored_row)
            pcol = col_dist(stored_col)
            dst_p = blacs2mpi(prow, pcol)
            nze = row_size*col_size
            send_count(imeta, dst_p) = send_count(imeta, dst_p) + 1
            send_count(idata, dst_p) = send_count(idata, dst_p) + nze
         END DO ! symmetry_i
      END DO ! iter
      CALL dbcsr_iterator_stop(iter)
      !
      CALL mp_alltoall(send_count, recv_count, 2, mp_group)
      !
      ! Allocate data structures needed for data exchange.
      CALL dbcsr_data_init(recv_data)
      CALL dbcsr_data_new(recv_data, dbcsr_get_data_type(sm), &
                          SUM(recv_count(idata, :)))
      ALLOCATE (recv_meta(metalen*SUM(recv_count(imeta, :))))
      CALL dbcsr_data_init(send_data)
      CALL dbcsr_data_new(send_data, dbcsr_get_data_type(sm), &
                          SUM(send_count(idata, :)))
      ALLOCATE (send_meta(metalen*SUM(send_count(imeta, :))))
      !
      ! Fill in the meta data structures and copy the data.
      DO dst_p = 0, numproc - 1
         total_send_count(imeta, dst_p) = send_count(imeta, dst_p)
         total_send_count(idata, dst_p) = send_count(idata, dst_p)
         total_recv_count(imeta, dst_p) = recv_count(imeta, dst_p)
         total_recv_count(idata, dst_p) = recv_count(idata, dst_p)
      END DO
      sd_disp = -1; sm_disp = -1
      rd_disp = -1; rm_disp = -1
      sd_disp(0) = 1; sm_disp(0) = 1
      rd_disp(0) = 1; rm_disp(0) = 1
      DO dst_p = 1, numproc - 1
         sm_disp(dst_p) = sm_disp(dst_p - 1) &
                          + metalen*total_send_count(imeta, dst_p - 1)
         sd_disp(dst_p) = sd_disp(dst_p - 1) &
                          + total_send_count(idata, dst_p - 1)
         rm_disp(dst_p) = rm_disp(dst_p - 1) &
                          + metalen*total_recv_count(imeta, dst_p - 1)
         rd_disp(dst_p) = rd_disp(dst_p - 1) &
                          + total_recv_count(idata, dst_p - 1)
      END DO
      sdp(:) = sd_disp
      smp(:) = sm_disp
      !
      CALL dbcsr_iterator_start(iter, sm)
      DO WHILE (dbcsr_iterator_blocks_left(iter))
         CALL dbcsr_iterator_next_block(iter, row, col, blk, blk_p=blk_p, &
                                        row_size=row_size, col_size=col_size)
         DO symmetry_i = 1, nsymmetries
            IF (symmetry_i .EQ. 1) THEN
               stored_row = row; stored_col = col; tr = .FALSE.
            ELSE
               IF (row .EQ. col) CYCLE
               stored_row = col; stored_col = row; tr = .TRUE.
            END IF
            ! Where do we send this block?
            prow = row_dist(stored_row)
            pcol = col_dist(stored_col)
            dst_p = blacs2mpi(prow, pcol)
            nze = row_size*col_size
            send_meta(smp(dst_p)) = stored_row
            send_meta(smp(dst_p) + 1) = stored_col
            send_meta(smp(dst_p) + 2) = sdp(dst_p) - sd_disp(dst_p) + 1
            IF (make_untr) THEN
               CALL dbcsr_block_partial_copy(dst=send_data, &
                                             dst_rs=row_size, dst_cs=col_size, &
                                             dst_tr=symmetry_i .GT. 1, &
                                             dst_r_lb=1, dst_c_lb=1, dst_offset=sdp(dst_p) - 1, &
                                             nrow=row_size, ncol=col_size, &
                                             src=sm%data_area, &
                                             src_rs=row_size, src_cs=col_size, &
                                             src_tr=blk_p .LT. 0, &
                                             src_r_lb=1, src_c_lb=1, &
                                             src_offset=ABS(blk_p) - 1)
               IF (tr) THEN
                  IF (sm%negate_real) THEN
                     CALL dbcsr_block_real_neg(dst=send_data, row_size=row_size, &
                                               col_size=col_size, lb=sdp(dst_p))
                  END IF
                  IF (sm%negate_imaginary) &
                     CALL dbcsr_block_conjg(dst=send_data, row_size=row_size, &
                                            col_size=col_size, lb=sdp(dst_p))
               END IF
            ELSE
               CALL dbcsr_data_copy(send_data, (/sdp(dst_p)/), (/nze/), &
                                    sm%data_area, (/ABS(blk_p)/), (/nze/))
               IF (tr) &
                  send_meta(smp(dst_p) + 2) = -send_meta(smp(dst_p) + 2)
            END IF
            smp(dst_p) = smp(dst_p) + metalen
            sdp(dst_p) = sdp(dst_p) + nze
         END DO ! symmetry_i
      END DO ! iter
      CALL dbcsr_iterator_stop(iter)
      ! Exchange the data and metadata structures.
      CALL hybrid_alltoall_any(send_data, &
                               total_send_count(idata, :), sd_disp(:) - 1, &
                               recv_data, &
                               total_recv_count(idata, :), rd_disp(:) - 1, mp_obj)
      CALL mp_alltoall(send_meta(:), metalen*total_send_count(imeta, :), sm_disp(:) - 1, &
                       recv_meta(:), metalen*total_recv_count(imeta, :), rm_disp(:) - 1, &
                       mp_group)
      ! Now fill in the data.
      CALL dbcsr_work_create(desm, &
                             SUM(recv_count(imeta, :)), &
                             SUM(recv_count(idata, :)), n=1, &
                             work_mutable=.FALSE.)
      ! Switch data data area of the work matrix with the received data
      ! (avoids copying).
      CALL dbcsr_data_hold(recv_data)
      CALL dbcsr_data_release(desm%wms(1)%data_area)
      desm%wms(1)%data_area = recv_data
      !
      blk_ps = 1
      blks = 1
      ! WRITE(*,*)rm_disp
      ! WRITE(*,*)recv_count
      DO src_p = 0, numproc - 1
         IF (careful_mod) THEN
            IF ((blks - 1)*3 /= rm_disp(src_p) - 1) &
               DBCSR_ABORT("Count mismatch")
         END IF
         blks = (rm_disp(src_p) - 1)/metalen + 1
         DO blk_l = 1, recv_count(imeta, src_p)
            stored_row = recv_meta(rm_disp(src_p) + metalen*(blk_l - 1))
            stored_col = recv_meta(rm_disp(src_p) + metalen*(blk_l - 1) + 1)
            blk_p = recv_meta(rm_disp(src_p) + metalen*(blk_l - 1) + 2)
            desm%wms(1)%row_i(blks) = stored_row
            desm%wms(1)%col_i(blks) = stored_col
            desm%wms(1)%blk_p(blks) = SIGN(ABS(blk_p) + (rd_disp(src_p) - 1), blk_p)
            nze = row_blk_size(ABS(stored_row)) &
                  *col_blk_size(stored_col)
            blk_ps = blk_ps + nze
            blks = blks + 1
         END DO
      END DO
      !
      desm%wms(1)%lastblk = blks - 1
      desm%wms(1)%datasize = blk_ps - 1
      CALL dbcsr_finalize(desm)
      DEALLOCATE (send_count)
      DEALLOCATE (recv_count)
      DEALLOCATE (sdp); DEALLOCATE (sd_disp)
      DEALLOCATE (smp); DEALLOCATE (sm_disp)
      DEALLOCATE (rd_disp)
      DEALLOCATE (rm_disp)
      CALL dbcsr_data_release(recv_data)
      DEALLOCATE (recv_meta)
      CALL dbcsr_data_release(send_data)
      DEALLOCATE (send_meta)
      CALL timestop(handle)
   END SUBROUTINE dbcsr_desymmetrize_deep

   SUBROUTINE dbcsr_distribute(matrix, fast)
      !! Distributes a matrix that is currently replicated.

      TYPE(dbcsr_type), INTENT(INOUT)                    :: matrix
         !! matrix to replicate
      LOGICAL, INTENT(in), OPTIONAL                      :: fast
         !! change just the index, don't touch the data

      CHARACTER(len=*), PARAMETER :: routineN = 'dbcsr_distribute'

      COMPLEX(KIND=dp), DIMENSION(:), POINTER, CONTIGUOUS :: c_dp
      COMPLEX(KIND=sp), DIMENSION(:), POINTER, CONTIGUOUS :: c_sp
      INTEGER                                            :: blk, col, handle, mynode, nblks, nze, p, &
                                                            row
      INTEGER, DIMENSION(:), POINTER, CONTIGUOUS         :: col_blk_size, row_blk_size, tmp_index
      LOGICAL                                            :: mini, tr
      REAL(KIND=dp), DIMENSION(:), POINTER, CONTIGUOUS   :: r_dp
      REAL(KIND=sp), DIMENSION(:), POINTER, CONTIGUOUS   :: r_sp
      TYPE(dbcsr_data_obj)                               :: tmp_data
      TYPE(dbcsr_distribution_obj)                       :: dist
      TYPE(dbcsr_iterator)                               :: iter
      TYPE(dbcsr_mp_obj)                                 :: mp_obj
      TYPE(dbcsr_type)                                   :: distributed

!   ---------------------------------------------------------------------------

      CALL timeset(routineN, handle)
      IF (.NOT. dbcsr_valid_index(matrix)) &
         DBCSR_ABORT("Matrix not initialized.")
      IF (matrix%replication_type .EQ. dbcsr_repl_none) &
         DBCSR_WARN("Distributing a non-replicated matrix makes no sense.")
      IF (PRESENT(fast)) THEN
         mini = fast
      ELSE
         mini = .FALSE.
      END IF
      SELECT CASE (matrix%data_type)
      CASE (dbcsr_type_real_8)
         CALL dbcsr_get_data(matrix%data_area, r_dp)
      CASE (dbcsr_type_real_4)
         CALL dbcsr_get_data(matrix%data_area, r_sp)
         DBCSR_ABORT("Only real double precision")
      CASE (dbcsr_type_complex_8)
         CALL dbcsr_get_data(matrix%data_area, c_dp)
         DBCSR_ABORT("Only real double precision")
      CASE (dbcsr_type_complex_4)
         CALL dbcsr_get_data(matrix%data_area, c_sp)
         DBCSR_ABORT("Only real double precision")
      END SELECT
      row_blk_size => array_data(matrix%row_blk_size)
      col_blk_size => array_data(matrix%col_blk_size)
      dist = dbcsr_distribution(matrix)
      mp_obj = dbcsr_distribution_mp(dist)
      mynode = dbcsr_mp_mynode(dbcsr_distribution_mp(dist))
      !
      IF (mini) THEN
         ! We just mark the blocks as deleted.
         CALL dbcsr_iterator_start(iter, matrix)
         DO WHILE (dbcsr_iterator_blocks_left(iter))
            CALL dbcsr_iterator_next_block(iter, row, col, r_dp, tr, blk)
            tr = .FALSE.
            CALL dbcsr_get_stored_coordinates(matrix, row, col, p)
            IF (mynode .EQ. p) THEN
               matrix%blk_p(blk) = 0
            END IF
         END DO
         CALL dbcsr_iterator_stop(iter)
         matrix%replication_type = dbcsr_repl_none
      ELSE
         CALL dbcsr_create(distributed, name='Distributed '//TRIM(matrix%name), &
                           template=matrix, &
                           matrix_type=dbcsr_type_no_symmetry, &
                           replication_type=dbcsr_repl_none)
         distributed%replication_type = dbcsr_repl_none
         ! First count how many blocks are local.
         nze = 0
         nblks = 0
         CALL dbcsr_iterator_start(iter, matrix)
         DO WHILE (dbcsr_iterator_blocks_left(iter))
            CALL dbcsr_iterator_next_block(iter, row, col, r_dp, tr, blk)
            tr = .FALSE.
            CALL dbcsr_get_stored_coordinates(matrix, row, col, p)
            IF (mynode .EQ. p) THEN
               nze = nze + row_blk_size(row)*col_blk_size(col)
               nblks = nblks + 1
            END IF
         END DO
         CALL dbcsr_iterator_stop(iter)
         ! Preallocate the array
         CALL dbcsr_work_create(distributed, nblks_guess=nblks, &
                                sizedata_guess=nze, work_mutable=.FALSE.)
         ! Now actually do the work
         CALL dbcsr_iterator_start(iter, matrix)
         DO WHILE (dbcsr_iterator_blocks_left(iter))
            CALL dbcsr_iterator_next_block(iter, row, col, r_dp, tr, blk)
            tr = .FALSE.
            CALL dbcsr_get_stored_coordinates(matrix, row, col, p)
            IF (mynode .EQ. p) THEN
               CALL dbcsr_put_block(distributed, row, col, r_dp, transposed=tr)
            END IF
         END DO
         CALL dbcsr_iterator_stop(iter)
         CALL dbcsr_finalize(distributed)
         ! Now replace the data and index
         CALL dbcsr_switch_data_area(matrix, distributed%data_area, &
                                     previous_data_area=tmp_data)
         CALL dbcsr_switch_data_area(distributed, tmp_data)
         CALL dbcsr_data_release(tmp_data)
         tmp_index => matrix%index
         matrix%index => distributed%index
         distributed%index => tmp_index
         CALL dbcsr_repoint_index(matrix)
         matrix%nze = distributed%nze
         matrix%nblks = distributed%nblks
         CALL dbcsr_release(distributed)
      END IF
      CALL timestop(handle)
   END SUBROUTINE dbcsr_distribute

   SUBROUTINE dbcsr_make_untransposed_blocks(matrix)
      !! Detransposes all blocks in a matrix

      TYPE(dbcsr_type), INTENT(INOUT)                    :: matrix
         !! DBCSR matrix

      CHARACTER(len=*), PARAMETER :: routineN = 'dbcsr_make_untransposed_blocks'

      INTEGER                                            :: blk, col, col_size, handle, row, row_size
      INTEGER, DIMENSION(:), POINTER                     :: cbs, rbs
      LOGICAL                                            :: sym_negation, tr
      TYPE(dbcsr_data_obj)                               :: block_data
      TYPE(dbcsr_iterator)                               :: iter

!   ---------------------------------------------------------------------------

      CALL timeset(routineN, handle)
      rbs => dbcsr_row_block_sizes(matrix)
      cbs => dbcsr_col_block_sizes(matrix)
      sym_negation = matrix%negate_real

!$OMP PARALLEL DEFAULT(NONE) PRIVATE(block_data,iter,row,col,tr,blk,row_size,col_size) &
!$OMP          SHARED(matrix,rbs,cbs,sym_negation)
      CALL dbcsr_data_init(block_data)
      CALL dbcsr_data_new(block_data, dbcsr_get_data_type(matrix))
      CALL dbcsr_iterator_start(iter, matrix)
      DO WHILE (dbcsr_iterator_blocks_left(iter))
         CALL dbcsr_iterator_next_block(iter, row, col, block_data, &
                                        transposed=tr, &
                                        block_number=blk)
         IF (tr) THEN
            row_size = rbs(row)
            col_size = cbs(col)
            CALL dbcsr_block_transpose(block_data, col_size, row_size)
            IF (sym_negation) THEN
               SELECT CASE (block_data%d%data_type)
               CASE (dbcsr_type_real_4)
                  block_data%d%r_sp(:) = -block_data%d%r_sp(:)
               CASE (dbcsr_type_real_8)
                  block_data%d%r_dp(:) = -block_data%d%r_dp(:)
               CASE (dbcsr_type_complex_4)
                  block_data%d%c_sp(:) = -block_data%d%c_sp(:)
               CASE (dbcsr_type_complex_8)
                  block_data%d%c_dp(:) = -block_data%d%c_dp(:)
               END SELECT
            END IF
            matrix%blk_p(blk) = -matrix%blk_p(blk)
         END IF
      END DO
      CALL dbcsr_iterator_stop(iter)
      CALL dbcsr_data_clear_pointer(block_data)
      CALL dbcsr_data_release(block_data)
!$OMP END PARALLEL

      CALL timestop(handle)
   END SUBROUTINE dbcsr_make_untransposed_blocks

   SUBROUTINE dbcsr_make_dense(matrix, dense_matrix, dense_dist, &
                               dense_row_sizes, dense_col_sizes, row_map, col_map)
      !! Makes a dense matrix, inplace.
      !! @note Used for making matrices dense/undense

      TYPE(dbcsr_type), INTENT(IN)                       :: matrix
         !! matrix to make dense
      TYPE(dbcsr_type), INTENT(OUT)                      :: dense_matrix
      TYPE(dbcsr_distribution_obj), INTENT(INOUT)        :: dense_dist
      TYPE(array_i1d_obj), INTENT(IN)                    :: dense_row_sizes, dense_col_sizes, &
                                                            row_map, col_map

      CHARACTER(len=*), PARAMETER :: routineN = 'dbcsr_make_dense'
      LOGICAL, PARAMETER                                 :: dbg = .FALSE.

      INTEGER                                            :: handle
      REAL(kind=dp)                                      :: cs
      TYPE(array_i1d_obj)                                :: dense_local_cols, dense_local_rows
      TYPE(dbcsr_distribution_obj)                       :: old_distribution

!   ---------------------------------------------------------------------------

      CALL timeset(routineN, handle)
      CALL dbcsr_create(dense_matrix, template=matrix, &
                        dist=dense_dist, &
                        row_blk_size_obj=dense_row_sizes, &
                        col_blk_size_obj=dense_col_sizes)
      !
      IF (dbg) THEN
         cs = dbcsr_checksum(matrix)
         WRITE (*, *) routineN//" prod cs pre", cs
      END IF
      old_distribution = dbcsr_distribution(matrix)
      ! Conversion of global to local offsets for the dense blocks
      CALL dbcsr_get_local_rows(dense_dist, dense_local_rows, &
                                dense_matrix%index(dbcsr_slot_home_prow))
      CALL dbcsr_get_local_cols(dense_dist, dense_local_cols, &
                                dense_matrix%index(dbcsr_slot_home_pcol))
      !
      CALL dbcsr_make_dense_low(matrix, dense_matrix, &
                                dbcsr_distribution_local_rows(old_distribution), &
                                dbcsr_distribution_local_cols(old_distribution), &
                                array_data(matrix%row_blk_offset), &
                                array_data(matrix%col_blk_offset), &
                                array_data(dense_local_rows), array_data(dense_local_cols), &
                                array_data(dense_matrix%row_blk_offset), &
                                array_data(dense_matrix%col_blk_offset), &
                                array_data(row_map), array_data(col_map), .TRUE., .TRUE.)
      IF (dbg) THEN
         cs = dbcsr_checksum(dense_matrix)
         WRITE (*, *) routineN//" prod cs pst", cs
      END IF
      CALL timestop(handle)
   END SUBROUTINE dbcsr_make_dense

   SUBROUTINE dbcsr_make_dense_low(und_matrix, dense_matrix, &
                                   und_local_rows, und_local_cols, &
                                   und_row_blk_offsets, und_col_blk_offsets, &
                                   dense_local_rows, dense_local_cols, &
                                   dense_row_blk_offsets, dense_col_blk_offsets, &
                                   row_map, col_map, join_rows, join_cols)
      !! Copies a matrix and makes its data dense.
      !! @note the dense_matrix must have been created

      TYPE(dbcsr_type), INTENT(IN)                       :: und_matrix
         !! Original non-dense matrix
      TYPE(dbcsr_type), INTENT(INOUT)                    :: dense_matrix
         !! Dense copy of und_matrix
      INTEGER, DIMENSION(:), INTENT(IN) :: und_local_rows, und_local_cols, und_row_blk_offsets, &
                                           und_col_blk_offsets, dense_local_rows, dense_local_cols, dense_row_blk_offsets, &
                                           dense_col_blk_offsets, row_map, col_map
         !! The process-grid local rows of the non-dense und_matrix
         !! The process-grid local columns of the non-dense und_matrix
         !! The block offsets of the rows of the non-dense matrix
         !! The block offsets of the columns of the non-dense matrix
         !! The process-grid local rows of the dense matrix
         !! The process-grid local columns of the dense matrix
         !! The block offsets of the rows of the dense matrix
         !! The block offsets of the columns of the dense matrix
         !! Mapping of non-dense rows to dense rows
         !! Mapping of non-dense columns to dense columns
      LOGICAL, INTENT(IN)                                :: join_rows, join_cols
         !! Make rows dense
         !! Make columns dense

      CHARACTER(len=*), PARAMETER :: routineN = 'dbcsr_make_dense_low'

      INTEGER :: dense_nblkcols_local, dense_nblkcols_total, dense_nblkrows_local, &
                 dense_nblkrows_total, dense_nlocal_blocks, error_handle, nfullcols, nfullrows, &
                 und_nblkcols_total, und_nblkrows_total
      INTEGER, ALLOCATABLE, DIMENSION(:) :: col_internal_offsets, dense_local_col_blk_offsets, &
                                            dense_local_row_blk_offsets, row_internal_offsets, und_local_col_blk_offsets, &
                                            und_local_row_blk_offsets
      INTEGER, DIMENSION(dbcsr_meta_size)                :: meta
      TYPE(dbcsr_data_obj)                               :: dense_data, und_data

!   ---------------------------------------------------------------------------

      CALL timeset(routineN, error_handle)
      !
      nfullrows = nfull_elements(und_row_blk_offsets, und_local_rows)
      nfullcols = nfull_elements(und_col_blk_offsets, und_local_cols)
      !
      und_nblkrows_total = SIZE(und_row_blk_offsets) - 1
      und_nblkcols_total = SIZE(und_col_blk_offsets) - 1
      !
      ! Find the local data offsets (but indexed by the global
      ! rows/columns) for the undense data.
      ALLOCATE (und_local_row_blk_offsets(und_nblkrows_total + 1))
      ALLOCATE (und_local_col_blk_offsets(und_nblkcols_total + 1))
      CALL global_offsets_to_local( &
         und_row_blk_offsets, &
         und_local_rows, &
         und_local_row_blk_offsets)
      CALL global_offsets_to_local( &
         und_col_blk_offsets, &
         und_local_cols, &
         und_local_col_blk_offsets)
      !
      dense_nblkrows_total = SIZE(dense_row_blk_offsets) - 1
      dense_nblkcols_total = SIZE(dense_col_blk_offsets) - 1
      dense_nblkrows_local = SIZE(dense_local_rows)
      dense_nblkcols_local = SIZE(dense_local_cols)
      !
      ! Find the local data offsets (but indexed by the (dense) global
      ! rows/columns) for the dense data.
      ALLOCATE (dense_local_row_blk_offsets(dense_nblkrows_total + 1))
      ALLOCATE (dense_local_col_blk_offsets(dense_nblkcols_total + 1))
      CALL global_offsets_to_local( &
         dense_row_blk_offsets, &
         dense_local_rows, &
         dense_local_row_blk_offsets)
      CALL global_offsets_to_local( &
         dense_col_blk_offsets, &
         dense_local_cols, &
         dense_local_col_blk_offsets)
      ! Find the offset of blocks within dense rows/columns.  This is needed
      ! since the blocked rows/columns are not necessarily in the same order.
      ALLOCATE (row_internal_offsets(und_nblkrows_total))
      ALLOCATE (col_internal_offsets(und_nblkcols_total))
      CALL get_internal_offsets( &
         und_local_rows, row_map, &
         und_local_row_blk_offsets, &
         dense_local_row_blk_offsets, &
         row_internal_offsets)
      CALL get_internal_offsets( &
         und_local_cols, col_map, &
         und_local_col_blk_offsets, &
         dense_local_col_blk_offsets, &
         col_internal_offsets)
      !
      und_data = und_matrix%data_area
      CALL dbcsr_data_hold(und_data)
      CALL dbcsr_data_init(dense_data)
      CALL dbcsr_data_new(dense_data, dbcsr_data_get_type(und_data), &
                          data_size=nfullrows*nfullcols, &
                          memory_type=dbcsr_data_get_memory_type(und_data))
      !
      ! Reshuffle the data
      CALL make_dense_data(und_matrix, &
                           dense_data, nfullrows, nfullcols, &
                           und_local_row_blk_offsets, und_local_col_blk_offsets, &
                           dense_local_row_blk_offsets, dense_local_col_blk_offsets, &
                           row_map=row_map, col_map=col_map, &
                           row_internal_offsets=row_internal_offsets, &
                           col_internal_offsets=col_internal_offsets, &
                           join_rows=join_rows, join_cols=join_cols, make_tr=.FALSE.)
      CALL dbcsr_switch_data_area(dense_matrix, dense_data)
      CALL dbcsr_data_release(dense_data)
      CALL dbcsr_data_release(und_data)
      !
      ! Create the new dense index.
      dense_nlocal_blocks = dense_nblkrows_local*dense_nblkcols_local
      CALL dbcsr_addto_index_array(dense_matrix, &
                                   dbcsr_slot_row_p, &
                                   reservation=dense_nblkrows_total + 1, extra=2*dense_nlocal_blocks)
      CALL dbcsr_addto_index_array(dense_matrix, &
                                   dbcsr_slot_col_i, &
                                   reservation=dense_nlocal_blocks)
      CALL dbcsr_addto_index_array(dense_matrix, &
                                   dbcsr_slot_blk_p, &
                                   reservation=dense_nlocal_blocks)
      !
      meta = dense_matrix%index(1:dbcsr_meta_size)
      CALL dbcsr_pack_meta(dense_matrix, meta)
      meta(dbcsr_slot_nze) = nfullrows*nfullcols
      meta(dbcsr_slot_nblks) = dense_nlocal_blocks
      CALL make_dense_index(dense_matrix%row_p, &
                            dense_matrix%col_i, &
                            dense_matrix%blk_p, &
                            dense_nblkrows_total, dense_nblkcols_total, &
                            dense_local_rows, &
                            dense_local_cols, &
                            dense_local_row_blk_offsets, &
                            dense_local_col_blk_offsets, &
                            make_tr=.FALSE., &
                            meta=meta)
      CALL dbcsr_unpack_meta(dense_matrix, meta)
      !
      DEALLOCATE (und_local_row_blk_offsets)
      DEALLOCATE (und_local_col_blk_offsets)
      DEALLOCATE (dense_local_row_blk_offsets)
      DEALLOCATE (dense_local_col_blk_offsets)
      DEALLOCATE (row_internal_offsets)
      DEALLOCATE (col_internal_offsets)
      !
      CALL timestop(error_handle)
   END SUBROUTINE dbcsr_make_dense_low

   SUBROUTINE dbcsr_make_undense(matrix, undense_matrix, distribution, &
                                 row_blk_offsets, col_blk_offsets, row_blk_sizes, col_blk_sizes, &
                                 row_map, col_map)
      !! Makes a blocked matrix from a dense matrix, inplace
      !! @note Used for making matrices dense/undense

      TYPE(dbcsr_type), INTENT(IN)                       :: matrix
         !! dense matrix
      TYPE(dbcsr_type), INTENT(INOUT)                    :: undense_matrix
         !! matrix to make undense
      TYPE(dbcsr_distribution_obj), INTENT(IN)           :: distribution
         !! distribution of non-dense rows and columns
      TYPE(array_i1d_obj), INTENT(IN)                    :: row_blk_offsets, col_blk_offsets, &
                                                            row_blk_sizes, col_blk_sizes, row_map, &
                                                            col_map
         !! non-dense row block offsets
         !! non-dense column block offsets
         !! non-dense row block sizes
         !! non-dense column block sizes
         !! mapping from non-dense rows
         !! mapping from non-dense columns

      CHARACTER(len=*), PARAMETER :: routineN = 'dbcsr_make_undense'
      LOGICAL, PARAMETER                                 :: dbg = .FALSE.

      INTEGER                                            :: handle, nblkcols_local, nblkcols_total, &
                                                            nblkrows_local, nblkrows_total, &
                                                            nfullcols_local, nfullrows_local
      INTEGER, ALLOCATABLE, DIMENSION(:) :: col_internal_offsets, dense_local_col_blk_offsets, &
                                            dense_local_row_blk_offsets, local_col_blk_offsets, local_row_blk_offsets, &
                                            row_internal_offsets
      INTEGER, DIMENSION(:), POINTER, CONTIGUOUS         :: local_cols, local_rows, meta
      REAL(kind=dp)                                      :: cs
      TYPE(dbcsr_data_obj)                               :: blocked_data, dense_data
      TYPE(dbcsr_distribution_obj)                       :: dense_distribution

!   ---------------------------------------------------------------------------

      CALL timeset(routineN, handle)
      IF (dbg) THEN
         cs = dbcsr_checksum(matrix)
         WRITE (*, *) routineN//" prod cs pre", cs
      END IF
      dense_distribution = dbcsr_distribution(matrix)
      nfullrows_local = matrix%nfullrows_local
      nfullcols_local = matrix%nfullcols_local
      nblkrows_local = dbcsr_distribution_nlocal_rows(distribution)
      nblkcols_local = dbcsr_distribution_nlocal_cols(distribution)
      nblkrows_total = dbcsr_distribution_nrows(distribution)
      nblkcols_total = dbcsr_distribution_ncols(distribution)
      local_rows => dbcsr_distribution_local_rows(distribution)
      local_cols => dbcsr_distribution_local_cols(distribution)
      CALL dbcsr_create(undense_matrix, template=matrix, &
                        dist=distribution, &
                        row_blk_size_obj=row_blk_sizes, &
                        col_blk_size_obj=col_blk_sizes)
      ! Restore previous offsets, just to try to keep the same memory.
      CALL array_release(undense_matrix%row_blk_offset)
      CALL array_release(undense_matrix%col_blk_offset)
      undense_matrix%row_blk_offset = row_blk_offsets
      undense_matrix%col_blk_offset = col_blk_offsets
      CALL array_hold(undense_matrix%row_blk_offset)
      CALL array_hold(undense_matrix%col_blk_offset)
      !
      ALLOCATE (local_row_blk_offsets(nblkrows_total + 1))
      ALLOCATE (local_col_blk_offsets(nblkcols_total + 1))
      CALL dbcsr_clearfrom_index_array(undense_matrix, dbcsr_slot_row_p)
      CALL dbcsr_clearfrom_index_array(undense_matrix, dbcsr_slot_col_i)
      CALL dbcsr_clearfrom_index_array(undense_matrix, dbcsr_slot_blk_p)
      CALL dbcsr_addto_index_array(undense_matrix, dbcsr_slot_row_p, &
                                   reservation=nblkrows_total + 1, extra=nblkrows_local*nblkcols_local*2)
      CALL dbcsr_addto_index_array(undense_matrix, dbcsr_slot_col_i, &
                                   reservation=nblkrows_local*nblkcols_local)
      CALL dbcsr_addto_index_array(undense_matrix, dbcsr_slot_blk_p, &
                                   reservation=nblkrows_local*nblkcols_local)
      meta => undense_matrix%index(1:dbcsr_meta_size)
      CALL dbcsr_pack_meta(undense_matrix, meta)
      meta(dbcsr_slot_nblks) = nblkrows_local*nblkcols_local
      meta(dbcsr_slot_nze) = nfullrows_local*nfullcols_local
      CALL global_offsets_to_local(array_data(row_blk_offsets), &
                                   local_rows, local_row_blk_offsets(1:nblkrows_local + 1))
      CALL global_offsets_to_local(array_data(col_blk_offsets), &
                                   local_cols, local_col_blk_offsets(1:nblkcols_local + 1))
      CALL make_undense_index(undense_matrix%row_p, undense_matrix%col_i, undense_matrix%blk_p, &
                              distribution, &
                              local_row_blk_offsets(1:nblkrows_local + 1), &
                              local_col_blk_offsets(1:nblkcols_local + 1), &
                              meta)
      CALL dbcsr_unpack_meta(undense_matrix, meta)
      !
      CALL global_offsets_to_local(array_data(row_blk_offsets), &
                                   local_rows, local_row_blk_offsets)
      CALL global_offsets_to_local(array_data(col_blk_offsets), &
                                   local_cols, local_col_blk_offsets)
      !
      ALLOCATE (dense_local_row_blk_offsets(1 + dbcsr_distribution_nrows(dense_distribution)))
      ALLOCATE (dense_local_col_blk_offsets(1 + dbcsr_distribution_ncols(dense_distribution)))
      CALL global_offsets_to_local(array_data(matrix%row_blk_offset), &
                                   dbcsr_distribution_local_rows(dense_distribution), &
                                   dense_local_row_blk_offsets)
      CALL global_offsets_to_local(array_data(matrix%col_blk_offset), &
                                   dbcsr_distribution_local_cols(dense_distribution), &
                                   dense_local_col_blk_offsets)
      ! Find the offset of blocks within dense rows/columns.  This is needed
      ! since the blocked rows/columns are not necessarily in the same order.
      ALLOCATE (row_internal_offsets(nblkrows_total))
      ALLOCATE (col_internal_offsets(nblkcols_total))
      CALL get_internal_offsets( &
         local_rows, array_data(row_map), &
         local_row_blk_offsets, &
         dense_local_row_blk_offsets, &
         row_internal_offsets)
      CALL get_internal_offsets( &
         local_cols, array_data(col_map), &
         local_col_blk_offsets, &
         dense_local_col_blk_offsets, &
         col_internal_offsets)
      !
      dense_data = matrix%data_area
      CALL dbcsr_data_hold(dense_data)
      CALL dbcsr_data_init(blocked_data)
      CALL dbcsr_data_new(blocked_data, dbcsr_data_get_type(dense_data), &
                          data_size=nfullrows_local*nfullcols_local, &
                          memory_type=dbcsr_data_get_memory_type(dense_data))
      CALL dbcsr_switch_data_area(undense_matrix, blocked_data)
      CALL dbcsr_data_release(blocked_data)
      ! Reshuffle the data
      CALL make_undense_data(undense_matrix, dense_data, &
                             nfullrows_local, nfullcols_local, &
                             dense_local_row_blk_offsets, dense_local_col_blk_offsets, &
                             array_data(row_map), array_data(col_map), &
                             row_internal_offsets, col_internal_offsets)
      CALL dbcsr_data_release(dense_data)
      IF (dbg) THEN
         cs = dbcsr_checksum(matrix)
         WRITE (*, *) routineN//" prod cs pst", cs
      END IF
      DEALLOCATE (local_row_blk_offsets)
      DEALLOCATE (local_col_blk_offsets)
      DEALLOCATE (dense_local_row_blk_offsets)
      DEALLOCATE (dense_local_col_blk_offsets)
      DEALLOCATE (row_internal_offsets)
      DEALLOCATE (col_internal_offsets)
      CALL timestop(handle)
   END SUBROUTINE dbcsr_make_undense

   SUBROUTINE make_dense_data(matrix, dense_data, nfullrows, nfullcols, &
                              und_row_blk_offsets, und_col_blk_offsets, &
                              dense_row_blk_offsets, dense_col_blk_offsets, &
                              row_map, col_map, &
                              row_internal_offsets, col_internal_offsets, &
                              join_rows, join_cols, make_tr)
      !! Shuffles the data from blocked to standard dense form
      !! @note Used for making matrices dense/undense

      TYPE(dbcsr_type), INTENT(IN)                       :: matrix
         !! Existing blocked matrix
      TYPE(dbcsr_data_obj), INTENT(INOUT)                :: dense_data
         !! Dense data
      INTEGER, INTENT(IN)                                :: nfullrows, nfullcols
         !! size of new data
         !! size of new data
      INTEGER, DIMENSION(:), INTENT(IN) :: und_row_blk_offsets, und_col_blk_offsets, &
                                           dense_row_blk_offsets, dense_col_blk_offsets, row_map, col_map, row_internal_offsets, &
                                           col_internal_offsets
      LOGICAL, INTENT(IN)                                :: join_rows, join_cols, make_tr
         !! make rows dense, default is yes
         !! make columns dense, default is yes
         !! make the dense blocks transposed

      CHARACTER(len=*), PARAMETER :: routineN = 'make_dense_data'

      INTEGER :: blk_col, blk_col_size, blk_row, blk_row_size, dense_col, dense_row, error_handle, &
                 target_col_offset, target_cs, target_offset, target_row_offset, target_rs, tco, tro
      LOGICAL                                            :: tr
      TYPE(dbcsr_data_obj)                               :: block
      TYPE(dbcsr_iterator)                               :: iter

!   ---------------------------------------------------------------------------

      CALL timeset(routineN, error_handle)
      IF (dbcsr_data_get_size(dense_data) < nfullrows*nfullcols) &
         DBCSR_ABORT("Dense data too small")
      IF (.NOT. join_cols .AND. .NOT. join_rows) &
         DBCSR_WARN("Joining neither rows nor columns is untested")
      !
      CALL dbcsr_data_clear(dense_data)
      IF (dbcsr_data_get_size(matrix%data_area) .GT. 0 &
          .AND. nfullrows .GT. 0 .AND. nfullcols .GT. 0) THEN
!$OMP PARALLEL DEFAULT(NONE) &
!$OMP PRIVATE (block, iter, &
!$OMP         target_rs, target_cs, blk_row, blk_col, tr, blk_row_size, blk_col_size,&
!$OMP         tro, tco, target_offset,&
!$OMP         target_row_offset, target_col_offset,&
!$OMP         dense_row, dense_col) &
!$OMP SHARED (&
!$OMP         dense_data, matrix, &
!$OMP         make_tr, join_rows, join_cols, &
!$OMP         und_row_blk_offsets, und_col_blk_offsets,&
!$OMP         dense_row_blk_offsets, dense_col_blk_offsets,&
!$OMP         row_internal_offsets, col_internal_offsets,&
!$OMP         row_map, col_map,&
!$OMP         nfullrows, nfullcols)
         CALL dbcsr_data_init(block)
         CALL dbcsr_data_new(block, &
                             dbcsr_type_1d_to_2d(dbcsr_data_get_type(dense_data)))
         CALL dbcsr_iterator_start(iter, matrix, dynamic=.TRUE., shared=.TRUE., &
                                   contiguous_pointers=.FALSE., read_only=.TRUE.)
         DO WHILE (dbcsr_iterator_blocks_left(iter))
            CALL dbcsr_iterator_next_block(iter, blk_row, blk_col, block, tr, &
                                           row_size=blk_row_size, col_size=blk_col_size)
            dense_row = row_map(blk_row)
            dense_col = col_map(blk_col)
            !
            ! Calculate the target block row/column size and the offset
            ! within the target block where the undense block is placed.
            IF (join_rows) THEN
               target_row_offset = dense_row_blk_offsets(dense_row)
               target_rs = dense_row_blk_offsets(dense_row + 1) - &
                           dense_row_blk_offsets(dense_row)
               tro = 1 + row_internal_offsets(blk_row)
            ELSE
               target_row_offset = und_row_blk_offsets(blk_row)
               target_rs = blk_row_size
               tro = 1
            END IF
            IF (join_cols) THEN
               target_col_offset = dense_col_blk_offsets(dense_col)
               target_cs = dense_col_blk_offsets(dense_col + 1) - &
                           dense_col_blk_offsets(dense_col)
               tco = 1 + col_internal_offsets(blk_col)
            ELSE
               target_col_offset = und_col_blk_offsets(blk_col)
               target_cs = blk_col_size
               tco = 1
            END IF
            target_offset = (target_row_offset - 1)*nfullcols &
                            + (target_col_offset - 1)*( &
                            dense_row_blk_offsets(dense_row + 1) - &
                            dense_row_blk_offsets(dense_row))
            CALL dbcsr_block_partial_copy(dst=dense_data, &
                                          dst_offset=target_offset, &
                                          dst_rs=target_rs, dst_cs=target_cs, dst_tr=make_tr, &
                                          dst_r_lb=tro, dst_c_lb=tco, &
                                          src=block, src_rs=blk_row_size, src_cs=blk_col_size, src_tr=tr, &
                                          src_r_lb=1, src_c_lb=1, nrow=blk_row_size, ncol=blk_col_size)
         END DO
         CALL dbcsr_iterator_stop(iter)
         CALL dbcsr_data_clear_pointer(block)
         CALL dbcsr_data_release(block)
!$OMP END PARALLEL
      END IF
      CALL timestop(error_handle)
   END SUBROUTINE make_dense_data

   SUBROUTINE make_undense_data(matrix, dense_data, nfullrows, nfullcols, &
                                dense_row_blk_offsets, dense_col_blk_offsets, &
                                row_map, col_map, row_internal_offsets, col_internal_offsets)
      !! Shuffles the data from standard dense to blocked form
      !! @note Used for making matrices dense/undense

      TYPE(dbcsr_type), INTENT(INOUT)                    :: matrix
         !! Matrix with data to fill
      TYPE(dbcsr_data_obj), INTENT(IN)                   :: dense_data
         !! Dense data
      INTEGER, INTENT(IN)                                :: nfullrows, nfullcols
         !! number of full rows in local submatrix
         !! number of full columns in local submatrix
      INTEGER, DIMENSION(:), INTENT(IN)                  :: dense_row_blk_offsets, &
                                                            dense_col_blk_offsets, row_map, &
                                                            col_map, row_internal_offsets, &
                                                            col_internal_offsets
         !! row block offsets for dense data
         !! column block offsets for dense data
         !! mapping from undense to dense rows
         !! mapping from undense to dense rows

      INTEGER :: blk_col, blk_col_size, blk_row, blk_row_size, dense_col, dense_col_offset, &
                 dense_cs, dense_offset, dense_row, dense_row_offset, dense_rs, sco, sro
      LOGICAL                                            :: tr
      TYPE(dbcsr_data_obj)                               :: block
      TYPE(dbcsr_iterator)                               :: iter

!   ---------------------------------------------------------------------------

      IF (dbcsr_data_get_size(dense_data) < nfullrows*nfullcols) &
         DBCSR_ABORT("Dense data too small")
      IF (dbcsr_data_get_size(matrix%data_area) .GT. 0) THEN
         CALL dbcsr_data_clear(matrix%data_area)
!$OMP PARALLEL DEFAULT(NONE) &
!$OMP PRIVATE (block, iter,&
!$OMP          blk_row, blk_col, tr,&
!$OMP          blk_row_size, blk_col_size, sro, sco,&
!$OMP          dense_row_offset, dense_col_offset, dense_row, dense_col,&
!$OMP          dense_cs, dense_rs,&
!$OMP          dense_offset) &
!$OMP SHARED (&
!$OMP         matrix, dense_data, &
!$OMP         nfullrows, nfullcols, &
!$OMP         dense_row_blk_offsets, dense_col_blk_offsets,&
!$OMP         row_map, col_map,&
!$OMP         row_internal_offsets, col_internal_offsets)
         CALL dbcsr_data_init(block)
         CALL dbcsr_data_new(block, &
                             dbcsr_type_1d_to_2d(dbcsr_data_get_type(dense_data)))
         CALL dbcsr_iterator_start(iter, matrix, dynamic=.TRUE., shared=.TRUE., &
                                   contiguous_pointers=.FALSE.)
         DO WHILE (dbcsr_iterator_blocks_left(iter))
            CALL dbcsr_iterator_next_block(iter, blk_row, blk_col, block, tr, &
                                           row_size=blk_row_size, col_size=blk_col_size)
            dense_row = row_map(blk_row)
            dense_col = col_map(blk_col)
            dense_row_offset = dense_row_blk_offsets(dense_row)
            dense_col_offset = dense_col_blk_offsets(dense_col)
            dense_rs = dense_row_blk_offsets(dense_row + 1) - &
                       dense_row_blk_offsets(dense_row)
            dense_cs = dense_col_blk_offsets(dense_col + 1) - &
                       dense_col_blk_offsets(dense_col)
            sro = 1 + row_internal_offsets(blk_row)
            sco = 1 + col_internal_offsets(blk_col)
            dense_offset = (dense_row_offset - 1)*nfullcols &
                           + (dense_col_offset - 1)*dense_rs
            CALL dbcsr_block_partial_copy( &
               dst=block, dst_rs=blk_row_size, dst_cs=blk_col_size, dst_tr=tr, &
               dst_r_lb=1, dst_c_lb=1, &
               src=dense_data, src_offset=dense_offset, &
               src_rs=dense_rs, src_cs=dense_cs, src_tr=.FALSE., &
               src_r_lb=sro, src_c_lb=sco, &
               nrow=blk_row_size, ncol=blk_col_size)
         END DO
         CALL dbcsr_iterator_stop(iter)
         CALL dbcsr_data_clear_pointer(block)
         CALL dbcsr_data_release(block)
!$OMP END PARALLEL
      END IF
   END SUBROUTINE make_undense_data

   SUBROUTINE dbcsr_replicate_all(matrix)
      !! Replicates a DBCSR on all processors.

      TYPE(dbcsr_type), INTENT(INOUT)                    :: matrix
         !! matrix to replicate

      CALL dbcsr_replicate(matrix, replicate_rows=.TRUE., &
                           replicate_columns=.TRUE.)
   END SUBROUTINE dbcsr_replicate_all

   SUBROUTINE dbcsr_replicate(matrix, replicate_rows, replicate_columns, &
                              restrict_source)
      !! Replicates a DBCSR matrix among process rows and columns
      !!
      !! Direction definition
      !! Row replication means that all processors in a process grid sharing
      !! the same row get the data of the entire row. (In a 1-column grid the
      !! operation has no effect.) Similar logic applies to column replication.

      TYPE(dbcsr_type), INTENT(INOUT)                    :: matrix
         !! matrix to replicate
      LOGICAL, INTENT(IN)                                :: replicate_rows, replicate_columns
         !! Row should be replicated among all processors
         !! Column should be replicated among all processors
      INTEGER, INTENT(IN), OPTIONAL                      :: restrict_source
         !! Send only from this node (ignores blocks on other nodes)

      CHARACTER(len=*), PARAMETER :: routineN = 'dbcsr_replicate'
      INTEGER, PARAMETER                                 :: metalen = 3
      LOGICAL, PARAMETER                                 :: dbg = .FALSE.

      CHARACTER                                          :: rep_type
      INTEGER :: blk, blk_l, blk_p, blk_ps, blks, col, col_size, data_type, dst_p, handle, &
                 mynode, mypcol, myprow, nblks, numnodes, nze, offset, row, row_size, smp, &
                 src_p, stored_blk_p, stored_col, stored_row
      INTEGER, ALLOCATABLE, DIMENSION(:)                 :: rd_disp, recv_meta, rm_disp, send_meta
      INTEGER, ALLOCATABLE, DIMENSION(:, :)              :: recv_count
      INTEGER, DIMENSION(2)                              :: send_count
      INTEGER, DIMENSION(:), POINTER, CONTIGUOUS         :: col_blk_size, col_dist, row_blk_size, &
                                                            row_dist, tmp_index
      INTEGER, DIMENSION(:, :), POINTER                  :: blacs2mpi
      LOGICAL                                            :: had_subcomms, i_am_restricted, rest_src, &
                                                            tr
      TYPE(dbcsr_data_obj)                               :: data_block, recv_data, send_data, &
                                                            tmp_data
      TYPE(dbcsr_distribution_obj)                       :: target_dist
      TYPE(dbcsr_iterator)                               :: iter
      TYPE(dbcsr_mp_obj)                                 :: mp_obj
      TYPE(dbcsr_type)                                   :: replicated
      TYPE(mp_comm_type)                                 :: mp_group

!   ---------------------------------------------------------------------------

      CALL timeset(routineN, handle)
      IF (.NOT. dbcsr_valid_index(matrix)) &
         DBCSR_ABORT("Matrix not initialized.")
      !IF(matrix%replication_type /= dbcsr_repl_none) &
      !   DBCSR_WARN("Replicating a non-distributed matrix makes no sense.")
      IF (replicate_rows .AND. replicate_columns) THEN
         rep_type = dbcsr_repl_full
      ELSEIF (replicate_rows .AND. .NOT. replicate_columns) THEN
         rep_type = dbcsr_repl_row
      ELSEIF (replicate_columns .AND. .NOT. replicate_rows) THEN
         rep_type = dbcsr_repl_col
      ELSE
         rep_type = dbcsr_repl_none
         IF (.NOT. replicate_rows .AND. .NOT. replicate_columns) &
            DBCSR_ABORT("Some replication must be specified")
      END IF
      data_type = dbcsr_get_data_type(matrix)
      row_blk_size => array_data(matrix%row_blk_size)
      col_blk_size => array_data(matrix%col_blk_size)
      target_dist = matrix%dist
      row_dist => dbcsr_distribution_row_dist(target_dist)
      col_dist => dbcsr_distribution_col_dist(target_dist)
      mp_obj = dbcsr_distribution_mp(target_dist)
      blacs2mpi => dbcsr_mp_pgrid(mp_obj)
      numnodes = dbcsr_mp_numnodes(mp_obj)
      mynode = dbcsr_mp_mynode(mp_obj)
      myprow = dbcsr_mp_myprow(mp_obj)
      mypcol = dbcsr_mp_mypcol(mp_obj)
      IF (MAXVAL(row_dist) .GT. UBOUND(blacs2mpi, 1)) &
         DBCSR_ABORT('Row distribution references unexistent processor rows')
      IF (dbg) THEN
         IF (MAXVAL(row_dist) .NE. UBOUND(blacs2mpi, 1)) &
            DBCSR_WARN('Range of row distribution not equal to processor rows')
      END IF
      IF (MAXVAL(col_dist) .GT. UBOUND(blacs2mpi, 2)) &
         DBCSR_ABORT('Col distribution references unexistent processor cols')
      IF (dbg) THEN
         IF (MAXVAL(col_dist) .NE. UBOUND(blacs2mpi, 2)) &
            DBCSR_WARN('Range of col distribution not equal to processor cols')
      END IF
      ! Define the number of nodes with which I will communicate. Also
      ! setup row and column communicators.
      had_subcomms = dbcsr_mp_has_subgroups(mp_obj)
      SELECT CASE (rep_type)
      CASE (dbcsr_repl_full)
         numnodes = dbcsr_mp_numnodes(mp_obj)
         mp_group = dbcsr_mp_group(mp_obj)
         mynode = dbcsr_mp_mynode(mp_obj)
      CASE (dbcsr_repl_row)
         numnodes = dbcsr_mp_npcols(mp_obj)
         CALL dbcsr_mp_grid_setup(mp_obj)
         mp_group = dbcsr_mp_my_row_group(mp_obj)
         mynode = dbcsr_mp_mypcol(mp_obj)
      CASE (dbcsr_repl_col)
         numnodes = dbcsr_mp_nprows(mp_obj)
         CALL dbcsr_mp_grid_setup(mp_obj)
         mp_group = dbcsr_mp_my_col_group(mp_obj)
         mynode = dbcsr_mp_myprow(mp_obj)
      CASE (dbcsr_repl_none)
         numnodes = 1
         mp_group = dbcsr_mp_group(mp_obj)
         mynode = 0
      END SELECT
      IF ((rep_type == dbcsr_repl_row .OR. rep_type == dbcsr_repl_col) .AND. .NOT. dbcsr_mp_has_subgroups(mp_obj)) &
         DBCSR_ABORT("Only full replication supported when subcommunicators are turned off.")
      !
      IF (PRESENT(restrict_source)) THEN
         rest_src = .TRUE.
         i_am_restricted = mynode .NE. restrict_source
      ELSE
         rest_src = .FALSE.
         i_am_restricted = .FALSE.
      END IF
      !
      ALLOCATE (recv_count(2, 0:numnodes - 1))
      ALLOCATE (rd_disp(0:numnodes - 1))
      ALLOCATE (rm_disp(0:numnodes - 1))
      CALL dbcsr_create(replicated, name='Replicated '//TRIM(matrix%name), &
                        template=matrix, &
                        matrix_type=dbcsr_type_no_symmetry, &
                        replication_type=rep_type)
      !
      ! Count initial sizes for sending. Also, ensure that blocks are on their
      ! home processors.
      send_count(1:2) = 0
      CALL dbcsr_iterator_start(iter, matrix)
      IF (.NOT. i_am_restricted) THEN
         DO WHILE (dbcsr_iterator_blocks_left(iter))
            CALL dbcsr_iterator_next_block(iter, row, col, &
                                           row_size=row_size, col_size=col_size, blk=blk)
            !tr = .FALSE.
            !CALL dbcsr_get_stored_coordinates (matrix, row, col, tr, dst_p)
            !IF(dst_p /= mynode) &
            !   DBCSR_ABORT("Matrix is not correctly distributed. Call dbcsr_redistribute.")
            nze = row_size*col_size
            send_count(1) = send_count(1) + 1
            send_count(2) = send_count(2) + nze
         END DO
         send_count(2) = dbcsr_data_get_size_referenced(matrix%data_area)
      END IF
      CALL dbcsr_iterator_stop(iter)
      ! Exchange how much data others have.
      CALL mp_allgather(send_count(1:2), recv_count, mp_group)
      CALL dbcsr_data_init(recv_data)
      nze = SUM(recv_count(2, :))
      nblks = SUM(recv_count(1, :))
      CALL dbcsr_data_new(recv_data, data_type=data_type, data_size=nze)
      ! send_data should have the correct size
      CALL dbcsr_data_init(send_data)
      IF (send_count(2) .EQ. 0) THEN
         CALL dbcsr_data_new(send_data, data_type=data_type, data_size=0)
      ELSE
         CALL dbcsr_data_new(send_data, data_type=data_type)
         send_data = pointer_view(send_data, matrix%data_area, 1, send_count(2))
      END IF
      ALLOCATE (recv_meta(metalen*nblks))
      ALLOCATE (send_meta(metalen*send_count(1)))
      recv_meta(:) = 0
      ! Fill in the meta data structures and copy the data.
      rd_disp = -1; rm_disp = -1
      rd_disp(0) = 1; rm_disp(0) = 1
      DO dst_p = 1, numnodes - 1
         rm_disp(dst_p) = rm_disp(dst_p - 1) &
                          + metalen*recv_count(1, dst_p - 1)
         rd_disp(dst_p) = rd_disp(dst_p - 1) &
                          + recv_count(2, dst_p - 1)
      END DO
      CALL dbcsr_data_init(data_block)
      CALL dbcsr_data_new(data_block, data_type=data_type)
      CALL dbcsr_iterator_start(iter, matrix)
      smp = 1
      IF (.NOT. i_am_restricted) THEN
         DO WHILE (dbcsr_iterator_blocks_left(iter))
            CALL dbcsr_iterator_next_block(iter, row, col, blk, &
                                           transposed=tr, blk_p=blk_p)
            send_meta(smp + 0) = row
            send_meta(smp + 1) = col
            send_meta(smp + 2) = blk_p
            smp = smp + metalen
         END DO
      END IF
      CALL dbcsr_iterator_stop(iter)
      CALL dbcsr_data_clear_pointer(data_block)
      CALL dbcsr_data_release(data_block)
      ! Exchange the data and metadata structures.
      CALL mp_allgather(send_meta, recv_meta, metalen*recv_count(1, :), &
                        rm_disp - 1, mp_group)
      CALL dbcsr_allgatherv(send_data, dbcsr_data_get_size(send_data), &
                            recv_data, recv_count(2, :), &
                            rd_disp - 1, mp_group)
      ! Release the send buffer. If it had a non-zero size then it was a
      ! pointer into the regular matrix and the data pointer should be
      ! cleared and not deallocated.
      IF (send_count(2) .NE. 0) THEN
         CALL dbcsr_data_clear_pointer(send_data)
      END IF
      CALL dbcsr_data_release(send_data)
      !
      ! Now fill in the data.
      CALL dbcsr_work_create(replicated, &
                             SUM(recv_count(1, :)), &
                             SUM(recv_count(2, :)), n=1, &
                             work_mutable=.FALSE.)
      CALL dbcsr_data_hold(recv_data)
      CALL dbcsr_data_release(replicated%wms(1)%data_area)
      replicated%wms(1)%data_area = recv_data
      blk_ps = 1
      blks = 1
      DO src_p = 0, numnodes - 1
         nze = recv_count(2, src_p)
         !CALL dbcsr_data_set (replicated%wms(1)%data_area, blk_ps, nze,&
         !     recv_data, rd_disp(src_p))
         offset = rd_disp(src_p) - 1
         DO blk_l = 1, recv_count(1, src_p)
            IF (dbg) WRITE (*, *) "src_p, blk_l", src_p, blk_l
            stored_row = recv_meta(rm_disp(src_p) + metalen*(blk_l - 1))
            stored_col = recv_meta(rm_disp(src_p) + metalen*(blk_l - 1) + 1)
            stored_blk_p = recv_meta(rm_disp(src_p) + metalen*(blk_l - 1) + 2)
            replicated%wms(1)%row_i(blks) = stored_row
            replicated%wms(1)%col_i(blks) = stored_col
            replicated%wms(1)%blk_p(blks) = SIGN(ABS(stored_blk_p) + offset, &
                                                 stored_blk_p)
            nze = row_blk_size(stored_row) &
                  *col_blk_size(stored_col)
            blk_ps = MAX(blk_ps, ABS(stored_blk_p) + nze + offset)
            blks = blks + 1
         END DO
      END DO
      CALL dbcsr_data_set_size_referenced(replicated%wms(1)%data_area, blk_ps - 1)
      !
      replicated%wms(1)%lastblk = blks - 1
      replicated%wms(1)%datasize = blk_ps - 1
      CALL dbcsr_finalize(replicated, reshuffle=.TRUE.)
      !
      ! Remove communicators if they were forcibly created.
      IF (had_subcomms .AND. &
          (rep_type .EQ. dbcsr_repl_row .OR. rep_type .EQ. dbcsr_repl_col)) THEN
         CALL dbcsr_mp_grid_remove(mp_obj)
      END IF
      DEALLOCATE (recv_count)
      DEALLOCATE (rd_disp)
      DEALLOCATE (rm_disp)
      CALL dbcsr_data_release(recv_data)
      DEALLOCATE (recv_meta)
      DEALLOCATE (send_meta)
      matrix%replication_type = replicated%replication_type
      ! Now replace the data and index
      CALL dbcsr_switch_data_area(matrix, replicated%data_area, &
                                  previous_data_area=tmp_data)
      CALL dbcsr_switch_data_area(replicated, tmp_data)
      CALL dbcsr_data_release(tmp_data)
      tmp_index => matrix%index
      matrix%index => replicated%index
      replicated%index => tmp_index
      CALL dbcsr_repoint_index(matrix)
      matrix%nze = replicated%nze
      matrix%nblks = replicated%nblks
      CALL dbcsr_release(replicated)
      CALL dbcsr_verify_matrix(matrix)
      CALL timestop(handle)
   END SUBROUTINE dbcsr_replicate

   SUBROUTINE dbcsr_complete_redistribute(matrix, redist, keep_sparsity, summation)
      !! Fully redistributes a DBCSR matrix.
      !! The new distribution may be arbitrary as long as the total
      !! number full rows and columns matches that of the existing
      !! matrix.

      TYPE(dbcsr_type), INTENT(IN)                       :: matrix
         !! matrix to redistribute
      TYPE(dbcsr_type), INTENT(INOUT)                    :: redist
         !! redistributed matrix
      LOGICAL, INTENT(IN), OPTIONAL                      :: keep_sparsity, summation
         !! retains the sparsity of the redist matrix
         !! sum blocks with identical row and col from different processes

      CHARACTER(len=*), PARAMETER :: routineN = 'dbcsr_complete_redistribute'
      INTEGER, PARAMETER                                 :: metalen = 7
      LOGICAL, PARAMETER                                 :: dbg = .FALSE.

      INTEGER :: blk, blk_col_new, blk_ps, blk_row_new, blks, cnt_fnd, cnt_new, cnt_skip, col, &
                 col_int, col_offset_new, col_offset_old, col_rle, col_size, col_size_new, data_offset_l, &
                 data_type, dst_p, handle, i, meta_l, numnodes, nze_rle, row, row_int, &
                 row_offset_new, row_offset_old, row_rle, row_size, row_size_new, src_p, stored_col_new, &
                 stored_row_new
      INTEGER, ALLOCATABLE, DIMENSION(:) :: col_end_new, col_end_old, col_start_new, &
                                            col_start_old, rd_disp, recv_meta, rm_disp, row_end_new, row_end_old, row_start_new, &
                                            row_start_old, sd_disp, sdp, send_meta, sm_disp, smp
      INTEGER, ALLOCATABLE, DIMENSION(:, :) :: col_reblocks, n_col_reblocks, n_row_reblocks, &
                                               recv_count, row_reblocks, send_count, total_recv_count, total_send_count
      INTEGER, DIMENSION(:), POINTER                     :: col_blk_size_new, col_blk_size_old, &
                                                            col_dist_new, row_blk_size_new, &
                                                            row_blk_size_old, row_dist_new
      INTEGER, DIMENSION(:, :), POINTER                  :: pgrid
      LOGICAL                                            :: found, my_keep_sparsity, my_summation, &
                                                            sym, tr, valid_block
      REAL(kind=dp)                                      :: cs1, cs2
      TYPE(dbcsr_data_obj)                               :: buff_data, data_block, recv_data, &
                                                            send_data
      TYPE(dbcsr_distribution_obj)                       :: dist_new
      TYPE(dbcsr_iterator)                               :: iter
      TYPE(dbcsr_mp_obj)                                 :: mp_obj_new
      TYPE(mp_comm_type)                                 :: mp_group

!   ---------------------------------------------------------------------------

      CALL timeset(routineN, handle)

      IF (.NOT. dbcsr_valid_index(matrix)) &
         DBCSR_ABORT("Input not valid.")
      IF (matrix%replication_type .NE. dbcsr_repl_none) &
         DBCSR_WARN("Can not redistribute replicated matrix.")
      IF (dbcsr_has_symmetry(matrix) .AND. .NOT. dbcsr_has_symmetry(redist)) &
         DBCSR_ABORT("Can not redistribute a symmetric matrix into a non-symmetric one")
      !
      my_keep_sparsity = .FALSE.
      IF (PRESENT(keep_sparsity)) my_keep_sparsity = keep_sparsity
      !
      my_summation = .FALSE.
      IF (PRESENT(summation)) my_summation = summation

      ! zero blocks that might be present in the target (redist) but not in the source (matrix)
      CALL dbcsr_set(redist, 0.0_dp)

      sym = dbcsr_has_symmetry(redist)
      data_type = matrix%data_type
      ! Get row and column start and end positions
      ! Old matrix
      row_blk_size_old => array_data(matrix%row_blk_size)
      col_blk_size_old => array_data(matrix%col_blk_size)
      ALLOCATE (row_start_old(dbcsr_nblkrows_total(matrix)), &
                row_end_old(dbcsr_nblkrows_total(matrix)), &
                col_start_old(dbcsr_nblkcols_total(matrix)), &
                col_end_old(dbcsr_nblkcols_total(matrix)))
      CALL convert_sizes_to_offsets(row_blk_size_old, &
                                    row_start_old, row_end_old)
      CALL convert_sizes_to_offsets(col_blk_size_old, &
                                    col_start_old, col_end_old)
      ! New matrix
      dist_new = dbcsr_distribution(redist)
      row_blk_size_new => array_data(redist%row_blk_size)
      col_blk_size_new => array_data(redist%col_blk_size)
      ALLOCATE (row_start_new(dbcsr_nblkrows_total(redist)), &
                row_end_new(dbcsr_nblkrows_total(redist)), &
                col_start_new(dbcsr_nblkcols_total(redist)), &
                col_end_new(dbcsr_nblkcols_total(redist)))
      CALL convert_sizes_to_offsets(row_blk_size_new, &
                                    row_start_new, row_end_new)
      CALL convert_sizes_to_offsets(col_blk_size_new, &
                                    col_start_new, col_end_new)
      row_dist_new => dbcsr_distribution_row_dist(dist_new)
      col_dist_new => dbcsr_distribution_col_dist(dist_new)
      ! Create mappings
      i = dbcsr_nfullrows_total(redist)
      ALLOCATE (row_reblocks(4, i))
      ALLOCATE (n_row_reblocks(2, dbcsr_nblkrows_total(matrix)))
      CALL dbcsr_reblocking_targets(row_reblocks, i, n_row_reblocks, &
                                    row_blk_size_old, row_blk_size_new)
      i = dbcsr_nfullcols_total(redist)
      ALLOCATE (col_reblocks(4, i))
      ALLOCATE (n_col_reblocks(2, dbcsr_nblkcols_total(matrix)))
      CALL dbcsr_reblocking_targets(col_reblocks, i, n_col_reblocks, &
                                    col_blk_size_old, col_blk_size_new)
      !
      mp_obj_new = dbcsr_distribution_mp(dist_new)
      pgrid => dbcsr_mp_pgrid(mp_obj_new)
      numnodes = dbcsr_mp_numnodes(mp_obj_new)
      mp_group = dbcsr_mp_group(mp_obj_new)
      !
      IF (MAXVAL(row_dist_new) > UBOUND(pgrid, 1)) &
         DBCSR_ABORT('Row distribution references unexistent processor rows')
      IF (dbg) THEN
         IF (MAXVAL(row_dist_new) .NE. UBOUND(pgrid, 1)) &
            DBCSR_WARN('Range of row distribution not equal to processor rows')
      END IF
      IF (MAXVAL(col_dist_new) > UBOUND(pgrid, 2)) &
         DBCSR_ABORT('Col distribution references unexistent processor cols')
      IF (dbg) THEN
         IF (MAXVAL(col_dist_new) .NE. UBOUND(pgrid, 2)) &
            DBCSR_WARN('Range of col distribution not equal to processor cols')
      END IF
      ALLOCATE (send_count(2, 0:numnodes - 1))
      ALLOCATE (recv_count(2, 0:numnodes - 1))
      ALLOCATE (total_send_count(2, 0:numnodes - 1))
      ALLOCATE (total_recv_count(2, 0:numnodes - 1))
      ALLOCATE (sdp(0:numnodes - 1))
      ALLOCATE (sd_disp(0:numnodes - 1))
      ALLOCATE (smp(0:numnodes - 1))
      ALLOCATE (sm_disp(0:numnodes - 1))
      ALLOCATE (rd_disp(0:numnodes - 1))
      ALLOCATE (rm_disp(0:numnodes - 1))
      IF (dbg) THEN
         cs1 = dbcsr_checksum(matrix)
      END IF
      !cs1 = dbcsr_checksum (matrix)
      !call dbcsr_print(matrix)
      !
      !
      ! Count initial sizes for sending.
      !
      ! We go through every element of every local block and determine
      ! to which processor it must be sent. It could be more efficient,
      ! but at least the index data are run-length encoded.
      send_count(:, :) = 0
      CALL dbcsr_iterator_start(iter, matrix)
      dst_p = -1
      DO WHILE (dbcsr_iterator_blocks_left(iter))
         CALL dbcsr_iterator_next_block(iter, row, col, blk)
         DO col_int = n_col_reblocks(1, col), &
            n_col_reblocks(1, col) + n_col_reblocks(2, col) - 1
            blk_col_new = col_reblocks(1, col_int)
            DO row_int = n_row_reblocks(1, row), &
               n_row_reblocks(1, row) + n_row_reblocks(2, row) - 1
               blk_row_new = row_reblocks(1, row_int)
               IF (.NOT. sym .OR. blk_col_new .GE. blk_row_new) THEN
                  tr = .FALSE.
                  CALL dbcsr_get_stored_coordinates(redist, &
                                                    blk_row_new, blk_col_new, dst_p)
                  send_count(1, dst_p) = send_count(1, dst_p) + 1
                  send_count(2, dst_p) = send_count(2, dst_p) + &
                                         col_reblocks(2, col_int)*row_reblocks(2, row_int)
               END IF
            END DO
         END DO
      END DO
      CALL dbcsr_iterator_stop(iter)
      !
      !
      CALL mp_alltoall(send_count, recv_count, 2, mp_group)
      ! Allocate data structures needed for data exchange.
      CALL dbcsr_data_init(recv_data)
      CALL dbcsr_data_new(recv_data, data_type, SUM(recv_count(2, :)))
      ALLOCATE (recv_meta(metalen*SUM(recv_count(1, :))))
      CALL dbcsr_data_init(send_data)
      CALL dbcsr_data_new(send_data, data_type, SUM(send_count(2, :)))
      ALLOCATE (send_meta(metalen*SUM(send_count(1, :))))
      ! Fill in the meta data structures and copy the data.
      DO dst_p = 0, numnodes - 1
         total_send_count(1, dst_p) = send_count(1, dst_p)
         total_send_count(2, dst_p) = send_count(2, dst_p)
         total_recv_count(1, dst_p) = recv_count(1, dst_p)
         total_recv_count(2, dst_p) = recv_count(2, dst_p)
      END DO
      sd_disp = -1; sm_disp = -1
      rd_disp = -1; rm_disp = -1
      sd_disp(0) = 1; sm_disp(0) = 1
      rd_disp(0) = 1; rm_disp(0) = 1
      DO dst_p = 1, numnodes - 1
         sm_disp(dst_p) = sm_disp(dst_p - 1) &
                          + metalen*total_send_count(1, dst_p - 1)
         sd_disp(dst_p) = sd_disp(dst_p - 1) &
                          + total_send_count(2, dst_p - 1)
         rm_disp(dst_p) = rm_disp(dst_p - 1) &
                          + metalen*total_recv_count(1, dst_p - 1)
         rd_disp(dst_p) = rd_disp(dst_p - 1) &
                          + total_recv_count(2, dst_p - 1)
      END DO
      sdp(:) = sd_disp     ! sdp points to the the next place to store
      ! data. It is postincremented.
      smp(:) = sm_disp - metalen  ! But smp points to the "working" data, not
      ! the next. It is pre-incremented, so we must
      ! first rewind it.
      !
      CALL dbcsr_data_init(data_block)
      CALL dbcsr_data_new(data_block, data_type)
      CALL dbcsr_iterator_start(iter, matrix)
      dst_p = -1
      DO WHILE (dbcsr_iterator_blocks_left(iter))
         CALL dbcsr_iterator_next_block(iter, row, col, data_block, tr, blk, &
                                        row_size=row_size, col_size=col_size)
         !IF (tr) WRITE(*,*)"block at",row,col," is transposed"
         DO col_int = n_col_reblocks(1, col), &
            n_col_reblocks(1, col) + n_col_reblocks(2, col) - 1
            blk_col_new = col_reblocks(1, col_int)
            DO row_int = n_row_reblocks(1, row), &
               n_row_reblocks(1, row) + n_row_reblocks(2, row) - 1
               blk_row_new = row_reblocks(1, row_int)
               loc_ok: IF (.NOT. sym .OR. blk_col_new .GE. blk_row_new) THEN
                  IF (dbg) &
                     WRITE (*, *) 'using block', blk_row_new, 'x', blk_col_new
                  ! Start a new RLE run
                  tr = .FALSE.
                  CALL dbcsr_get_stored_coordinates(redist, &
                                                    blk_row_new, blk_col_new, dst_p)
                  row_offset_old = row_reblocks(3, row_int)
                  col_offset_old = col_reblocks(3, col_int)
                  row_offset_new = row_reblocks(4, row_int)
                  col_offset_new = col_reblocks(4, col_int)
                  row_rle = row_reblocks(2, row_int)
                  col_rle = col_reblocks(2, col_int)
                  smp(dst_p) = smp(dst_p) + metalen
                  send_meta(smp(dst_p)) = blk_row_new   ! new blocked row
                  send_meta(smp(dst_p) + 1) = blk_col_new ! new blocked column
                  send_meta(smp(dst_p) + 2) = row_offset_new  ! row in new block
                  send_meta(smp(dst_p) + 3) = col_offset_new  ! col in new block
                  send_meta(smp(dst_p) + 4) = row_rle ! RLE rows
                  send_meta(smp(dst_p) + 5) = col_rle ! RLE columns
                  send_meta(smp(dst_p) + 6) = sdp(dst_p) - sd_disp(dst_p) ! Offset in data
                  nze_rle = row_rle*col_rle
                  ! Copy current block into the send buffer
                  CALL dbcsr_block_partial_copy( &
                     send_data, dst_offset=sdp(dst_p) - 1, &
                     dst_rs=row_rle, dst_cs=col_rle, dst_tr=.FALSE., &
                     dst_r_lb=1, dst_c_lb=1, &
                     src=data_block, &
                     src_rs=row_size, src_cs=col_size, src_tr=tr, &
                     src_r_lb=row_offset_old, src_c_lb=col_offset_old, &
                     nrow=row_rle, ncol=col_rle)
                  sdp(dst_p) = sdp(dst_p) + nze_rle
               END IF loc_ok
            END DO ! row_int
         END DO ! col_int
      END DO
      CALL dbcsr_iterator_stop(iter)
      CALL dbcsr_data_clear_pointer(data_block)
      CALL dbcsr_data_release(data_block)

      ! Exchange the data and metadata structures.
      !
      SELECT CASE (data_type)
      CASE (dbcsr_type_real_4)
         CALL hybrid_alltoall_s1( &
            send_data%d%r_sp(:), total_send_count(2, :), sd_disp(:) - 1, &
            recv_data%d%r_sp(:), total_recv_count(2, :), rd_disp(:) - 1, &
            mp_obj_new)
      CASE (dbcsr_type_real_8)
         !CALL mp_alltoall(&
         !     send_data%d%r_dp(:), total_send_count(2,:), sd_disp(:)-1,&
         !     recv_data%d%r_dp(:), total_recv_count(2,:), rd_disp(:)-1,&
         !     mp_group)
         CALL hybrid_alltoall_d1( &
            send_data%d%r_dp(:), total_send_count(2, :), sd_disp(:) - 1, &
            recv_data%d%r_dp(:), total_recv_count(2, :), rd_disp(:) - 1, &
            mp_obj_new)
      CASE (dbcsr_type_complex_4)
         CALL hybrid_alltoall_c1( &
            send_data%d%c_sp(:), total_send_count(2, :), sd_disp(:) - 1, &
            recv_data%d%c_sp(:), total_recv_count(2, :), rd_disp(:) - 1, &
            mp_obj_new)
      CASE (dbcsr_type_complex_8)
         CALL hybrid_alltoall_z1( &
            send_data%d%c_dp(:), total_send_count(2, :), sd_disp(:) - 1, &
            recv_data%d%c_dp(:), total_recv_count(2, :), rd_disp(:) - 1, &
            mp_obj_new)
      CASE default
         DBCSR_ABORT("Invalid matrix type")
      END SELECT
      CALL hybrid_alltoall_i1(send_meta(:), metalen*total_send_count(1, :), sm_disp(:) - 1, &
                              recv_meta(:), metalen*total_recv_count(1, :), rm_disp(:) - 1, mp_obj_new)
      !
      ! Now fill in the data.
      CALL dbcsr_work_create(redist, &
                             nblks_guess=SUM(recv_count(1, :)), &
                             sizedata_guess=SUM(recv_count(2, :)), work_mutable=.TRUE.)
      CALL dbcsr_data_init(buff_data)
      CALL dbcsr_data_init(data_block)
      CALL dbcsr_data_new(buff_data, dbcsr_type_1d_to_2d(data_type), &
                          redist%max_rbs, redist%max_cbs)
      CALL dbcsr_data_new(data_block, dbcsr_type_1d_to_2d(data_type))

      !blk_p = 1
      !blk = 1
      blk_ps = 0
      blks = 0
      cnt_fnd = 0; cnt_new = 0; cnt_skip = 0
      DO src_p = 0, numnodes - 1
         data_offset_l = rd_disp(src_p)
         DO meta_l = 1, recv_count(1, src_p)
            stored_row_new = recv_meta(rm_disp(src_p) + metalen*(meta_l - 1))
            stored_col_new = recv_meta(rm_disp(src_p) + metalen*(meta_l - 1) + 1)
            row_offset_new = recv_meta(rm_disp(src_p) + metalen*(meta_l - 1) + 2)
            col_offset_new = recv_meta(rm_disp(src_p) + metalen*(meta_l - 1) + 3)
            row_rle = recv_meta(rm_disp(src_p) + metalen*(meta_l - 1) + 4)
            col_rle = recv_meta(rm_disp(src_p) + metalen*(meta_l - 1) + 5)
            data_offset_l = rd_disp(src_p) &
                            + recv_meta(rm_disp(src_p) + metalen*(meta_l - 1) + 6)

            CALL dbcsr_data_clear_pointer(data_block)
            CALL dbcsr_get_block_p(redist, stored_row_new, stored_col_new, &
                                   data_block, tr, found)
            valid_block = found

            IF (found) cnt_fnd = cnt_fnd + 1
            IF (.NOT. found .AND. .NOT. my_keep_sparsity) THEN
               ! We have to set up a buffer block
               CALL dbcsr_data_set_pointer(data_block, &
                                           rsize=row_blk_size_new(stored_row_new), &
                                           csize=col_blk_size_new(stored_col_new), &
                                           pointee=buff_data)
               CALL dbcsr_data_clear(data_block)
               !r2_dp => r2_dp_buff(1:row_blk_size_new (stored_row_new),&
               !     1:col_blk_size_new (stored_col_new))
               !r2_dp(:,:) = 0.0_dp
               tr = .FALSE.
               blks = blks + 1
               blk_ps = blk_ps + row_blk_size_new(stored_row_new)* &
                        col_blk_size_new(stored_col_new)
               valid_block = .TRUE.
               cnt_new = cnt_new + 1
            END IF
            nze_rle = row_rle*col_rle

            IF (valid_block) THEN
               row_size_new = row_blk_size_new(stored_row_new)
               col_size_new = col_blk_size_new(stored_col_new)
               CALL dbcsr_block_partial_copy( &
                  dst=data_block, dst_tr=tr, &
                  dst_rs=row_size_new, dst_cs=col_size_new, &
                  dst_r_lb=row_offset_new, dst_c_lb=col_offset_new, &
                  src=recv_data, src_offset=data_offset_l - 1, &
                  src_rs=row_rle, src_cs=col_rle, src_tr=.FALSE., &
                  src_r_lb=1, src_c_lb=1, &
                  nrow=row_rle, ncol=col_rle)
            ELSE
               cnt_skip = cnt_skip + 1
            END IF

            data_offset_l = data_offset_l + nze_rle
            IF ((.NOT. found .OR. my_summation) .AND. valid_block) THEN
               IF (dbg) WRITE (*, *) routineN//" Adding new block at", &
                  stored_row_new, stored_col_new
               CALL dbcsr_put_block(redist, stored_row_new, stored_col_new, &
                                    data_block, transposed=tr, summation=my_summation)
               !DEALLOCATE (r2_dp)
            ELSE
               IF (.NOT. my_keep_sparsity .AND. dbg) &
                  WRITE (*, *) routineN//" Reusing block at", &
                  stored_row_new, stored_col_new
            END IF
         END DO
      END DO

      CALL dbcsr_data_clear_pointer(data_block)
      CALL dbcsr_data_release(buff_data)
      CALL dbcsr_data_release(data_block)
      !
      IF (dbg) THEN
         WRITE (*, *) routineN//" Declared blocks=", redist%wms(1)%lastblk, &
            "actual=", blks
         WRITE (*, *) routineN//" Declared data size=", redist%wms(1)%datasize, &
            "actual=", blk_ps
      END IF

      CALL dbcsr_finalize(redist)

      DEALLOCATE (send_count)
      DEALLOCATE (recv_count)
      DEALLOCATE (sdp); DEALLOCATE (sd_disp)
      DEALLOCATE (smp); DEALLOCATE (sm_disp)
      DEALLOCATE (rd_disp)
      DEALLOCATE (rm_disp)

      CALL dbcsr_data_release(recv_data)
      CALL dbcsr_data_release(send_data)

      DEALLOCATE (recv_meta)
      DEALLOCATE (send_meta)

      !if (dbg) call dbcsr_print(redist)
      IF (dbg) THEN
         cs2 = dbcsr_checksum(redist)
         WRITE (*, *) routineN//" Checksums=", cs1, cs2, cs1 - cs2
      END IF
      !IF(cs1-cs2 > 0.00001) DBCSR_ABORT("Mangled data!")
      CALL timestop(handle)
   END SUBROUTINE dbcsr_complete_redistribute

   SUBROUTINE dbcsr_redistribute(matrix, redist)
      !! Redistributes a DBCSR matrix.
      !! The new distribution should have compatible row and column blocks.

      TYPE(dbcsr_type), INTENT(IN)                       :: matrix
         !! matrix to redistribute
      TYPE(dbcsr_type), INTENT(INOUT)                    :: redist
         !! redistributed matrix, which should already be created

      CHARACTER(len=*), PARAMETER :: routineN = 'dbcsr_redistribute'
      INTEGER, PARAMETER                                 :: metalen = 2
      LOGICAL, PARAMETER                                 :: dbg = .FALSE.

      INTEGER :: blk, blk_ps, blks, col, col_size, data_type, dst_p, handle, meta_l, &
                 numnodes, nze, row, row_size, src_p, stored_col_new, stored_row_new
      INTEGER, ALLOCATABLE, DIMENSION(:)                 :: rd_disp, recv_meta, rm_disp, sd_disp, &
                                                            sdp, send_meta, sm_disp, smp
      INTEGER, ALLOCATABLE, DIMENSION(:, :)              :: recv_count, send_count, &
                                                            total_recv_count, total_send_count
      INTEGER, DIMENSION(:), POINTER                     :: col_blk_size_new, col_dist_new, &
                                                            row_blk_size_new, row_dist_new
      INTEGER, DIMENSION(:, :), POINTER                  :: pgrid
      LOGICAL                                            :: sym_tr, tr
      TYPE(dbcsr_data_obj)                               :: data_block, recv_data, send_data
      TYPE(dbcsr_distribution_obj)                       :: dist_new
      TYPE(dbcsr_iterator)                               :: iter
      TYPE(dbcsr_mp_obj)                                 :: mp_obj_new
      TYPE(mp_comm_type)                                 :: mp_group

!   ---------------------------------------------------------------------------

      CALL timeset(routineN, handle)
      !call dbcsr_print_dist (matrix%dist)
      !call dbcsr_print_dist (redist%dist)
      IF (.NOT. dbcsr_valid_index(matrix)) &
         DBCSR_ABORT("Input not valid.")
      IF (matrix%replication_type .NE. dbcsr_repl_none) &
         DBCSR_WARN("Can not redistribute replicated matrix.")
      data_type = matrix%data_type
      ! Get row and column start and end positions
      ! Old matrix
      ! New matrix
      dist_new = dbcsr_distribution(redist)
      row_blk_size_new => array_data(redist%row_blk_size)
      col_blk_size_new => array_data(redist%col_blk_size)
      row_dist_new => dbcsr_distribution_row_dist(dist_new)
      col_dist_new => dbcsr_distribution_col_dist(dist_new)
      !
      mp_obj_new = dbcsr_distribution_mp(dist_new)
      pgrid => dbcsr_mp_pgrid(mp_obj_new)
      numnodes = dbcsr_mp_numnodes(mp_obj_new)
      mp_group = dbcsr_mp_group(mp_obj_new)
      !
      IF (MAXVAL(row_dist_new) .GT. UBOUND(pgrid, 1)) &
         DBCSR_ABORT('Row distribution references unexistent processor rows')
      IF (dbg) THEN
         IF (MAXVAL(row_dist_new) .NE. UBOUND(pgrid, 1)) &
            DBCSR_WARN('Range of row distribution not equal to processor rows')
      END IF
      IF (MAXVAL(col_dist_new) .GT. UBOUND(pgrid, 2)) &
         DBCSR_ABORT('Col distribution references unexistent processor cols')
      IF (dbg) THEN
         IF (MAXVAL(col_dist_new) .NE. UBOUND(pgrid, 2)) &
            DBCSR_WARN('Range of col distribution not equal to processor cols')
      END IF
      ALLOCATE (send_count(2, 0:numnodes - 1))
      ALLOCATE (recv_count(2, 0:numnodes - 1))
      ALLOCATE (total_send_count(2, 0:numnodes - 1))
      ALLOCATE (total_recv_count(2, 0:numnodes - 1))
      ALLOCATE (sdp(0:numnodes - 1))
      ALLOCATE (sd_disp(0:numnodes - 1))
      ALLOCATE (smp(0:numnodes - 1))
      ALLOCATE (sm_disp(0:numnodes - 1))
      ALLOCATE (rd_disp(0:numnodes - 1))
      ALLOCATE (rm_disp(0:numnodes - 1))
      ! Count initial sizes for sending.
      !
      send_count(:, :) = 0
      CALL dbcsr_iterator_start(iter, matrix)
      dst_p = -1
      DO WHILE (dbcsr_iterator_blocks_left(iter))
         CALL dbcsr_iterator_next_block(iter, row, col, blk, tr, &
                                        row_size=row_size, col_size=col_size)
         sym_tr = .FALSE.
         CALL dbcsr_get_stored_coordinates(redist, &
                                           row, col, dst_p)
         nze = row_size*col_size
         send_count(1, dst_p) = send_count(1, dst_p) + 1
         send_count(2, dst_p) = send_count(2, dst_p) + nze
      END DO
      CALL dbcsr_iterator_stop(iter)
      CALL mp_alltoall(send_count, recv_count, 2, mp_group)
      ! Allocate data structures needed for data exchange.
      CALL dbcsr_data_init(recv_data)
      CALL dbcsr_data_new(recv_data, data_type, SUM(recv_count(2, :)))
      ALLOCATE (recv_meta(metalen*SUM(recv_count(1, :))))
      CALL dbcsr_data_init(send_data)
      CALL dbcsr_data_new(send_data, data_type, SUM(send_count(2, :)))
      ALLOCATE (send_meta(metalen*SUM(send_count(1, :))))
      ! Fill in the meta data structures and copy the data.
      DO dst_p = 0, numnodes - 1
         total_send_count(1, dst_p) = send_count(1, dst_p)
         total_send_count(2, dst_p) = send_count(2, dst_p)
         total_recv_count(1, dst_p) = recv_count(1, dst_p)
         total_recv_count(2, dst_p) = recv_count(2, dst_p)
      END DO
      sd_disp = -1; sm_disp = -1
      rd_disp = -1; rm_disp = -1
      sd_disp(0) = 1; sm_disp(0) = 1
      rd_disp(0) = 1; rm_disp(0) = 1
      DO dst_p = 1, numnodes - 1
         sm_disp(dst_p) = sm_disp(dst_p - 1) &
                          + metalen*total_send_count(1, dst_p - 1)
         sd_disp(dst_p) = sd_disp(dst_p - 1) &
                          + total_send_count(2, dst_p - 1)
         rm_disp(dst_p) = rm_disp(dst_p - 1) &
                          + metalen*total_recv_count(1, dst_p - 1)
         rd_disp(dst_p) = rd_disp(dst_p - 1) &
                          + total_recv_count(2, dst_p - 1)
      END DO
      sdp(:) = sd_disp     ! sdp points to the the next place to store
      ! data. It is postincremented.
      smp(:) = sm_disp - metalen  ! But smp points to the "working" data, not
      ! the next. It is pre-incremented, so we must
      ! first rewind it.
      CALL dbcsr_data_init(data_block)
      CALL dbcsr_data_new(data_block, data_type)
      CALL dbcsr_iterator_start(iter, matrix)
      dst_p = -1
      DO WHILE (dbcsr_iterator_blocks_left(iter))
         CALL dbcsr_iterator_next_block(iter, row, col, data_block, tr, blk)
         !IF (tr) WRITE(*,*)"block at",row,col," is transposed"
         sym_tr = .FALSE.
         CALL dbcsr_get_stored_coordinates(redist, &
                                           row, col, dst_p)
         smp(dst_p) = smp(dst_p) + metalen
         IF (tr) THEN
            send_meta(smp(dst_p)) = -row
         ELSE
            send_meta(smp(dst_p)) = row
         END IF
         send_meta(smp(dst_p) + 1) = col ! new blocked column
         nze = dbcsr_data_get_size(data_block)
         CALL dbcsr_data_set(send_data, lb=sdp(dst_p), data_size=nze, &
                             src=data_block, source_lb=1)
         !send_data(sdp(dst_p):sdp(dst_p)+SIZE(r_dp)-1) &
         !     = r_dp(:)
         sdp(dst_p) = sdp(dst_p) + nze
      END DO
      CALL dbcsr_iterator_stop(iter)
      CALL dbcsr_data_clear_pointer(data_block)
      ! Exchange the data and metadata structures.
      SELECT CASE (data_type)
      CASE (dbcsr_type_real_4)
         CALL hybrid_alltoall_s1( &
            send_data%d%r_sp(:), total_send_count(2, :), sd_disp(:) - 1, &
            recv_data%d%r_sp(:), total_recv_count(2, :), rd_disp(:) - 1, &
            mp_obj_new)
      CASE (dbcsr_type_real_8)
         !CALL mp_alltoall(&
         !     send_data%d%r_dp(:), total_send_count(2,:), sd_disp(:)-1,&
         !     recv_data%d%r_dp(:), total_recv_count(2,:), rd_disp(:)-1,&
         !     mp_group)
         CALL hybrid_alltoall_d1( &
            send_data%d%r_dp(:), total_send_count(2, :), sd_disp(:) - 1, &
            recv_data%d%r_dp(:), total_recv_count(2, :), rd_disp(:) - 1, &
            mp_obj_new)
      CASE (dbcsr_type_complex_4)
         CALL hybrid_alltoall_c1( &
            send_data%d%c_sp(:), total_send_count(2, :), sd_disp(:) - 1, &
            recv_data%d%c_sp(:), total_recv_count(2, :), rd_disp(:) - 1, &
            mp_obj_new)
      CASE (dbcsr_type_complex_8)
         CALL hybrid_alltoall_z1( &
            send_data%d%c_dp(:), total_send_count(2, :), sd_disp(:) - 1, &
            recv_data%d%c_dp(:), total_recv_count(2, :), rd_disp(:) - 1, &
            mp_obj_new)
      END SELECT
      !CALL mp_alltoall(send_data(:), total_send_count(2,:), sd_disp(:)-1,&
      !     recv_data(:), total_recv_count(2,:), rd_disp(:)-1, mp_group)
      CALL hybrid_alltoall_i1(send_meta(:), metalen*total_send_count(1, :), sm_disp(:) - 1, &
                              recv_meta(:), metalen*total_recv_count(1, :), rm_disp(:) - 1, mp_obj_new)
      ! Now fill in the data.
      CALL dbcsr_work_create(redist, &
                             SUM(recv_count(1, :)), &
                             SUM(recv_count(2, :)), work_mutable=.FALSE., n=1)
      !
      blk_ps = 1
      blks = 0
      DO src_p = 0, numnodes - 1
         !data_offset_l = rd_disp(src_p)
         DO meta_l = 1, recv_count(1, src_p)
            row = recv_meta(rm_disp(src_p) + metalen*(meta_l - 1))
            tr = row .LT. 0
            stored_row_new = ABS(row)
            stored_col_new = recv_meta(rm_disp(src_p) + metalen*(meta_l - 1) + 1)
            nze = row_blk_size_new(stored_row_new)*col_blk_size_new(stored_col_new)
            !r_dp => recv_data(blk_ps:blk_ps+nze-1)
            !CALL dbcsr_put_block(redist, stored_row_new, stored_col_new, r_dp, tr)
            !### this should be changed to be like the make images (i.e., copy data in finalize, not here & now)
            data_block = pointer_view(data_block, recv_data, blk_ps, nze)
            CALL dbcsr_put_block(redist, stored_row_new, stored_col_new, data_block, transposed=tr)
            blk_ps = blk_ps + nze
            blks = blks + 1
         END DO
      END DO
      CALL dbcsr_data_clear_pointer(data_block)
      CALL dbcsr_data_release(data_block)
      !
      IF (dbg) THEN
         WRITE (*, *) routineN//" Declared blocks=", redist%wms(1)%lastblk, &
            "actual=", blks
         WRITE (*, *) routineN//" Declared data size=", redist%wms(1)%datasize, &
            "actual=", blk_ps
      END IF
      CALL dbcsr_finalize(redist)
      CALL dbcsr_data_release(recv_data)
      CALL dbcsr_data_release(send_data)
      DEALLOCATE (send_count)
      DEALLOCATE (recv_count)
      DEALLOCATE (sdp); DEALLOCATE (sd_disp)
      DEALLOCATE (smp); DEALLOCATE (sm_disp)
      DEALLOCATE (rd_disp)
      DEALLOCATE (rm_disp)
      DEALLOCATE (recv_meta)
      DEALLOCATE (send_meta)
      CALL timestop(handle)
   END SUBROUTINE dbcsr_redistribute

   SUBROUTINE dbcsr_datablock_redistribute(dblk, row_p, col_i, blk_p, &
                                           proc_nblks, proc_darea_sizes, new_matrix)
      !! Redistributes data blocks of a DBCSR matrix read from a file.
      !! This routine should be used with dbcsr_binary_read in the module
      !! dbcsr_io.F

      TYPE(dbcsr_data_obj), INTENT(IN)         :: dblk
         !! data blocks of the DBCSR matrix that the current node possesses after reading the data file
      INTEGER, DIMENSION(:), INTENT(IN), &
         POINTER                                :: row_p, col_i, blk_p, &
                                                   proc_nblks, proc_darea_sizes
         !! row_p of the DBCSR matrix that the current node possesses after reading the data file
         !! col_i of the DBCSR matrix that the current node possesses after reading the data file
         !! blk_p of the DBCSR matrix that the current node possesses after reading the data file
         !! 1D array holding nblks of those nodes of the mp environment, that created the file, whose contents have been read by the
         !! current node of the present mp environment
         !! 1D array holding data_area_size of those nodes of the mp environment, that created the file, whose contents have been
         !! read by the current node of the present mp environment
      TYPE(dbcsr_type), INTENT(INOUT)           :: new_matrix
         !! redistributed matrix

      CHARACTER(LEN=*), PARAMETER :: routineN = 'dbcsr_datablock_redistribute'
      INTEGER, PARAMETER                       :: metalen = 2

      COMPLEX(kind=dp), DIMENSION(:), POINTER, CONTIGUOUS :: c_dp
      COMPLEX(kind=sp), DIMENSION(:), POINTER, CONTIGUOUS :: c_sp
      INTEGER :: bcol, blk, blk_ps, blk_size, blks, brow, col_size, data_type, &
                 dst_p, handle, i, ind, job_count, meta_l, &
                 nblkrows_total, numnodes, row_size, src_p, stored_col_new, &
                 stored_row_new
      INTEGER(kind=int_8)                      :: actual_blk, blkp, end_ind, &
                                                  start_ind
      INTEGER(kind=int_8), ALLOCATABLE, &
         DIMENSION(:)                           :: extra_darea_size, extra_nblks
      INTEGER, ALLOCATABLE, DIMENSION(:)       :: rd_disp, recv_meta, rm_disp, &
                                                  sd_disp, sdp, send_meta, &
                                                  sm_disp, smp
      INTEGER, ALLOCATABLE, DIMENSION(:, :)    :: recv_count, send_count, &
                                                  total_recv_count, &
                                                  total_send_count
      INTEGER, DIMENSION(:), POINTER           :: col_blk_size, row_blk_size
      LOGICAL                                  :: sym_tr, tr
      REAL(kind=dp), DIMENSION(:), POINTER, CONTIGUOUS :: r_dp
      REAL(kind=sp), DIMENSION(:), POINTER, CONTIGUOUS :: r_sp
      TYPE(dbcsr_data_obj)                     :: data_block, recv_data, &
                                                  send_data
      TYPE(dbcsr_distribution_obj)             :: dist
      TYPE(dbcsr_mp_obj)                       :: mp_env
      TYPE(mp_comm_type)                                 :: mp_group

      CALL timeset(routineN, handle)

      dist = dbcsr_distribution(new_matrix)
      mp_env = dbcsr_distribution_mp(dist)
      numnodes = dbcsr_mp_numnodes(mp_env)
      mp_group = dbcsr_mp_group(mp_env)

      data_type = dbcsr_get_data_type(new_matrix)
      nblkrows_total = dbcsr_nblkrows_total(new_matrix)
      row_blk_size => array_data(new_matrix%row_blk_size)
      col_blk_size => array_data(new_matrix%col_blk_size)

      ALLOCATE (send_count(2, 0:numnodes - 1))
      ALLOCATE (recv_count(2, 0:numnodes - 1))
      ALLOCATE (total_send_count(2, 0:numnodes - 1))
      ALLOCATE (total_recv_count(2, 0:numnodes - 1))
      ALLOCATE (sdp(0:numnodes - 1))
      ALLOCATE (sd_disp(0:numnodes - 1))
      ALLOCATE (smp(0:numnodes - 1))
      ALLOCATE (sm_disp(0:numnodes - 1))
      ALLOCATE (rd_disp(0:numnodes - 1))
      ALLOCATE (rm_disp(0:numnodes - 1))
      send_count(:, :) = 0
      dst_p = -1

      job_count = COUNT(proc_nblks .NE. 0)

      ALLOCATE (extra_nblks(job_count))
      ALLOCATE (extra_darea_size(job_count))
      IF (job_count > 0) THEN
         CALL cumsum_l(INT((/0, proc_nblks(1:job_count - 1)/), kind=int_8), extra_nblks)
         CALL cumsum_l(INT((/0, proc_darea_sizes(1:job_count - 1)/), kind=int_8), extra_darea_size)
      END IF

      i = 0
      DO ind = 1, job_count*nblkrows_total
         brow = MOD(ind - 1, nblkrows_total) + 1
         IF (brow .EQ. 1) i = i + 1
         row_size = row_blk_size(brow)
         DO blk = row_p(ind + i - 1) + 1, row_p(ind + i)
            actual_blk = INT(blk, kind=int_8) + extra_nblks(i)
            bcol = col_i(actual_blk)
            col_size = col_blk_size(bcol)
            blk_size = row_size*col_size
            sym_tr = .FALSE.
            CALL dbcsr_get_stored_coordinates(new_matrix, brow, bcol, dst_p)
            send_count(1, dst_p) = send_count(1, dst_p) + 1
            send_count(2, dst_p) = send_count(2, dst_p) + blk_size
         END DO
      END DO
      CALL mp_alltoall(send_count, recv_count, 2, mp_group)
      CALL dbcsr_data_init(recv_data)
      CALL dbcsr_data_new(recv_data, data_type, SUM(recv_count(2, :)))
      ALLOCATE (recv_meta(metalen*SUM(recv_count(1, :))))
      CALL dbcsr_data_init(send_data)
      CALL dbcsr_data_new(send_data, data_type, SUM(send_count(2, :)))
      ALLOCATE (send_meta(metalen*SUM(send_count(1, :))))
      DO dst_p = 0, numnodes - 1
         total_send_count(1, dst_p) = send_count(1, dst_p)
         total_send_count(2, dst_p) = send_count(2, dst_p)
         total_recv_count(1, dst_p) = recv_count(1, dst_p)
         total_recv_count(2, dst_p) = recv_count(2, dst_p)
      END DO
      sd_disp = -1; sm_disp = -1; rd_disp = -1; rm_disp = -1
      sd_disp(0) = 1; sm_disp(0) = 1; rd_disp(0) = 1; rm_disp(0) = 1
      DO dst_p = 1, numnodes - 1
         sm_disp(dst_p) = sm_disp(dst_p - 1) + metalen*total_send_count(1, dst_p - 1)
         sd_disp(dst_p) = sd_disp(dst_p - 1) + total_send_count(2, dst_p - 1)
         rm_disp(dst_p) = rm_disp(dst_p - 1) + metalen*total_recv_count(1, dst_p - 1)
         rd_disp(dst_p) = rd_disp(dst_p - 1) + total_recv_count(2, dst_p - 1)
      END DO
      sdp(:) = sd_disp
      smp(:) = sm_disp - metalen

      SELECT CASE (data_type)
      CASE (dbcsr_type_real_4)
         r_sp => dblk%d%r_sp
      CASE (dbcsr_type_real_8)
         r_dp => dblk%d%r_dp
      CASE (dbcsr_type_complex_4)
         c_sp => dblk%d%c_sp
      CASE (dbcsr_type_complex_8)
         c_dp => dblk%d%c_dp
      END SELECT

      CALL dbcsr_data_init(data_block)
      CALL dbcsr_data_new(data_block, data_type)
      i = 0
      dst_p = -1
      DO ind = 1, job_count*nblkrows_total
         brow = MOD(ind - 1, nblkrows_total) + 1
         IF (brow .EQ. 1) i = i + 1
         row_size = row_blk_size(brow)
         DO blk = row_p(ind + i - 1) + 1, row_p(ind + i)
            actual_blk = INT(blk, kind=int_8) + extra_nblks(i)
            bcol = col_i(actual_blk)
            col_size = col_blk_size(bcol)
            blk_size = row_size*col_size
            blkp = INT(blk_p(actual_blk), kind=int_8)
            start_ind = blkp + extra_darea_size(i)
            end_ind = blkp + extra_darea_size(i) + blk_size - 1

            SELECT CASE (data_type)
            CASE (dbcsr_type_real_4)
               data_block%d%r_sp => r_sp(start_ind:end_ind)
            CASE (dbcsr_type_real_8)
               data_block%d%r_dp => r_dp(start_ind:end_ind)
            CASE (dbcsr_type_complex_4)
               data_block%d%c_sp => c_sp(start_ind:end_ind)
            CASE (dbcsr_type_complex_8)
               data_block%d%c_dp => c_dp(start_ind:end_ind)
            END SELECT
            sym_tr = .FALSE.
            CALL dbcsr_get_stored_coordinates(new_matrix, brow, bcol, dst_p)
            smp(dst_p) = smp(dst_p) + metalen
            tr = .FALSE.
!           IF (tr) THEN
!              send_meta(smp(dst_p)) = -brow
!           ELSE
            send_meta(smp(dst_p)) = brow
!           ENDIF
            send_meta(smp(dst_p) + 1) = bcol
            blk_size = dbcsr_data_get_size(data_block)

            CALL dbcsr_data_set(send_data, lb=sdp(dst_p), &
                                data_size=blk_size, src=data_block, source_lb=1)
            sdp(dst_p) = sdp(dst_p) + blk_size
         END DO
      END DO
      CALL dbcsr_data_clear_pointer(data_block)

      SELECT CASE (data_type)
      CASE (dbcsr_type_real_4)
         CALL hybrid_alltoall_s1( &
            send_data%d%r_sp(:), total_send_count(2, :), sd_disp(:) - 1, &
            recv_data%d%r_sp(:), total_recv_count(2, :), rd_disp(:) - 1, &
            mp_env)
      CASE (dbcsr_type_real_8)
         CALL hybrid_alltoall_d1( &
            send_data%d%r_dp(:), total_send_count(2, :), sd_disp(:) - 1, &
            recv_data%d%r_dp(:), total_recv_count(2, :), rd_disp(:) - 1, &
            mp_env)
      CASE (dbcsr_type_complex_4)
         CALL hybrid_alltoall_c1( &
            send_data%d%c_sp(:), total_send_count(2, :), sd_disp(:) - 1, &
            recv_data%d%c_sp(:), total_recv_count(2, :), rd_disp(:) - 1, &
            mp_env)
      CASE (dbcsr_type_complex_8)
         CALL hybrid_alltoall_z1( &
            send_data%d%c_dp(:), total_send_count(2, :), sd_disp(:) - 1, &
            recv_data%d%c_dp(:), total_recv_count(2, :), rd_disp(:) - 1, &
            mp_env)
      END SELECT
      CALL hybrid_alltoall_i1(send_meta(:), metalen*total_send_count(1, :), sm_disp(:) - 1, &
                              recv_meta(:), metalen*total_recv_count(1, :), rm_disp(:) - 1, mp_env)
      CALL dbcsr_work_create(new_matrix, SUM(recv_count(1, :)), &
                             SUM(recv_count(2, :)), work_mutable=.FALSE., n=1)

      blk_ps = 1
      blks = 0
      DO src_p = 0, numnodes - 1
         DO meta_l = 1, recv_count(1, src_p)
            brow = recv_meta(rm_disp(src_p) + metalen*(meta_l - 1))
            tr = brow .LT. 0
            stored_row_new = ABS(brow)
            stored_col_new = recv_meta(rm_disp(src_p) + metalen*(meta_l - 1) + 1)
            blk_size = row_blk_size(stored_row_new)*col_blk_size(stored_col_new)

            data_block = pointer_view(data_block, recv_data, blk_ps, blk_size)
            CALL dbcsr_put_block(new_matrix, stored_row_new, stored_col_new, data_block, transposed=tr)

            blk_ps = blk_ps + blk_size
            blks = blks + 1
         END DO
      END DO
      CALL dbcsr_data_clear_pointer(data_block)
      DEALLOCATE (data_block%d)
      CALL dbcsr_finalize(new_matrix, reshuffle=.TRUE.)
      CALL dbcsr_data_release(recv_data)
      CALL dbcsr_data_release(send_data)

      DEALLOCATE (send_count)
      DEALLOCATE (recv_count)
      DEALLOCATE (sdp); DEALLOCATE (sd_disp)
      DEALLOCATE (smp); DEALLOCATE (sm_disp)
      DEALLOCATE (rd_disp)
      DEALLOCATE (rm_disp)
      DEALLOCATE (recv_meta)
      DEALLOCATE (send_meta)
      DEALLOCATE (extra_nblks); DEALLOCATE (extra_darea_size)

      CALL timestop(handle)
   CONTAINS

      SUBROUTINE cumsum_l(arr, cumsum)
         INTEGER(kind=int_8), DIMENSION(:), INTENT(IN)      :: arr
         INTEGER(kind=int_8), DIMENSION(:), INTENT(OUT)     :: cumsum

         INTEGER                                            :: i

         IF (SIZE(cumsum) > 0) THEN
            cumsum(1) = arr(1)
            DO i = 2, SIZE(cumsum)
               cumsum(i) = cumsum(i - 1) + arr(i)
            END DO
         END IF
      END SUBROUTINE cumsum_l
   END SUBROUTINE dbcsr_datablock_redistribute

END MODULE dbcsr_transformations