Use ETCD Service Discovery in test infra

Recently I did a sharing to the team about ETCD service discovery. It also can be used to implement your test infra to hide test server changes from your infra.

Some slides from my presentation

Screenshot 2019-11-25 at 9.16.55 AM

Screenshot 2019-11-25 at 9.17.04 AM

Screenshot 2019-11-25 at 9.17.12 AM

Screenshot 2019-11-25 at 9.17.19 AM

The below is the architecture (I also used MySQL, Kafka and ES for other task requirements. You can ignore it)

Screenshot 2019-11-25 at 9.35.22 AM

How to utilise ETCD to hide test server changes from your infra

Think about a test infra requirement:

  • You are doing API automation testing. You not only have to verify the response but also want to check if the *.log in server contain correct logging info. Say, if it fails, response will be something like 5xx error code from server which is generic, you want to confirm it contains something like ‘balabala duplicated. Transaction xxx’ – so you can ensure it is really failing due to that specific error
  • Your servers are deploying in a docker/micro services environment which IPs may keep changed.
    • You may think can ask IT to configure DNS for it. But think about it, that you have more than two servers which serving the same APIs.

In this scenario you can use ETCD to achieve the goal:

  • Firstly, start a very basic TCP server in your every server instance. The TCP server will listen to request which contains finding-text and will return the log entry which contains the finding-text. The logic is very simple, it will accept a find-text, and grep the file and return the log entry to client.
    • func startFileParseServer(port string, file string) {
         ln, err := net.Listen("tcp", ":" + port)
         if err != nil {
            eChan <- err
         defer ln.Close()
         for {
            conn, err := ln.Accept()
            if err != nil {
               eChan <- err
            buf := make([]byte, 1024)
            // Read the incoming connection into the buffer.
            _, err = conn.Read(buf)
            if err != nil {
               log.Println("Error reading:", err.Error())
            buf = getValidByte([]byte(buf))
            log.Println("#" + string(buf) + "#")
            readFile, err := os.Open(file)
            if err != nil {
               log.Fatalf("failed to open file: %s", err)
            fileScanner := bufio.NewScanner(readFile)
            var fileTextLines []string
            for fileScanner.Scan() {
               fileTextLines = append(fileTextLines, fileScanner.Text())
            _ = readFile.Close()
            for _, l := range fileTextLines {
               if strings.Contains(l, string(buf)) {
                  io.WriteString(conn, l)
            // io.WriteString(conn, "")
      func getValidByte(src []byte) []byte {
         var str_buf []byte
         for _, v := range src {
            if v != 0 {
               str_buf = append(str_buf, v)
         return str_buf
  • Secondly, make your TCP servers registered to ETCD
    • package serviceregistry
      import (
         errUtil "xxx/component/err"
         configFactory "xxx/config"
      var (
         conf              = configFactory.Config()
         tcpUploadServices = make(map[string]string)
         tcpSearchServices = make(map[string]string)
         warn              = "[WARN]SvcR"
         cli               *clientv3.Client
      // cli *clientv3.Client, keyPrefix string
      func init() {
         log.Println("Initializing Service Registry...")
         var err error
         if cli, err = clientv3.New(clientv3.Config{
            Endpoints:   []string{conf.Etcd.Url},
            DialTimeout: 5 * time.Second,
         }); err != nil {
      func initServices(prefix string) {
         resp, err := cli.Get(context.Background(), prefix, clientv3.WithPrefix())
         if err != nil {
         for _, kv := range resp.Kvs {
            switch prefix {
            case conf.Etcd.Etcd_tcp_upload_prefix_tcp:
               tcpUploadServices[string(kv.Key)] = string(kv.Value)
            case conf.Etcd.Etcd_tcp_search_prefix_tcp:
               tcpSearchServices[string(kv.Key)] = string(kv.Value)
            log.Println("*** Available Service: " + string(kv.Key) + string(kv.Value))
      func Watch(prefix string) {
         defer func() {
            if err := cli.Close(); err != nil {
               log.Printf("%s: Somehow fail to close then etcd client v3 conn handler. Ignoring...\n", warn)
         log.Printf("Service Registry is listening for services %s.* updates...", prefix)
         rch := cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
         for n := range rch {
            for _, ev := range n.Events {
               switch ev.Type {
               case mvccpb.PUT:
                  switch prefix {
                  case conf.Etcd.Etcd_tcp_upload_prefix_tcp:
                     tcpUploadServices[string(ev.Kv.Key)] = string(ev.Kv.Value)
                  case conf.Etcd.Etcd_tcp_search_prefix_tcp:
                     tcpSearchServices[string(ev.Kv.Key)] = string(ev.Kv.Value)
                  log.Println("*** Service is PUT: " + string(ev.Kv.Key) + "#" + string(ev.Kv.Value))
               case mvccpb.DELETE:
                  switch prefix {
                  case conf.Etcd.Etcd_tcp_upload_prefix_tcp:
                     delete(tcpUploadServices, string(ev.Kv.Key))
                  case conf.Etcd.Etcd_tcp_search_prefix_tcp:
                     delete(tcpSearchServices, string(ev.Kv.Key))
                  log.Println("*** Service is DELETED: " + string(ev.Kv.Key) + "#" + string(ev.Kv.Value))
      func Resolver(prefix string) (string, error) {
         if prefix == conf.Etcd.Etcd_tcp_upload_prefix_tcp {
            return fetchServer(tcpUploadServices)
         if prefix == conf.Etcd.Etcd_tcp_search_prefix_tcp {
            return fetchServer(tcpSearchServices)
         return "", errors.New(errUtil.Lookup(errUtil.SERVER_ERR_UNSUPPORTED_OPERATION))
      func fetchServer(services map[string]string) (string, error) {
         // TODO: can implement a real strategy here, say a weighted round robin algorithm
         max := big.NewInt(int64(len(services)))
         if max.Int64() == int64(0) {
            return "", errors.New(errUtil.Lookup(errUtil.SERVER_ERR_SERVER_UNAVAILABLE))
         i, _ := rand.Int(rand.Reader, max)
         r := i.Int64()
         var count = int64(0)
         var resolve = ""
         for k, v := range services {
            if count != r {
               count = count + 1
            } else {
               log.Printf("*** Picked up [%s]:[%s] to serve", k, v)
               resolve = v
         return resolve, nil
  • Thirdly, write a simple Load Balance (LB) which will provide a single endpoint to your test infra for consumption
    • package main
      import (
         configFactory "xxxx/config"
      var (
         conf = configFactory.Config()
         warn = "[WARN]LB"
      func handleConnection(in net.Conn, prefix string) {
         dest, err := serviceregistry.Resolver(prefix)
         if err != nil {
            _ = in.Close() // inform client
         log.Printf("Connecting to service: %s...\n", dest)
         out, err := net.Dial("tcp", dest)
         defer func() {
            err1 := in.Close();
            err2 := out.Close();
            if err1 != nil || err2 != nil {
               log.Printf("%s: unable to close tcp for %s. Ignoring...\n", warn, dest)
            } else {
               log.Printf("Closed tcp for %s.\n", dest)
            // try to recover from it - when there are many requests the io.Copy randomly will crash due to NPE
            if r := recover(); r != nil {
               log.Printf("%s: while tried to recover from: %s", warn, r)
         eChan := make(chan error, 2)
         cp := func(dst io.Writer, src io.Reader) {
            _, err := io.Copy(dst, src)
            eChan <- err
         go cp(out, in)
         go cp(in, out)
         err = <-eChan
         if err != nil && err != io.EOF {
            log.Printf("%s: %s\n", warn, err)
      func init() {
         go serviceregistry.Watch(conf.Etcd.Etcd_tcp_upload_prefix_tcp)
         go serviceregistry.Watch(conf.Etcd.Etcd_tcp_search_prefix_tcp)
         // await service registry initialization
         time.Sleep(2 * time.Second)
      func main() {
         messages := make(chan int)
         var wg sync.WaitGroup
         go func() {
            loadBalance(conf.Upload.Protocol, conf.Upload.Host, conf.Upload.Port, conf.Etcd.Etcd_tcp_upload_prefix_tcp)
            messages <- 1
         go func() {
            loadBalance(conf.Tcpsearch.Protocol, conf.Tcpsearch.Host, conf.Tcpsearch.Port, conf.Etcd.Etcd_tcp_search_prefix_tcp)
            messages <- 2
      func loadBalance(Protocol string, HOST string, PORT string, prefix string) {
         l, err := net.Listen(Protocol, HOST+":"+PORT)
         if err != nil {
         defer func() {
            if err := l.Close(); err != nil {
               log.Printf("%s: unable to shutdown server. Ignoring...\n", warn)
         log.Printf("Load Balancer is serving services %s.* updates...", prefix)
         for {
            c, err := l.Accept()
            if err != nil {
            go handleConnection(c, prefix)
  • The last, in your test infra/test code, can just connect to LB to grep logs from remote servers
    • func GetDeductServerInfoLog(finder string) string {
         // connect to this socket
         conn, _ := net.Dial("tcp", <YOUR_LB_ENDPOINT> + ":<PORT>")
         message, _ := bufio.NewReader(conn).ReadString('\n')
         log.Print("GetDeductServerInfoLog: " + message)
         defer conn.Close()
         return message

Build Performance Testing Infra with JMeter + InfluxDB + Grafana + Perfmon

Performance Testing

First of all, if you don’t know Performance Testing much, below materials are good to kick off

So now your boss is asking you to do Performance Testing.

  • JMeter. You need to have a performance testing tool which is open source and free, can support test script recording, and can generate insightful HTML reports. Apache JMeter may be used to test performance both on static and dynamic resources, Web dynamic applications. It can be used to simulate a heavy load on a server, group of servers, network or object to test its strength or to analyze overall performance under different load types.
  • InfluxDB. JMeter can provide very friendly and detailed HTML report after testing run. However, you still want to persistent your JMeter testing result in DB so that you can compare your test results with previous’. InfluxDB is the one can help you out. InfluxDB is an open-source time series database developed by InfluxData. It is written in Go and optimized for fast, high-availability storage and retrieval of time series data in fields such as operations monitoring, application metrics, Internet of Things sensor data, and real-time analytics
  • Grafana. InfluxDB can help you store the time series JMeter data but you still want to have a on-the-fly dashboard to help you monitor and visualize the real time run of JMeter and its histories. Grafana is the open source analytics & monitoring solution for every database. It allows you to query, visualize, alert on and understand your metrics no matter where they are stored. Create, explore, and share dashboards with your team and foster a data driven culture so you can analysis the performance testing reports better
  • Perfmon. During a load test, it is important to know the health of the servers loaded. It is also nice to see if you are targeting a cluster if the load is correctly dispatched. To address this, the plugin package now supports server monitoring! Using it, you can monitor CPU, Memory, Swap, Disks I/O and Networks I/O on almost all platforms!



Supposed you have read through JMeter user manual and know how to use it. If not yet, is a very good tutorial

Script Recorder & Distribution Mode

  • Use the test script recorder to capture user’s operation:
  • JMeter is written by Java and all of the concurrents are Threads so all of the constraints against JVM will be there for JMeter. A single JMeter client running on a 2-3Ghz CPU (recent cpu) can handle 300-600 threads depending on the type of test. My laptop is MacBook Pro and its processor is 2.8GHz and in my tries, it supports 500 threads gracefully and will run into problem if I try to run with 1000 threads.

JMeter Report

Run JMeter from command line: avoid using JMeter UI during load tests, it can eat a lot of memory

  • jmeter -n -t JMX_FILE -l JTL_FILE -r

Since JMeter 3.0, it provides a very powerful feature to genarate HTML Report DashBoard

  • jmeter -g JTL_FILE -o OUTPUT_FOLDER
    • -g JTL_FILE: relative or full path to the JTL file. Example: jmeter.jtl,
    • -o OUTPUT_FOLDER: the folder in which the HTML report should be written
    • Screenshot 2019-10-02 at 9.51.14 PM


When analysis the report, should focus on,

  • APDEX (Application Performance Index) –
    • {\displaystyle Apdex_{t}={\frac {SatisfiedCount+{\frac {ToleratingCount}{2}}}{TotalSamples}}}
      • Example: if there are 100 samples with a target time of 3 seconds, where 60 are below 3 seconds, 30 are between 3 and 12 seconds, and the remaining 10 are above 12 seconds, the Apdex score is:
      • {\displaystyle Apdex_{3}={\frac {60+{\frac {30}{2}}}{100}}=0.75}
    • In JMeter, you can use apdex_satisfied_threshold (default: 500ms) and apdex_tolerated_threshold (default: 1500ms) to set the threshold
  • Throughput: Calculated as requests/unit of time. The time is calculated from the start of the first sample to the end of the last sample. The formula is: Throughput = (number of requests) / (total time).
  • Elapsed Time / Connect Time / Latency: should be as low as possible, ideally less than 1 second.
  • Median: should be close to average elapsed response time,
  • XX% line: should be as low as possible too. When it’s way lower than average elapsed time, it indicates that the last XX% requests have dramatically higher response times than lower ones,
  • Standard Deviation: should be low. A high deviation indicates discrepancies in responses times, which translates to response time spikes.

InfluxDB + Grafana

Detailed Instruction: . Overal steps:

  • Install and start InfluxDB ‘influxd -config /usr/local/etc/influxdb.conf’, create database
  • Install Grafana ‘grafana-server –config=/usr/local/etc/grafana/grafana.ini –homepath /usr/local/share/grafana –packaging=brew cfg:default.paths.logs=/usr/local/var/log/grafana cfg:default.paths.plugins=/usr/local/var/lib/grafana/plugins’ and configure Influx DB for it
  • Download JMeter InfluxDB Writer and configure a Backend Listener for InfluxDB. Backend Listener is Asynchronous listener that enables you to write time series data to InfluxDB which can be used by NovaTec APM Dashboard in Grafana
  • Start JMeter run, and then you can use below command to see if the JMeter is barfing records to InfluxDB
  • Go to Grafana, if you can see the below on-the-fly dialog then it works
    • Screenshot 2019-10-02 at 6.11.01 PM
    • Screenshot 2019-10-02 at 9.48.22 PM


React Native Debugging + Android Profiler + GPU rendering

I usually use three applications to do Android Client (and react native) profiling.

React Native Debugging

  1. Download and install the React Native Debugger from
  2. Go to your react native code directory
  3. npm install and then npm start to start the server
  4. Launch the app and go to the React Native page
  5. From command line, ‘adb -s your-device shell input keyevent 82’
  6. Choose ‘Enable Remote JS Debuging’ then you are free to go

Android Profiler

  1. View -> Tool Windows -> Android Profiler

GPU rendering:

  1. Developer Options -> Visualize GPU rendering (enable it)
  2. Developer Options -> Profile GPU rendering (On Screen as bars)
  3. device-2019-09-27-122205
  4. (fps frames per second)

Monkey Testing

Monkey testing has two disadvantages:

  1. The randomness of monkey testing often makes the bugs found difficult or impossible to reproduce. Unexpected bugs found by monkey testing can also be challenging and time consuming to analyze
  2. Difficult to cover all of the UI pages because Monkey testing (a.ka. random testing?) is based on random events

I use to do monkey testing against my Android products. To solve above two disadvantages, I implement the test suite as below:

  1. When a test case is kicked off, it will start to record video and will pull and backup the video after the testing is done so we can know what happen when an error happens
  2. Based on my functional UI regression test suites, I add the monkey testing step in each UI page so the monkey events can be exercised in a specific UI page
from threading import Thread
from time import sleep
import os
import subprocess
import time
from shutil import copyfile
from configs import ocha_configs as CONFIG

class Monkey:

    def record():
        # os.system('adb -s T21718CJ40232 shell screenrecord /sdcard/demo.mp4 --time-limit 30')
        result =
            ['adb', '-s', CONFIG.DEVICE_NAME, 'shell', 'screenrecord', '/sdcard/ocha.mp4', '--time-limit', '60'],

    def pull_video():
        result =
            ['adb', '-s', CONFIG.DEVICE_NAME, 'pull', '/sdcard/ocha.mp4'],

        except OSError:

        # print(result)
        copyfile('ocha.mp4', 'golden_video/ocha-screen-video-' + time.strftime("%Y-%m-%d-%H-%M-%S") + '.mp4')

    def perform():
        result =
            ['adb', '-s', CONFIG.DEVICE_NAME, 'shell', 'monkey', '-p', '', '--pct-syskeys', '0', '-v',
        assert result.returncode == 0, '!!! Monkey Testing Failed around {} !!!'.format( time.strftime("%Y-%m-%d-%H-%M-%S"))

    def play():
        print('Monkey is playing')
        record_screen_thread = Thread(target=Monkey.record)
        run_monkey_thread = Thread(target=Monkey.perform)
        # run_monkey_thread.join()
        print('Monkey is out')


import actions
from configs import ocha_configs as CONFIG
import pytest
import utils

class TestOchaSettingsPrinters:

    def test_update_receipt_footer_save_new_line_format(self, driver):
        actions.printers.update_receipt_settings(driver, receipt_format_type=CONFIG.RECEIPT_FORMAT_TYPE['NEW_LINE'])

    def test_update_cashier_printer_wifi(self, driver):

Large set of unknown developers (LSUDs) VS. Small set of known developers (SSKDs) and HTTP idempotent f(f(x)) = f(x)

A very thoughtful post about one API design principle.

  • Large set of unknown developers (LSUDs) VS. Small set of known developers (SSKDs)

HTTP 1.1:

Methods can also have the property of “idempotence” in that (aside from error or expiration issues) the side-effects of N > 0 identical requests is the same as for a single request.

In my current project, all of the developers have to follow SSKD (because we are working on an in-house product) and our APIs must to support idempotent operation.

For the idempotent operation, we use a ‘version’ in each request, if the ‘version’ of a request has been proceeded, the server will not proceed its subsequent duplicated requests; the server will return the same result for the same requests.


Post or Put:

Host SSL Cert Checker

Just copied and modified a cert check script from

The script will validate the cert of hosts and fail the run if the expiry date of any certs < 30 days.

# -*- encoding: utf-8 -*-
# requires a recent enough python with idna support in socket
# pyopenssl, cryptography and idna
# all credits to

from OpenSSL import SSL
from cryptography import x509
from cryptography.x509.oid import NameOID
import idna

from socket import socket
from collections import namedtuple
from datetime import datetime

HostInfo = namedtuple(field_names='cert hostname peername', typename='HostInfo')

    ('', 443),

    ('', 443),
    ('', 443),

def verify_cert(cert, hostname):
    # verify notAfter/notBefore, CA trusted, servername/sni/hostname
    # service_identity.pyopenssl.verify_hostname(client_ssl, hostname)
    # issuer

def get_certificate(hostname, port):
    hostname_idna = idna.encode(hostname)
    sock = socket()

    sock.connect((hostname, port))
    peername = sock.getpeername()
    ctx = SSL.Context(SSL.SSLv23_METHOD)  # most compatible
    ctx.check_hostname = False
    ctx.verify_mode = SSL.VERIFY_NONE

    sock_ssl = SSL.Connection(ctx, sock)
    cert = sock_ssl.get_peer_certificate()
    crypto_cert = cert.to_cryptography()

    return HostInfo(cert=crypto_cert, peername=peername, hostname=hostname)

def get_alt_names(cert):
        ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
        return ext.value.get_values_for_type(x509.DNSName)
    except x509.ExtensionNotFound:
        return None

def get_common_name(cert):
        names = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
        return names[0].value
    except x509.ExtensionNotFound:
        return None

def get_issuer(cert):
        names = cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)
        return names[0].value
    except x509.ExtensionNotFound:
        return None

def find_expiring_cert(hostinfo):
    basic_info = '''{hostname} - {peername} | commonName: {commonname} | SAN: {SAN} | issuer: {issuer} | notBefore: {notbefore} | notAfter:  {notafter}
    time_between_insertion = hostinfo.cert.not_valid_after -
    print('Expiring after: ' + str(time_between_insertion.days)) + ' days'
    found = False
    if int(time_between_insertion.days) < 30:
        found = True

    return found

def check_it_out(hostname, port):
    hostinfo = get_certificate(hostname, port)

def test_ssl_check():
    found = False
    for host in HOSTS:
        if find_expiring_cert(get_certificate(host[0], host[1])):
            found = True

    assert found is False, 'Found cert which is expiring in 30 days!!!'

Rest API Maturity Level and HATEOAS

Just read into an article about the maturity level of Rest API design,

So basically,

Level0 – You are starting to use HTTP

Level1 – All are resources in your API design

Level2 – All are about Verb, post, put, delete, get, patch, option, etc.

Level3 – HATEOAS – Hypermedia As The Engine Of Application State.

One example of HATEOAS:

"name": "John Doe",
"links": [{
"rel": "self",
"href": "http://localhost:8080/users/123"
"rel": "posts",
"href": "http://localhost:8080/users/123/posts"
"rel": "address",
"href": "http://localhost:8080/users/123/address"

Currently one of the projects I am working on it is using the HATEOAS. We are building up a CRM project and using React to build the user interfaces. Then we embed the React into Android and iOS. Because of this design, we have to control the display and client logic from server. Hence, in the server API response, we have to tell clients (both of Android and iOS) what are the next steps, like it is a href and its URL.


Client Testing Process

To achieve below objectives, I defined the testing process for my team as below screeshot,

  • Objective – I wanted to test my feature ASAP
    • Use development branch – dev can check in code into development branch any time directly without code review
    • QA can update or request dev to modify configuration or code in development branch any time for testing convenient
    • Dev environment / or Dev client-apk/ipa is supposed to be unstable
    • We conduct/focus on new feature / bug fixes on Dev branch
    • Limited regression testing will be conducted on Dev branch only
    • Jump into the code review / Merge Request (MR) / commits to improve test efficient
  • Objective – I wanted to build this or that into the next .apk / deploy this or that to QA/Staging environment
    • Become the admin of Jenkins
    • Make Jenkins build friendly to QA
    • Jump into Git to control the code change scope
    • Don’t do this way, ‘hey developer can you help me build a new apk’ – this is anti-pattern

Screen Shot 2018-10-17 at 5.44.35 PM

A typical User Case:

  • Step 1 – dev creates a new branch ‘my-mom-cooks-dinner-for-me-every-day’
  • Step 2 – dev wants to involve QA for testing before raising MR. He merges to pre-dev branch directly
  • Step 3 – Jenkins builds an apk
  • Step 4 – QA gets the apk and test quickly
  • Step 5 – QA feedbacks quickly
    • if test pass, ask dev to rasie MR and tell the reviewer test passed
    • else, ask dev to fix
    • QA can also review the code changes to improve test coverage and test productivity
  • Step 6 – QA gives a GO, reviewer reviews code, merge code to development branch
  • Step 7 – development branch triggers Jenkins to build .apk automatically
  • Step 8 – QA verifies the build when s/he has time
  • Step 9 – QA verifies test result
  • Step 10
    • if test passed, QA merge the code to master branch directly
    • else, inform dev to fix
  • Step 11 – master branch triggers Jenkins to build .apk automatically – If QA has time continue to test
  • Step 12 – QA gets the master branch .apk and test
  • Step 14
    •  If test passes, QA can trigger Jenkins to build a public version manually
    • else feedbacks to dev immediately


  • We have several feature branches and some of them may not be released in next public release. Should I still merge to pre-dev branch? — Yes! Integrate as soon as you think it is ready for testing. You can even raise MRs for it and just do the merge when it becomes a release candidate
  • I have done my work on feature branch but I am a bit worried the implementation is not quite stable, can I merge to pre-dev branch? — Yes! Just merge it. If it is fragile then do it frequent to make it strong. QA welcomes it!
  • So Dev will not do the merge from development branch to master branch, true? — Yes! QA will do it and do it every day and will only involve Dev’s attention for code conflicts.

Image Compare Check – Android

Google provided a tool ‘monkeyrunner’ which is written in Jython

Its sub tool APIs “monkeyrunner class can be used to hold an image of the device or emulator’s screen. The image is copied from the screen buffer during a screenshot. This object’s methods allow you to convert the image into various storage formats, write the image to a file, copy parts of the image, and compare this object to other MonkeyImage objects.”

Jython script (firstly you need to install the Jython. I used Jython 2.7):

from import MonkeyRunner, MonkeyDevice, MonkeyImage
import sys
import os

device = MonkeyRunner.waitForConnection(deviceId=sys.argv[1])
result = device.takeSnapshot()

image_format = 'png'
golden_image = sys.argv[2] + '/' + sys.argv[3] + '.' + image_format
image_exists = os.path.isfile(golden_image)

if image_exists is False:
    print('Image does not exist. Creating golden image')
    except OSError:
    result.writeToFile(golden_image, image_format)

ref = MonkeyRunner.loadImageFromFile(golden_image, image_format)

diff_image = sys.argv[2] + '/' + sys.argv[3] + '-diff.' + image_format
is_same = result.sameAs(ref, float(sys.argv[4]))

if is_same is False:
    print('Image is differed. Creating diffed image')
    result.writeToFile(diff_image, image_format)
    print('Image is matched')

In your Python (my Python is 3.7) script, you can invoke it,

import subprocess
import os
from configs import ocha_configs as CONFIG
import inspect
import re

class ImageUtils:

    def diff_image(image_dir, image_name, should_diff=True):
        if should_diff is False:

        jython_script = re.sub(r'test_suite(\S+)', '{}'.format('golden_images/'), image_dir)

        image_dir_translated = image_dir.replace('.py', '').replace('test_suite',
                                                                    'golden_images/' + os.getenv('DEVICE_NAME'))
        result =
            ['monkeyrunner', jython_script, os.getenv('DEVICE_NAME'), image_dir_translated, image_name,

        assert result.returncode == 0, 'Image is mismatched'